@@ -42,11 +42,11 @@ use lightning::sign::EntropySource;
42
42
use lightning:: sign:: OutputSpender ;
43
43
use lightning:: util:: logger:: Logger ;
44
44
use lightning:: util:: persist:: {
45
- KVStoreSync , CHANNEL_MANAGER_PERSISTENCE_KEY , CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
46
- CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE , NETWORK_GRAPH_PERSISTENCE_KEY ,
47
- NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE , NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
48
- SCORER_PERSISTENCE_KEY , SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
49
- SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
45
+ KVStore , KVStoreSync , KVStoreSyncWrapper , CHANNEL_MANAGER_PERSISTENCE_KEY ,
46
+ CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE , CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
47
+ NETWORK_GRAPH_PERSISTENCE_KEY , NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
48
+ NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE , SCORER_PERSISTENCE_KEY ,
49
+ SCORER_PERSISTENCE_PRIMARY_NAMESPACE , SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
50
50
} ;
51
51
use lightning:: util:: sweep:: OutputSweeper ;
52
52
#[ cfg( feature = "std" ) ]
@@ -318,6 +318,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
318
318
true
319
319
}
320
320
321
+ macro_rules! maybe_await {
322
+ ( true , $e: expr) => {
323
+ $e. await
324
+ } ;
325
+ ( false , $e: expr) => {
326
+ $e
327
+ } ;
328
+ }
329
+
321
330
macro_rules! define_run_body {
322
331
(
323
332
$kv_store: ident,
@@ -327,7 +336,7 @@ macro_rules! define_run_body {
327
336
$peer_manager: ident, $gossip_sync: ident,
328
337
$process_sweeper: expr,
329
338
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
330
- $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
339
+ $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async_persist : tt ,
331
340
) => { {
332
341
log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
333
342
$channel_manager. get_cm( ) . timer_tick_occurred( ) ;
@@ -383,12 +392,12 @@ macro_rules! define_run_body {
383
392
384
393
if $channel_manager. get_cm( ) . get_and_clear_needs_persistence( ) {
385
394
log_trace!( $logger, "Persisting ChannelManager..." ) ;
386
- $kv_store. write(
395
+ maybe_await! ( $async_persist , $kv_store. write(
387
396
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
388
397
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
389
398
CHANNEL_MANAGER_PERSISTENCE_KEY ,
390
399
& $channel_manager. get_cm( ) . encode( ) ,
391
- ) ?;
400
+ ) ) ?;
392
401
log_trace!( $logger, "Done persisting ChannelManager." ) ;
393
402
}
394
403
if $timer_elapsed( & mut last_freshness_call, FRESHNESS_TIMER ) {
@@ -449,12 +458,12 @@ macro_rules! define_run_body {
449
458
log_trace!( $logger, "Persisting network graph." ) ;
450
459
}
451
460
452
- if let Err ( e) = $kv_store. write(
461
+ if let Err ( e) = maybe_await! ( $async_persist , $kv_store. write(
453
462
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
454
463
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
455
464
NETWORK_GRAPH_PERSISTENCE_KEY ,
456
465
& network_graph. encode( ) ,
457
- ) {
466
+ ) ) {
458
467
log_error!( $logger, "Error: Failed to persist network graph, check your disk and permissions {}" , e)
459
468
}
460
469
@@ -482,12 +491,12 @@ macro_rules! define_run_body {
482
491
} else {
483
492
log_trace!( $logger, "Persisting scorer" ) ;
484
493
}
485
- if let Err ( e) = $kv_store. write(
494
+ if let Err ( e) = maybe_await! ( $async_persist , $kv_store. write(
486
495
SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
487
496
SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
488
497
SCORER_PERSISTENCE_KEY ,
489
498
& scorer. encode( ) ,
490
- ) {
499
+ ) ) {
491
500
log_error!( $logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
492
501
}
493
502
}
@@ -510,31 +519,31 @@ macro_rules! define_run_body {
510
519
// After we exit, ensure we persist the ChannelManager one final time - this avoids
511
520
// some races where users quit while channel updates were in-flight, with
512
521
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
513
- $kv_store. write(
522
+ maybe_await! ( $async_persist , $kv_store. write(
514
523
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
515
524
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
516
525
CHANNEL_MANAGER_PERSISTENCE_KEY ,
517
526
& $channel_manager. get_cm( ) . encode( ) ,
518
- ) ?;
527
+ ) ) ?;
519
528
520
529
// Persist Scorer on exit
521
530
if let Some ( ref scorer) = $scorer {
522
- $kv_store. write(
531
+ maybe_await! ( $async_persist , $kv_store. write(
523
532
SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
524
533
SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
525
534
SCORER_PERSISTENCE_KEY ,
526
535
& scorer. encode( ) ,
527
- ) ?;
536
+ ) ) ?;
528
537
}
529
538
530
539
// Persist NetworkGraph on exit
531
540
if let Some ( network_graph) = $gossip_sync. network_graph( ) {
532
- $kv_store. write(
541
+ maybe_await! ( $async_persist , $kv_store. write(
533
542
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
534
543
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
535
544
NETWORK_GRAPH_PERSISTENCE_KEY ,
536
545
& network_graph. encode( ) ,
537
- ) ?;
546
+ ) ) ?;
538
547
}
539
548
540
549
Ok ( ( ) )
@@ -681,11 +690,12 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
681
690
/// ```
682
691
/// # use lightning::io;
683
692
/// # use lightning::events::ReplayEvent;
684
- /// # use lightning::util::sweep::OutputSweeper;
685
693
/// # use std::sync::{Arc, RwLock};
686
694
/// # use std::sync::atomic::{AtomicBool, Ordering};
687
695
/// # use std::time::SystemTime;
688
- /// # use lightning_background_processor::{process_events_async, GossipSync};
696
+ /// # use lightning_background_processor::{process_events_full_async, GossipSync};
697
+ /// # use core::future::Future;
698
+ /// # use core::pin::Pin;
689
699
/// # struct Logger {}
690
700
/// # impl lightning::util::logger::Logger for Logger {
691
701
/// # fn log(&self, _record: lightning::util::logger::Record) {}
@@ -697,6 +707,13 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
697
707
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
698
708
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
699
709
/// # }
710
+ /// # struct Store {}
711
+ /// # impl lightning::util::persist::KVStore for Store {
712
+ /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
713
+ /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
714
+ /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
715
+ /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
716
+ /// # }
700
717
/// # struct EventHandler {}
701
718
/// # impl EventHandler {
702
719
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
@@ -715,7 +732,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
715
732
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
716
733
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
717
734
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
718
- /// #
735
+ /// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
736
+ ///
719
737
/// # struct Node<
720
738
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
721
739
/// # F: lightning::chain::Filter + Send + Sync + 'static,
@@ -731,10 +749,10 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
731
749
/// # liquidity_manager: Arc<LiquidityManager<B, F, FE>>,
732
750
/// # chain_monitor: Arc<ChainMonitor<B, F, FE>>,
733
751
/// # gossip_sync: Arc<P2PGossipSync<UL>>,
734
- /// # persister: Arc<StoreSync >,
752
+ /// # persister: Arc<Store >,
735
753
/// # logger: Arc<Logger>,
736
754
/// # scorer: Arc<Scorer>,
737
- /// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O> >>,
755
+ /// # sweeper: Arc<OutputSweeper<B, D, FE, F, O >>,
738
756
/// # }
739
757
/// #
740
758
/// # async fn setup_background_processing<
@@ -780,7 +798,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
780
798
doc = " let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();"
781
799
) ]
782
800
#[ cfg_attr( not( feature = "std" ) , doc = " rt.block_on(async move {" ) ]
783
- /// process_events_async (
801
+ /// process_events_full_async (
784
802
/// background_persister,
785
803
/// |e| background_event_handler.handle_event(e),
786
804
/// background_chain_mon,
@@ -805,7 +823,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
805
823
#[ cfg_attr( feature = "std" , doc = " handle.await.unwrap()" ) ]
806
824
/// # }
807
825
///```
808
- pub async fn process_events_async <
826
+ pub async fn process_events_full_async <
809
827
' a ,
810
828
UL : ' static + Deref ,
811
829
CF : ' static + Deref ,
@@ -856,7 +874,7 @@ where
856
874
LM :: Target : ALiquidityManager ,
857
875
O :: Target : ' static + OutputSpender ,
858
876
D :: Target : ' static + ChangeDestinationSource ,
859
- K :: Target : ' static + KVStoreSync ,
877
+ K :: Target : ' static + KVStore ,
860
878
{
861
879
let mut should_break = false ;
862
880
let async_event_handler = |event| {
@@ -875,12 +893,15 @@ where
875
893
if let Some ( duration_since_epoch) = fetch_time ( ) {
876
894
if update_scorer ( scorer, & event, duration_since_epoch) {
877
895
log_trace ! ( logger, "Persisting scorer after update" ) ;
878
- if let Err ( e) = kv_store. write (
879
- SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
880
- SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
881
- SCORER_PERSISTENCE_KEY ,
882
- & scorer. encode ( ) ,
883
- ) {
896
+ if let Err ( e) = kv_store
897
+ . write (
898
+ SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
899
+ SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
900
+ SCORER_PERSISTENCE_KEY ,
901
+ & scorer. encode ( ) ,
902
+ )
903
+ . await
904
+ {
884
905
log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e) ;
885
906
// We opt not to abort early on persistence failure here as persisting
886
907
// the scorer is non-critical and we still hope that it will have
@@ -958,7 +979,82 @@ where
958
979
} ,
959
980
mobile_interruptable_platform,
960
981
fetch_time,
982
+ true ,
983
+ )
984
+ }
985
+
986
+ /// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
987
+ /// synchronous background persistence.
988
+ pub async fn process_events_async <
989
+ UL : ' static + Deref ,
990
+ CF : ' static + Deref ,
991
+ T : ' static + Deref ,
992
+ F : ' static + Deref ,
993
+ G : ' static + Deref < Target = NetworkGraph < L > > ,
994
+ L : ' static + Deref + Send + Sync ,
995
+ P : ' static + Deref ,
996
+ EventHandlerFuture : core:: future:: Future < Output = Result < ( ) , ReplayEvent > > ,
997
+ EventHandler : Fn ( Event ) -> EventHandlerFuture ,
998
+ ES : ' static + Deref + Send ,
999
+ M : ' static
1000
+ + Deref < Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P , ES > >
1001
+ + Send
1002
+ + Sync ,
1003
+ CM : ' static + Deref + Send + Sync ,
1004
+ OM : ' static + Deref ,
1005
+ PGS : ' static + Deref < Target = P2PGossipSync < G , UL , L > > ,
1006
+ RGS : ' static + Deref < Target = RapidGossipSync < G , L > > ,
1007
+ PM : ' static + Deref ,
1008
+ LM : ' static + Deref ,
1009
+ D : ' static + Deref ,
1010
+ O : ' static + Deref ,
1011
+ K : ' static + Deref ,
1012
+ OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , KVStoreSyncWrapper < K > , L , O > > ,
1013
+ S : ' static + Deref < Target = SC > + Send + Sync ,
1014
+ SC : for < ' b > WriteableScore < ' b > ,
1015
+ SleepFuture : core:: future:: Future < Output = bool > + core:: marker:: Unpin ,
1016
+ Sleeper : Fn ( Duration ) -> SleepFuture ,
1017
+ FetchTime : Fn ( ) -> Option < Duration > ,
1018
+ > (
1019
+ kv_store : K , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
1020
+ onion_messenger : Option < OM > , gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM ,
1021
+ liquidity_manager : Option < LM > , sweeper : Option < OS > , logger : L , scorer : Option < S > ,
1022
+ sleeper : Sleeper , mobile_interruptable_platform : bool , fetch_time : FetchTime ,
1023
+ ) -> Result < ( ) , lightning:: io:: Error >
1024
+ where
1025
+ UL :: Target : ' static + UtxoLookup ,
1026
+ CF :: Target : ' static + chain:: Filter ,
1027
+ T :: Target : ' static + BroadcasterInterface ,
1028
+ F :: Target : ' static + FeeEstimator ,
1029
+ L :: Target : ' static + Logger ,
1030
+ P :: Target : ' static + Persist < <CM :: Target as AChannelManager >:: Signer > ,
1031
+ ES :: Target : ' static + EntropySource ,
1032
+ CM :: Target : AChannelManager ,
1033
+ OM :: Target : AOnionMessenger ,
1034
+ PM :: Target : APeerManager ,
1035
+ LM :: Target : ALiquidityManager ,
1036
+ O :: Target : ' static + OutputSpender ,
1037
+ D :: Target : ' static + ChangeDestinationSource ,
1038
+ K :: Target : ' static + KVStoreSync ,
1039
+ {
1040
+ let kv_store = KVStoreSyncWrapper ( kv_store) ;
1041
+ process_events_full_async (
1042
+ kv_store,
1043
+ event_handler,
1044
+ chain_monitor,
1045
+ channel_manager,
1046
+ onion_messenger,
1047
+ gossip_sync,
1048
+ peer_manager,
1049
+ liquidity_manager,
1050
+ sweeper,
1051
+ logger,
1052
+ scorer,
1053
+ sleeper,
1054
+ mobile_interruptable_platform,
1055
+ fetch_time,
961
1056
)
1057
+ . await
962
1058
}
963
1059
964
1060
#[ cfg( feature = "std" ) ]
@@ -1138,6 +1234,7 @@ impl BackgroundProcessor {
1138
1234
. expect( "Time should be sometime after 1970" ) ,
1139
1235
)
1140
1236
} ,
1237
+ false ,
1141
1238
)
1142
1239
} ) ;
1143
1240
Self { stop_thread : stop_thread_clone, thread_handle : Some ( handle) }
@@ -1226,7 +1323,7 @@ mod tests {
1226
1323
use lightning:: types:: payment:: PaymentHash ;
1227
1324
use lightning:: util:: config:: UserConfig ;
1228
1325
use lightning:: util:: persist:: {
1229
- KVStoreSync , CHANNEL_MANAGER_PERSISTENCE_KEY ,
1326
+ KVStoreSync , KVStoreSyncWrapper , CHANNEL_MANAGER_PERSISTENCE_KEY ,
1230
1327
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
1231
1328
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE , NETWORK_GRAPH_PERSISTENCE_KEY ,
1232
1329
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE , NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
@@ -2151,12 +2248,13 @@ mod tests {
2151
2248
open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
2152
2249
2153
2250
let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
2154
- let persister = Arc :: new (
2251
+ let kv_store_sync = Arc :: new (
2155
2252
Persister :: new ( data_dir) . with_manager_error ( std:: io:: ErrorKind :: Other , "test" ) ,
2156
2253
) ;
2254
+ let kv_store = Arc :: new ( KVStoreSyncWrapper ( kv_store_sync) ) ;
2157
2255
2158
- let bp_future = super :: process_events_async (
2159
- persister ,
2256
+ let bp_future = super :: process_events_full_async (
2257
+ kv_store ,
2160
2258
|_: _ | async { Ok ( ( ) ) } ,
2161
2259
Arc :: clone ( & nodes[ 0 ] . chain_monitor ) ,
2162
2260
Arc :: clone ( & nodes[ 0 ] . node ) ,
@@ -2659,11 +2757,13 @@ mod tests {
2659
2757
let ( _, nodes) =
2660
2758
create_nodes ( 2 , "test_not_pruning_network_graph_until_graph_sync_completion_async" ) ;
2661
2759
let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
2662
- let persister = Arc :: new ( Persister :: new ( data_dir) . with_graph_persistence_notifier ( sender) ) ;
2760
+ let kv_store_sync =
2761
+ Arc :: new ( Persister :: new ( data_dir) . with_graph_persistence_notifier ( sender) ) ;
2762
+ let kv_store = Arc :: new ( KVStoreSyncWrapper ( kv_store_sync) ) ;
2663
2763
2664
2764
let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
2665
- let bp_future = super :: process_events_async (
2666
- persister ,
2765
+ let bp_future = super :: process_events_full_async (
2766
+ kv_store ,
2667
2767
|_: _ | async { Ok ( ( ) ) } ,
2668
2768
Arc :: clone ( & nodes[ 0 ] . chain_monitor ) ,
2669
2769
Arc :: clone ( & nodes[ 0 ] . node ) ,
@@ -2875,12 +2975,13 @@ mod tests {
2875
2975
2876
2976
let ( _, nodes) = create_nodes ( 1 , "test_payment_path_scoring_async" ) ;
2877
2977
let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
2878
- let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
2978
+ let kv_store_sync = Arc :: new ( Persister :: new ( data_dir) ) ;
2979
+ let kv_store = Arc :: new ( KVStoreSyncWrapper ( kv_store_sync) ) ;
2879
2980
2880
2981
let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
2881
2982
2882
- let bp_future = super :: process_events_async (
2883
- persister ,
2983
+ let bp_future = super :: process_events_full_async (
2984
+ kv_store ,
2884
2985
event_handler,
2885
2986
Arc :: clone ( & nodes[ 0 ] . chain_monitor ) ,
2886
2987
Arc :: clone ( & nodes[ 0 ] . node ) ,
0 commit comments