|
16 | 16 | from collections import deque
|
17 | 17 | from email.message import EmailMessage
|
18 | 18 | from functools import wraps
|
19 |
| -from threading import Lock, Timer |
| 19 | +from threading import RLock, Timer |
20 | 20 | from types import FrameType
|
21 | 21 | from typing import (
|
22 | 22 | Any,
|
@@ -582,8 +582,10 @@ class MissedMessageWorker(QueueProcessingWorker):
|
582 | 582 | # This lock protects access to all of the data structures declared
|
583 | 583 | # above. A lock is required because maybe_send_batched_emails, as
|
584 | 584 | # the argument to Timer, runs in a separate thread from the rest
|
585 |
| - # of the consumer. |
586 |
| - lock = Lock() |
| 585 | + # of the consumer. This is a _re-entrant_ lock because we may |
| 586 | + # need to take the lock when we already have it during shutdown |
| 587 | + # (see the stop method). |
| 588 | + lock = RLock() |
587 | 589 |
|
588 | 590 | # Because the background `maybe_send_batched_email` thread can
|
589 | 591 | # hold the lock for an indeterminate amount of time, the `consume`
|
@@ -640,7 +642,10 @@ def maybe_send_batched_emails(self) -> None:
|
640 | 642 | # self.timer_event just triggered execution of this
|
641 | 643 | # function in a thread, so now that we hold the lock, we
|
642 | 644 | # clear the timer_event attribute to record that no Timer
|
643 |
| - # is active. |
| 645 | + # is active. If it is already None, stop() is shutting us |
| 646 | + # down. |
| 647 | + if self.timer_event is None: |
| 648 | + return |
644 | 649 | self.timer_event = None
|
645 | 650 |
|
646 | 651 | current_time = timezone_now()
|
@@ -696,6 +701,29 @@ def maybe_send_batched_emails(self) -> None:
|
696 | 701 | if ScheduledMessageNotificationEmail.objects.exists():
|
697 | 702 | self.ensure_timer()
|
698 | 703 |
|
| 704 | + def stop(self) -> None: |
| 705 | + # This may be called from a signal handler when we _already_ |
| 706 | + # have the lock. Python doesn't give us a way to check if our |
| 707 | + # thread has the lock, so we instead use a re-entrant lock to |
| 708 | + # always take it. |
| 709 | + with self.lock: |
| 710 | + # With the lock,we can safely inspect the timer_event and |
| 711 | + # cancel it if it is still pending. |
| 712 | + if self.timer_event is not None: |
| 713 | + # We cancel and then join the timer with a timeout to |
| 714 | + # prevent deadlock, where we took the lock, the timer |
| 715 | + # then ran out and started maybe_send_batched_emails, |
| 716 | + # and then it started waiting for the lock. The timer |
| 717 | + # isn't running anymore so can't be canceled, and the |
| 718 | + # thread is blocked on the lock, so will never join(). |
| 719 | + self.timer_event.cancel() |
| 720 | + self.timer_event.join(timeout=1) |
| 721 | + # In case we did hit this deadlock, we signal to |
| 722 | + # maybe_send_batched_emails that it should abort by, |
| 723 | + # before releasing the lock, unsetting the timer. |
| 724 | + self.timer_event = None |
| 725 | + super().stop() |
| 726 | + |
699 | 727 |
|
700 | 728 | @assign_queue("email_senders")
|
701 | 729 | class EmailSendingWorker(LoopQueueProcessingWorker):
|
|
0 commit comments