Skip to content

Commit 2142ceb

Browse files
authored
MRG: Merge pull request #690 from octue/warn-about-runtime-timeout
Log warning when runtime timeout is near
2 parents c467cbd + 596ed00 commit 2142ceb

File tree

10 files changed

+347
-210
lines changed

10 files changed

+347
-210
lines changed

docs/source/inter_service_compatibility.rst

+101-99
Large diffs are not rendered by default.

octue/cloud/pub_sub/events.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from octue.cloud.events.handler import AbstractEventHandler
1212
from octue.cloud.events.validation import SERVICE_COMMUNICATION_SCHEMA
1313
from octue.utils.decoders import OctueJSONDecoder
14-
from octue.utils.objects import getattr_or_subscribe
14+
from octue.utils.objects import get_nested_attribute
1515
from octue.utils.threads import RepeatingTimer
1616

1717

@@ -28,7 +28,7 @@ def extract_event_and_attributes_from_pub_sub_message(message):
2828
:return (any, dict): the extracted event and its attributes
2929
"""
3030
# Cast attributes to a dictionary to avoid defaultdict-like behaviour from Pub/Sub message attributes container.
31-
attributes = dict(getattr_or_subscribe(message, "attributes"))
31+
attributes = dict(get_nested_attribute(message, "attributes"))
3232

3333
# Deserialise the `parent_question_uuid`, `forward_logs`, and `retry_count`, fields if they're present
3434
# (don't assume they are before validation).

octue/cloud/pub_sub/service.py

+18-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import importlib.metadata
66
import json
77
import logging
8+
import os
9+
import time
810
import uuid
911

1012
import google.api_core.exceptions
@@ -232,11 +234,12 @@ def answer(self, question, heartbeat_interval=120, timeout=30):
232234

233235
try:
234236
self._send_delivery_acknowledgment(**routing_metadata)
237+
start_time = time.perf_counter()
235238

236239
heartbeater = RepeatingTimer(
237240
interval=heartbeat_interval,
238-
function=self._send_heartbeat,
239-
kwargs=routing_metadata,
241+
function=self._send_heartbeat_and_check_runtime,
242+
kwargs={"start_time": start_time, **routing_metadata},
240243
)
241244

242245
heartbeater.daemon = True
@@ -666,24 +669,29 @@ def _send_delivery_acknowledgment(
666669

667670
logger.info("%r acknowledged receipt of question %r.", self, question_uuid)
668671

669-
def _send_heartbeat(
672+
def _send_heartbeat_and_check_runtime(
670673
self,
671674
question_uuid,
672675
parent_question_uuid,
673676
originator_question_uuid,
674677
parent,
675678
originator,
676679
retry_count,
680+
start_time,
681+
runtime_timeout_warning_time=3480, # This is 58 minutes in seconds.
677682
timeout=30,
678683
):
679-
"""Send a heartbeat to the parent, indicating that the service is alive.
684+
"""Send a heartbeat to the parent, indicating that the service is alive. If it's running on Cloud Run and it's
685+
been running for longer than the runtime timeout warning time, log a warning that it will be stopped soon.
680686
681687
:param str question_uuid: the UUID of the question this event relates to
682688
:param str|None parent_question_uuid: the UUID of the question that triggered this question
683689
:param str|None originator_question_uuid: the UUID of the question that triggered all ancestor questions of this question
684690
:param str parent: the SRUID of the parent that asked the question this event is related to
685691
:param str originator: the SRUID of the service revision that triggered all ancestor questions of this question
686692
:param int retry_count: the retry count of the question (this is zero if it's the first attempt at the question)
693+
:param int|float start_time: the `time.perf_counter` time that the analysis was started [s]
694+
:param int|float runtime_timeout_warning_time: the amount of time after which to warn that the runtime timeout is approaching [s]
687695
:param float timeout: time in seconds after which to give up sending
688696
:return None:
689697
"""
@@ -700,6 +708,12 @@ def _send_heartbeat(
700708
timeout=timeout,
701709
)
702710

711+
if (
712+
os.environ.get("COMPUTE_PROVIDER") == "GOOGLE_CLOUD_RUN"
713+
and time.perf_counter() - start_time > runtime_timeout_warning_time
714+
):
715+
logger.warning("This analysis will reach the maximum runtime and be stopped soon.")
716+
703717
logger.debug("Heartbeat sent by %r.", self)
704718

705719
def _send_monitor_message(

0 commit comments

Comments
 (0)