14
14
* limitations under the License.
15
15
*/
16
16
17
- import { Observable , of , OperatorFunction , Subject } from 'rxjs' ;
18
- import { catchError , filter , flatMap , map , mergeMap , share , switchMap } from 'rxjs/operators' ;
17
+ import { Observable , of , Subject } from 'rxjs' ;
18
+ import { catchError , filter , map , mergeMap , share , switchMap } from 'rxjs/operators' ;
19
19
import { BlockInfoDTO } from 'symbol-openapi-typescript-fetch-client' ;
20
20
import * as WebSocket from 'ws' ;
21
21
import { UnresolvedAddress } from '../model' ;
22
22
import { Address } from '../model/account/Address' ;
23
23
import { PublicAccount } from '../model/account/PublicAccount' ;
24
24
import { FinalizedBlock } from '../model/blockchain/FinalizedBlock' ;
25
25
import { NewBlock } from '../model/blockchain/NewBlock' ;
26
- import { NamespaceName } from '../model/namespace/NamespaceName' ;
27
26
import { AggregateTransaction } from '../model/transaction/AggregateTransaction' ;
28
27
import { CosignatureSignedTransaction } from '../model/transaction/CosignatureSignedTransaction' ;
29
28
import { Deadline } from '../model/transaction/Deadline' ;
@@ -335,24 +334,25 @@ export class Listener implements IListener {
335
334
transactionHash ?: string ,
336
335
subscribeMultisig = false ,
337
336
) : Observable < T > {
338
- return this . getResolvedAddress ( unresolvedAddress ) . pipe (
339
- mergeMap ( ( address : Address ) => {
340
- return this . subscribeWithMultig ( address , channel , subscribeMultisig ) . pipe (
341
- switchMap ( ( subscribers ) => {
342
- return this . messageSubject . asObservable ( ) . pipe (
343
- filter ( ( listenerMessage ) => listenerMessage . channelName === channel ) ,
344
- filter ( ( listenerMessage ) => listenerMessage . message instanceof Transaction ) ,
345
- switchMap ( ( _ ) => {
346
- const transactionObservable = of ( _ . message as T ) . pipe (
347
- filter ( ( transaction ) => this . filterHash ( transaction , transactionHash ) ) ,
348
- ) ;
349
- if ( subscribers . includes ( _ . channelParam . toUpperCase ( ) ) ) {
350
- return transactionObservable ;
351
- } else {
352
- return transactionObservable . pipe ( this . filterByNotifyAccount ( address ) ) ;
353
- }
354
- } ) ,
337
+ return this . subscribeWithMultig ( unresolvedAddress , channel , subscribeMultisig ) . pipe (
338
+ switchMap ( ( subscribers ) => {
339
+ return this . messageSubject . asObservable ( ) . pipe (
340
+ filter ( ( listenerMessage ) => listenerMessage . channelName === channel ) ,
341
+ filter ( ( listenerMessage ) => listenerMessage . message instanceof Transaction ) ,
342
+ switchMap ( ( _ ) => {
343
+ const transactionObservable = of ( _ . message as T ) . pipe (
344
+ filter ( ( transaction ) => this . filterHash ( transaction , transactionHash ) ) ,
355
345
) ;
346
+ if ( subscribers . includes ( _ . channelParam . toUpperCase ( ) ) ) {
347
+ return transactionObservable ;
348
+ } else {
349
+ return transactionObservable . pipe (
350
+ filter (
351
+ ( transaction ) =>
352
+ transaction . isSigned ( unresolvedAddress ) || transaction . shouldNotifyAccount ( unresolvedAddress ) ,
353
+ ) ,
354
+ ) ;
355
+ }
356
356
} ) ,
357
357
) ;
358
358
} ) ,
@@ -414,18 +414,14 @@ export class Listener implements IListener {
414
414
transactionHash : string | undefined ,
415
415
subscribeMultisig = false ,
416
416
) : Observable < string > {
417
- return this . getResolvedAddress ( unresolvedAddress ) . pipe (
418
- mergeMap ( ( address : Address ) => {
419
- return this . subscribeWithMultig ( address , channel , subscribeMultisig ) . pipe (
420
- switchMap ( ( subscribers ) => {
421
- return this . messageSubject . asObservable ( ) . pipe (
422
- filter ( ( _ ) => _ . channelName === channel ) ,
423
- filter ( ( _ ) => typeof _ . message === 'string' ) ,
424
- filter ( ( _ ) => subscribers . includes ( _ . channelParam . toUpperCase ( ) ) ) ,
425
- map ( ( _ ) => _ . message as string ) ,
426
- filter ( ( _ ) => ! transactionHash || _ . toUpperCase ( ) == transactionHash . toUpperCase ( ) ) ,
427
- ) ;
428
- } ) ,
417
+ return this . subscribeWithMultig ( unresolvedAddress , channel , subscribeMultisig ) . pipe (
418
+ switchMap ( ( subscribers ) => {
419
+ return this . messageSubject . asObservable ( ) . pipe (
420
+ filter ( ( _ ) => _ . channelName === channel ) ,
421
+ filter ( ( _ ) => typeof _ . message === 'string' ) ,
422
+ filter ( ( _ ) => subscribers . includes ( _ . channelParam . toUpperCase ( ) ) ) ,
423
+ map ( ( _ ) => _ . message as string ) ,
424
+ filter ( ( _ ) => ! transactionHash || _ . toUpperCase ( ) == transactionHash . toUpperCase ( ) ) ,
429
425
) ;
430
426
} ) ,
431
427
) ;
@@ -469,39 +465,6 @@ export class Listener implements IListener {
469
465
}
470
466
}
471
467
472
- /**
473
- * It filters a transaction by address using the aliases.
474
- *
475
- * This method delegates the rest loading as much as possible. It tries to filter by signer first.
476
- *
477
- * Note: this filter performs one extra rest call and it should be down in the pipeline.
478
- *
479
- * @param address the address.
480
- * @return an observable filter.
481
- */
482
- private filterByNotifyAccount < T extends Transaction > ( address : Address ) : OperatorFunction < T , T > {
483
- return ( transactionObservable ) : Observable < T > => {
484
- return transactionObservable . pipe (
485
- flatMap ( ( transaction ) => {
486
- if ( transaction . isSigned ( address ) ) {
487
- return of ( transaction ) ;
488
- }
489
- const namespaceIdsObservable = this . namespaceRepository . getAccountsNames ( [ address ] ) . pipe (
490
- map ( ( names ) => {
491
- return ( [ ] as NamespaceName [ ] )
492
- . concat ( ...Array . from ( names . map ( ( accountName ) => accountName . names ) ) )
493
- . map ( ( name ) => name . namespaceId ) ;
494
- } ) ,
495
- ) ;
496
- return namespaceIdsObservable . pipe (
497
- filter ( ( namespaceIds ) => transaction . shouldNotifyAccount ( address , namespaceIds ) ) ,
498
- map ( ( ) => transaction ) ,
499
- ) ;
500
- } ) ,
501
- ) ;
502
- } ;
503
- }
504
-
505
468
/**
506
469
* Returns an observable stream of {@link CosignatureSignedTransaction} for specific address.
507
470
* Each time a cosigner signs a transaction the address initialized,
@@ -512,17 +475,13 @@ export class Listener implements IListener {
512
475
* @return an observable stream of {@link CosignatureSignedTransaction}
513
476
*/
514
477
public cosignatureAdded ( unresolvedAddress : UnresolvedAddress , subscribeMultisig = false ) : Observable < CosignatureSignedTransaction > {
515
- return this . getResolvedAddress ( unresolvedAddress ) . pipe (
516
- mergeMap ( ( address : Address ) => {
517
- return this . subscribeWithMultig ( address , ListenerChannelName . cosignature , subscribeMultisig ) . pipe (
518
- switchMap ( ( subscribers ) => {
519
- return this . messageSubject . asObservable ( ) . pipe (
520
- filter ( ( _ ) => _ . channelName . toUpperCase ( ) === ListenerChannelName . cosignature . toUpperCase ( ) ) ,
521
- filter ( ( _ ) => _ . message instanceof CosignatureSignedTransaction ) ,
522
- filter ( ( _ ) => subscribers . includes ( _ . channelParam . toUpperCase ( ) ) ) ,
523
- map ( ( _ ) => _ . message as CosignatureSignedTransaction ) ,
524
- ) ;
525
- } ) ,
478
+ return this . subscribeWithMultig ( unresolvedAddress , ListenerChannelName . cosignature , subscribeMultisig ) . pipe (
479
+ switchMap ( ( subscribers ) => {
480
+ return this . messageSubject . asObservable ( ) . pipe (
481
+ filter ( ( _ ) => _ . channelName . toUpperCase ( ) === ListenerChannelName . cosignature . toUpperCase ( ) ) ,
482
+ filter ( ( _ ) => _ . message instanceof CosignatureSignedTransaction ) ,
483
+ filter ( ( _ ) => subscribers . includes ( _ . channelParam . toUpperCase ( ) ) ) ,
484
+ map ( ( _ ) => _ . message as CosignatureSignedTransaction ) ,
526
485
) ;
527
486
} ) ,
528
487
) ;
@@ -604,22 +563,26 @@ export class Listener implements IListener {
604
563
* @param multisig subscribe multisig account
605
564
* @returns {string[] }
606
565
*/
607
- private subscribeWithMultig ( cosigner : Address , channel : ListenerChannelName , multisig = false ) : Observable < string [ ] > {
566
+ private subscribeWithMultig ( cosigner : UnresolvedAddress , channel : ListenerChannelName , multisig = false ) : Observable < string [ ] > {
608
567
if ( ! multisig ) {
609
568
this . subscribeTo ( `${ channel . toString ( ) } /${ cosigner . plain ( ) } ` ) ;
610
569
return of ( [ cosigner . plain ( ) ] ) ;
611
570
}
612
- return this . multisigRepository ! . getMultisigAccountInfo ( cosigner ) . pipe (
613
- map ( ( multisigInfo ) => {
614
- const subscribers = [ cosigner ] . concat ( multisigInfo . multisigAddresses ) ;
615
- subscribers . forEach ( ( m ) => {
616
- this . subscribeTo ( `${ channel . toString ( ) } /${ m . plain ( ) } ` ) ;
617
- } ) ;
618
- return subscribers . map ( ( m ) => m . plain ( ) ) ;
619
- } ) ,
620
- catchError ( ( ) => {
621
- this . subscribeTo ( `${ channel . toString ( ) } /${ cosigner . plain ( ) } ` ) ;
622
- return of ( [ cosigner . plain ( ) ] ) ;
571
+ return this . getResolvedAddress ( cosigner ) . pipe (
572
+ mergeMap ( ( address : Address ) => {
573
+ return this . multisigRepository ! . getMultisigAccountInfo ( address ) . pipe (
574
+ map ( ( multisigInfo ) => {
575
+ const subscribers = [ cosigner ] . concat ( multisigInfo . multisigAddresses ) ;
576
+ subscribers . forEach ( ( m ) => {
577
+ this . subscribeTo ( `${ channel . toString ( ) } /${ m . plain ( ) } ` ) ;
578
+ } ) ;
579
+ return subscribers . map ( ( m ) => m . plain ( ) ) ;
580
+ } ) ,
581
+ catchError ( ( ) => {
582
+ this . subscribeTo ( `${ channel . toString ( ) } /${ cosigner . plain ( ) } ` ) ;
583
+ return of ( [ cosigner . plain ( ) ] ) ;
584
+ } ) ,
585
+ ) ;
623
586
} ) ,
624
587
) ;
625
588
}
0 commit comments