1313# limitations under the License.
1414
1515import asyncio
16- from typing import Any , AsyncIterator , Callable , Iterable , TYPE_CHECKING
16+ from typing import Any , AsyncIterator , Callable
1717
1818from google .api_core import exceptions
1919from google .cloud .storage ._experimental .asyncio .retry .base_strategy import (
2020 _BaseResumptionStrategy ,
2121)
22-
23- if TYPE_CHECKING :
24- from google .api_core .retry_async import AsyncRetry
25-
26-
2722class _BidiStreamRetryManager :
2823 """Manages the generic retry loop for a bidi streaming operation."""
29-
3024 def __init__ (
3125 self ,
3226 strategy : _BaseResumptionStrategy ,
33- stream_opener : Callable [[ Iterable [ Any ], Any ] , AsyncIterator [Any ]],
27+ stream_opener : Callable [... , AsyncIterator [Any ]],
3428 ):
3529 """Initializes the retry manager.
3630 Args:
@@ -40,39 +34,28 @@ def __init__(
4034 """
4135 self ._strategy = strategy
4236 self ._stream_opener = stream_opener
43-
44- async def execute (self , initial_state : Any , retry_policy : "AsyncRetry" ):
37+ async def execute (self , initial_state : Any , retry_policy ):
4538 """
4639 Executes the bidi operation with the configured retry policy.
4740 Args:
4841 initial_state: An object containing all state for the operation.
49- retry_policy: The `google.api_core.retry_async .AsyncRetry` object to
42+ retry_policy: The `google.api_core.retry .AsyncRetry` object to
5043 govern the retry behavior for this specific operation.
5144 """
5245 state = initial_state
5346
54- def on_error (e : Exception ):
55- """The single point of recovery logic."""
56- self ._strategy .recover_state_on_failure (e , state )
57-
5847 async def attempt ():
59- """The core operation to be retried."""
6048 requests = self ._strategy .generate_requests (state )
6149 stream = self ._stream_opener (requests , state )
62- async for response in stream :
63- self ._strategy .update_state_from_response (response , state )
64-
65- # Correctly create a new retry instance with the on_error handler.
66- retry_with_error_handler = type (retry_policy )(
67- predicate = retry_policy ._predicate ,
68- initial = retry_policy ._initial ,
69- maximum = retry_policy ._maximum ,
70- multiplier = retry_policy ._multiplier ,
71- deadline = retry_policy ._deadline ,
72- on_error = on_error ,
73- )
50+ try :
51+ async for response in stream :
52+ self ._strategy .update_state_from_response (response , state )
53+ return
54+ except Exception as e :
55+ if retry_policy ._predicate (e ):
56+ await self ._strategy .recover_state_on_failure (e , state )
57+ raise e
7458
75- wrapped_attempt = retry_with_error_handler (attempt )
59+ wrapped_attempt = retry_policy (attempt )
7660
77- # Execute the operation with retry.
7861 await wrapped_attempt ()
0 commit comments