Skip to content

Commit 9b5a534

Browse files
authored
Merge pull request #49 from MatrixAI/feat-monitor-fixes
Feat monitor fixes
2 parents 7cb9fd4 + 48d42fb commit 9b5a534

File tree

9 files changed

+111
-139
lines changed

9 files changed

+111
-139
lines changed

package-lock.json

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
"@matrixai/async-cancellable": "^1.1.1",
4040
"@matrixai/async-init": "^1.8.4",
4141
"@matrixai/async-locks": "^4.0.0",
42-
"@matrixai/contexts": "^1.1.0",
42+
"@matrixai/contexts": "^1.2.0",
4343
"@matrixai/errors": "^1.1.7",
4444
"@matrixai/logger": "^3.1.0",
4545
"@matrixai/resources": "^1.1.5",

src/QUICClient.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { quiche } from './native';
1616
import * as utils from './utils';
1717
import * as errors from './errors';
1818
import * as events from './events';
19-
import { clientDefault } from './config';
19+
import { clientDefault, minIdleTimeout } from './config';
2020
import QUICSocket from './QUICSocket';
2121
import QUICConnection from './QUICConnection';
2222
import QUICConnectionId from './QUICConnectionId';
@@ -87,7 +87,7 @@ class QUICClient extends EventTarget {
8787
},
8888
ctx?: Partial<ContextTimedInput>,
8989
): PromiseCancellable<QUICClient>;
90-
@timedCancellable(true, Infinity, errors.ErrorQUICClientCreateTimeOut)
90+
@timedCancellable(true, minIdleTimeout, errors.ErrorQUICClientCreateTimeOut)
9191
public static async createQUICClient(
9292
{
9393
host,

src/QUICConnection.ts

Lines changed: 56 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ import {
2424
import Logger from '@matrixai/logger';
2525
import { Timer } from '@matrixai/timer';
2626
import { context, timedCancellable } from '@matrixai/contexts/dist/decorators';
27-
import { buildQuicheConfig } from './config';
27+
import { withF } from '@matrixai/resources';
28+
import { utils as contextsUtils } from '@matrixai/contexts';
29+
import { buildQuicheConfig, minIdleTimeout } from './config';
2830
import QUICStream from './QUICStream';
2931
import { quiche } from './native';
3032
import * as events from './events';
@@ -235,7 +237,11 @@ class QUICConnection extends EventTarget {
235237
},
236238
ctx?: Partial<ContextTimedInput>,
237239
): PromiseCancellable<QUICConnection>;
238-
@timedCancellable(true, Infinity, errors.ErrorQUICConnectionStartTimeOut)
240+
@timedCancellable(
241+
true,
242+
minIdleTimeout,
243+
errors.ErrorQUICConnectionStartTimeOut,
244+
)
239245
public static async createQUICConnection(
240246
args:
241247
| {
@@ -265,40 +271,20 @@ class QUICConnection extends EventTarget {
265271
},
266272
@context ctx: ContextTimed,
267273
): Promise<QUICConnection> {
268-
const timeoutTime = ctx.timer.getTimeout();
269-
if (timeoutTime !== Infinity && timeoutTime >= args.config.maxIdleTimeout) {
270-
throw new errors.ErrorQUICConnectionInvalidConfig(
271-
'connection timeout timer must be strictly less than maxIdleTimeout',
272-
);
273-
}
274274
ctx.signal.throwIfAborted();
275275
const abortProm = promise<never>();
276276
const abortHandler = () => {
277277
abortProm.rejectP(ctx.signal.reason);
278278
};
279279
ctx.signal.addEventListener('abort', abortHandler);
280280
const connection = new this(args);
281-
const startProm = connection.start().then(async () => {
282-
// If this is a server connection, we need to process the packet that created it
283-
if (args.type === 'server') {
284-
await utils.withMonitor(
285-
undefined,
286-
connection.lockbox,
287-
RWLockWriter,
288-
async (mon) => {
289-
await mon.withF(connection.lockCode, async (mon) => {
290-
await connection.recv(args.data, args.remoteInfo, mon);
291-
await connection.send(mon);
292-
});
293-
},
294-
);
295-
}
296-
});
281+
// If it's a server connection we want to pass the initial packet
282+
const data = args.type === 'server' ? args.data : undefined;
297283
// This ensures that TLS has been established and verified on both sides
298284
try {
299285
await Promise.race([
300286
Promise.all([
301-
startProm,
287+
connection.start(data),
302288
connection.establishedP,
303289
connection.secureEstablishedP,
304290
]),
@@ -471,12 +457,25 @@ class QUICConnection extends EventTarget {
471457

472458
/**
473459
* This will set up the connection initiate sending
460+
* @param data - the initial packet that triggered the creation of the connection.
474461
*/
475-
public async start(): Promise<void> {
462+
public async start(data?: Uint8Array): Promise<void> {
476463
this.logger.info(`Start ${this.constructor.name}`);
477464
// Set the connection up
478465
this.socket.connectionMap.set(this.connectionId, this);
479-
await this.send();
466+
await withF(
467+
[contextsUtils.monitor(this.lockbox, RWLockWriter)],
468+
async ([mon]) => {
469+
if (data != null) {
470+
await this.recv(
471+
data,
472+
{ host: this._remoteHost, port: this._remotePort },
473+
mon,
474+
);
475+
}
476+
await this.send(mon);
477+
},
478+
);
480479
this.logger.info(`Started ${this.constructor.name}`);
481480
}
482481

@@ -535,7 +534,7 @@ class QUICConnection extends EventTarget {
535534
this.stopKeepAliveIntervalTimer();
536535

537536
// Trigger closing connection in the background and await close later.
538-
void utils.withMonitor(mon, this.lockbox, RWLockWriter, async (mon) => {
537+
void this.withMonitor(mon, this.lockbox, RWLockWriter, async (mon) => {
539538
await mon.withF(this.lockCode, async (mon) => {
540539
// If this is already closed, then `Done` will be thrown
541540
// Otherwise it can send `CONNECTION_CLOSE` frame
@@ -665,13 +664,15 @@ class QUICConnection extends EventTarget {
665664
public async recv(
666665
data: Uint8Array,
667666
remoteInfo: RemoteInfo,
668-
mon: Monitor<RWLockWriter>,
667+
mon?: Monitor<RWLockWriter>,
669668
): Promise<void> {
670-
if (!mon.isLocked(this.lockCode)) {
671-
return mon.withF(this.lockCode, async (mon) => {
672-
return this.recv(data, remoteInfo, mon);
673-
});
674-
}
669+
await this.withMonitor(mon, this.lockbox, RWLockWriter, async (mon) => {
670+
if (!mon.isLocked(this.lockCode)) {
671+
return mon.withF(this.lockCode, async (mon) => {
672+
return this.recv(data, remoteInfo, mon);
673+
});
674+
}
675+
});
675676

676677
try {
677678
// The remote information may be changed on each receive
@@ -775,7 +776,7 @@ class QUICConnection extends EventTarget {
775776
* @internal
776777
*/
777778
public async send(mon?: Monitor<RWLockWriter>): Promise<void> {
778-
await utils.withMonitor(mon, this.lockbox, RWLockWriter, async (mon) => {
779+
await this.withMonitor(mon, this.lockbox, RWLockWriter, async (mon) => {
779780
if (!mon.isLocked(this.lockCode)) {
780781
return mon.withF(this.lockCode, async (mon) => {
781782
return this.send(mon);
@@ -1083,6 +1084,26 @@ class QUICConnection extends EventTarget {
10831084
return quicStream;
10841085
});
10851086
}
1087+
1088+
/**
1089+
* Used as a clean way to create a new monitor if it doesn't exist, otherwise uses the existing one.
1090+
*/
1091+
protected async withMonitor<T>(
1092+
mon: Monitor<RWLockWriter> | undefined,
1093+
lockBox: LockBox<RWLockWriter>,
1094+
lockConstructor: { new (): RWLockWriter },
1095+
f: (mon: Monitor<RWLockWriter>) => Promise<T>,
1096+
locksPending?: Map<string, { count: number }>,
1097+
): Promise<T> {
1098+
if (mon == null) {
1099+
return await withF(
1100+
[contextsUtils.monitor(lockBox, lockConstructor, locksPending)],
1101+
([mon]) => f(mon),
1102+
);
1103+
} else {
1104+
return f(mon);
1105+
}
1106+
}
10861107
}
10871108

10881109
export default QUICConnection;

src/QUICServer.ts

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class QUICServer extends EventTarget {
5353
protected codeToReason: StreamCodeToReason | undefined;
5454
protected verifyCallback: VerifyCallback | undefined;
5555
protected connectionMap: QUICConnectionMap;
56+
protected minIdleTimeout: number | undefined;
5657
// Used to track address string for logging ONLY
5758
protected address: string;
5859

@@ -109,6 +110,7 @@ class QUICServer extends EventTarget {
109110
reasonToCode,
110111
codeToReason,
111112
verifyCallback,
113+
minIdleTimeout,
112114
logger,
113115
}: {
114116
crypto: {
@@ -124,6 +126,7 @@ class QUICServer extends EventTarget {
124126
reasonToCode?: StreamReasonToCode;
125127
codeToReason?: StreamCodeToReason;
126128
verifyCallback?: VerifyCallback;
129+
minIdleTimeout?: number;
127130
logger?: Logger;
128131
}) {
129132
super();
@@ -151,6 +154,7 @@ class QUICServer extends EventTarget {
151154
this.reasonToCode = reasonToCode;
152155
this.codeToReason = codeToReason;
153156
this.verifyCallback = verifyCallback;
157+
this.minIdleTimeout = minIdleTimeout;
154158
}
155159

156160
@ready(new errors.ErrorQUICServerNotRunning())
@@ -355,23 +359,28 @@ class QUICServer extends EventTarget {
355359
`Accepting new connection from QUIC packet from ${remoteInfo.host}:${remoteInfo.port}`,
356360
);
357361
const clientConnRef = Buffer.from(header.scid).toString('hex').slice(32);
358-
const connectionProm = QUICConnection.createQUICConnection({
359-
type: 'server',
360-
scid: newScid,
361-
dcid: dcidOriginal,
362-
socket: this.socket,
363-
remoteInfo,
364-
data,
365-
config: this.config,
366-
reasonToCode: this.reasonToCode,
367-
codeToReason: this.codeToReason,
368-
verifyCallback: this.verifyCallback,
369-
logger: this.logger.getChild(
370-
`${QUICConnection.name} ${scid.toString().slice(32)}-${clientConnRef}`,
371-
),
372-
});
362+
let connection: QUICConnection;
373363
try {
374-
await connectionProm;
364+
connection = await QUICConnection.createQUICConnection(
365+
{
366+
type: 'server',
367+
scid: newScid,
368+
dcid: dcidOriginal,
369+
socket: this.socket,
370+
remoteInfo,
371+
data,
372+
config: this.config,
373+
reasonToCode: this.reasonToCode,
374+
codeToReason: this.codeToReason,
375+
verifyCallback: this.verifyCallback,
376+
logger: this.logger.getChild(
377+
`${QUICConnection.name} ${scid
378+
.toString()
379+
.slice(32)}-${clientConnRef}`,
380+
),
381+
},
382+
{ timer: this.minIdleTimeout },
383+
);
375384
} catch (e) {
376385
// Ignoring any errors here as a failure to connect
377386
this.dispatchEvent(
@@ -383,7 +392,6 @@ class QUICServer extends EventTarget {
383392
);
384393
return;
385394
}
386-
const connection = await connectionProm;
387395
// Handling connection events
388396
connection.addEventListener(
389397
'connectionError',

src/QUICSocket.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import { running } from '@matrixai/async-init';
88
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
99
import { RWLockWriter } from '@matrixai/async-locks';
1010
import { status } from '@matrixai/async-init/dist/utils';
11+
import { withF } from '@matrixai/resources';
12+
import { utils as contextsUtils } from '@matrixai/contexts';
1113
import QUICConnectionId from './QUICConnectionId';
1214
import QUICConnectionMap from './QUICConnectionMap';
1315
import { quiche } from './native';
@@ -108,11 +110,9 @@ class QUICSocket extends EventTarget {
108110
// Acquire the conn lock, this ensures mutual exclusion
109111
// for state changes on the internal connection
110112
try {
111-
await utils.withMonitor(
112-
undefined,
113-
connection.lockbox,
114-
RWLockWriter,
115-
async (mon) => {
113+
await withF(
114+
[contextsUtils.monitor(connection.lockbox, RWLockWriter)],
115+
async ([mon]) => {
116116
await mon.withF(connection.lockCode, async (mon) => {
117117
// Even if we are `stopping`, the `quiche` library says we need to
118118
// continue processing any packets.

src/config.ts

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,26 @@ const sigalgs = [
2323
'ed25519',
2424
].join(':');
2525

26+
/**
27+
* Usually we would create separate timeouts for connecting vs idling.
28+
* Unfortunately quiche only has 1 config option that controls both.
29+
* And it is not possible to mutate this option after connecting.
30+
* Therefore, this option is just a way to set a shorter connecting timeout
31+
* compared to the idling timeout.
32+
* If this is the larger than the `maxIdleTimeout` (remember that `0` is `Infinity`) for `maxIdleTimeout`, then this has no effect.
33+
* This only has an effect if this is set to a number less than `maxIdleTimeout`.
34+
* Thus, it is the "minimum boundary" of the timeout during connecting.
35+
* While the `maxIdleTimeout` is still the "maximum boundary" during connecting.
36+
*/
37+
const minIdleTimeout = Infinity;
38+
2639
const clientDefault: QUICConfig = {
2740
sigalgs,
2841
verifyPeer: true,
2942
verifyAllowFail: false,
3043
grease: true,
31-
maxIdleTimeout: 1 * 60 * 1000,
44+
keepAliveIntervalTime: undefined,
45+
maxIdleTimeout: 0,
3246
maxRecvUdpPayloadSize: quiche.MAX_DATAGRAM_SIZE, // 65527
3347
maxSendUdpPayloadSize: quiche.MIN_CLIENT_INITIAL_LEN, // 1200,
3448
initialMaxData: 10 * 1024 * 1024,
@@ -48,7 +62,8 @@ const serverDefault: QUICConfig = {
4862
verifyPeer: false,
4963
verifyAllowFail: false,
5064
grease: true,
51-
maxIdleTimeout: 1 * 60 * 1000,
65+
keepAliveIntervalTime: undefined,
66+
maxIdleTimeout: 0,
5267
maxRecvUdpPayloadSize: quiche.MAX_DATAGRAM_SIZE, // 65527
5368
maxSendUdpPayloadSize: quiche.MIN_CLIENT_INITIAL_LEN, // 1200
5469
initialMaxData: 10 * 1024 * 1024,
@@ -188,4 +203,4 @@ function buildQuicheConfig(config: QUICConfig): QuicheConfig {
188203
return quicheConfig;
189204
}
190205

191-
export { clientDefault, serverDefault, buildQuicheConfig };
206+
export { minIdleTimeout, clientDefault, serverDefault, buildQuicheConfig };

0 commit comments

Comments
 (0)