Skip to content

Commit d79dc91

Browse files
authored
feat(core/protocols): eventstreams for schema serde (#1661)
1 parent 1cab1f5 commit d79dc91

24 files changed

+431
-74
lines changed

.changeset/shy-hornets-care.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@smithy/core": minor
3+
---
4+
5+
schema serde eventstreams implementation

packages/core/src/submodules/cbor/CborCodec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { NormalizedSchema } from "@smithy/core/schema";
22
import { generateIdempotencyToken, parseEpochTimestamp } from "@smithy/core/serde";
3-
import { Codec, Schema, SerdeFunctions, ShapeDeserializer, ShapeSerializer } from "@smithy/types";
3+
import type { Codec, Schema, SerdeFunctions, ShapeDeserializer, ShapeSerializer } from "@smithy/types";
44
import { fromBase64 } from "@smithy/util-base64";
55

66
import { cbor } from "./cbor";

packages/core/src/submodules/cbor/SmithyRpcV2CborProtocol.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { list, map, SCHEMA, struct } from "@smithy/core/schema";
22
import { HttpRequest, HttpResponse } from "@smithy/protocol-http";
3-
import { SchemaRef } from "@smithy/types";
3+
import type { SchemaRef } from "@smithy/types";
44
import { describe, expect, test as it } from "vitest";
55

66
import { cbor } from "./cbor";

packages/core/src/submodules/cbor/SmithyRpcV2CborProtocol.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ export class SmithyRpcV2CborProtocol extends RpcProtocol {
4343
): Promise<IHttpRequest> {
4444
const request = await super.serializeRequest(operationSchema, input, context);
4545
Object.assign(request.headers, {
46-
"content-type": "application/cbor",
46+
"content-type": this.getDefaultContentType(),
4747
"smithy-protocol": "rpc-v2-cbor",
48-
accept: "application/cbor",
48+
accept: this.getDefaultContentType(),
4949
});
5050
if (deref(operationSchema.input) === "unit") {
5151
delete request.body;
@@ -113,4 +113,8 @@ export class SmithyRpcV2CborProtocol extends RpcProtocol {
113113

114114
throw exception;
115115
}
116+
117+
protected getDefaultContentType(): string {
118+
return "application/cbor";
119+
}
116120
}

packages/core/src/submodules/cbor/parseCborBody.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import { collectBody } from "@smithy/core/protocols";
22
import { HttpRequest as __HttpRequest } from "@smithy/protocol-http";
3-
import { HeaderBag as __HeaderBag, HttpResponse, SerdeContext as __SerdeContext, SerdeContext } from "@smithy/types";
3+
import type {
4+
HeaderBag as __HeaderBag,
5+
HttpResponse,
6+
SerdeContext as __SerdeContext,
7+
SerdeContext,
8+
} from "@smithy/types";
49
import { calculateBodyLength } from "@smithy/util-body-length-browser";
510

611
import { cbor } from "./cbor";

packages/core/src/submodules/protocols/HttpBindingProtocol.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { map, op, SCHEMA, struct } from "@smithy/core/schema";
22
import { HttpResponse } from "@smithy/protocol-http";
3-
import {
3+
import type {
44
Codec,
55
CodecSettings,
66
HandlerExecutionContext,

packages/core/src/submodules/protocols/HttpBindingProtocol.ts

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import { NormalizedSchema, SCHEMA } from "@smithy/core/schema";
22
import { splitEvery, splitHeader } from "@smithy/core/serde";
33
import { HttpRequest } from "@smithy/protocol-http";
4-
import {
4+
import type {
55
Endpoint,
66
EndpointBearer,
7-
EventStreamSerdeContext,
87
HandlerExecutionContext,
98
HttpRequest as IHttpRequest,
109
HttpResponse as IHttpResponse,
@@ -86,8 +85,12 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
8685
if (isStreaming) {
8786
const isEventStream = memberNs.isStructSchema();
8887
if (isEventStream) {
89-
// todo(schema)
90-
throw new Error("serialization of event streams is not yet implemented");
88+
if (input[memberName]) {
89+
payload = this.serializeEventStream({
90+
eventStream: input[memberName],
91+
unionSchema: memberNs,
92+
});
93+
}
9194
} else {
9295
// streaming blob body
9396
payload = inputMemberValue;
@@ -132,6 +135,9 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
132135
if (hasNonHttpBindingMember && input) {
133136
serializer.write(schema, input);
134137
payload = serializer.flush() as Uint8Array;
138+
139+
// Due to Smithy validation, we can assume that the members with no HTTP
140+
// bindings DO NOT contain an event stream.
135141
}
136142

137143
request.headers = headers;
@@ -221,14 +227,12 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
221227
dataObject[member] = dataFromBody[member];
222228
}
223229
}
230+
// Due to Smithy validation, we can assume that the members with no HTTP
231+
// bindings DO NOT contain an event stream.
224232
}
225233

226-
const output: Output = {
227-
$metadata: this.deserializeMetadata(response),
228-
...dataObject,
229-
};
230-
231-
return output;
234+
dataObject.$metadata = this.deserializeMetadata(response);
235+
return dataObject;
232236
}
233237

234238
/**
@@ -276,33 +280,13 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
276280
if (isStreaming) {
277281
const isEventStream = memberSchema.isStructSchema();
278282
if (isEventStream) {
279-
// streaming event stream (union)
280-
const context = this.serdeContext as unknown as EventStreamSerdeContext;
281-
if (!context.eventStreamMarshaller) {
282-
throw new Error("@smithy/core - HttpProtocol: eventStreamMarshaller missing in serdeContext.");
283-
}
284-
const memberSchemas = memberSchema.getMemberSchemas();
285-
dataObject[memberName] = context.eventStreamMarshaller.deserialize(response.body, async (event) => {
286-
const unionMember =
287-
Object.keys(event).find((key) => {
288-
return key !== "__type";
289-
}) ?? "";
290-
if (unionMember in memberSchemas) {
291-
const eventStreamSchema = memberSchemas[unionMember];
292-
return {
293-
[unionMember]: await deserializer.read(eventStreamSchema, event[unionMember].body),
294-
};
295-
} else {
296-
// todo(schema): This union convention is ignored by the event stream marshaller.
297-
// todo(schema): This should be returned to the user instead.
298-
// see "if (deserialized.$unknown) return;" in getUnmarshalledStream.ts
299-
return {
300-
$unknown: event,
301-
};
302-
}
283+
// event stream (union)
284+
dataObject[memberName] = this.deserializeEventStream({
285+
response,
286+
unionSchema: memberSchema,
303287
});
304288
} else {
305-
// streaming blob body
289+
// data stream (blob)
306290
dataObject[memberName] = sdkStreamMixin(response.body);
307291
}
308292
} else if (response.body) {

packages/core/src/submodules/protocols/HttpProtocol.spec.ts

Lines changed: 152 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
import { map, SCHEMA, struct } from "@smithy/core/schema";
2-
import { HandlerExecutionContext, HttpResponse as IHttpResponse, Schema, SerdeFunctions } from "@smithy/types";
3-
import { describe, expect, test as it } from "vitest";
1+
import { map, NormalizedSchema, SCHEMA, struct } from "@smithy/core/schema";
2+
import { HttpResponse } from "@smithy/protocol-http";
3+
import type { HandlerExecutionContext, HttpResponse as IHttpResponse, Schema, SerdeFunctions } from "@smithy/types";
4+
import { toUtf8 } from "@smithy/util-utf8";
5+
import { Readable } from "node:stream";
6+
import { describe, expect, test as it, vi } from "vitest";
47

58
import { HttpProtocol } from "./HttpProtocol";
69
import { FromStringShapeDeserializer } from "./serde/FromStringShapeDeserializer";
10+
import { ToStringShapeSerializer } from "./serde/ToStringShapeSerializer";
711

812
describe(HttpProtocol.name, () => {
913
it("ignores http bindings (only HttpBindingProtocol uses them)", async () => {
@@ -49,4 +53,149 @@ describe(HttpProtocol.name, () => {
4953
// headers were ignored
5054
});
5155
});
56+
57+
describe("event stream serde", () => {
58+
const impl = {
59+
serializer: new ToStringShapeSerializer({
60+
timestampFormat: { default: 7, useTrait: true },
61+
}),
62+
deserializer: new FromStringShapeDeserializer({
63+
httpBindings: true,
64+
timestampFormat: { default: 7, useTrait: true },
65+
}),
66+
getEventStreamMarshaller() {
67+
return this.serdeContext.eventStreamMarshaller;
68+
},
69+
serdeContext: {
70+
eventStreamMarshaller: {
71+
serialize: vi.fn().mockImplementation((eventStream, eventStreamSerializationFn) => {
72+
return Readable.from({
73+
async *[Symbol.asyncIterator]() {
74+
for await (const inputEvent of eventStream) {
75+
yield eventStreamSerializationFn(inputEvent);
76+
}
77+
},
78+
});
79+
}),
80+
deserialize: vi.fn().mockImplementation((body, eventStreamDeserializationFn) => {
81+
return {
82+
async *[Symbol.asyncIterator]() {
83+
for await (const outputEvent of body) {
84+
yield eventStreamDeserializationFn(outputEvent);
85+
}
86+
},
87+
};
88+
}),
89+
},
90+
},
91+
getDefaultContentType() {
92+
return "unit/test";
93+
},
94+
};
95+
96+
const serializeEventStream = (HttpProtocol.prototype as any).serializeEventStream.bind(impl);
97+
const deserializeEventStream = (HttpProtocol.prototype as any).deserializeEventStream.bind(impl);
98+
99+
const eventStreamUnionSchema = struct(
100+
"ns",
101+
"EventStreamStructure",
102+
{ streaming: 1 },
103+
["A", "B", "C"],
104+
[struct("ns", "A", 0, ["name"], [0]), struct("ns", "B", 0, ["name"], [0]), struct("ns", "C", 0, ["name"], [0])]
105+
);
106+
107+
it("serializes event streams", async () => {
108+
const eventStream = {
109+
async *[Symbol.asyncIterator]() {
110+
yield { A: { name: "a" } };
111+
yield { B: { name: "b" } };
112+
yield { C: { name: "c" } };
113+
yield { $unknown: ["D", { name: "d" }] };
114+
},
115+
};
116+
const unionSchema = NormalizedSchema.of(eventStreamUnionSchema);
117+
118+
const requestBody = serializeEventStream({
119+
eventStream,
120+
unionSchema,
121+
});
122+
123+
const collect = [];
124+
for await (const chunk of requestBody) {
125+
collect.push(chunk);
126+
}
127+
expect(
128+
collect.map((item) => {
129+
return {
130+
headers: item.headers,
131+
body: toUtf8(item.body).replace(/\s+/g, ""),
132+
};
133+
})
134+
).toEqual([
135+
{
136+
headers: {
137+
":event-type": { type: "string", value: "A" },
138+
":message-type": { type: "string", value: "event" },
139+
":content-type": { type: "string", value: "unit/test" },
140+
},
141+
body: `{"name":"a"}`,
142+
},
143+
{
144+
headers: {
145+
":event-type": { type: "string", value: "B" },
146+
":message-type": { type: "string", value: "event" },
147+
":content-type": { type: "string", value: "unit/test" },
148+
},
149+
body: `{"name":"b"}`,
150+
},
151+
{
152+
headers: {
153+
":event-type": { type: "string", value: "C" },
154+
":message-type": { type: "string", value: "event" },
155+
":content-type": { type: "string", value: "unit/test" },
156+
},
157+
body: `{"name":"c"}`,
158+
},
159+
{
160+
headers: {
161+
":event-type": { type: "string", value: "D" },
162+
":message-type": { type: "string", value: "event" },
163+
":content-type": { type: "string", value: "unit/test" },
164+
},
165+
body: `{"name":"d"}`,
166+
},
167+
]);
168+
});
169+
170+
it("deserializes event streams", async () => {
171+
const response = new HttpResponse({
172+
statusCode: 200,
173+
body: {
174+
async *[Symbol.asyncIterator]() {
175+
yield { A: { headers: {}, body: { name: "a" } } };
176+
yield { B: { headers: {}, body: { name: "b" } } };
177+
yield { C: { headers: {}, body: { name: "c" } } };
178+
yield { D: { headers: {}, body: { name: "d" } } };
179+
},
180+
},
181+
});
182+
const unionSchema = NormalizedSchema.of(eventStreamUnionSchema);
183+
184+
const asyncIterable = deserializeEventStream({
185+
response,
186+
unionSchema,
187+
});
188+
189+
const collect = [];
190+
for await (const event of asyncIterable) {
191+
collect.push(event);
192+
}
193+
expect(collect).toEqual([
194+
{ A: { name: "a" } },
195+
{ B: { name: "b" } },
196+
{ C: { name: "c" } },
197+
{ $unknown: { D: { headers: {}, body: { name: "d" } } } },
198+
]);
199+
});
200+
});
52201
});

0 commit comments

Comments
 (0)