Skip to content

Commit b877415

Browse files
committed
Update network streams (performance improvement).
1 parent cd3bd0e commit b877415

18 files changed

+147
-156
lines changed
File renamed without changes.

src/hook/session-factory-hook.ts

+27-41
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
*/
66

77
import { SessionConfig } from '../config';
8-
import { LocoSession, PacketResData, SessionFactory } from '../network/request-session';
9-
import { DefaultRes, DefaultReq, AsyncCommandResult } from '../request';
10-
import { LocoPacket } from '../packet';
8+
import { ConnectionSession, PacketResData, SessionFactory } from '../network/request-session';
9+
import { DefaultReq, AsyncCommandResult, DefaultRes } from '../request';
10+
import { BiStream } from '../stream';
1111

1212
/**
1313
* Hook incoming datas
@@ -24,11 +24,6 @@ export interface SessionHook {
2424
*/
2525
onRequest: (method: string, data: DefaultReq) => void;
2626

27-
/**
28-
* Hook loco packet
29-
*/
30-
onSendPacket: (packet: LocoPacket) => void;
31-
3227
onClose(): () => void;
3328

3429
}
@@ -41,60 +36,51 @@ export class HookedSessionFactory implements SessionFactory {
4136

4237
}
4338

44-
async createSession(config: SessionConfig): AsyncCommandResult<LocoSession> {
45-
const sessionRes = await this._factory.createSession(config);
39+
async connect(config: SessionConfig): AsyncCommandResult<ConnectionSession> {
40+
const sessionRes = await this._factory.connect(config);
4641
if (!sessionRes.success) return sessionRes;
4742

48-
return { status: sessionRes.status, success: true, result: new HookedLocoSession(sessionRes.result, this._hook) };
43+
return { status: sessionRes.status, success: true, result: new InspectSession(sessionRes.result, this._hook) };
4944
}
5045
}
5146

52-
/**
53-
* Hook loco session
54-
*/
55-
export class HookedLocoSession implements LocoSession {
56-
constructor(private _session: LocoSession, public hook: Partial<SessionHook> = {}) {
47+
export class InspectSession implements ConnectionSession {
48+
49+
constructor(
50+
private _session: ConnectionSession,
51+
private _hook: Partial<SessionHook> = {}
52+
) {
5753

5854
}
5955

56+
get stream(): BiStream {
57+
return this._session.stream;
58+
}
59+
60+
request<T = DefaultRes>(method: string, data: DefaultReq): Promise<DefaultRes & T> {
61+
if (this._hook.onRequest) this._hook.onRequest(method, data);
62+
63+
return this._session.request(method, data);
64+
}
65+
6066
listen(): AsyncIterableIterator<PacketResData> {
61-
const hook = this.hook;
6267
const iterator = this._session.listen();
6368

6469
return {
65-
[Symbol.asyncIterator]() {
70+
[Symbol.asyncIterator](): AsyncIterableIterator<PacketResData> {
6671
return this;
6772
},
6873

69-
async next(): Promise<IteratorResult<PacketResData>> {
74+
next: async (): Promise<IteratorResult<PacketResData>> => {
7075
const next = await iterator.next();
7176

72-
if (!next.done && hook.onData) {
73-
const { method, data, push } = next.value;
74-
75-
hook.onData(method, data, push);
77+
if (!next.done && this._hook.onData) {
78+
this._hook.onData(next.value.method, next.value.data, next.value.push);
7679
}
7780

7881
return next;
79-
},
82+
}
8083
};
8184
}
8285

83-
request<T = DefaultRes>(method: string, data: DefaultReq): Promise<DefaultRes & T> {
84-
if (this.hook.onRequest) this.hook.onRequest(method, data);
85-
86-
return this._session.request(method, data);
87-
}
88-
89-
sendPacket(packet: LocoPacket): Promise<LocoPacket> {
90-
if (this.hook.onSendPacket) this.hook.onSendPacket(packet);
91-
92-
return this._session.sendPacket(packet);
93-
}
94-
95-
close(): void {
96-
if (this.hook.onClose) this.hook.onClose();
97-
98-
this._session.close();
99-
}
10086
}

src/hook/stream-hook.ts

+8-4
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,18 @@ export interface StreamHook {
1010

1111
/**
1212
* Hook data write
13-
* @param data
13+
*
14+
* @param data Write data
1415
*/
1516
onWrite(data: Uint8Array): void;
1617

1718
/**
1819
* Hook data read
19-
* @param buf
20+
*
21+
* @param buf Read buffer
22+
* @param read Read size
2023
*/
21-
onRead(buf: Uint8Array): void;
24+
onRead(buf: Uint8Array, read: number | null): void;
2225

2326
onClose(): void;
2427

@@ -41,7 +44,8 @@ export class HookedStream implements BiStream {
4144

4245
async read(buffer: Uint8Array): Promise<number | null> {
4346
const read = await this._stream.read(buffer);
44-
if (this.hook.onRead) this.hook.onRead(buffer);
47+
48+
if (this.hook.onRead) this.hook.onRead(buffer, read);
4549

4650
return read;
4751
}

src/network/loco-packet-codec.ts

+7-27
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,24 @@ export class LocoPacketCodec {
2121
return this._stream;
2222
}
2323

24-
send(packet: LocoPacket): Promise<number> {
24+
write(packet: LocoPacket): Promise<number> {
2525
const packetBuffer = new ArrayBuffer(22 + packet.data[1].byteLength);
2626
const packetArray = new Uint8Array(packetBuffer);
27-
const namebuffer = new Uint8Array(11);
2827
const view = new DataView(packetBuffer);
2928

3029
view.setUint32(0, packet.header.id, true);
3130
view.setUint16(4, packet.header.status & 0xffff, true);
32-
view.setUint8(17, packet.data[0] & 0xff);
33-
view.setUint32(18, packet.data[1].byteLength, true);
3431

35-
const nameLen = Math.min(packet.header.method.length, 11);
36-
const nameList: number[] = [];
37-
for (let i = 0; i < nameLen; i++) {
38-
const code = packet.header.method.charCodeAt(i);
32+
for (let i = 0; i < 11; i++) {
33+
const code = packet.header.method.charCodeAt(i) || 0;
3934

4035
if (code > 0xff) throw new Error('Invalid ASCII code at method string');
41-
nameList.push(code);
36+
packetArray[6 + i] = code;
4237
}
43-
namebuffer.set(nameList, 0);
4438

45-
packetArray.set(namebuffer, 6);
39+
view.setUint8(17, packet.data[0] & 0xff);
40+
view.setUint32(18, packet.data[1].byteLength, true);
41+
4642
packetArray.set(packet.data[1], 22);
4743

4844
return this._stream.write(packetArray);
@@ -71,20 +67,4 @@ export class LocoPacketCodec {
7167
data: [dataType, data],
7268
};
7369
}
74-
75-
iterate(): AsyncIterableIterator<LocoPacket> {
76-
return {
77-
[Symbol.asyncIterator](): AsyncIterableIterator<LocoPacket> {
78-
return this;
79-
},
80-
81-
next: async(): Promise<IteratorResult<LocoPacket>> => {
82-
if (this._stream.ended) return { done: true, value: null }
83-
84-
const read = await this.read();
85-
if (!read) return { done: true, value: null };
86-
return { done: false, value: read };
87-
},
88-
};
89-
}
9070
}

src/network/loco-packet-dispatcher.ts

+26-21
Original file line numberDiff line numberDiff line change
@@ -45,42 +45,47 @@ export class LocoPacketDispatcher {
4545
this._packetMap.set(packet.header.id, [resolve, reject]);
4646
});
4747

48-
await this._codec.send(packet);
48+
await this._codec.write(packet);
4949

5050
return promise;
5151
}
5252

5353
/**
54-
* Listen and process incoming packets.
54+
* Read one packet and process it.
55+
*/
56+
async read(): Promise<PacketRes | undefined> {
57+
const packet = await this._codec.read();
58+
if (!packet) return;
59+
60+
if (this._packetMap.has(packet.header.id)) {
61+
const resolver = this._packetMap.get(packet.header.id);
62+
if (resolver) {
63+
resolver[0](packet);
64+
this._packetMap.delete(packet.header.id);
65+
}
66+
return { push: false, packet };
67+
} else {
68+
return { push: true, packet };
69+
}
70+
}
71+
72+
/**
73+
* Listen and read incoming packets.
5574
*
5675
* @return {AsyncIterableIterator<PacketRes>}
5776
*/
5877
listen(): AsyncIterableIterator<PacketRes> {
59-
const packetMap = this._packetMap;
60-
const iterator = this._codec.iterate();
61-
6278
return {
6379
[Symbol.asyncIterator]() {
6480
return this;
6581
},
6682

67-
async next(): Promise<IteratorResult<PacketRes>> {
68-
const next = await iterator.next();
69-
70-
if (next.done) return { done: true, value: null };
71-
72-
const packet = next.value;
83+
next: async (): Promise<IteratorResult<PacketRes>> => {
84+
const read = await this.read();
7385

74-
if (packetMap.has(packet.header.id)) {
75-
const resolver = packetMap.get(packet.header.id);
76-
if (resolver) {
77-
resolver[0](packet);
78-
packetMap.delete(packet.header.id);
79-
}
80-
return { done: false, value: { push: false, packet } };
81-
} else {
82-
return { done: false, value: { push: true, packet } };
83-
}
86+
if (!read) return { done: true, value: null };
87+
88+
return { done: false, value: read };
8489
},
8590
};
8691
}

src/network/loco-secure-layer.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export class LocoSecureLayer implements BiStream {
4848
buffer.set(data.subarray(0, readSize), 0);
4949

5050
if (data.byteLength > buffer.byteLength) {
51-
this._dataChunks.append(data.slice(buffer.byteLength));
51+
this._dataChunks.append(data.subarray(buffer.byteLength));
5252
}
5353

5454
return readSize;

src/network/packet-assembler.ts

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ export class PacketAssembler<T, R> {
4343
* Deconstruct LocoPacket into data.
4444
* This method can throw error if the type is not supported by codec.
4545
*
46+
* @template R
4647
* @param {LocoPacket} packet
4748
* @return {R}
4849
*/

src/network/request-session.ts

+25-31
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import { SessionConfig } from '../config';
88
import { AsyncCommandResult, DefaultReq, DefaultRes } from '../request';
9-
import { BsonDataCodec, LocoPacket } from '../packet';
9+
import { BsonDataCodec } from '../packet';
1010
import { LocoPacketDispatcher } from './loco-packet-dispatcher';
1111
import { PacketAssembler } from './packet-assembler';
1212
import { BiStream } from '../stream';
@@ -23,37 +23,38 @@ export interface CommandSession {
2323

2424
}
2525

26-
export interface PacketResData {
26+
export interface ConnectionSession extends CommandSession {
2727

28-
method: string;
29-
data: DefaultRes;
30-
push: boolean;
31-
32-
}
28+
/**
29+
* Connection stream
30+
*/
31+
readonly stream: BiStream;
3332

34-
export interface LocoSession extends CommandSession {
33+
/**
34+
* Listen incoming packets
35+
*/
36+
listen(): AsyncIterableIterator<PacketResData>;
3537

36-
listen(): AsyncIterable<PacketResData> & AsyncIterator<PacketResData>;
38+
}
3739

38-
sendPacket(packet: LocoPacket): Promise<LocoPacket>;
40+
export interface PacketResData {
3941

40-
close(): void;
42+
method: string;
43+
data: DefaultRes;
44+
push: boolean;
4145

4246
}
4347

4448
/**
45-
* Create LocoSession using configuration.
49+
* Create connection using configuration.
4650
*/
4751
export interface SessionFactory {
4852

49-
createSession(config: SessionConfig): AsyncCommandResult<LocoSession>;
53+
connect(config: SessionConfig): AsyncCommandResult<ConnectionSession>;
5054

5155
}
5256

53-
/**
54-
* Holds current loco session.
55-
*/
56-
export class DefaultLocoSession implements LocoSession {
57+
export class LocoSession implements ConnectionSession {
5758
private _assembler: PacketAssembler<DefaultReq, DefaultRes>;
5859
private _dispatcher: LocoPacketDispatcher;
5960

@@ -62,12 +63,16 @@ export class DefaultLocoSession implements LocoSession {
6263
this._dispatcher = new LocoPacketDispatcher(stream);
6364
}
6465

65-
listen(): { [Symbol.asyncIterator](): AsyncIterator<PacketResData>, next(): Promise<IteratorResult<PacketResData>> } {
66+
get stream(): BiStream {
67+
return this._dispatcher.stream;
68+
}
69+
70+
listen(): AsyncIterableIterator<PacketResData> {
6671
const iterator = this._dispatcher.listen();
6772
const assembler = this._assembler;
6873

6974
return {
70-
[Symbol.asyncIterator](): AsyncIterator<PacketResData> {
75+
[Symbol.asyncIterator](): AsyncIterableIterator<PacketResData> {
7176
return this;
7277
},
7378

@@ -78,23 +83,12 @@ export class DefaultLocoSession implements LocoSession {
7883
const { push, packet } = next.value;
7984

8085
return { done: false, value: { push, method: packet.header.method, data: assembler.deconstruct(packet) } };
81-
},
86+
}
8287
};
8388
}
8489

8590
async request<T = DefaultRes>(method: string, data: DefaultReq): Promise<DefaultRes & T> {
8691
const res = await this._dispatcher.sendPacket(this._assembler.construct(method, data));
8792
return this._assembler.deconstruct(res) as DefaultRes & T;
8893
}
89-
90-
sendPacket(packet: LocoPacket): Promise<LocoPacket> {
91-
return this._dispatcher.sendPacket(packet);
92-
}
93-
94-
/**
95-
* Close session
96-
*/
97-
close(): void {
98-
this._dispatcher.stream.close();
99-
}
10094
}

0 commit comments

Comments
 (0)