Skip to content

Commit 3e53341

Browse files
Centralize event stream wrappers
1 parent 7c72f8f commit 3e53341

File tree

18 files changed

+346
-555
lines changed

18 files changed

+346
-555
lines changed

designs/event-streams.md

+47-45
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ async with publisher:
5353
publisher.send(FooEvent(foo="bar"))
5454
```
5555

56+
Protocol implementations will be responsible for creating publishers.
57+
5658
## Event Receivers
5759

5860
An `AsyncEventReceiver` is used to receive events from a service.
@@ -131,6 +133,8 @@ async for event in reciever:
131133
handle_event(event)
132134
```
133135

136+
Protocol implementations will be responsible for creating receivers.
137+
134138
### Errors
135139

136140
Event streams may define modeled errors that may be sent over the stream. These
@@ -169,38 +173,30 @@ are handled by the following classes:
169173
* `OutputEventStream` is returned when the operation only has an output stream.
170174

171175
```python
172-
class DuplexEventStream[I: SerializableShape, O: DeserializableShape, R](Protocol):
173-
174-
input_stream: AsyncEventPublisher[I]
175-
176-
_output_stream: AsyncEventReceiver[O] | None = None
177-
_response: R | None = None
178-
179-
@property
180-
def output_stream(self) -> AsyncEventReceiver[O] | None:
181-
return self._output_stream
182-
183-
@output_stream.setter
184-
def output_stream(self, value: AsyncEventReceiver[O]) -> None:
185-
self._output_stream = value
186-
187-
@property
188-
def response(self) -> R | None:
189-
return self._response
190-
191-
@response.setter
192-
def response(self, value: R) -> None:
193-
self._response = value
194-
195-
async def await_output(self) -> tuple[R, AsyncEventReceiver[O]]:
176+
class DuplexEventStream[
177+
IE: SerializeableShape,
178+
OE: DeserializeableShape,
179+
O: DeserializeableShape,
180+
]:
181+
182+
input_stream: EventPublisher[IE]
183+
output_stream: EventReceiver[OE] | None = None
184+
response: O | None = None
185+
186+
def __init__(
187+
self,
188+
*,
189+
input_stream: EventPublisher[IE],
190+
output_future: Future[tuple[O, EventReceiver[OE]]],
191+
) -> None:
192+
self.input_stream = input_stream
193+
self._output_future = output_future
194+
195+
async def await_output(self) -> tuple[O, EventReceiver[OE]]:
196196
...
197197

198198
async def close(self) -> None:
199-
if self.output_stream is None:
200-
_, self.output_stream = await self.await_output()
201-
202-
await self.input_stream.close()
203-
await self.output_stream.close()
199+
...
204200

205201
async def __aenter__(self) -> Self:
206202
return self
@@ -209,21 +205,21 @@ class DuplexEventStream[I: SerializableShape, O: DeserializableShape, R](Protoco
209205
await self.close()
210206

211207

212-
class InputEventStream[I: SerializableShape, R](Protocol):
208+
class InputEventStream[IE: SerializeableShape, O]:
213209

214-
input_stream: AsyncEventPublisher[I]
210+
input_stream: EventPublisher[IE]
211+
response: O | None = None
215212

216-
_response: R | None = None
213+
def __init__(
214+
self,
215+
*,
216+
input_stream: EventPublisher[IE],
217+
output_future: Future[O],
218+
) -> None:
219+
self.input_stream = input_stream
220+
self._output_future = output_future
217221

218-
@property
219-
def response(self) -> R | None:
220-
return self._response
221-
222-
@response.setter
223-
def response(self, value: R) -> None:
224-
self._response = value
225-
226-
async def await_output(self) -> R:
222+
async def await_output(self) -> O:
227223
...
228224

229225
async def close(self) -> None:
@@ -236,11 +232,14 @@ class InputEventStream[I: SerializableShape, R](Protocol):
236232
await self.close()
237233

238234

239-
class OutputEventStream[O: DeserializableShape, R](Protocol):
235+
class OutputEventStream[OE: DeserializeableShape, O: DeserializeableShape]:
240236

241-
output_stream: AsyncEventReceiver[O]
242-
243-
response: R
237+
output_stream: EventReceiver[OE]
238+
response: O
239+
240+
def __init__(self, output_stream: EventReceiver[OE], output: O) -> None:
241+
self.output_stream = output_stream
242+
self.response = output
244243

245244
async def close(self) -> None:
246245
await self.output_stream.close()
@@ -290,6 +289,9 @@ with await client.output_operation() as stream:
290289
handle_event(event)
291290
```
292291

292+
All three output types are centrally located and will be constructed by filling
293+
in the relevant publishers and receivers from the protocol implementation.
294+
293295
## Event Structure
294296

295297
Event messages are structurally similar to HTTP messages. They consist of a map

packages/aws-event-stream/pyproject.toml

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ readme = "README.md"
66
requires-python = ">=3.12"
77
dependencies = [
88
"smithy-core",
9-
"smithy-event-stream",
109
]
1110

1211
[build-system]

packages/aws-event-stream/src/aws_event_stream/_private/deserializers.py

-60
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,17 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3-
import asyncio
43
import datetime
54
import logging
65
from collections.abc import Callable
76

8-
from smithy_core.aio.interfaces import AsyncByteStream
97
from smithy_core.codecs import Codec
108
from smithy_core.deserializers import (
11-
DeserializeableShape,
129
ShapeDeserializer,
1310
SpecificShapeDeserializer,
1411
)
1512
from smithy_core.schemas import Schema
1613
from smithy_core.shapes import ShapeType
1714
from smithy_core.utils import expect_type
18-
from smithy_event_stream.aio.interfaces import AsyncEventReceiver
1915

2016
from ..events import HEADERS_DICT, Event
2117
from ..exceptions import EventError, UnmodeledEventError
@@ -31,62 +27,6 @@
3127
INITIAL_MESSAGE_TYPES = (INITIAL_REQUEST_EVENT_TYPE, INITIAL_RESPONSE_EVENT_TYPE)
3228

3329

34-
class AWSAsyncEventReceiver[E: DeserializeableShape](AsyncEventReceiver[E]):
35-
def __init__(
36-
self,
37-
payload_codec: Codec,
38-
source: AsyncByteStream,
39-
deserializer: Callable[[ShapeDeserializer], E],
40-
is_client_mode: bool = True,
41-
) -> None:
42-
self._payload_codec = payload_codec
43-
self._source = source
44-
self._is_client_mode = is_client_mode
45-
self._deserializer = deserializer
46-
self._closed = False
47-
48-
async def receive(self) -> E | None:
49-
if self._closed:
50-
return None
51-
52-
try:
53-
event = await Event.decode_async(self._source)
54-
except Exception as e:
55-
await self.close()
56-
if not isinstance(e, EventError):
57-
raise IOError("Failed to read from stream.") from e
58-
raise
59-
60-
if event is None:
61-
logger.debug("No event received from the source.")
62-
return None
63-
logger.debug("Received raw event: %s", event)
64-
65-
deserializer = EventDeserializer(
66-
event=event,
67-
payload_codec=self._payload_codec,
68-
is_client_mode=self._is_client_mode,
69-
)
70-
result = self._deserializer(deserializer)
71-
logger.debug("Successfully deserialized event: %s", result)
72-
if isinstance(getattr(result, "value"), Exception):
73-
raise result.value # type: ignore
74-
return result
75-
76-
async def close(self) -> None:
77-
if self._closed:
78-
return
79-
self._closed = True
80-
81-
if (close := getattr(self._source, "close", None)) is not None:
82-
if asyncio.iscoroutine(result := close()):
83-
await result
84-
85-
@property
86-
def closed(self) -> bool:
87-
return self._closed
88-
89-
9030
class EventDeserializer(SpecificShapeDeserializer):
9131
def __init__(
9232
self, event: Event, payload_codec: Codec, is_client_mode: bool = True

packages/aws-event-stream/src/aws_event_stream/_private/serializers.py

+1-60
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,20 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3-
import asyncio
43
import datetime
54
import logging
6-
from collections.abc import Callable, Iterator
5+
from collections.abc import Iterator
76
from contextlib import contextmanager
87
from io import BytesIO
98
from typing import Never
109

11-
from smithy_core.aio.interfaces import AsyncWriter
1210
from smithy_core.codecs import Codec
13-
from smithy_core.exceptions import ExpectationNotMetException
1411
from smithy_core.schemas import Schema
1512
from smithy_core.serializers import (
1613
InterceptingSerializer,
17-
SerializeableShape,
1814
ShapeSerializer,
1915
SpecificShapeSerializer,
2016
)
2117
from smithy_core.shapes import ShapeType
22-
from smithy_event_stream.aio.interfaces import AsyncEventPublisher
2318

2419
from ..events import EventMessage, HEADER_VALUE, Short, Byte, Long
2520
from ..exceptions import InvalidHeaderValue
@@ -36,60 +31,6 @@
3631
_DEFAULT_BLOB_CONTENT_TYPE = "application/octet-stream"
3732

3833

39-
type Signer = Callable[[EventMessage], EventMessage]
40-
"""A function that takes an event message and signs it, and returns it signed."""
41-
42-
43-
class AWSAsyncEventPublisher[E: SerializeableShape](AsyncEventPublisher[E]):
44-
def __init__(
45-
self,
46-
payload_codec: Codec,
47-
async_writer: AsyncWriter,
48-
signer: Signer | None = None,
49-
is_client_mode: bool = True,
50-
):
51-
self._writer = async_writer
52-
self._signer = signer
53-
self._serializer = EventSerializer(
54-
payload_codec=payload_codec, is_client_mode=is_client_mode
55-
)
56-
self._closed = False
57-
58-
async def send(self, event: E) -> None:
59-
if self._closed:
60-
raise IOError("Attempted to write to closed stream.")
61-
logger.debug("Preparing to publish event: %s", event)
62-
event.serialize(self._serializer)
63-
result = self._serializer.get_result()
64-
if result is None:
65-
raise ExpectationNotMetException(
66-
"Expected an event message to be serialized, but was None."
67-
)
68-
if self._signer is not None:
69-
result = self._signer(result)
70-
71-
encoded_result = result.encode()
72-
try:
73-
logger.debug("Publishing serialized event: %s", result)
74-
await self._writer.write(encoded_result)
75-
except Exception as e:
76-
await self.close()
77-
raise IOError("Failed to write to stream.") from e
78-
79-
async def close(self) -> None:
80-
if self._closed:
81-
return
82-
self._closed = True
83-
84-
if (close := getattr(self._writer, "close", None)) is not None:
85-
if asyncio.iscoroutine(result := close()):
86-
await result
87-
88-
@property
89-
def closed(self) -> bool:
90-
return self._closed
91-
92-
9334
class EventSerializer(SpecificShapeSerializer):
9435
def __init__(
9536
self,

0 commit comments

Comments
 (0)