Skip to content

Commit e46fe25

Browse files
authored
MRG: Merge pull request #581 from octue/fix/allow-sets-and-datetimes-as-input-and-output-values
Allow sets and datetimes as service input/output values
2 parents 227f664 + 1123635 commit e46fe25

File tree

8 files changed

+1174
-878
lines changed

8 files changed

+1174
-878
lines changed

docs/source/inter_service_compatibility.rst

+149-147
Large diffs are not rendered by default.

octue/cloud/pub_sub/message_handler.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from octue.definitions import GOOGLE_COMPUTE_PROVIDERS
1616
from octue.log_handlers import COLOUR_PALETTE
1717
from octue.resources.manifest import Manifest
18+
from octue.utils.decoders import OctueJSONDecoder
1819
from octue.utils.threads import RepeatingTimer
1920

2021

@@ -226,7 +227,7 @@ def _pull_and_enqueue_message(self, timeout):
226227
if not self._child_sdk_version:
227228
self._heartbeat_checker.cancel()
228229

229-
message = json.loads(answer.message.data.decode())
230+
message = json.loads(answer.message.data.decode(), cls=OctueJSONDecoder)
230231

231232
message_number = int(message["message_number"])
232233
self._waiting_messages[message_number] = message
@@ -354,7 +355,7 @@ def _handle_monitor_message(self, message):
354355
logger.debug("%r received a monitor message.", self.receiving_service)
355356

356357
if self.handle_monitor_message is not None:
357-
self.handle_monitor_message(json.loads(message["data"]))
358+
self.handle_monitor_message(json.loads(message["data"], cls=OctueJSONDecoder))
358359

359360
def _handle_log_message(self, message):
360361
"""Deserialise the message into a log record and pass it to the local log handlers, adding [<service-name>] to

octue/cloud/pub_sub/service.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -489,15 +489,15 @@ def _parse_question(self, question):
489489

490490
try:
491491
# Parse question directly from Pub/Sub or Dataflow.
492-
data = json.loads(question.data.decode())
492+
data = json.loads(question.data.decode(), cls=OctueJSONDecoder)
493493

494494
# Acknowledge it if it's directly from Pub/Sub
495495
if hasattr(question, "ack"):
496496
question.ack()
497497

498498
except Exception:
499499
# Parse question from Google Cloud Run.
500-
data = json.loads(base64.b64decode(question["data"]).decode("utf-8").strip())
500+
data = json.loads(base64.b64decode(question["data"]).decode("utf-8").strip(), cls=OctueJSONDecoder)
501501

502502
# Keep backwards compatibility with questions from Octue services running `octue<0.41.1`.
503503
if isinstance(data["input_manifest"], str):

octue/metadata/recorded_questions.jsonl

+1
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,4 @@
7070
{"parent_sdk_version": "0.44.0", "question": {"data": "{\"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"8d79f172-5565-4903-8dc2-481b95d37cd8\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmpkyd5lpg9\"}}, \"children\": null, \"message_number\": 0}", "attributes": {"question_uuid": "34f946ef-f359-436a-892a-5a91b5991a1c", "forward_logs": "1", "allow_save_diagnostics_data_on_crash": "1", "octue_sdk_version": "0.44.0"}}}
7171
{"parent_sdk_version": "0.45.0", "question": {"data": "{\"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"ef377a21-8633-4324-ac82-efab087605c8\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmpit8qx3kc\"}}, \"children\": null, \"message_number\": 0}", "attributes": {"question_uuid": "5f0a60b6-b770-465b-a7ff-2e7b1854a171", "forward_logs": "1", "allow_save_diagnostics_data_on_crash": "1", "octue_sdk_version": "0.45.0"}}}
7272
{"parent_sdk_version": "0.46.0", "question": {"data": "{\"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"a289353b-2708-4a69-a72e-54bf67b75e61\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmp56ucgays\"}}, \"children\": null, \"message_number\": 0}", "attributes": {"question_uuid": "bb08fdfc-6f67-4942-8fc4-8a4adb7bb93c", "forward_logs": "1", "allow_save_diagnostics_data_on_crash": "1", "octue_sdk_version": "0.46.0"}}}
73+
{"parent_sdk_version": "0.46.1", "question": {"data": "{\"input_values\": {\"height\": 4, \"width\": 72}, \"input_manifest\": {\"id\": \"d45acf3c-cc17-4b5c-9d5d-4036341cbcf4\", \"name\": null, \"datasets\": {\"my_dataset\": \"/var/folders/sk/hf5fbp616c77tsys9lz55qn40000gp/T/tmp11q86f7l\"}}, \"children\": null, \"message_number\": 0}", "attributes": {"question_uuid": "3b880fcc-716b-4bdd-a5f8-044c6163bd20", "forward_logs": "1", "allow_save_diagnostics_data_on_crash": "1", "octue_sdk_version": "0.46.1"}}}

0 commit comments

Comments
 (0)