Skip to content

Commit 5f4e122

Browse files
committed
feat: implement RFC 7464 JSON stream parsing
An initial, basic implementation which is functional but is stricter than the RFC suggests.
0 parents  commit 5f4e122

File tree

3 files changed

+324
-0
lines changed

3 files changed

+324
-0
lines changed

dev_deps.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export {
2+
assert,
3+
assertEquals,
4+
} from "https://deno.land/[email protected]/testing/asserts.ts";

mod.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/* This module implements the text format, described in [RFC 7464], to
2+
* represent streams of individual JSON objects (application/json-seq). See
3+
* [format_notes.md](format_notes.md)
4+
*
5+
* [RFC 7464]: https://datatracker.ietf.org/doc/html/rfc7464
6+
*/
7+
8+
export const JSON_SEQ_START = "\x1E";
9+
export const JSON_SEQ_END = "\n";
10+
11+
enum State {
12+
BEFORE_CHUNK_START,
13+
BEFORE_CHUNK_END,
14+
}
15+
16+
export function jsonSeqDelimiterTransformer(options?: {
17+
strict?: boolean;
18+
}): Transformer<string, string> {
19+
const strict = options?.strict === undefined ? true : options?.strict;
20+
let state: State = State.BEFORE_CHUNK_START;
21+
let unDelimitedChunks: string[] = [];
22+
23+
return {
24+
transform(chunk, controller) {
25+
while (chunk) {
26+
if (state === State.BEFORE_CHUNK_START) {
27+
const start = chunk.indexOf(JSON_SEQ_START);
28+
if (strict && start !== 0) {
29+
throw new Error(
30+
`leading content before chunk start: ${
31+
start < 0 ? chunk : chunk.substring(0, start)
32+
}`
33+
);
34+
}
35+
if (start < 0) {
36+
// ignore leading content before a chunk start
37+
return;
38+
}
39+
state = State.BEFORE_CHUNK_END;
40+
chunk = chunk.substring(start + 1);
41+
}
42+
const end = chunk.indexOf(JSON_SEQ_END);
43+
if (end < 0) {
44+
if (chunk) {
45+
unDelimitedChunks.push(chunk);
46+
}
47+
return;
48+
}
49+
const chunkTail = chunk.substring(0, end);
50+
unDelimitedChunks.push(chunkTail);
51+
controller.enqueue(unDelimitedChunks.join(""));
52+
unDelimitedChunks = [];
53+
54+
state = State.BEFORE_CHUNK_START;
55+
chunk = chunk.substring(end + 1);
56+
}
57+
},
58+
flush() {
59+
if (strict && unDelimitedChunks.length) {
60+
throw new Error(`end of stream before chunk end`);
61+
}
62+
},
63+
};
64+
}
65+
66+
export function stringToJSONTransformer(): Transformer<string, unknown> {
67+
return {
68+
transform(chunk, controller) {
69+
controller.enqueue(JSON.parse(chunk));
70+
},
71+
};
72+
}
73+
74+
export class JsonSequenceDecoderStream
75+
implements TransformStream<Uint8Array, unknown>
76+
{
77+
readonly readable: ReadableStream<unknown>;
78+
readonly writable: WritableStream<Uint8Array>;
79+
80+
constructor() {
81+
const decoder = new TextDecoderStream();
82+
this.readable = decoder.readable
83+
.pipeThrough(new TransformStream(jsonSeqDelimiterTransformer()))
84+
.pipeThrough(new TransformStream(stringToJSONTransformer()));
85+
this.writable = decoder.writable;
86+
}
87+
}

mod_test.ts

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
import { assert, assertEquals } from "./dev_deps.ts";
2+
import {
3+
JSON_SEQ_END,
4+
JSON_SEQ_START,
5+
jsonSeqDelimiterTransformer,
6+
stringToJSONTransformer,
7+
} from "./mod.ts";
8+
import { readableStreamFromIterable } from "https://deno.land/[email protected]/streams/mod.ts";
9+
10+
const LARGE_CHUNK_SIZE = 1000033;
11+
12+
function chunkify(str: string, size: number): string[] {
13+
assert(size > 0);
14+
const chunks: string[] = [];
15+
for (let i = 0; i < str.length; i += size) {
16+
chunks.push(str.substring(i, Math.min(str.length, i + size)));
17+
}
18+
return chunks;
19+
}
20+
21+
Deno.test("chunkify", () => {
22+
assertEquals(chunkify("", 1), []);
23+
assertEquals(chunkify("", 10), []);
24+
assertEquals(chunkify("foo", 10), ["foo"]);
25+
assertEquals(chunkify("foo", 1), ["f", "o", "o"]);
26+
assertEquals(chunkify("foo", 2), ["fo", "o"]);
27+
});
28+
29+
function enlargen(chunk: string, length: number): string {
30+
return chunk.repeat(Math.ceil(length / chunk.length)).substring(0, length);
31+
}
32+
33+
Deno.test("enlargen", () => {
34+
assertEquals(enlargen("a", 3), "aaa");
35+
assertEquals(enlargen("abc", 5), "abcab");
36+
assertEquals(enlargen("abc", 1), "a");
37+
assertEquals(enlargen("abc", 2), "ab");
38+
assertEquals(enlargen("abc", 3), "abc");
39+
});
40+
41+
function jsonSeqChunk<Content extends string = string>(
42+
content: Content
43+
): `${typeof JSON_SEQ_START}${Content}${typeof JSON_SEQ_END}` {
44+
return `${JSON_SEQ_START}${content}${JSON_SEQ_END}`;
45+
}
46+
47+
async function assertStreamContainsChunks<T>(
48+
stream: ReadableStream<T>,
49+
chunks: ReadonlyArray<T>
50+
): Promise<void> {
51+
const reader = stream.getReader();
52+
for (const chunk of chunks) {
53+
const result = await reader.read();
54+
assertEquals(result, { done: false, value: chunk });
55+
}
56+
const result = await reader.read();
57+
assertEquals(result, { done: true, value: undefined });
58+
}
59+
60+
Deno.test("jsonSeqDelimiterTransformer() transforms empty stream", async () => {
61+
const emptyStream = readableStreamFromIterable([]);
62+
const jsonSeqChunkStream = emptyStream.pipeThrough(
63+
new TransformStream(jsonSeqDelimiterTransformer())
64+
);
65+
await assertStreamContainsChunks(jsonSeqChunkStream, []);
66+
});
67+
68+
Deno.test("jsonSeqDelimiterTransformer() transforms single chunk", async () => {
69+
const emptyStream = readableStreamFromIterable([jsonSeqChunk("content")]);
70+
const jsonSeqChunkStream = emptyStream.pipeThrough(
71+
new TransformStream(jsonSeqDelimiterTransformer())
72+
);
73+
await assertStreamContainsChunks(jsonSeqChunkStream, ["content"]);
74+
});
75+
76+
Deno.test(
77+
"jsonSeqDelimiterTransformer() transforms multiple chunks",
78+
async () => {
79+
const chunks = ["foo", "bar", "baz"];
80+
const emptyStream = readableStreamFromIterable(chunks.map(jsonSeqChunk));
81+
const jsonSeqChunkStream = emptyStream.pipeThrough(
82+
new TransformStream(jsonSeqDelimiterTransformer())
83+
);
84+
await assertStreamContainsChunks(jsonSeqChunkStream, chunks);
85+
}
86+
);
87+
88+
Deno.test("jsonSeqDelimiterTransformer() transforms large chunks", async () => {
89+
const chunks = ["foo", "bar", "baz"].map((c) =>
90+
enlargen(c, LARGE_CHUNK_SIZE)
91+
);
92+
93+
const emptyStream = readableStreamFromIterable(
94+
chunkify(chunks.map(jsonSeqChunk).join(""), 1013)
95+
);
96+
const jsonSeqChunkStream = emptyStream.pipeThrough(
97+
new TransformStream(jsonSeqDelimiterTransformer())
98+
);
99+
await assertStreamContainsChunks(jsonSeqChunkStream, chunks);
100+
});
101+
102+
const malformedStreams = [
103+
{
104+
name: "content before first chunk",
105+
chunks: [null, "foo", "bar", "baz"],
106+
streamContent: [
107+
"junk",
108+
jsonSeqChunk("foo"),
109+
jsonSeqChunk("bar"),
110+
jsonSeqChunk("baz"),
111+
].join(""),
112+
},
113+
{
114+
name: "content after last chunk",
115+
chunks: ["foo", "bar", null],
116+
streamContent: [jsonSeqChunk("foo"), jsonSeqChunk("bar"), "junk"].join(""),
117+
},
118+
{
119+
name: "content in between chunks",
120+
chunks: ["foo", null, "bar", "baz"],
121+
streamContent: [
122+
jsonSeqChunk("foo"),
123+
"junk",
124+
jsonSeqChunk("bar"),
125+
jsonSeqChunk("baz"),
126+
].join(""),
127+
},
128+
{
129+
name: "partial chunk at end",
130+
chunks: ["foo", "bar", null],
131+
streamContent: [
132+
jsonSeqChunk("foo"),
133+
jsonSeqChunk("bar"),
134+
`${JSON_SEQ_START}baz`,
135+
].join(""),
136+
},
137+
{
138+
name: "partial chunk at start",
139+
chunks: [null, "bar", "baz"],
140+
streamContent: [
141+
`foo${JSON_SEQ_END}`,
142+
jsonSeqChunk("bar"),
143+
jsonSeqChunk("baz"),
144+
].join(""),
145+
},
146+
];
147+
148+
const inputChunkSizes = [1, 4, LARGE_CHUNK_SIZE] as const;
149+
150+
for (const inputChunkSize of inputChunkSizes) {
151+
for (const { name, chunks, streamContent } of malformedStreams) {
152+
Deno.test(
153+
`jsonSeqDelimiterTransformer() rejects malformed streams with ${name} in strict mode (inputChunkSize: ${inputChunkSize})`,
154+
async () => {
155+
const emptyStream = readableStreamFromIterable(
156+
chunkify(streamContent, inputChunkSize)
157+
);
158+
const jsonSeqChunkStream = emptyStream.pipeThrough(
159+
new TransformStream(jsonSeqDelimiterTransformer())
160+
);
161+
const reader = jsonSeqChunkStream.getReader();
162+
for (const chunk of chunks) {
163+
if (chunk === null) {
164+
try {
165+
await reader.read();
166+
assert(false, "read() did not throw");
167+
} catch (_e) {
168+
return;
169+
}
170+
} else {
171+
const result = await reader.read();
172+
assertEquals(result, { done: false, value: chunk });
173+
}
174+
}
175+
}
176+
);
177+
178+
Deno.test(
179+
`jsonSeqDelimiterTransformer() accepts malformed streams with ${name} in non-strict mode (inputChunkSize: ${inputChunkSize})`,
180+
async () => {
181+
const emptyStream = readableStreamFromIterable(
182+
chunkify(streamContent, inputChunkSize)
183+
);
184+
const jsonSeqChunkStream = emptyStream.pipeThrough(
185+
new TransformStream(jsonSeqDelimiterTransformer({ strict: false }))
186+
);
187+
const resultChunks = chunks.filter(
188+
(c): c is string => typeof c === "string"
189+
);
190+
await assertStreamContainsChunks(jsonSeqChunkStream, resultChunks);
191+
}
192+
);
193+
}
194+
}
195+
196+
Deno.test("stringToJSONTransformer() parses chunks", async () => {
197+
const values = [{ an: "object" }, ["array"], true, false, null, 123, "foo"];
198+
const stream = readableStreamFromIterable(
199+
values.map((val) => JSON.stringify(val, undefined, 2))
200+
).pipeThrough(new TransformStream(stringToJSONTransformer()));
201+
202+
await assertStreamContainsChunks(stream, values);
203+
});
204+
205+
Deno.test(
206+
"stringToJSONTransformer() errors stream on invalid JSON",
207+
async () => {
208+
const stream = readableStreamFromIterable([
209+
'{"ok": true}',
210+
'{"invalid',
211+
]).pipeThrough(new TransformStream(stringToJSONTransformer()));
212+
213+
const reader = stream.getReader();
214+
assertEquals(await reader.read(), { done: false, value: { ok: true } });
215+
try {
216+
await reader.read();
217+
assert(false, "read() did not throw");
218+
} catch (e) {
219+
assert(e instanceof SyntaxError);
220+
}
221+
}
222+
);
223+
224+
Deno.test("async iterable", async () => {
225+
const values = [{}, [], true];
226+
const stream = readableStreamFromIterable(
227+
[{}, [], true].map((val) => JSON.stringify(val, undefined, 2))
228+
).pipeThrough(new TransformStream(stringToJSONTransformer()));
229+
230+
for await (const val of stream) {
231+
assertEquals(val, values.shift());
232+
}
233+
});

0 commit comments

Comments
 (0)