Skip to content

Commit 228f824

Browse files
addievoamydevs
authored andcommitted
feat(middleware): Enhance middleware with timeout and encapsulation features
- Import ClientRPCResponseResult and ClientRPCRequestParams from PK. - Implement timeoutMiddlewareServer and timeoutMiddlewareClient. - Integrate timeoutMiddleware into defaultMiddleware. - Fix Jest test issues. - Rename to RPCResponseResult and RPCRequestParams for clarity. - Perform lint fixes and Jest tests. fix: timeouts are now only propagated unidirectionally from the client to the server [ci-skip]
1 parent 8434c70 commit 228f824

File tree

6 files changed

+399
-80
lines changed

6 files changed

+399
-80
lines changed

src/middleware.ts

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ import type {
44
JSONRPCResponse,
55
JSONRPCResponseResult,
66
MiddlewareFactory,
7+
JSONValue,
8+
JSONRPCRequestMetadata,
9+
JSONRPCResponseMetadata,
710
} from './types';
11+
import type { ContextTimed } from '@matrixai/contexts';
812
import { TransformStream } from 'stream/web';
913
import { JSONParser } from '@streamparser/json';
1014
import * as utils from './utils';
@@ -75,6 +79,80 @@ function jsonMessageToBinaryStream(): TransformStream<
7579
});
7680
}
7781

82+
function timeoutMiddlewareServer(
83+
ctx: ContextTimed,
84+
_cancel: (reason?: any) => void,
85+
_meta: Record<string, JSONValue> | undefined,
86+
) {
87+
const currentTimeout = ctx.timer.delay;
88+
// Flags for tracking if the first message has been processed
89+
let forwardFirst = true;
90+
return {
91+
forward: new TransformStream<
92+
JSONRPCRequest<JSONRPCRequestMetadata>,
93+
JSONRPCRequest<JSONRPCRequestMetadata>
94+
>({
95+
transform: (chunk, controller) => {
96+
controller.enqueue(chunk);
97+
if (forwardFirst) {
98+
forwardFirst = false;
99+
let clientTimeout = chunk.metadata?.timeout;
100+
if (clientTimeout === undefined) return;
101+
if (clientTimeout === null) clientTimeout = Infinity;
102+
if (clientTimeout < currentTimeout) ctx.timer.reset(clientTimeout);
103+
}
104+
},
105+
}),
106+
reverse: new TransformStream<
107+
JSONRPCResponse<JSONRPCResponseMetadata>,
108+
JSONRPCResponse<JSONRPCResponseMetadata>
109+
>({
110+
transform: (chunk, controller) => {
111+
// Passthrough chunk, no need for server to send ctx.timeout
112+
controller.enqueue(chunk);
113+
},
114+
}),
115+
};
116+
}
117+
118+
/**
119+
* This adds its own timeout to the forward metadata and updates it's timeout
120+
* based on the reverse metadata.
121+
* @param ctx
122+
* @param _cancel
123+
* @param _meta
124+
*/
125+
function timeoutMiddlewareClient(
126+
ctx: ContextTimed,
127+
_cancel: (reason?: any) => void,
128+
_meta: Record<string, JSONValue> | undefined,
129+
) {
130+
const currentTimeout = ctx.timer.delay;
131+
// Flags for tracking if the first message has been processed
132+
let forwardFirst = true;
133+
return {
134+
forward: new TransformStream<JSONRPCRequest, JSONRPCRequest>({
135+
transform: (chunk, controller) => {
136+
if (forwardFirst) {
137+
forwardFirst = false;
138+
if (chunk == null) chunk = { jsonrpc: '2.0', method: '' };
139+
if (chunk.metadata == null) chunk.metadata = {};
140+
(chunk.metadata as any).timeout = currentTimeout;
141+
}
142+
controller.enqueue(chunk);
143+
},
144+
}),
145+
reverse: new TransformStream<
146+
JSONRPCResponse<JSONRPCResponseMetadata>,
147+
JSONRPCResponse<JSONRPCResponseMetadata>
148+
>({
149+
transform: (chunk, controller) => {
150+
controller.enqueue(chunk); // Passthrough chunk, no need for client to set ctx.timeout
151+
},
152+
}),
153+
};
154+
}
155+
78156
/**
79157
* This function is a factory for creating a pass-through streamPair. It is used
80158
* as the default middleware for the middleware wrappers.
@@ -116,12 +194,14 @@ function defaultServerMiddlewareWrapper(
116194
>();
117195

118196
const middleMiddleware = middlewareFactory(ctx, cancel, meta);
197+
const timeoutMiddleware = timeoutMiddlewareServer(ctx, cancel, meta);
119198

120-
const forwardReadable = inputTransformStream.readable.pipeThrough(
121-
middleMiddleware.forward,
122-
); // Usual middleware here
199+
const forwardReadable = inputTransformStream.readable
200+
.pipeThrough(timeoutMiddleware.forward) // Timeout middleware here
201+
.pipeThrough(middleMiddleware.forward); // Usual middleware here
123202
const reverseReadable = outputTransformStream.readable
124203
.pipeThrough(middleMiddleware.reverse) // Usual middleware here
204+
.pipeThrough(timeoutMiddleware.reverse) // Timeout middleware here
125205
.pipeThrough(jsonMessageToBinaryStream());
126206

127207
return {
@@ -172,13 +252,15 @@ const defaultClientMiddlewareWrapper = (
172252
JSONRPCRequest
173253
>();
174254

255+
const timeoutMiddleware = timeoutMiddlewareClient(ctx, cancel, meta);
175256
const middleMiddleware = middleware(ctx, cancel, meta);
176257
const forwardReadable = inputTransformStream.readable
258+
.pipeThrough(timeoutMiddleware.forward)
177259
.pipeThrough(middleMiddleware.forward) // Usual middleware here
178260
.pipeThrough(jsonMessageToBinaryStream());
179-
const reverseReadable = outputTransformStream.readable.pipeThrough(
180-
middleMiddleware.reverse,
181-
); // Usual middleware here
261+
const reverseReadable = outputTransformStream.readable
262+
.pipeThrough(middleMiddleware.reverse)
263+
.pipeThrough(timeoutMiddleware.reverse); // Usual middleware here
182264

183265
return {
184266
forward: {
@@ -199,4 +281,6 @@ export {
199281
defaultMiddleware,
200282
defaultServerMiddlewareWrapper,
201283
defaultClientMiddlewareWrapper,
284+
timeoutMiddlewareClient,
285+
timeoutMiddlewareServer,
202286
};

src/types.ts

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type JSONRPCRequestMessage<T extends JSONValue = JSONValue> = {
3838
* SHOULD NOT contain fractional parts [2]
3939
*/
4040
id: string | number | null;
41-
};
41+
} & JSONRPCRequestMetadata;
4242

4343
/**
4444
* This is the JSON RPC notification object. this is used for a request that
@@ -60,7 +60,7 @@ type JSONRPCRequestNotification<T extends JSONValue = JSONValue> = {
6060
* This member MAY be omitted.
6161
*/
6262
params?: T;
63-
};
63+
} & JSONRPCRequestMetadata;
6464

6565
/**
6666
* This is the JSON RPC response result object. It contains the response data for a
@@ -84,7 +84,7 @@ type JSONRPCResponseResult<T extends JSONValue = JSONValue> = {
8484
* it MUST be Null.
8585
*/
8686
id: string | number | null;
87-
};
87+
} & JSONRPCResponseMetadata;
8888

8989
/**
9090
* This is the JSON RPC response Error object. It contains any errors that have
@@ -110,6 +110,29 @@ type JSONRPCResponseError = {
110110
id: string | number | null;
111111
};
112112

113+
type ObjectEmpty = NonNullable<unknown>;
114+
115+
// Prevent overwriting the metadata type with `Omit<>`
116+
type JSONRPCRequestMetadata<T extends Record<string, JSONValue> = ObjectEmpty> =
117+
{
118+
metadata?: {
119+
[Key: string]: JSONValue;
120+
} & Partial<{
121+
timeout: number | null;
122+
}>;
123+
} & Omit<T, 'metadata'>;
124+
125+
// Prevent overwriting the metadata type with `Omit<>`
126+
type JSONRPCResponseMetadata<
127+
T extends Record<string, JSONValue> = ObjectEmpty,
128+
> = {
129+
metadata?: {
130+
[Key: string]: JSONValue;
131+
} & Partial<{
132+
timeout: number | null;
133+
}>;
134+
} & Omit<T, 'metadata'>;
135+
113136
/**
114137
* This is a JSON RPC error object, it encodes the error data for the JSONRPCResponseError object.
115138
*/
@@ -357,6 +380,8 @@ export type {
357380
JSONRPCRequestNotification,
358381
JSONRPCResponseResult,
359382
JSONRPCResponseError,
383+
JSONRPCRequestMetadata,
384+
JSONRPCResponseMetadata,
360385
JSONRPCError,
361386
JSONRPCRequest,
362387
JSONRPCResponse,

tests/RPC.test.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -870,7 +870,62 @@ describe('RPC', () => {
870870
},
871871
{ numRuns: 1 },
872872
);
873+
test('RPC server times out using client timeout', async () => {
874+
// Setup server and client communication pairs
875+
const { clientPair, serverPair } = rpcTestUtils.createTapPairs<
876+
Uint8Array,
877+
Uint8Array
878+
>();
879+
const { p: ctxP, resolveP: resolveCtxP } = utils.promise<ContextTimed>();
880+
class TestMethod extends UnaryHandler {
881+
public handle = async (
882+
input: JSONValue,
883+
cancel: (reason?: any) => void,
884+
meta: Record<string, JSONValue> | undefined,
885+
ctx: ContextTimed,
886+
): Promise<JSONValue> => {
887+
const abortProm = utils.promise<never>();
888+
ctx.signal.addEventListener('abort', () => {
889+
resolveCtxP(ctx);
890+
abortProm.resolveP(ctx.signal.reason);
891+
});
892+
throw await abortProm.p;
893+
};
894+
}
895+
// Set up a client and server with matching timeout settings
896+
const rpcServer = new RPCServer({
897+
logger,
898+
idGen,
899+
handlerTimeoutTime: 150,
900+
});
901+
await rpcServer.start({
902+
manifest: {
903+
testMethod: new TestMethod({}),
904+
},
905+
});
906+
rpcServer.handleStream({
907+
...serverPair,
908+
cancel: () => {},
909+
});
873910

911+
const rpcClient = new RPCClient({
912+
manifest: {
913+
testMethod: new UnaryCaller(),
914+
},
915+
streamFactory: async () => {
916+
return {
917+
...clientPair,
918+
cancel: () => {},
919+
};
920+
},
921+
logger,
922+
idGen,
923+
});
924+
await expect(rpcClient.methods.testMethod({}, { timer: 100 })).toReject();
925+
await expect(ctxP).resolves.toHaveProperty(['timer', 'delay'], 100);
926+
927+
await rpcServer.stop({ force: true });
928+
});
874929
testProp(
875930
'RPC Serializes and Deserializes Error',
876931
[rpcTestUtils.errorArb(rpcTestUtils.errorArb())],

tests/RPCClient.test.ts

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -123,18 +123,20 @@ describe(`${RPCClient.name}`, () => {
123123
}
124124
await writable.close();
125125

126-
const expectedMessages: Array<JSONRPCRequestMessage> = messages.map((v) => {
127-
const request: JSONRPCRequestMessage = {
126+
const expectedMessages: Array<JSONRPCRequestMessage> = messages.map(
127+
(v, i) => ({
128128
jsonrpc: '2.0',
129129
method: methodName,
130130
id: null,
131131
...(v.result === undefined ? {} : { params: v.result }),
132-
};
133-
return request;
134-
});
132+
...(i === 0 ? { metadata: { timeout: null } } : {}),
133+
}),
134+
);
135+
135136
const outputMessages = (await outputResult).map((v) =>
136137
JSON.parse(v.toString()),
137138
);
139+
138140
expect(outputMessages).toStrictEqual(expectedMessages);
139141
});
140142
testProp(
@@ -171,6 +173,9 @@ describe(`${RPCClient.name}`, () => {
171173
jsonrpc: '2.0',
172174
id: null,
173175
params,
176+
metadata: {
177+
timeout: null,
178+
},
174179
}),
175180
);
176181
},
@@ -207,14 +212,16 @@ describe(`${RPCClient.name}`, () => {
207212
}
208213
await writer.close();
209214
expect(await output).toStrictEqual(message.result);
210-
const expectedOutput = params.map((v) =>
215+
const expectedOutput = params.map((v, i) =>
211216
JSON.stringify({
212217
method: methodName,
213218
jsonrpc: '2.0',
214219
id: null,
215220
params: v,
221+
...(i === 0 ? { metadata: { timeout: null } } : {}),
216222
}),
217223
);
224+
218225
expect((await outputResult).map((v) => v.toString())).toStrictEqual(
219226
expectedOutput,
220227
);
@@ -249,6 +256,7 @@ describe(`${RPCClient.name}`, () => {
249256
jsonrpc: '2.0',
250257
id: null,
251258
params: params,
259+
metadata: { timeout: null },
252260
}),
253261
);
254262
},
@@ -423,19 +431,19 @@ describe(`${RPCClient.name}`, () => {
423431
}
424432

425433
const expectedMessages: Array<JSONRPCRequestMessage> = messages.map(
426-
() => {
427-
const request: JSONRPCRequestMessage = {
428-
jsonrpc: '2.0',
429-
method: methodName,
430-
id: null,
431-
params: 'one',
432-
};
433-
return request;
434-
},
434+
(_, i) => ({
435+
jsonrpc: '2.0',
436+
method: methodName,
437+
id: null,
438+
params: 'one',
439+
...(i === 0 ? { metadata: { timeout: null } } : {}),
440+
}),
435441
);
442+
436443
const outputMessages = (await outputResult).map((v) =>
437444
JSON.parse(v.toString()),
438445
);
446+
439447
expect(outputMessages).toStrictEqual(expectedMessages);
440448
},
441449
);
@@ -527,6 +535,7 @@ describe(`${RPCClient.name}`, () => {
527535
jsonrpc: '2.0',
528536
id: null,
529537
params,
538+
metadata: { timeout: null },
530539
}),
531540
);
532541
},
@@ -562,12 +571,13 @@ describe(`${RPCClient.name}`, () => {
562571
}
563572
expect(await output).toStrictEqual(message.result);
564573
await writer.close();
565-
const expectedOutput = params.map((v) =>
574+
const expectedOutput = params.map((v, i) =>
566575
JSON.stringify({
567576
method: 'client',
568577
jsonrpc: '2.0',
569578
id: null,
570579
params: v,
580+
...(i === 0 ? { metadata: { timeout: null } } : {}),
571581
}),
572582
);
573583
expect((await outputResult).map((v) => v.toString())).toStrictEqual(
@@ -603,6 +613,7 @@ describe(`${RPCClient.name}`, () => {
603613
jsonrpc: '2.0',
604614
id: null,
605615
params: params,
616+
metadata: { timeout: null },
606617
}),
607618
);
608619
},

0 commit comments

Comments
 (0)