Skip to content

Commit 9def17f

Browse files
committed
added argument to drain that provides the list of packets that are drained
1 parent e95f6ab commit 9def17f

File tree

7 files changed

+34
-25
lines changed

7 files changed

+34
-25
lines changed

packages/engine.io/lib/socket.ts

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ export interface SendOptions {
1414

1515
type ReadyState = "opening" | "open" | "closing" | "closed";
1616

17-
type SendCallback = (transport: Transport) => void;
17+
type SendCallback = (
18+
transport: Transport,
19+
packets: Packet[] | undefined
20+
) => void;
1821

1922
export class Socket extends EventEmitter {
2023
/**
@@ -80,7 +83,7 @@ export class Socket extends EventEmitter {
8083
server: BaseServer,
8184
transport: Transport,
8285
req: EngineRequest,
83-
protocol: number,
86+
protocol: number
8487
) {
8588
super();
8689
this.id = id;
@@ -125,7 +128,7 @@ export class Socket extends EventEmitter {
125128
pingInterval: this.server.opts.pingInterval,
126129
pingTimeout: this.server.opts.pingTimeout,
127130
maxPayload: this.server.opts.maxHttpBufferSize,
128-
}),
131+
})
129132
);
130133

131134
if (this.server.opts.initialPacket) {
@@ -212,7 +215,7 @@ export class Socket extends EventEmitter {
212215
this.pingIntervalTimer = setTimeout(() => {
213216
debug(
214217
"writing ping packet - expecting pong within %sms",
215-
this.server.opts.pingTimeout,
218+
this.server.opts.pingTimeout
216219
);
217220
this.sendPacket("ping");
218221
this.resetPingTimeout();
@@ -233,7 +236,7 @@ export class Socket extends EventEmitter {
233236
},
234237
this.protocol === 3
235238
? this.server.opts.pingInterval + this.server.opts.pingTimeout
236-
: this.server.opts.pingTimeout,
239+
: this.server.opts.pingTimeout
237240
);
238241
}
239242

@@ -271,13 +274,13 @@ export class Socket extends EventEmitter {
271274
*
272275
* @private
273276
*/
274-
private onDrain() {
277+
private onDrain(packets: Packet[] | undefined) {
275278
if (this.sentCallbackFn.length > 0) {
276279
debug("executing batch send callback");
277280
const seqFn = this.sentCallbackFn.shift();
278281
if (seqFn) {
279282
for (let i = 0; i < seqFn.length; i++) {
280-
seqFn[i](this.transport);
283+
seqFn[i](this.transport, packets);
281284
}
282285
}
283286
}
@@ -293,7 +296,7 @@ export class Socket extends EventEmitter {
293296
debug(
294297
'might upgrade socket transport from "%s" to "%s"',
295298
this.transport.name,
296-
transport.name,
299+
transport.name
297300
);
298301

299302
this.upgrading = true;
@@ -468,7 +471,7 @@ export class Socket extends EventEmitter {
468471
type: PacketType,
469472
data?: RawData,
470473
options: SendOptions = {},
471-
callback?: SendCallback,
474+
callback?: SendCallback
472475
) {
473476
if ("function" === typeof options) {
474477
callback = options;
@@ -525,8 +528,8 @@ export class Socket extends EventEmitter {
525528
}
526529

527530
this.transport.send(wbuf);
528-
this.emit("drain");
529-
this.server.emit("drain", this);
531+
this.emit("drain", wbuf);
532+
this.server.emit("drain", this, wbuf);
530533
}
531534
}
532535

@@ -568,7 +571,7 @@ export class Socket extends EventEmitter {
568571
if (this.writeBuffer.length) {
569572
debug(
570573
"there are %d remaining packets in the buffer, waiting for the 'drain' event",
571-
this.writeBuffer.length,
574+
this.writeBuffer.length
572575
);
573576
this.once("drain", () => {
574577
debug("all packets have been sent, closing the transport");

packages/engine.io/lib/transport.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import * as parser_v3 from "./parser-v3/index";
44
import debugModule from "debug";
55
import type { IncomingMessage, ServerResponse } from "http";
66
import { Packet, RawData } from "engine.io-parser";
7+
import type * as Parser from "engine.io-parser";
78

89
const debug = debugModule("engine:transport");
910

@@ -69,7 +70,7 @@ export abstract class Transport extends EventEmitter {
6970
"readyState updated from %s to %s (%s)",
7071
this._readyState,
7172
state,
72-
this.name,
73+
this.name
7374
);
7475
this._readyState = state;
7576
}

packages/engine.io/lib/transports-uws/polling.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { createGzip, createDeflate } from "zlib";
33
import * as accepts from "accepts";
44
import debugModule from "debug";
55
import { HttpRequest, HttpResponse } from "uWebSockets.js";
6+
import { Packet } from "engine.io-parser";
67

78
const debug = debugModule("engine:polling");
89

@@ -253,7 +254,7 @@ export class Polling extends Transport {
253254
* @param {Object} packet
254255
* @private
255256
*/
256-
send(packets) {
257+
send(packets: Packet[]) {
257258
this.writable = false;
258259

259260
if (this.shouldClose) {
@@ -263,11 +264,11 @@ export class Polling extends Transport {
263264
this.shouldClose = null;
264265
}
265266

266-
const doWrite = (data) => {
267+
const doWrite = (data: string) => {
267268
const compress = packets.some((packet) => {
268269
return packet.options && packet.options.compress;
269270
});
270-
this.write(data, { compress });
271+
this.write(data, { compress, source: packets });
271272
};
272273

273274
if (this.protocol === 3) {
@@ -288,7 +289,7 @@ export class Polling extends Transport {
288289
debug('writing "%s"', data);
289290
this.doWrite(data, options, () => {
290291
this.req.cleanup();
291-
this.emit("drain");
292+
this.emit("drain", options.source);
292293
});
293294
}
294295

packages/engine.io/lib/transports-uws/websocket.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ export class WebSocket extends Transport {
5555
this.socket.send(data, isBinary, compress);
5656

5757
if (isLast) {
58-
this.emit("drain");
58+
this.emit("drain", packets);
5959
this.writable = true;
6060
this.emit("ready");
6161
}

packages/engine.io/lib/transports/polling.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ export class Polling extends Transport {
229229
const compress = packets.some((packet) => {
230230
return packet.options && packet.options.compress;
231231
});
232-
this.write(data, { compress });
232+
this.write(data, { compress, source: packets });
233233
};
234234

235235
if (this.protocol === 3) {
@@ -250,7 +250,7 @@ export class Polling extends Transport {
250250
debug('writing "%s"', data);
251251
this.doWrite(data, options, () => {
252252
this.req.cleanup();
253-
this.emit("drain");
253+
this.emit("drain", options.source);
254254
});
255255
}
256256

packages/engine.io/lib/transports/websocket.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const debug = debugModule("engine:ws");
77
export class WebSocket extends Transport {
88
protected perMessageDeflate: any;
99
private socket: any;
10+
private currentPackets: Packet[] | undefined;
1011

1112
/**
1213
* WebSocket transport
@@ -44,6 +45,8 @@ export class WebSocket extends Transport {
4445
send(packets: Packet[]) {
4546
this.writable = false;
4647

48+
this.currentPackets = packets;
49+
4750
for (let i = 0; i < packets.length; i++) {
4851
const packet = packets[i];
4952
const isLast = i + 1 === packets.length;
@@ -54,13 +57,13 @@ export class WebSocket extends Transport {
5457
this.socket._sender.sendFrame(
5558
// @ts-ignore
5659
packet.options.wsPreEncodedFrame,
57-
isLast ? this._onSentLast : this._onSent,
60+
isLast ? this._onSentLast : this._onSent
5861
);
5962
} else {
6063
this.parser.encodePacket(
6164
packet,
6265
this.supportsBinary,
63-
isLast ? this._doSendLast : this._doSend,
66+
isLast ? this._doSendLast : this._doSend
6467
);
6568
}
6669
}
@@ -98,8 +101,9 @@ export class WebSocket extends Transport {
98101
if (err) {
99102
this.onError("write error", err.stack);
100103
} else {
101-
this.emit("drain");
104+
this.emit("drain", this.currentPackets);
102105
this.writable = true;
106+
this.currentPackets = undefined;
103107
this.emit("ready");
104108
}
105109
};

packages/engine.io/lib/transports/webtransport.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export class WebTransport extends Transport {
1313
constructor(
1414
private readonly session,
1515
stream,
16-
reader,
16+
reader
1717
) {
1818
super({ _query: { EIO: "4" } });
1919

@@ -60,7 +60,7 @@ export class WebTransport extends Transport {
6060
debug("error while writing: %s", e.message);
6161
}
6262

63-
this.emit("drain");
63+
this.emit("drain", packets);
6464
this.writable = true;
6565
this.emit("ready");
6666
}

0 commit comments

Comments
 (0)