Skip to content

Commit d2d60a2

Browse files
committed
fix: fixed monitor usage in QUICConnection
* Related #48
1 parent 0d0977a commit d2d60a2

File tree

3 files changed

+50
-35
lines changed

3 files changed

+50
-35
lines changed

src/QUICConnection.ts

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import {
2424
import Logger from '@matrixai/logger';
2525
import { Timer } from '@matrixai/timer';
2626
import { context, timedCancellable } from '@matrixai/contexts/dist/decorators';
27+
import { withF } from '@matrixai/resources';
28+
import { utils as contextsUtils } from '@matrixai/contexts';
2729
import { buildQuicheConfig } from './config';
2830
import QUICStream from './QUICStream';
2931
import { quiche } from './native';
@@ -278,27 +280,15 @@ class QUICConnection extends EventTarget {
278280
};
279281
ctx.signal.addEventListener('abort', abortHandler);
280282
const connection = new this(args);
281-
const startProm = connection.start().then(async () => {
282-
// If this is a server connection, we need to process the packet that created it
283-
if (args.type === 'server') {
284-
await utils.withMonitor(
285-
undefined,
286-
connection.lockbox,
287-
RWLockWriter,
288-
async (mon) => {
289-
await mon.withF(connection.lockCode, async (mon) => {
290-
await connection.recv(args.data, args.remoteInfo, mon);
291-
await connection.send(mon);
292-
});
293-
},
294-
);
295-
}
296-
});
283+
const initialData =
284+
args.type === 'server'
285+
? { data: args.data, remoteInfo: args.remoteInfo }
286+
: undefined;
297287
// This ensures that TLS has been established and verified on both sides
298288
try {
299289
await Promise.race([
300290
Promise.all([
301-
startProm,
291+
connection.start(initialData),
302292
connection.establishedP,
303293
connection.secureEstablishedP,
304294
]),
@@ -471,12 +461,24 @@ class QUICConnection extends EventTarget {
471461

472462
/**
473463
* This will set up the connection initiate sending
464+
* @param initialData - If the connection is server initiated then the data that initiated it needs to be provided here
474465
*/
475-
public async start(): Promise<void> {
466+
public async start(initialData?: {
467+
data: Uint8Array;
468+
remoteInfo: RemoteInfo;
469+
}): Promise<void> {
476470
this.logger.info(`Start ${this.constructor.name}`);
477471
// Set the connection up
478472
this.socket.connectionMap.set(this.connectionId, this);
479-
await this.send();
473+
await withF(
474+
[contextsUtils.monitor(this.lockbox, RWLockWriter)],
475+
async ([mon]) => {
476+
if (initialData != null) {
477+
await this.recv(initialData.data, initialData.remoteInfo, mon);
478+
}
479+
await this.send(mon);
480+
},
481+
);
480482
this.logger.info(`Started ${this.constructor.name}`);
481483
}
482484

@@ -665,13 +667,15 @@ class QUICConnection extends EventTarget {
665667
public async recv(
666668
data: Uint8Array,
667669
remoteInfo: RemoteInfo,
668-
mon: Monitor<RWLockWriter>,
670+
mon?: Monitor<RWLockWriter>,
669671
): Promise<void> {
670-
if (!mon.isLocked(this.lockCode)) {
671-
return mon.withF(this.lockCode, async (mon) => {
672-
return this.recv(data, remoteInfo, mon);
673-
});
674-
}
672+
await utils.withMonitor(mon, this.lockbox, RWLockWriter, async (mon) => {
673+
if (!mon.isLocked(this.lockCode)) {
674+
return mon.withF(this.lockCode, async (mon) => {
675+
return this.recv(data, remoteInfo, mon);
676+
});
677+
}
678+
});
675679

676680
try {
677681
// The remote information may be changed on each receive

src/QUICSocket.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import { running } from '@matrixai/async-init';
88
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
99
import { RWLockWriter } from '@matrixai/async-locks';
1010
import { status } from '@matrixai/async-init/dist/utils';
11+
import { withF } from '@matrixai/resources';
12+
import { utils as contextsUtils } from '@matrixai/contexts';
1113
import QUICConnectionId from './QUICConnectionId';
1214
import QUICConnectionMap from './QUICConnectionMap';
1315
import { quiche } from './native';
@@ -108,11 +110,9 @@ class QUICSocket extends EventTarget {
108110
// Acquire the conn lock, this ensures mutual exclusion
109111
// for state changes on the internal connection
110112
try {
111-
await utils.withMonitor(
112-
undefined,
113-
connection.lockbox,
114-
RWLockWriter,
115-
async (mon) => {
113+
await withF(
114+
[contextsUtils.monitor(connection.lockbox, RWLockWriter)],
115+
async ([mon]) => {
116116
await mon.withF(connection.lockCode, async (mon) => {
117117
// Even if we are `stopping`, the `quiche` library says we need to
118118
// continue processing any packets.

src/utils.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import type {
99
} from './types';
1010
import type { Connection } from './native';
1111
import type { LockBox, RWLockWriter } from '@matrixai/async-locks';
12+
import type { Monitor } from '@matrixai/async-locks';
1213
import dns from 'dns';
1314
import { IPv4, IPv6, Validator } from 'ip-num';
14-
import { Monitor } from '@matrixai/async-locks';
15+
import { withF } from '@matrixai/resources';
16+
import { utils as contextsUtils } from '@matrixai/contexts';
1517
import QUICConnectionId from './QUICConnectionId';
1618
import * as errors from './errors';
1719

@@ -464,17 +466,26 @@ function streamStats(
464466
`;
465467
}
466468

469+
/**
470+
* Used as a clean way to create a new monitor if it doesn't exist, otherwise uses the existing one.
471+
*/
467472
async function withMonitor<T>(
468473
mon: Monitor<RWLockWriter> | undefined,
469474
lockBox: LockBox<RWLockWriter>,
470475
lockConstructor: { new (): RWLockWriter },
471476
fun: (mon: Monitor<RWLockWriter>) => Promise<T>,
472477
locksPending?: Map<string, { count: number }>,
473478
): Promise<T> {
474-
const _mon = mon ?? new Monitor(lockBox, lockConstructor, locksPending);
475-
const result = await fun(_mon);
476-
if (mon != null) await _mon.unlockAll();
477-
return result;
479+
if (mon == null) {
480+
return await withF(
481+
[contextsUtils.monitor(lockBox, lockConstructor, locksPending)],
482+
async ([mon]) => {
483+
return await fun(mon);
484+
},
485+
);
486+
} else {
487+
return await fun(mon);
488+
}
478489
}
479490

480491
export {

0 commit comments

Comments
 (0)