7
7
#![ deny( private_intra_doc_links) ]
8
8
9
9
#![ deny( missing_docs) ]
10
- #![ deny( unsafe_code) ]
10
+ #![ cfg_attr ( not ( feature = "futures" ) , deny( unsafe_code) ) ]
11
11
12
12
#![ cfg_attr( docsrs, feature( doc_auto_cfg) ) ]
13
13
@@ -26,18 +26,20 @@ use lightning::chain;
26
26
use lightning:: chain:: chaininterface:: { BroadcasterInterface , FeeEstimator } ;
27
27
use lightning:: chain:: chainmonitor:: { ChainMonitor , Persist } ;
28
28
use lightning:: chain:: keysinterface:: { EntropySource , NodeSigner , SignerProvider } ;
29
+ use lightning:: events:: { Event , PathFailure } ;
30
+ #[ cfg( feature = "std" ) ]
31
+ use lightning:: events:: { EventHandler , EventsProvider } ;
29
32
use lightning:: ln:: channelmanager:: ChannelManager ;
30
33
use lightning:: ln:: msgs:: { ChannelMessageHandler , OnionMessageHandler , RoutingMessageHandler } ;
31
34
use lightning:: ln:: peer_handler:: { CustomMessageHandler , PeerManager , SocketDescriptor } ;
32
35
use lightning:: routing:: gossip:: { NetworkGraph , P2PGossipSync } ;
33
36
use lightning:: routing:: utxo:: UtxoLookup ;
34
37
use lightning:: routing:: router:: Router ;
35
38
use lightning:: routing:: scoring:: { Score , WriteableScore } ;
36
- use lightning:: util:: events:: { Event , PathFailure } ;
37
- #[ cfg( feature = "std" ) ]
38
- use lightning:: util:: events:: { EventHandler , EventsProvider } ;
39
39
use lightning:: util:: logger:: Logger ;
40
40
use lightning:: util:: persist:: Persister ;
41
+ #[ cfg( feature = "std" ) ]
42
+ use lightning:: util:: wakers:: Sleeper ;
41
43
use lightning_rapid_gossip_sync:: RapidGossipSync ;
42
44
43
45
use core:: ops:: Deref ;
@@ -52,8 +54,6 @@ use std::thread::{self, JoinHandle};
52
54
#[ cfg( feature = "std" ) ]
53
55
use std:: time:: Instant ;
54
56
55
- #[ cfg( feature = "futures" ) ]
56
- use futures_util:: { select_biased, future:: FutureExt , task} ;
57
57
#[ cfg( not( feature = "std" ) ) ]
58
58
use alloc:: vec:: Vec ;
59
59
@@ -80,7 +80,7 @@ use alloc::vec::Vec;
80
80
/// unilateral chain closure fees are at risk.
81
81
///
82
82
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
83
- /// [`Event`]: lightning::util:: events::Event
83
+ /// [`Event`]: lightning::events::Event
84
84
#[ cfg( feature = "std" ) ]
85
85
#[ must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown." ]
86
86
pub struct BackgroundProcessor {
@@ -116,6 +116,13 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
116
116
#[ cfg( test) ]
117
117
const FIRST_NETWORK_PRUNE_TIMER : u64 = 1 ;
118
118
119
+ #[ cfg( feature = "futures" ) ]
120
+ /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
121
+ const fn min_u64 ( a : u64 , b : u64 ) -> u64 { if a < b { a } else { b } }
122
+ #[ cfg( feature = "futures" ) ]
123
+ const FASTEST_TIMER : u64 = min_u64 ( min_u64 ( FRESHNESS_TIMER , PING_TIMER ) ,
124
+ min_u64 ( SCORER_PERSIST_TIMER , FIRST_NETWORK_PRUNE_TIMER ) ) ;
125
+
119
126
/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
120
127
pub enum GossipSync <
121
128
P : Deref < Target = P2PGossipSync < G , U , L > > ,
@@ -258,7 +265,8 @@ macro_rules! define_run_body {
258
265
( $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
259
266
$channel_manager: ident, $process_channel_manager_events: expr,
260
267
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
261
- $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
268
+ $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
269
+ $check_slow_await: expr)
262
270
=> { {
263
271
log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
264
272
$channel_manager. timer_tick_occurred( ) ;
@@ -288,9 +296,10 @@ macro_rules! define_run_body {
288
296
289
297
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
290
298
// see `await_start`'s use below.
291
- let mut await_start = $get_timer( 1 ) ;
299
+ let mut await_start = None ;
300
+ if $check_slow_await { await_start = Some ( $get_timer( 1 ) ) ; }
292
301
let updates_available = $await;
293
- let await_slow = $ timer_elapsed( & mut await_start, 1 ) ;
302
+ let await_slow = if $check_slow_await { $ timer_elapsed( & mut await_start. unwrap ( ) , 1 ) } else { false } ;
294
303
295
304
if updates_available {
296
305
log_trace!( $logger, "Persisting ChannelManager..." ) ;
@@ -384,6 +393,59 @@ macro_rules! define_run_body {
384
393
} }
385
394
}
386
395
396
+ #[ cfg( feature = "futures" ) ]
397
+ pub ( crate ) mod futures_util {
398
+ use core:: future:: Future ;
399
+ use core:: task:: { Poll , Waker , RawWaker , RawWakerVTable } ;
400
+ use core:: pin:: Pin ;
401
+ use core:: marker:: Unpin ;
402
+ pub ( crate ) struct Selector <
403
+ A : Future < Output =( ) > + Unpin , B : Future < Output =( ) > + Unpin , C : Future < Output =bool > + Unpin
404
+ > {
405
+ pub a : A ,
406
+ pub b : B ,
407
+ pub c : C ,
408
+ }
409
+ pub ( crate ) enum SelectorOutput {
410
+ A , B , C ( bool ) ,
411
+ }
412
+
413
+ impl <
414
+ A : Future < Output =( ) > + Unpin , B : Future < Output =( ) > + Unpin , C : Future < Output =bool > + Unpin
415
+ > Future for Selector < A , B , C > {
416
+ type Output = SelectorOutput ;
417
+ fn poll ( mut self : Pin < & mut Self > , ctx : & mut core:: task:: Context < ' _ > ) -> Poll < SelectorOutput > {
418
+ match Pin :: new ( & mut self . a ) . poll ( ctx) {
419
+ Poll :: Ready ( ( ) ) => { return Poll :: Ready ( SelectorOutput :: A ) ; } ,
420
+ Poll :: Pending => { } ,
421
+ }
422
+ match Pin :: new ( & mut self . b ) . poll ( ctx) {
423
+ Poll :: Ready ( ( ) ) => { return Poll :: Ready ( SelectorOutput :: B ) ; } ,
424
+ Poll :: Pending => { } ,
425
+ }
426
+ match Pin :: new ( & mut self . c ) . poll ( ctx) {
427
+ Poll :: Ready ( res) => { return Poll :: Ready ( SelectorOutput :: C ( res) ) ; } ,
428
+ Poll :: Pending => { } ,
429
+ }
430
+ Poll :: Pending
431
+ }
432
+ }
433
+
434
+ // If we want to poll a future without an async context to figure out if it has completed or
435
+ // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
436
+ // but sadly there's a good bit of boilerplate here.
437
+ fn dummy_waker_clone ( _: * const ( ) ) -> RawWaker { RawWaker :: new ( core:: ptr:: null ( ) , & DUMMY_WAKER_VTABLE ) }
438
+ fn dummy_waker_action ( _: * const ( ) ) { }
439
+
440
+ const DUMMY_WAKER_VTABLE : RawWakerVTable = RawWakerVTable :: new (
441
+ dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action) ;
442
+ pub ( crate ) fn dummy_waker ( ) -> Waker { unsafe { Waker :: from_raw ( RawWaker :: new ( core:: ptr:: null ( ) , & DUMMY_WAKER_VTABLE ) ) } }
443
+ }
444
+ #[ cfg( feature = "futures" ) ]
445
+ use futures_util:: { Selector , SelectorOutput , dummy_waker} ;
446
+ #[ cfg( feature = "futures" ) ]
447
+ use core:: task;
448
+
387
449
/// Processes background events in a future.
388
450
///
389
451
/// `sleeper` should return a future which completes in the given amount of time and returns a
@@ -396,6 +458,11 @@ macro_rules! define_run_body {
396
458
/// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
397
459
/// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
398
460
/// manually instead.
461
+ ///
462
+ /// The `mobile_interruptable_platform` flag should be set if we're currently running on a
463
+ /// mobile device, where we may need to check for interruption of the application regularly. If you
464
+ /// are unsure, you should set the flag, as the performance impact of it is minimal unless there
465
+ /// are hundreds or thousands of simultaneous process calls running.
399
466
#[ cfg( feature = "futures" ) ]
400
467
pub async fn process_events_async <
401
468
' a ,
@@ -431,7 +498,7 @@ pub async fn process_events_async<
431
498
> (
432
499
persister : PS , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
433
500
gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM , logger : L , scorer : Option < S > ,
434
- sleeper : Sleeper ,
501
+ sleeper : Sleeper , mobile_interruptable_platform : bool ,
435
502
) -> Result < ( ) , lightning:: io:: Error >
436
503
where
437
504
UL :: Target : ' static + UtxoLookup ,
@@ -470,19 +537,25 @@ where
470
537
chain_monitor, chain_monitor. process_pending_events_async( async_event_handler) . await ,
471
538
channel_manager, channel_manager. process_pending_events_async( async_event_handler) . await ,
472
539
gossip_sync, peer_manager, logger, scorer, should_break, {
473
- select_biased! {
474
- _ = channel_manager. get_persistable_update_future( ) . fuse( ) => true ,
475
- exit = sleeper( Duration :: from_millis( 100 ) ) . fuse( ) => {
540
+ let fut = Selector {
541
+ a: channel_manager. get_persistable_update_future( ) ,
542
+ b: chain_monitor. get_update_future( ) ,
543
+ c: sleeper( if mobile_interruptable_platform { Duration :: from_millis( 100 ) } else { Duration :: from_secs( FASTEST_TIMER ) } ) ,
544
+ } ;
545
+ match fut. await {
546
+ SelectorOutput :: A => true ,
547
+ SelectorOutput :: B => false ,
548
+ SelectorOutput :: C ( exit) => {
476
549
should_break = exit;
477
550
false
478
551
}
479
552
}
480
553
} , |t| sleeper( Duration :: from_secs( t) ) ,
481
554
|fut: & mut SleepFuture , _| {
482
- let mut waker = task :: noop_waker ( ) ;
555
+ let mut waker = dummy_waker ( ) ;
483
556
let mut ctx = task:: Context :: from_waker( & mut waker) ;
484
557
core:: pin:: Pin :: new( fut) . poll( & mut ctx) . is_ready( )
485
- } )
558
+ } , mobile_interruptable_platform )
486
559
}
487
560
488
561
#[ cfg( feature = "std" ) ]
@@ -597,8 +670,11 @@ impl BackgroundProcessor {
597
670
define_run_body ! ( persister, chain_monitor, chain_monitor. process_pending_events( & event_handler) ,
598
671
channel_manager, channel_manager. process_pending_events( & event_handler) ,
599
672
gossip_sync, peer_manager, logger, scorer, stop_thread. load( Ordering :: Acquire ) ,
600
- channel_manager. await_persistable_update_timeout( Duration :: from_millis( 100 ) ) ,
601
- |_| Instant :: now( ) , |time: & Instant , dur| time. elapsed( ) . as_secs( ) > dur)
673
+ Sleeper :: from_two_futures(
674
+ channel_manager. get_persistable_update_future( ) ,
675
+ chain_monitor. get_update_future( )
676
+ ) . wait_timeout( Duration :: from_millis( 100 ) ) ,
677
+ |_| Instant :: now( ) , |time: & Instant , dur| time. elapsed( ) . as_secs( ) > dur, false )
602
678
} ) ;
603
679
Self { stop_thread : stop_thread_clone, thread_handle : Some ( handle) }
604
680
}
@@ -663,7 +739,8 @@ mod tests {
663
739
use lightning:: chain:: channelmonitor:: ANTI_REORG_DELAY ;
664
740
use lightning:: chain:: keysinterface:: { InMemorySigner , KeysManager } ;
665
741
use lightning:: chain:: transaction:: OutPoint ;
666
- use lightning:: get_event_msg;
742
+ use lightning:: events:: { Event , PathFailure , MessageSendEventsProvider , MessageSendEvent } ;
743
+ use lightning:: { get_event_msg, get_event} ;
667
744
use lightning:: ln:: PaymentHash ;
668
745
use lightning:: ln:: channelmanager;
669
746
use lightning:: ln:: channelmanager:: { BREAKDOWN_TIMEOUT , ChainParameters , MIN_CLTV_EXPIRY_DELTA , PaymentId } ;
@@ -674,7 +751,6 @@ mod tests {
674
751
use lightning:: routing:: router:: { DefaultRouter , RouteHop } ;
675
752
use lightning:: routing:: scoring:: { ChannelUsage , Score } ;
676
753
use lightning:: util:: config:: UserConfig ;
677
- use lightning:: util:: events:: { Event , PathFailure , MessageSendEventsProvider , MessageSendEvent } ;
678
754
use lightning:: util:: ser:: Writeable ;
679
755
use lightning:: util:: test_utils;
680
756
use lightning:: util:: persist:: KVStorePersister ;
@@ -1012,7 +1088,10 @@ mod tests {
1012
1088
( $node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => { {
1013
1089
$node_a. node. funding_transaction_generated( & $temporary_channel_id, & $node_b. node. get_our_node_id( ) , $tx. clone( ) ) . unwrap( ) ;
1014
1090
$node_b. node. handle_funding_created( & $node_a. node. get_our_node_id( ) , & get_event_msg!( $node_a, MessageSendEvent :: SendFundingCreated , $node_b. node. get_our_node_id( ) ) ) ;
1091
+ get_event!( $node_b, Event :: ChannelPending ) ;
1092
+
1015
1093
$node_a. node. handle_funding_signed( & $node_b. node. get_our_node_id( ) , & get_event_msg!( $node_b, MessageSendEvent :: SendFundingSigned , $node_a. node. get_our_node_id( ) ) ) ;
1094
+ get_event!( $node_a, Event :: ChannelPending ) ;
1016
1095
} }
1017
1096
}
1018
1097
0 commit comments