@@ -118,7 +118,9 @@ def services_topic(self):
118
118
topic = Topic (name = OCTUE_SERVICES_PREFIX , project_name = self .backend .project_name )
119
119
120
120
if not topic .exists ():
121
- raise octue .exceptions .ServiceNotFound (f"{ topic !r} cannot be found." )
121
+ raise octue .exceptions .ServiceNotFound (
122
+ f"The { topic !r} topic cannot be found. Check that it's been created for this service network."
123
+ )
122
124
123
125
return topic
124
126
@@ -147,7 +149,7 @@ def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow
147
149
logger .info ("Starting %r." , self )
148
150
149
151
subscription = Subscription (
150
- name = "." . join (( OCTUE_SERVICES_PREFIX , self ._pub_sub_id )) ,
152
+ name = self ._pub_sub_id ,
151
153
topic = self .services_topic ,
152
154
filter = f'attributes.recipient = "{ self .id } " AND attributes.sender_type = "{ PARENT_SENDER_TYPE } "' ,
153
155
expiration_time = None ,
@@ -328,6 +330,8 @@ def ask(
328
330
"""
329
331
service_namespace , service_name , service_revision_tag = split_service_id (service_id )
330
332
333
+ # If using a service registry, check that the service revision is registered, or get the default service
334
+ # revision if no revision tag is provided.
331
335
if self .service_registries :
332
336
if service_revision_tag :
333
337
raise_if_revision_not_registered (sruid = service_id , service_registries = self .service_registries )
@@ -338,7 +342,17 @@ def ask(
338
342
service_registries = self .service_registries ,
339
343
)
340
344
341
- elif not service_revision_tag :
345
+ # If not using a service registry, check that the service revision exists by checking for its subscription.
346
+ elif service_revision_tag :
347
+ service_revision_subscription = Subscription (
348
+ name = convert_service_id_to_pub_sub_form (service_id ),
349
+ topic = self .services_topic ,
350
+ )
351
+
352
+ if not service_revision_subscription .exists ():
353
+ raise octue .exceptions .ServiceNotFound (f"Service revision { service_id !r} not found." )
354
+
355
+ else :
342
356
raise octue .exceptions .InvalidServiceID (
343
357
f"A service revision tag for { service_id !r} must be provided if service registries aren't being used."
344
358
)
@@ -362,10 +376,8 @@ def ask(
362
376
if asynchronous and not push_endpoint :
363
377
answer_subscription = None
364
378
else :
365
- pub_sub_id = convert_service_id_to_pub_sub_form (self .id )
366
-
367
379
answer_subscription = Subscription (
368
- name = "." .join ((OCTUE_SERVICES_PREFIX , pub_sub_id , ANSWERS_NAMESPACE , question_uuid )),
380
+ name = "." .join ((self . _pub_sub_id , ANSWERS_NAMESPACE , question_uuid )),
369
381
topic = self .services_topic ,
370
382
filter = (
371
383
f'attributes.recipient = "{ self .id } " '
0 commit comments