Skip to content

Commit f10f0fa

Browse files
authored
fix (provider-utils): improve event source stream parsing performance (#5351)
1 parent 93bd899 commit f10f0fa

9 files changed

+340
-19
lines changed

.changeset/gorgeous-pandas-wink.md

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@ai-sdk/provider-utils': patch
3+
'ai': patch
4+
---
5+
6+
fix (provider-utils): improve event source stream parsing performance

packages/ai/core/tool/mcp/mcp-sse-transport.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { EventSourceParserStream } from 'eventsource-parser/stream';
1+
import { createEventSourceParserStream } from '@ai-sdk/provider-utils';
22
import { MCPClientError } from '../../../errors';
33
import { JSONRPCMessage, JSONRPCMessageSchema } from './json-rpc-message';
44
import { MCPTransport } from './mcp-transport';
@@ -55,7 +55,7 @@ export class SseMCPTransport implements MCPTransport {
5555

5656
const stream = response.body
5757
.pipeThrough(new TextDecoderStream())
58-
.pipeThrough(new EventSourceParserStream());
58+
.pipeThrough(createEventSourceParserStream());
5959

6060
const reader = stream.getReader();
6161

packages/ai/package.json

-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
"@ai-sdk/react": "1.2.0",
7171
"@ai-sdk/ui-utils": "1.2.0",
7272
"@opentelemetry/api": "1.9.0",
73-
"eventsource-parser": "^3.0.0",
7473
"jsondiffpatch": "0.6.0"
7574
},
7675
"devDependencies": {

packages/provider-utils/package.json

-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
},
3939
"dependencies": {
4040
"@ai-sdk/provider": "1.1.0",
41-
"eventsource-parser": "^3.0.0",
4241
"nanoid": "^3.3.8",
4342
"secure-json-parse": "^2.7.0"
4443
},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
import { createEventSourceParserStream } from './event-source-parser-stream';
2+
import { convertReadableStreamToArray } from './test';
3+
4+
describe('EventSourceParserStream', () => {
5+
async function parseStream(inputChunks: string[]) {
6+
const stream = createEventSourceParserStream();
7+
const writer = stream.writable.getWriter();
8+
9+
for (const chunk of inputChunks) {
10+
writer.write(chunk);
11+
}
12+
writer.close();
13+
14+
return convertReadableStreamToArray(stream.readable);
15+
}
16+
17+
it('should parse simple data events', async () => {
18+
expect(await parseStream(['data: hello\n\n'])).toEqual([{ data: 'hello' }]);
19+
});
20+
21+
it('should parse events with types', async () => {
22+
expect(await parseStream(['event: message\ndata: hello\n\n'])).toEqual([
23+
{ event: 'message', data: 'hello' },
24+
]);
25+
});
26+
27+
it('should handle multiple events in one chunk', async () => {
28+
expect(await parseStream(['data: one\n\ndata: two\n\n'])).toEqual([
29+
{ data: 'one' },
30+
{ data: 'two' },
31+
]);
32+
});
33+
34+
it('should handle events split across chunks', async () => {
35+
expect(await parseStream(['data: hel', 'lo\n\n'])).toEqual([
36+
{ data: 'hello' },
37+
]);
38+
});
39+
40+
it('should handle events and data split across chunks', async () => {
41+
expect(await parseStream(['event: mess', 'age\ndata: hello\n\n'])).toEqual([
42+
{ event: 'message', data: 'hello' },
43+
]);
44+
});
45+
46+
it('should handle CRLF line endings', async () => {
47+
expect(await parseStream(['data: hello\r\n\r\n'])).toEqual([
48+
{ data: 'hello' },
49+
]);
50+
});
51+
52+
it('should handle empty data lines', async () => {
53+
expect(await parseStream(['data: \n', 'data: hello\n\n'])).toEqual([
54+
{ data: '\nhello' },
55+
]);
56+
});
57+
58+
it('should carry event type to next data line', async () => {
59+
expect(
60+
await parseStream([
61+
'event: message\ndata: one\n\nevent: alert\ndata: two\n\n',
62+
]),
63+
).toEqual([
64+
{ event: 'message', data: 'one' },
65+
{ event: 'alert', data: 'two' },
66+
]);
67+
});
68+
69+
it('should reset event type after emitting data', async () => {
70+
expect(
71+
await parseStream(['event: message\ndata: hello\n\ndata: world\n\n']),
72+
).toEqual([{ event: 'message', data: 'hello' }, { data: 'world' }]);
73+
});
74+
75+
it('should handle incomplete events at the end of stream', async () => {
76+
expect(await parseStream(['data: hello\n\ndata:'])).toEqual([
77+
{ data: 'hello' },
78+
{ data: '' },
79+
]);
80+
});
81+
82+
it('should handle multi-line data', async () => {
83+
expect(
84+
await parseStream(['data: line1\ndata: line2\ndata: line3\n\n']),
85+
).toEqual([{ data: 'line1\nline2\nline3' }]);
86+
});
87+
88+
it('should handle id field', async () => {
89+
expect(await parseStream(['id: 123\ndata: hello\n\n'])).toEqual([
90+
{ data: 'hello', id: '123' },
91+
]);
92+
});
93+
94+
it('should persist id across events', async () => {
95+
expect(
96+
await parseStream(['id: 123\ndata: first\n\ndata: second\n\n']),
97+
).toEqual([
98+
{ data: 'first', id: '123' },
99+
{ data: 'second', id: '123' },
100+
]);
101+
});
102+
103+
it('should update id when a new one is received', async () => {
104+
expect(
105+
await parseStream(['id: 123\ndata: first\n\nid: 456\ndata: second\n\n']),
106+
).toEqual([
107+
{ data: 'first', id: '123' },
108+
{ data: 'second', id: '456' },
109+
]);
110+
});
111+
112+
it('should handle retry field', async () => {
113+
expect(await parseStream(['retry: 5000\ndata: hello\n\n'])).toEqual([
114+
{ data: 'hello', retry: 5000 },
115+
]);
116+
});
117+
118+
it('should ignore invalid retry values', async () => {
119+
expect(await parseStream(['retry: invalid\ndata: hello\n\n'])).toEqual([
120+
{ data: 'hello' },
121+
]);
122+
});
123+
124+
it('should ignore comment lines', async () => {
125+
expect(await parseStream([': this is a comment\ndata: hello\n\n'])).toEqual(
126+
[{ data: 'hello' }],
127+
);
128+
});
129+
130+
it('should handle fields with no value', async () => {
131+
expect(await parseStream(['event\ndata: hello\n\n'])).toEqual([
132+
{ event: '', data: 'hello' },
133+
]);
134+
});
135+
136+
it('should ignore unrecognized fields with no colon', async () => {
137+
expect(await parseStream(['eventmessage\ndata: hello\n\n'])).toEqual([
138+
{ data: 'hello' },
139+
]);
140+
});
141+
142+
it('should handle multiple blank lines between events', async () => {
143+
expect(await parseStream(['data: first\n\n\n\ndata: second\n\n'])).toEqual([
144+
{ data: 'first' },
145+
{ data: 'second' },
146+
]);
147+
});
148+
149+
it('should emit event at end of stream even without final newline', async () => {
150+
expect(await parseStream(['data: hello'])).toEqual([{ data: 'hello' }]);
151+
});
152+
153+
it('should correctly handle a complete event source stream example', async () => {
154+
expect(
155+
await parseStream([
156+
'event: update\n',
157+
'id: 1\n',
158+
'data: {"message": "First update"}\n',
159+
'\n',
160+
': this is a comment\n',
161+
'event: alert\n',
162+
'id: 2\n',
163+
'retry: 10000\n',
164+
'data: line1\n',
165+
'data: line2\n',
166+
'\n',
167+
'data: standalone message\n',
168+
'\n',
169+
]),
170+
).toEqual([
171+
{
172+
event: 'update',
173+
data: '{"message": "First update"}',
174+
id: '1',
175+
},
176+
{
177+
event: 'alert',
178+
data: 'line1\nline2',
179+
id: '2',
180+
retry: 10000,
181+
},
182+
{
183+
data: 'standalone message',
184+
id: '2',
185+
},
186+
]);
187+
});
188+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
export type EventSourceChunk = {
2+
event: string | undefined;
3+
data: string;
4+
id?: string;
5+
retry?: number;
6+
};
7+
8+
export function createEventSourceParserStream() {
9+
let buffer = '';
10+
let event: string | undefined = undefined;
11+
let data: string[] = [];
12+
let lastEventId: string | undefined = undefined;
13+
let retry: number | undefined = undefined;
14+
15+
function parseLine(
16+
line: string,
17+
controller: TransformStreamDefaultController<EventSourceChunk>,
18+
) {
19+
// Empty line means dispatch the event
20+
if (line === '') {
21+
dispatchEvent(controller);
22+
return;
23+
}
24+
25+
// Comments start with colon
26+
if (line.startsWith(':')) {
27+
return;
28+
}
29+
30+
// Field parsing
31+
const colonIndex = line.indexOf(':');
32+
if (colonIndex === -1) {
33+
// field with no value
34+
handleField(line, '');
35+
return;
36+
}
37+
38+
const field = line.slice(0, colonIndex);
39+
// If there's a space after the colon, it should be ignored
40+
const valueStart = colonIndex + 1;
41+
const value =
42+
valueStart < line.length && line[valueStart] === ' '
43+
? line.slice(valueStart + 1)
44+
: line.slice(valueStart);
45+
46+
handleField(field, value);
47+
}
48+
49+
function dispatchEvent(
50+
controller: TransformStreamDefaultController<EventSourceChunk>,
51+
) {
52+
if (data.length > 0) {
53+
controller.enqueue({
54+
event,
55+
data: data.join('\n'),
56+
id: lastEventId,
57+
retry,
58+
});
59+
60+
// Reset data but keep lastEventId as per spec
61+
data = [];
62+
event = undefined;
63+
retry = undefined;
64+
}
65+
}
66+
67+
function handleField(field: string, value: string) {
68+
switch (field) {
69+
case 'event':
70+
event = value;
71+
break;
72+
case 'data':
73+
data.push(value);
74+
break;
75+
case 'id':
76+
lastEventId = value;
77+
break;
78+
case 'retry':
79+
const parsedRetry = parseInt(value, 10);
80+
if (!isNaN(parsedRetry)) {
81+
retry = parsedRetry;
82+
}
83+
break;
84+
}
85+
}
86+
87+
return new TransformStream<string, EventSourceChunk>({
88+
transform(chunk, controller) {
89+
const { lines, incompleteLine } = splitLines(buffer, chunk);
90+
91+
buffer = incompleteLine;
92+
93+
// using for loop for performance
94+
for (let i = 0; i < lines.length; i++) {
95+
parseLine(lines[i], controller);
96+
}
97+
},
98+
99+
flush(controller) {
100+
parseLine(buffer, controller);
101+
dispatchEvent(controller);
102+
},
103+
});
104+
}
105+
106+
// performance: send in already scanned buffer separately, do not scan again
107+
function splitLines(buffer: string, chunk: string) {
108+
const lines: Array<string> = [];
109+
let currentLine = buffer;
110+
111+
// using for loop for performance
112+
for (let i = 0; i < chunk.length; ) {
113+
const char = chunk[i++];
114+
115+
// order is performance-optimized
116+
if (char === '\n') {
117+
// Standalone LF
118+
lines.push(currentLine);
119+
currentLine = '';
120+
} else if (char === '\r') {
121+
lines.push(currentLine);
122+
currentLine = '';
123+
124+
if (chunk[i + 1] === '\n') {
125+
i++; // CRLF case: Skip the LF character
126+
}
127+
} else {
128+
currentLine += char;
129+
}
130+
}
131+
132+
return { lines, incompleteLine: currentLine };
133+
}

packages/provider-utils/src/index.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
export * from './combine-headers';
22
export { convertAsyncIteratorToReadableStream } from './convert-async-iterator-to-readable-stream';
33
export * from './delay';
4+
export { createEventSourceParserStream } from './event-source-parser-stream';
5+
export type { EventSourceChunk } from './event-source-parser-stream';
46
export * from './extract-response-headers';
57
export * from './fetch-function';
68
export { createIdGenerator, generateId } from './generate-id';
9+
export type { IDGenerator } from './generate-id';
710
export * from './get-error-message';
811
export * from './get-from-api';
912
export * from './is-abort-error';
1013
export * from './load-api-key';
1114
export { loadOptionalSetting } from './load-optional-setting';
1215
export { loadSetting } from './load-setting';
1316
export * from './parse-json';
17+
export { parseProviderOptions } from './parse-provider-options';
1418
export * from './post-to-api';
1519
export * from './remove-undefined-entries';
1620
export * from './resolve';
@@ -20,7 +24,5 @@ export * from './validate-types';
2024
export * from './validator';
2125
export * from './without-trailing-slash';
2226

23-
export type { IDGenerator } from './generate-id';
24-
export { parseProviderOptions } from './parse-provider-options';
2527
export type { CoreToolCall, ToolCall } from './types/tool-call';
2628
export type { CoreToolResult, ToolResult } from './types/tool-result';

0 commit comments

Comments
 (0)