Skip to content

Commit 2d26679

Browse files
authored
Merge pull request #2723 from jkczyz/2023-11-direct-connect
Direct connect for `OnionMessage` sending
2 parents 6199406 + 0b83116 commit 2d26679

File tree

7 files changed

+611
-229
lines changed

7 files changed

+611
-229
lines changed

fuzz/src/onion_message.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ impl MessageRouter for TestMessageRouter {
7979
Ok(OnionMessagePath {
8080
intermediate_nodes: vec![],
8181
destination,
82+
addresses: None,
8283
})
8384
}
8485
}
@@ -269,7 +270,9 @@ mod tests {
269270
"Received an onion message with path_id None and a reply_path: Custom(TestCustomMessage)"
270271
.to_string())), Some(&1));
271272
assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(),
272-
"Sending onion message: TestCustomMessage".to_string())), Some(&1));
273+
"Constructing onion message when responding to Custom onion message with path_id None: TestCustomMessage".to_string())), Some(&1));
274+
assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(),
275+
"Buffered onion message when responding to Custom onion message with path_id None".to_string())), Some(&1));
273276
}
274277

275278
let two_unblinded_hops_om = "\

lightning-background-processor/src/lib.rs

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use lightning::events::{Event, PathFailure};
3030
#[cfg(feature = "std")]
3131
use lightning::events::{EventHandler, EventsProvider};
3232
use lightning::ln::channelmanager::ChannelManager;
33+
use lightning::ln::msgs::OnionMessageHandler;
3334
use lightning::ln::peer_handler::APeerManager;
3435
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
3536
use lightning::routing::utxo::UtxoLookup;
@@ -104,6 +105,11 @@ const PING_TIMER: u64 = 30;
104105
#[cfg(test)]
105106
const PING_TIMER: u64 = 1;
106107

108+
#[cfg(not(test))]
109+
const ONION_MESSAGE_HANDLER_TIMER: u64 = 10;
110+
#[cfg(test)]
111+
const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;
112+
107113
/// Prune the network graph of stale entries hourly.
108114
const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
109115

@@ -270,18 +276,20 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
270276
}
271277

272278
macro_rules! define_run_body {
273-
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
274-
$channel_manager: ident, $process_channel_manager_events: expr,
275-
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
276-
$loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
277-
$check_slow_await: expr)
278-
=> { {
279+
(
280+
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
281+
$channel_manager: ident, $process_channel_manager_events: expr,
282+
$peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
283+
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
284+
$timer_elapsed: expr, $check_slow_await: expr
285+
) => { {
279286
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
280287
$channel_manager.timer_tick_occurred();
281288
log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
282289
$chain_monitor.rebroadcast_pending_claims();
283290

284291
let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
292+
let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
285293
let mut last_ping_call = $get_timer(PING_TIMER);
286294
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
287295
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
@@ -291,6 +299,7 @@ macro_rules! define_run_body {
291299
loop {
292300
$process_channel_manager_events;
293301
$process_chain_monitor_events;
302+
$process_onion_message_handler_events;
294303

295304
// Note that the PeerManager::process_events may block on ChannelManager's locks,
296305
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -334,6 +343,11 @@ macro_rules! define_run_body {
334343
$channel_manager.timer_tick_occurred();
335344
last_freshness_call = $get_timer(FRESHNESS_TIMER);
336345
}
346+
if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
347+
log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
348+
$peer_manager.onion_message_handler().timer_tick_occurred();
349+
last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
350+
}
337351
if await_slow {
338352
// On various platforms, we may be starved of CPU cycles for several reasons.
339353
// E.g. on iOS, if we've been in the background, we will be entirely paused.
@@ -603,8 +617,7 @@ pub async fn process_events_async<
603617
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
604618
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
605619
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
606-
APM: APeerManager + Send + Sync,
607-
PM: 'static + Deref<Target = APM> + Send + Sync,
620+
PM: 'static + Deref + Send + Sync,
608621
S: 'static + Deref<Target = SC> + Send + Sync,
609622
SC: for<'b> WriteableScore<'b>,
610623
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -627,6 +640,7 @@ where
627640
L::Target: 'static + Logger,
628641
P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
629642
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
643+
PM::Target: APeerManager + Send + Sync,
630644
{
631645
let mut should_break = false;
632646
let async_event_handler = |event| {
@@ -650,10 +664,12 @@ where
650664
event_handler(event).await;
651665
}
652666
};
653-
define_run_body!(persister,
654-
chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
667+
define_run_body!(
668+
persister, chain_monitor,
669+
chain_monitor.process_pending_events_async(async_event_handler).await,
655670
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
656-
gossip_sync, peer_manager, logger, scorer, should_break, {
671+
peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
672+
gossip_sync, logger, scorer, should_break, {
657673
let fut = Selector {
658674
a: channel_manager.get_event_or_persistence_needed_future(),
659675
b: chain_monitor.get_update_future(),
@@ -673,7 +689,29 @@ where
673689
task::Poll::Ready(exit) => { should_break = exit; true },
674690
task::Poll::Pending => false,
675691
}
676-
}, mobile_interruptable_platform)
692+
}, mobile_interruptable_platform
693+
)
694+
}
695+
696+
#[cfg(feature = "futures")]
697+
async fn process_onion_message_handler_events_async<
698+
EventHandlerFuture: core::future::Future<Output = ()>,
699+
EventHandler: Fn(Event) -> EventHandlerFuture,
700+
PM: 'static + Deref + Send + Sync,
701+
>(
702+
peer_manager: &PM, handler: EventHandler
703+
)
704+
where
705+
PM::Target: APeerManager + Send + Sync,
706+
{
707+
use lightning::events::EventsProvider;
708+
709+
let events = core::cell::RefCell::new(Vec::new());
710+
peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
711+
712+
for event in events.into_inner() {
713+
handler(event).await
714+
}
677715
}
678716

679717
#[cfg(feature = "std")]
@@ -742,8 +780,7 @@ impl BackgroundProcessor {
742780
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
743781
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
744782
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
745-
APM: APeerManager + Send + Sync,
746-
PM: 'static + Deref<Target = APM> + Send + Sync,
783+
PM: 'static + Deref + Send + Sync,
747784
S: 'static + Deref<Target = SC> + Send + Sync,
748785
SC: for <'b> WriteableScore<'b>,
749786
>(
@@ -763,6 +800,7 @@ impl BackgroundProcessor {
763800
L::Target: 'static + Logger,
764801
P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
765802
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
803+
PM::Target: APeerManager + Send + Sync,
766804
{
767805
let stop_thread = Arc::new(AtomicBool::new(false));
768806
let stop_thread_clone = stop_thread.clone();
@@ -782,14 +820,18 @@ impl BackgroundProcessor {
782820
}
783821
event_handler.handle_event(event);
784822
};
785-
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
823+
define_run_body!(
824+
persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
786825
channel_manager, channel_manager.process_pending_events(&event_handler),
787-
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
826+
peer_manager,
827+
peer_manager.onion_message_handler().process_pending_events(&event_handler),
828+
gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
788829
{ Sleeper::from_two_futures(
789830
channel_manager.get_event_or_persistence_needed_future(),
790831
chain_monitor.get_update_future()
791832
).wait_timeout(Duration::from_millis(100)); },
792-
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
833+
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false
834+
)
793835
});
794836
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
795837
}
@@ -1362,9 +1404,11 @@ mod tests {
13621404

13631405
#[test]
13641406
fn test_timer_tick_called() {
1365-
// Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
1366-
// `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
1367-
// `PeerManager::timer_tick_occurred` every `PING_TIMER`.
1407+
// Test that:
1408+
// - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
1409+
// - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`,
1410+
// - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and
1411+
// - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`.
13681412
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
13691413
let data_dir = nodes[0].kv_store.get_data_dir();
13701414
let persister = Arc::new(Persister::new(data_dir));
@@ -1375,9 +1419,11 @@ mod tests {
13751419
let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
13761420
let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
13771421
let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
1422+
let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
13781423
if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() &&
13791424
log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() &&
1380-
log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() {
1425+
log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() &&
1426+
log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() {
13811427
break
13821428
}
13831429
}

lightning/src/events/mod.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,25 @@ pub enum Event {
530530
/// serialized prior to LDK version 0.0.117.
531531
sender_intended_total_msat: Option<u64>,
532532
},
533+
/// Indicates that a peer connection with a node is needed in order to send an [`OnionMessage`].
534+
///
535+
/// Typically, this happens when a [`MessageRouter`] is unable to find a complete path to a
536+
/// [`Destination`]. Once a connection is established, any messages buffered by an
537+
/// [`OnionMessageHandler`] may be sent.
538+
///
539+
/// This event will not be generated for onion message forwards; only for sends including
540+
/// replies. Handlers should connect to the node otherwise any buffered messages may be lost.
541+
///
542+
/// [`OnionMessage`]: msgs::OnionMessage
543+
/// [`MessageRouter`]: crate::onion_message::MessageRouter
544+
/// [`Destination`]: crate::onion_message::Destination
545+
/// [`OnionMessageHandler`]: crate::ln::msgs::OnionMessageHandler
546+
ConnectionNeeded {
547+
/// The node id for the node needing a connection.
548+
node_id: PublicKey,
549+
/// Sockets for connecting to the node.
550+
addresses: Vec<msgs::SocketAddress>,
551+
},
533552
/// Indicates a request for an invoice failed to yield a response in a reasonable amount of time
534553
/// or was explicitly abandoned by [`ChannelManager::abandon_payment`]. This may be for an
535554
/// [`InvoiceRequest`] sent for an [`Offer`] or for a [`Refund`] that hasn't been redeemed.
@@ -1190,6 +1209,10 @@ impl Writeable for Event {
11901209
(0, payment_id, required),
11911210
})
11921211
},
1212+
&Event::ConnectionNeeded { .. } => {
1213+
35u8.write(writer)?;
1214+
// Never write ConnectionNeeded events as buffered onion messages aren't serialized.
1215+
},
11931216
// Note that, going forward, all new events must only write data inside of
11941217
// `write_tlv_fields`. Versions 0.0.101+ will ignore odd-numbered events that write
11951218
// data via `write_tlv_fields`.
@@ -1200,8 +1223,7 @@ impl Writeable for Event {
12001223
impl MaybeReadable for Event {
12011224
fn read<R: io::Read>(reader: &mut R) -> Result<Option<Self>, msgs::DecodeError> {
12021225
match Readable::read(reader)? {
1203-
// Note that we do not write a length-prefixed TLV for FundingGenerationReady events,
1204-
// unlike all other events, thus we return immediately here.
1226+
// Note that we do not write a length-prefixed TLV for FundingGenerationReady events.
12051227
0u8 => Ok(None),
12061228
1u8 => {
12071229
let f = || {
@@ -1588,6 +1610,8 @@ impl MaybeReadable for Event {
15881610
};
15891611
f()
15901612
},
1613+
// Note that we do not write a length-prefixed TLV for ConnectionNeeded events.
1614+
35u8 => Ok(None),
15911615
// Versions prior to 0.0.100 did not ignore odd types, instead returning InvalidValue.
15921616
// Version 0.0.100 failed to properly ignore odd types, possibly resulting in corrupt
15931617
// reads.

lightning/src/ln/msgs.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use core::fmt::Display;
5252
use crate::io::{self, Cursor, Read};
5353
use crate::io_extras::read_to_end;
5454

55-
use crate::events::MessageSendEventsProvider;
55+
use crate::events::{EventsProvider, MessageSendEventsProvider};
5656
use crate::util::chacha20poly1305rfc::ChaChaPolyReadAdapter;
5757
use crate::util::logger;
5858
use crate::util::ser::{LengthReadable, LengthReadableArgs, Readable, ReadableArgs, Writeable, Writer, WithoutLength, FixedLengthReader, HighZeroBytesDroppedBigSize, Hostname, TransactionU16LenLimited, BigSize};
@@ -1631,7 +1631,7 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
16311631
}
16321632

16331633
/// A handler for received [`OnionMessage`]s and for providing generated ones to send.
1634-
pub trait OnionMessageHandler {
1634+
pub trait OnionMessageHandler: EventsProvider {
16351635
/// Handle an incoming `onion_message` message from the given peer.
16361636
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
16371637

@@ -1650,6 +1650,10 @@ pub trait OnionMessageHandler {
16501650
/// drop and refuse to forward onion messages to this peer.
16511651
fn peer_disconnected(&self, their_node_id: &PublicKey);
16521652

1653+
/// Performs actions that should happen roughly every ten seconds after startup. Allows handlers
1654+
/// to drop any buffered onion messages intended for prospective peers.
1655+
fn timer_tick_occurred(&self);
1656+
16531657
// Handler information:
16541658
/// Gets the node feature flags which this handler itself supports. All available handlers are
16551659
/// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`]

lightning/src/ln/peer_handler.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use bitcoin::blockdata::constants::ChainHash;
1919
use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};
2020

2121
use crate::sign::{KeysManager, NodeSigner, Recipient};
22-
use crate::events::{MessageSendEvent, MessageSendEventsProvider};
22+
use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
2323
use crate::ln::ChannelId;
2424
use crate::ln::features::{InitFeatures, NodeFeatures};
2525
use crate::ln::msgs;
@@ -89,6 +89,9 @@ pub trait CustomMessageHandler: wire::CustomMessageReader {
8989
/// A dummy struct which implements `RoutingMessageHandler` without storing any routing information
9090
/// or doing any processing. You can provide one of these as the route_handler in a MessageHandler.
9191
pub struct IgnoringMessageHandler{}
92+
impl EventsProvider for IgnoringMessageHandler {
93+
fn process_pending_events<H: Deref>(&self, _handler: H) where H::Target: EventHandler {}
94+
}
9295
impl MessageSendEventsProvider for IgnoringMessageHandler {
9396
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> { Vec::new() }
9497
}
@@ -115,6 +118,7 @@ impl OnionMessageHandler for IgnoringMessageHandler {
115118
fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
116119
fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
117120
fn peer_disconnected(&self, _their_node_id: &PublicKey) {}
121+
fn timer_tick_occurred(&self) {}
118122
fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
119123
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
120124
InitFeatures::empty()
@@ -680,6 +684,8 @@ pub trait APeerManager {
680684
type NS: Deref<Target=Self::NST>;
681685
/// Gets a reference to the underlying [`PeerManager`].
682686
fn as_ref(&self) -> &PeerManager<Self::Descriptor, Self::CM, Self::RM, Self::OM, Self::L, Self::CMH, Self::NS>;
687+
/// Returns the peer manager's [`OnionMessageHandler`].
688+
fn onion_message_handler(&self) -> &Self::OMT;
683689
}
684690

685691
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref, NS: Deref>
@@ -705,6 +711,9 @@ APeerManager for PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> where
705711
type NST = <NS as Deref>::Target;
706712
type NS = NS;
707713
fn as_ref(&self) -> &PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> { self }
714+
fn onion_message_handler(&self) -> &Self::OMT {
715+
self.message_handler.onion_message_handler.deref()
716+
}
708717
}
709718

710719
/// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls

0 commit comments

Comments
 (0)