1
1
use crate :: { http_retry:: Client , number_or_string, AccountDetails , AccountSettings , NodeStore } ;
2
2
use bytes:: Bytes ;
3
- use futures:: { future:: join_all, TryFutureExt } ;
3
+ use futures:: { future:: join_all, Future , FutureExt , StreamExt , TryFutureExt } ;
4
4
use interledger_btp:: { connect_to_service_account, BtpAccount , BtpOutgoingService } ;
5
5
use interledger_ccp:: { CcpRoutingAccount , Mode , RouteControlRequest , RoutingRelation } ;
6
6
use interledger_http:: { deserialize_json, error:: * , HttpAccount , HttpStore } ;
@@ -13,7 +13,7 @@ use interledger_service::{
13
13
use interledger_service_util:: { BalanceStore , ExchangeRateStore } ;
14
14
use interledger_settlement:: core:: types:: SettlementAccount ;
15
15
use interledger_spsp:: { pay, SpspResponder } ;
16
- use interledger_stream:: StreamNotificationsStore ;
16
+ use interledger_stream:: { PaymentNotification , StreamNotificationsStore } ;
17
17
use log:: { debug, error, trace} ;
18
18
use secrecy:: { ExposeSecret , SecretString } ;
19
19
use serde:: { Deserialize , Serialize } ;
@@ -345,20 +345,9 @@ where
345
345
. and ( warp:: path:: end ( ) )
346
346
. and ( warp:: ws ( ) )
347
347
. and ( with_store. clone ( ) )
348
- . map ( |_id : Uuid , ws : warp:: ws:: Ws , _store : S | {
349
- ws. on_upgrade ( move |_ws : warp:: ws:: WebSocket | {
350
- async {
351
- // TODO: Implement this.
352
- unimplemented ! ( )
353
- // let (tx, rx) = futures::channel::mpsc::unbounded::<PaymentNotification>();
354
- // store.add_payment_notification_subscription(id, tx);
355
- // rx.map_err(|_| -> warp::Error { unreachable!("unbounded rx never errors") })
356
- // .map(|notification| {
357
- // warp::ws::Message::text(serde_json::to_string(¬ification).unwrap())
358
- // })
359
- // .map(|_| ())
360
- // .map_err(|err| error!("Error forwarding notifications to websocket: {:?}", err))
361
- }
348
+ . map ( |id : Uuid , ws : warp:: ws:: Ws , store : S | {
349
+ ws. on_upgrade ( move |ws : warp:: ws:: WebSocket | {
350
+ notify_user ( ws, id, store) . map ( |result| result. unwrap ( ) )
362
351
} )
363
352
} )
364
353
. boxed ( ) ;
@@ -474,11 +463,37 @@ where
474
463
. or ( get_account)
475
464
. or ( get_account_balance)
476
465
. or ( put_account_settings)
477
- . or ( incoming_payment_notifications) // Commented out until tungenstite ws support is added
466
+ . or ( incoming_payment_notifications)
478
467
. or ( post_payments)
479
468
. boxed ( )
480
469
}
481
470
471
+ fn notify_user (
472
+ socket : warp:: ws:: WebSocket ,
473
+ id : Uuid ,
474
+ store : impl StreamNotificationsStore ,
475
+ ) -> impl Future < Output = Result < ( ) , ( ) > > {
476
+ let ( tx, rx) = futures:: channel:: mpsc:: unbounded :: < PaymentNotification > ( ) ;
477
+ // the client is now subscribed
478
+ store. add_payment_notification_subscription ( id, tx) ;
479
+
480
+ // Anytime something is written to tx, it will reach rx
481
+ // and get converted to a warp::ws::Message
482
+ let rx = rx. map ( |notification : PaymentNotification | {
483
+ let msg = warp:: ws:: Message :: text ( serde_json:: to_string ( & notification) . unwrap ( ) ) ;
484
+ Ok ( msg)
485
+ } ) ;
486
+
487
+ // Then it gets forwarded to the client
488
+ rx. forward ( socket)
489
+ . map ( |result| {
490
+ if let Err ( e) = result {
491
+ eprintln ! ( "websocket send error: {}" , e) ;
492
+ }
493
+ } )
494
+ . then ( futures:: future:: ok)
495
+ }
496
+
482
497
async fn get_address_from_parent_and_update_routes < O , A , S > (
483
498
mut service : O ,
484
499
parent : A ,
0 commit comments