Skip to content

Commit 1eb8761

Browse files
authored
Remove duplicte notification (#768)
* Remove duplicte notification * on listener close, capture the unsolicited close event
1 parent ab5eeb3 commit 1eb8761

File tree

1 file changed

+23
-2
lines changed

1 file changed

+23
-2
lines changed

src/infrastructure/Listener.ts

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
import { Observable, of, Subject } from 'rxjs';
18-
import { catchError, filter, map, mergeMap, share, switchMap } from 'rxjs/operators';
18+
import { catchError, distinctUntilChanged, filter, map, mergeMap, share, switchMap } from 'rxjs/operators';
1919
import { BlockInfoDTO } from 'symbol-openapi-typescript-fetch-client';
2020
import * as WebSocket from 'ws';
2121
import { UnresolvedAddress } from '../model';
@@ -74,6 +74,8 @@ export class Listener implements IListener {
7474
*/
7575
private uid: string;
7676

77+
private SIGINT = false;
78+
7779
/**
7880
* Constructor
7981
* @param url - Listener websocket server url. default: rest-gateway's url with ''/ws'' suffix. (e.g. http://localhost:3000/ws).
@@ -120,6 +122,18 @@ export class Listener implements IListener {
120122
this.webSocket.onerror = (err: Error): void => {
121123
reject(err);
122124
};
125+
this.webSocket.onclose = (closeEvent?: any): void => {
126+
if (this.SIGINT) {
127+
return;
128+
}
129+
if (closeEvent) {
130+
reject({
131+
client: this.uid,
132+
code: closeEvent.code,
133+
reason: closeEvent.reason,
134+
});
135+
}
136+
};
123137
this.webSocket.onmessage = (msg: any): void => {
124138
const message = JSON.parse(msg.data as string);
125139
this.handleMessage(message, resolve);
@@ -232,6 +246,7 @@ export class Listener implements IListener {
232246
this.webSocket &&
233247
(this.webSocket.readyState === this.webSocket.OPEN || this.webSocket.readyState === this.webSocket.CONNECTING)
234248
) {
249+
this.SIGINT = true;
235250
this.webSocket.close();
236251
}
237252
}
@@ -339,6 +354,11 @@ export class Listener implements IListener {
339354
return this.messageSubject.asObservable().pipe(
340355
filter((listenerMessage) => listenerMessage.channelName === channel),
341356
filter((listenerMessage) => listenerMessage.message instanceof Transaction),
357+
distinctUntilChanged((prev, curr) => {
358+
const currentHash = (curr.message as Transaction).transactionInfo!.hash;
359+
const previousHash = (prev.message as Transaction).transactionInfo!.hash;
360+
return (currentHash && previousHash && previousHash === currentHash) || !currentHash || !previousHash;
361+
}),
342362
switchMap((_) => {
343363
const transactionObservable = of(_.message as T).pipe(
344364
filter((transaction) => this.filterHash(transaction, transactionHash)),
@@ -421,7 +441,8 @@ export class Listener implements IListener {
421441
filter((_) => typeof _.message === 'string'),
422442
filter((_) => subscribers.includes(_.channelParam.toUpperCase())),
423443
map((_) => _.message as string),
424-
filter((_) => !transactionHash || _.toUpperCase() == transactionHash.toUpperCase()),
444+
filter((_) => !transactionHash || _.toUpperCase() === transactionHash.toUpperCase()),
445+
distinctUntilChanged(),
425446
);
426447
}),
427448
);

0 commit comments

Comments
 (0)