Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions src/client/stdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ export type StdioServerParameters = {
* If not specified, the current working directory will be inherited.
*/
cwd?: string;

/**
* Maximum size of the read buffer in bytes. If a single message exceeds
* this size the transport will emit an error and close.
*
* Defaults to 10 MB.
*/
maxBufferSize?: number;
};

/**
Expand Down Expand Up @@ -91,7 +99,7 @@ export function getDefaultEnvironment(): Record<string, string> {
*/
export class StdioClientTransport implements Transport {
private _process?: ChildProcess;
private _readBuffer: ReadBuffer = new ReadBuffer();
private _readBuffer: ReadBuffer;
private _serverParams: StdioServerParameters;
private _stderrStream: PassThrough | null = null;

Expand All @@ -101,6 +109,7 @@ export class StdioClientTransport implements Transport {

constructor(server: StdioServerParameters) {
this._serverParams = server;
this._readBuffer = new ReadBuffer({ maxBufferSize: server.maxBufferSize });
if (server.stderr === 'pipe' || server.stderr === 'overlapped') {
this._stderrStream = new PassThrough();
}
Expand Down Expand Up @@ -148,8 +157,13 @@ export class StdioClientTransport implements Transport {
});

this._process.stdout?.on('data', chunk => {
this._readBuffer.append(chunk);
this.processReadBuffer();
try {
this._readBuffer.append(chunk);
this.processReadBuffer();
} catch (error) {
this.onerror?.(error as Error);
this.close().catch(() => {});
}
});

this._process.stdout?.on('error', error => {
Expand Down
26 changes: 21 additions & 5 deletions src/server/stdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,38 @@ import { Transport } from '../shared/transport.js';
* This transport is only available in Node.js environments.
*/
export class StdioServerTransport implements Transport {
private _readBuffer: ReadBuffer = new ReadBuffer();
private _readBuffer: ReadBuffer;
private _started = false;

constructor(
private _stdin: Readable = process.stdin,
private _stdout: Writable = process.stdout
) {}
private _stdout: Writable = process.stdout,
options?: {
/**
* Maximum size of the read buffer in bytes. If a single message exceeds
* this size the transport will emit an error and close.
*
* Defaults to 10 MB.
*/
maxBufferSize?: number;
}
) {
this._readBuffer = new ReadBuffer({ maxBufferSize: options?.maxBufferSize });
}

onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;

// Arrow functions to bind `this` properly, while maintaining function identity.
_ondata = (chunk: Buffer) => {
this._readBuffer.append(chunk);
this.processReadBuffer();
try {
this._readBuffer.append(chunk);
this.processReadBuffer();
} catch (error) {
this.onerror?.(error as Error);
this.close().catch(() => {});
}
};
_onerror = (error: Error) => {
this.onerror?.(error);
Expand Down
12 changes: 12 additions & 0 deletions src/shared/stdio.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
import { JSONRPCMessage, JSONRPCMessageSchema } from '../types.js';

export const STDIO_DEFAULT_MAX_BUFFER_SIZE = 10 * 1024 * 1024;

/**
* Buffers a continuous stdio stream into discrete JSON-RPC messages.
*/
export class ReadBuffer {
private _buffer?: Buffer;
private _maxBufferSize: number;

constructor(options?: { maxBufferSize?: number }) {
this._maxBufferSize = options?.maxBufferSize ?? STDIO_DEFAULT_MAX_BUFFER_SIZE;
}

append(chunk: Buffer): void {
const newSize = (this._buffer?.length ?? 0) + chunk.length;
if (newSize > this._maxBufferSize) {
this.clear();
throw new Error(`ReadBuffer exceeded maximum size of ${this._maxBufferSize} bytes`);
}
this._buffer = this._buffer ? Buffer.concat([this._buffer, chunk]) : chunk;
Comment thread
claude[bot] marked this conversation as resolved.
}

Expand Down
41 changes: 41 additions & 0 deletions test/client/stdio.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,44 @@ test('should return child process pid', async () => {
await client.close();
expect(client.pid).toBeNull();
});

test('should respect custom maxBufferSize option', async () => {
const client = new StdioClientTransport({
command: 'node',
args: ['-e', 'process.stdout.write(Buffer.alloc(200, 0x41))'],
maxBufferSize: 100
});

const errorReceived = new Promise<Error>(resolve => {
client.onerror = resolve;
});
const closed = new Promise<void>(resolve => {
client.onclose = () => resolve();
});

await client.start();

const error = await errorReceived;
expect(error.message).toMatch(/ReadBuffer exceeded maximum size/);
await closed;
});

test('should fire onerror and close when ReadBuffer overflows', async () => {
const client = new StdioClientTransport({
command: 'node',
args: ['-e', 'process.stdout.write(Buffer.alloc(11 * 1024 * 1024, 0x41))']
});

const errorReceived = new Promise<Error>(resolve => {
client.onerror = resolve;
});
const closed = new Promise<void>(resolve => {
client.onclose = () => resolve();
});

await client.start();

const error = await errorReceived;
expect(error.message).toMatch(/ReadBuffer exceeded maximum size/);
await closed;
});
48 changes: 48 additions & 0 deletions test/server/stdio.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,51 @@ test('should read multiple messages', async () => {
await finished;
expect(readMessages).toEqual(messages);
});

test('should respect custom maxBufferSize option', async () => {
const server = new StdioServerTransport(input, output, { maxBufferSize: 100 });

let receivedError: Error | undefined;
server.onerror = err => {
receivedError = err;
};
let closeCount = 0;
server.onclose = () => {
closeCount++;
};

await server.start();

// Push 101 bytes without a newline — exceeds the 100-byte limit
input.push(Buffer.alloc(101, 0x41));

await new Promise(resolve => setTimeout(resolve, 10));

expect(receivedError?.message).toMatch(/ReadBuffer exceeded maximum size/);
expect(closeCount).toBe(1);
});

test('should fire onerror and close when ReadBuffer overflows', async () => {
const server = new StdioServerTransport(input, output);

let receivedError: Error | undefined;
server.onerror = err => {
receivedError = err;
};
let closeCount = 0;
server.onclose = () => {
closeCount++;
};

await server.start();

// Push data exceeding the default 10 MB limit without a newline
const chunk = Buffer.alloc(11 * 1024 * 1024, 0x41);
input.push(chunk);

// Allow the close() promise to settle
await new Promise(resolve => setTimeout(resolve, 10));

expect(receivedError?.message).toMatch(/ReadBuffer exceeded maximum size/);
expect(closeCount).toBe(1);
});
45 changes: 44 additions & 1 deletion test/shared/stdio.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { JSONRPCMessage } from '../../src/types.js';
import { ReadBuffer } from '../../src/shared/stdio.js';
import { STDIO_DEFAULT_MAX_BUFFER_SIZE, ReadBuffer } from '../../src/shared/stdio.js';

const testMessage: JSONRPCMessage = {
jsonrpc: '2.0',
Expand Down Expand Up @@ -33,3 +33,46 @@ test('should be reusable after clearing', () => {
readBuffer.append(Buffer.from('\n'));
expect(readBuffer.readMessage()).toEqual(testMessage);
});

describe('buffer size limit', () => {
test('should throw when buffer exceeds default max size', () => {
const readBuffer = new ReadBuffer();
const chunkSize = 1024 * 1024; // 1 MB
const chunk = Buffer.alloc(chunkSize);
const chunksToFill = Math.floor(STDIO_DEFAULT_MAX_BUFFER_SIZE / chunkSize);
for (let i = 0; i < chunksToFill; i++) {
readBuffer.append(chunk);
}
expect(() => readBuffer.append(chunk)).toThrow(/ReadBuffer exceeded maximum size/);
});

test('should throw when buffer exceeds custom max size', () => {
const readBuffer = new ReadBuffer({ maxBufferSize: 100 });
readBuffer.append(Buffer.alloc(50));
expect(() => readBuffer.append(Buffer.alloc(51))).toThrow(/ReadBuffer exceeded maximum size/);
});

test('should clear buffer before throwing on overflow', () => {
const readBuffer = new ReadBuffer({ maxBufferSize: 100 });
readBuffer.append(Buffer.alloc(50));
expect(() => readBuffer.append(Buffer.alloc(51))).toThrow();

// Buffer should be cleared — can append again
readBuffer.append(Buffer.alloc(50));
// And read messages normally
expect(readBuffer.readMessage()).toBeNull();
});

test('should allow appending up to exactly the max size', () => {
const readBuffer = new ReadBuffer({ maxBufferSize: 100 });
// Should not throw — exactly at limit
expect(() => readBuffer.append(Buffer.alloc(100))).not.toThrow();
});

test('should work with no options (backwards compatible)', () => {
const readBuffer = new ReadBuffer();
// Small append should always work
readBuffer.append(Buffer.from(JSON.stringify({ jsonrpc: '2.0', method: 'ping' }) + '\n'));
expect(readBuffer.readMessage()).not.toBeNull();
});
});
Loading