@@ -2720,6 +2720,10 @@ pub struct ChannelManager<
2720
2720
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
2721
2721
pending_events_processor: AtomicBool,
2722
2722
2723
+ /// A simple atomic flag to ensure only one task at a time can be processing HTLC forwards via
2724
+ /// [`Self::process_pending_htlc_forwards`].
2725
+ pending_htlc_forwards_processor: AtomicBool,
2726
+
2723
2727
/// If we are running during init (either directly during the deserialization method or in
2724
2728
/// block connection methods which run after deserialization but before normal operation) we
2725
2729
/// cannot provide the user with [`ChannelMonitorUpdate`]s through the normal update flow -
@@ -3796,6 +3800,7 @@ where
3796
3800
3797
3801
pending_events: Mutex::new(VecDeque::new()),
3798
3802
pending_events_processor: AtomicBool::new(false),
3803
+ pending_htlc_forwards_processor: AtomicBool::new(false),
3799
3804
pending_background_events: Mutex::new(Vec::new()),
3800
3805
total_consistency_lock: RwLock::new(()),
3801
3806
background_events_processed_since_startup: AtomicBool::new(false),
@@ -6326,9 +6331,19 @@ where
6326
6331
///
6327
6332
/// Will regularly be called by the background processor.
6328
6333
pub fn process_pending_htlc_forwards(&self) {
6334
+ if self
6335
+ .pending_htlc_forwards_processor
6336
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
6337
+ .is_err()
6338
+ {
6339
+ return;
6340
+ }
6341
+
6329
6342
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
6330
6343
self.internal_process_pending_htlc_forwards()
6331
6344
});
6345
+
6346
+ self.pending_htlc_forwards_processor.store(false, Ordering::Release);
6332
6347
}
6333
6348
6334
6349
// Returns whether or not we need to re-persist.
@@ -16459,6 +16474,7 @@ where
16459
16474
16460
16475
pending_events: Mutex::new(pending_events_read),
16461
16476
pending_events_processor: AtomicBool::new(false),
16477
+ pending_htlc_forwards_processor: AtomicBool::new(false),
16462
16478
pending_background_events: Mutex::new(pending_background_events),
16463
16479
total_consistency_lock: RwLock::new(()),
16464
16480
background_events_processed_since_startup: AtomicBool::new(false),
0 commit comments