Skip to content

Commit 58d9975

Browse files
mateuszmanderatimabbott
authored andcommitted
embed_links: Interrupt consume() function on worker timeout.
This fixes a bug introduced in 95b4654 which made the worker simply log a warning about the timeout and then continue consume()ing the event that should have also been interrupted. The idea here is to introduce an exception which can be used to interrupt the consume() process without triggering the regular handling of exceptions that happens in _handle_consume_exception.
1 parent cee4da6 commit 58d9975

File tree

1 file changed

+16
-0
lines changed

1 file changed

+16
-0
lines changed

zerver/worker/queue_processors.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,16 @@ def __str__(self) -> str:
115115
return f"Timed out after {self.limit * self.event_count} seconds processing {self.event_count} events"
116116

117117

118+
class InterruptConsumeException(Exception):
119+
"""
120+
This exception is to be thrown inside event consume function
121+
if the intention is to simply interrupt the processing
122+
of the current event and normally continue the work of the queue.
123+
"""
124+
125+
pass
126+
127+
118128
class WorkerDeclarationException(Exception):
119129
pass
120130

@@ -341,6 +351,11 @@ def timer_expired(
341351
raise WorkerTimeoutException(limit, len(events))
342352

343353
def _handle_consume_exception(self, events: List[Dict[str, Any]], exception: Exception) -> None:
354+
if isinstance(exception, InterruptConsumeException):
355+
# The exception signals that no further error handling
356+
# is needed and the worker can proceed.
357+
return
358+
344359
with configure_scope() as scope:
345360
scope.set_context(
346361
"events",
@@ -775,6 +790,7 @@ def timer_expired(
775790
event["message_id"],
776791
event["urls"],
777792
)
793+
raise InterruptConsumeException
778794

779795

780796
@assign_queue("outgoing_webhooks")

0 commit comments

Comments
 (0)