Skip to content

Commit d81fb6f

Browse files
committed
Merge branch 'mock-actors-3' of github.com:lor1113/python-sdk into mock-actors-3
Signed-off-by: Lorenzo Curcio <[email protected]>
2 parents 6406b8d + da5a2e0 commit d81fb6f

File tree

9 files changed

+105
-82
lines changed

9 files changed

+105
-82
lines changed

dapr/aio/clients/grpc/client.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -534,13 +534,14 @@ async def subscribe_with_handler(
534534
async def stream_messages(sub: Subscription):
535535
while True:
536536
try:
537-
message = await sub.next_message()
538-
if message:
539-
response = await handler_fn(message)
540-
if response:
541-
await subscription.respond(message, response.status)
542-
else:
543-
continue
537+
async for message in subscription:
538+
if message:
539+
response = await handler_fn(message)
540+
if response:
541+
await subscription.respond(message, response.status)
542+
else:
543+
continue
544+
544545
except StreamInactiveError:
545546
break
546547

dapr/aio/clients/grpc/subscription.py

+7
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,10 @@ async def close(self):
114114
raise Exception(f'Error while closing stream: {e}')
115115
except Exception as e:
116116
raise Exception(f'Error while closing stream: {e}')
117+
118+
def __aiter__(self):
119+
"""Make the subscription async iterable."""
120+
return self
121+
122+
async def __anext__(self):
123+
return await self.next_message()

dapr/clients/grpc/client.py

+13-9
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from dapr.clients.grpc.interceptors import DaprClientInterceptor, DaprClientTimeoutInterceptor
4646
from dapr.clients.health import DaprHealth
4747
from dapr.clients.retry import RetryPolicy
48+
from dapr.common.pubsub.subscription import StreamCancelledError
4849
from dapr.conf import settings
4950
from dapr.proto import api_v1, api_service_v1, common_v1
5051
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
@@ -535,17 +536,20 @@ def subscribe_with_handler(
535536
def stream_messages(sub):
536537
while True:
537538
try:
538-
message = sub.next_message()
539-
if message:
540-
# Process the message
541-
response = handler_fn(message)
542-
if response:
543-
subscription.respond(message, response.status)
544-
else:
545-
# No message received
546-
continue
539+
for message in sub:
540+
if message:
541+
# Process the message
542+
response = handler_fn(message)
543+
if response:
544+
subscription.respond(message, response.status)
545+
else:
546+
# No message received
547+
continue
548+
547549
except StreamInactiveError:
548550
break
551+
except StreamCancelledError:
552+
break
549553

550554
def close_subscription():
551555
subscription.close()

dapr/clients/grpc/subscription.py

+6
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,9 @@ def close(self):
143143
raise Exception(f'Error while closing stream: {e}')
144144
except Exception as e:
145145
raise Exception(f'Error while closing stream: {e}')
146+
147+
def __iter__(self):
148+
return self
149+
150+
def __next__(self):
151+
return self.next_message()

daprdocs/content/en/python-sdk-docs/python-client.md

+26-21
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,8 @@ def mytopic_important(event: v1.Event) -> None:
260260
You can create a streaming subscription to a PubSub topic using either the `subscribe`
261261
or `subscribe_handler` methods.
262262

263-
The `subscribe` method returns a `Subscription` object, which allows you to pull messages from the
264-
stream by
263+
The `subscribe` method returns an iterable `Subscription` object, which allows you to pull messages from the
264+
stream by using a `for` loop (ex. `for message in subscription`) or by
265265
calling the `next_message` method. This will block on the main thread while waiting for messages.
266266
When done, you should call the close method to terminate the
267267
subscription and stop receiving messages.
@@ -281,7 +281,7 @@ Here's an example of using the `subscribe` method:
281281
import time
282282

283283
from dapr.clients import DaprClient
284-
from dapr.clients.grpc.subscription import StreamInactiveError
284+
from dapr.clients.grpc.subscription import StreamInactiveError, StreamCancelledError
285285

286286
counter = 0
287287

@@ -303,30 +303,35 @@ def main():
303303
)
304304

305305
try:
306-
while counter < 5:
307-
try:
308-
message = subscription.next_message()
306+
for message in subscription:
307+
if message is None:
308+
print('No message received. The stream might have been cancelled.')
309+
continue
309310

310-
except StreamInactiveError as e:
311+
try:
312+
response_status = process_message(message)
313+
314+
if response_status == 'success':
315+
subscription.respond_success(message)
316+
elif response_status == 'retry':
317+
subscription.respond_retry(message)
318+
elif response_status == 'drop':
319+
subscription.respond_drop(message)
320+
321+
if counter >= 5:
322+
break
323+
except StreamInactiveError:
311324
print('Stream is inactive. Retrying...')
312325
time.sleep(1)
313326
continue
314-
if message is None:
315-
print('No message received within timeout period.')
316-
continue
317-
318-
# Process the message
319-
response_status = process_message(message)
320-
321-
if response_status == 'success':
322-
subscription.respond_success(message)
323-
elif response_status == 'retry':
324-
subscription.respond_retry(message)
325-
elif response_status == 'drop':
326-
subscription.respond_drop(message)
327+
except StreamCancelledError:
328+
print('Stream was cancelled')
329+
break
330+
except Exception as e:
331+
print(f'Error occurred during message processing: {e}')
327332

328333
finally:
329-
print("Closing subscription...")
334+
print('Closing subscription...')
330335
subscription.close()
331336

332337

examples/pubsub-streaming-async/subscriber-handler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ async def process_message(message) -> TopicEventResponse:
1818
Asynchronously processes the message and returns a TopicEventResponse.
1919
"""
2020

21-
print(f'Processing message: {message.data()} from {message.topic()}...')
21+
print(f'Processing message: {message.data()} from {message.topic()}...', flush=True)
2222
global counter
2323
counter += 1
2424
return TopicEventResponse('success')

examples/pubsub-streaming-async/subscriber.py

+23-19
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def process_message(message):
1919
global counter
2020
counter += 1
2121
# Process the message here
22-
print(f'Processing message: {message.data()} from {message.topic()}...')
22+
print(f'Processing message: {message.data()} from {message.topic()}...', flush=True)
2323
return 'success'
2424

2525

@@ -31,32 +31,36 @@ async def main():
3131
)
3232

3333
try:
34-
while counter < 5:
34+
async for message in subscription:
35+
if message is None:
36+
print(
37+
'No message received within timeout period. '
38+
'The stream might have been cancelled.'
39+
)
40+
continue
41+
3542
try:
36-
message = await subscription.next_message()
37-
if message is None:
38-
print(
39-
'No message received within timeout period. '
40-
'The stream might have been cancelled.'
41-
)
42-
continue
43+
# Process the message
44+
response_status = process_message(message)
45+
46+
# Respond based on the processing result
47+
if response_status == 'success':
48+
await subscription.respond_success(message)
49+
elif response_status == 'retry':
50+
await subscription.respond_retry(message)
51+
elif response_status == 'drop':
52+
await subscription.respond_drop(message)
53+
54+
if counter >= 5:
55+
break
4356

4457
except StreamInactiveError:
4558
print('Stream is inactive. Retrying...')
4659
await asyncio.sleep(1)
4760
continue
48-
except StreamCancelledError as e:
61+
except StreamCancelledError:
4962
print('Stream was cancelled')
5063
break
51-
# Process the message
52-
response_status = process_message(message)
53-
54-
if response_status == 'success':
55-
await subscription.respond_success(message)
56-
elif response_status == 'retry':
57-
await subscription.respond_retry(message)
58-
elif response_status == 'drop':
59-
await subscription.respond_drop(message)
6064

6165
finally:
6266
print('Closing subscription...')

examples/pubsub-streaming/subscriber-handler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def process_message(message):
1818
# Process the message here
1919
global counter
2020
counter += 1
21-
print(f'Processing message: {message.data()} from {message.topic()}...')
21+
print(f'Processing message: {message.data()} from {message.topic()}...', flush=True)
2222
return TopicEventResponse('success')
2323

2424

examples/pubsub-streaming/subscriber.py

+20-24
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def process_message(message):
1919
global counter
2020
counter += 1
2121
# Process the message here
22-
print(f'Processing message: {message.data()} from {message.topic()}...')
22+
print(f'Processing message: {message.data()} from {message.topic()}...', flush=True)
2323
return 'success'
2424

2525

@@ -36,36 +36,32 @@ def main():
3636
return
3737

3838
try:
39-
while counter < 5:
39+
for message in subscription:
40+
if message is None:
41+
print('No message received. The stream might have been cancelled.')
42+
continue
43+
4044
try:
41-
message = subscription.next_message()
42-
if message is None:
43-
print(
44-
'No message received within timeout period. '
45-
'The stream might have been cancelled.'
46-
)
47-
continue
48-
49-
except StreamInactiveError as e:
45+
response_status = process_message(message)
46+
47+
if response_status == 'success':
48+
subscription.respond_success(message)
49+
elif response_status == 'retry':
50+
subscription.respond_retry(message)
51+
elif response_status == 'drop':
52+
subscription.respond_drop(message)
53+
54+
if counter >= 5:
55+
break
56+
except StreamInactiveError:
5057
print('Stream is inactive. Retrying...')
5158
time.sleep(1)
5259
continue
53-
except StreamCancelledError as e:
60+
except StreamCancelledError:
5461
print('Stream was cancelled')
5562
break
5663
except Exception as e:
57-
print(f'Error occurred: {e}')
58-
pass
59-
60-
# Process the message
61-
response_status = process_message(message)
62-
63-
if response_status == 'success':
64-
subscription.respond_success(message)
65-
elif response_status == 'retry':
66-
subscription.respond_retry(message)
67-
elif response_status == 'drop':
68-
subscription.respond_drop(message)
64+
print(f'Error occurred during message processing: {e}')
6965

7066
finally:
7167
print('Closing subscription...')

0 commit comments

Comments
 (0)