@@ -2720,6 +2720,10 @@ pub struct ChannelManager<
2720
2720
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
2721
2721
pending_events_processor: AtomicBool,
2722
2722
2723
+ /// A simple atomic flag to ensure only one task at a time can be processing HTLC forwards via
2724
+ /// [`Self::process_pending_htlc_forwards`].
2725
+ pending_htlc_forwards_processor: AtomicBool,
2726
+
2723
2727
/// If we are running during init (either directly during the deserialization method or in
2724
2728
/// block connection methods which run after deserialization but before normal operation) we
2725
2729
/// cannot provide the user with [`ChannelMonitorUpdate`]s through the normal update flow -
@@ -3796,6 +3800,7 @@ where
3796
3800
3797
3801
pending_events: Mutex::new(VecDeque::new()),
3798
3802
pending_events_processor: AtomicBool::new(false),
3803
+ pending_htlc_forwards_processor: AtomicBool::new(false),
3799
3804
pending_background_events: Mutex::new(Vec::new()),
3800
3805
total_consistency_lock: RwLock::new(()),
3801
3806
background_events_processed_since_startup: AtomicBool::new(false),
@@ -6329,9 +6334,19 @@ where
6329
6334
/// Users implementing their own background processing logic should call this in irregular,
6330
6335
/// randomly-distributed intervals.
6331
6336
pub fn process_pending_htlc_forwards(&self) {
6337
+ if self
6338
+ .pending_htlc_forwards_processor
6339
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
6340
+ .is_err()
6341
+ {
6342
+ return;
6343
+ }
6344
+
6332
6345
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
6333
6346
self.internal_process_pending_htlc_forwards()
6334
6347
});
6348
+
6349
+ self.pending_htlc_forwards_processor.store(false, Ordering::Release);
6335
6350
}
6336
6351
6337
6352
// Returns whether or not we need to re-persist.
@@ -16462,6 +16477,7 @@ where
16462
16477
16463
16478
pending_events: Mutex::new(pending_events_read),
16464
16479
pending_events_processor: AtomicBool::new(false),
16480
+ pending_htlc_forwards_processor: AtomicBool::new(false),
16465
16481
pending_background_events: Mutex::new(pending_background_events),
16466
16482
total_consistency_lock: RwLock::new(()),
16467
16483
background_events_processed_since_startup: AtomicBool::new(false),
0 commit comments