@@ -156,15 +156,15 @@ where
156
156
entropy_source : ES ,
157
157
node_signer : NS ,
158
158
logger : L ,
159
- message_buffers : Mutex < HashMap < PublicKey , OnionMessageBuffer > > ,
159
+ message_recipients : Mutex < HashMap < PublicKey , OnionMessageRecipient > > ,
160
160
secp_ctx : Secp256k1 < secp256k1:: All > ,
161
161
message_router : MR ,
162
162
offers_handler : OMH ,
163
163
custom_handler : CMH ,
164
164
}
165
165
166
166
/// [`OnionMessage`]s buffered to be sent.
167
- enum OnionMessageBuffer {
167
+ enum OnionMessageRecipient {
168
168
/// Messages for a node connected as a peer.
169
169
ConnectedPeer ( VecDeque < OnionMessage > ) ,
170
170
@@ -173,31 +173,31 @@ enum OnionMessageBuffer {
173
173
PendingConnection ( VecDeque < OnionMessage > , Option < Vec < SocketAddress > > , usize ) ,
174
174
}
175
175
176
- impl OnionMessageBuffer {
176
+ impl OnionMessageRecipient {
177
177
fn pending_connection ( addresses : Vec < SocketAddress > ) -> Self {
178
178
Self :: PendingConnection ( VecDeque :: new ( ) , Some ( addresses) , 0 )
179
179
}
180
180
181
181
fn pending_messages ( & self ) -> & VecDeque < OnionMessage > {
182
182
match self {
183
- OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
184
- OnionMessageBuffer :: PendingConnection ( pending_messages, _, _) => pending_messages,
183
+ OnionMessageRecipient :: ConnectedPeer ( pending_messages) => pending_messages,
184
+ OnionMessageRecipient :: PendingConnection ( pending_messages, _, _) => pending_messages,
185
185
}
186
186
}
187
187
188
188
fn enqueue_message ( & mut self , message : OnionMessage ) {
189
189
let pending_messages = match self {
190
- OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
191
- OnionMessageBuffer :: PendingConnection ( pending_messages, _, _) => pending_messages,
190
+ OnionMessageRecipient :: ConnectedPeer ( pending_messages) => pending_messages,
191
+ OnionMessageRecipient :: PendingConnection ( pending_messages, _, _) => pending_messages,
192
192
} ;
193
193
194
194
pending_messages. push_back ( message) ;
195
195
}
196
196
197
197
fn dequeue_message ( & mut self ) -> Option < OnionMessage > {
198
198
let pending_messages = match self {
199
- OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
200
- OnionMessageBuffer :: PendingConnection ( pending_messages, _, _) => {
199
+ OnionMessageRecipient :: ConnectedPeer ( pending_messages) => pending_messages,
200
+ OnionMessageRecipient :: PendingConnection ( pending_messages, _, _) => {
201
201
debug_assert ! ( false ) ;
202
202
pending_messages
203
203
} ,
@@ -209,18 +209,18 @@ impl OnionMessageBuffer {
209
209
#[ cfg( test) ]
210
210
fn release_pending_messages ( & mut self ) -> VecDeque < OnionMessage > {
211
211
let pending_messages = match self {
212
- OnionMessageBuffer :: ConnectedPeer ( pending_messages) => pending_messages,
213
- OnionMessageBuffer :: PendingConnection ( pending_messages, _, _) => pending_messages,
212
+ OnionMessageRecipient :: ConnectedPeer ( pending_messages) => pending_messages,
213
+ OnionMessageRecipient :: PendingConnection ( pending_messages, _, _) => pending_messages,
214
214
} ;
215
215
216
216
core:: mem:: take ( pending_messages)
217
217
}
218
218
219
219
fn mark_connected ( & mut self ) {
220
- if let OnionMessageBuffer :: PendingConnection ( pending_messages, _, _) = self {
220
+ if let OnionMessageRecipient :: PendingConnection ( pending_messages, _, _) = self {
221
221
let mut new_pending_messages = VecDeque :: new ( ) ;
222
222
core:: mem:: swap ( pending_messages, & mut new_pending_messages) ;
223
- * self = OnionMessageBuffer :: ConnectedPeer ( new_pending_messages) ;
223
+ * self = OnionMessageRecipient :: ConnectedPeer ( new_pending_messages) ;
224
224
}
225
225
}
226
226
}
@@ -631,7 +631,7 @@ where
631
631
OnionMessenger {
632
632
entropy_source,
633
633
node_signer,
634
- message_buffers : Mutex :: new ( HashMap :: new ( ) ) ,
634
+ message_recipients : Mutex :: new ( HashMap :: new ( ) ) ,
635
635
secp_ctx,
636
636
logger,
637
637
message_router,
@@ -687,9 +687,9 @@ where
687
687
. get_node_id ( Recipient :: Node )
688
688
. map_err ( |_| SendError :: GetNodeIdFailed ) ?;
689
689
690
- let peers = self . message_buffers . lock ( ) . unwrap ( )
690
+ let peers = self . message_recipients . lock ( ) . unwrap ( )
691
691
. iter ( )
692
- . filter ( |( _, buffer ) | matches ! ( buffer , OnionMessageBuffer :: ConnectedPeer ( _) ) )
692
+ . filter ( |( _, recipient ) | matches ! ( recipient , OnionMessageRecipient :: ConnectedPeer ( _) ) )
693
693
. map ( |( node_id, _) | * node_id)
694
694
. collect ( ) ;
695
695
@@ -708,16 +708,16 @@ where
708
708
& self . entropy_source , & self . node_signer , & self . secp_ctx , path, contents, reply_path
709
709
) ?;
710
710
711
- let mut message_buffers = self . message_buffers . lock ( ) . unwrap ( ) ;
712
- if outbound_buffer_full ( & first_node_id, & message_buffers ) {
711
+ let mut message_recipients = self . message_recipients . lock ( ) . unwrap ( ) ;
712
+ if outbound_buffer_full ( & first_node_id, & message_recipients ) {
713
713
return Err ( SendError :: BufferFull ) ;
714
714
}
715
715
716
- match message_buffers . entry ( first_node_id) {
716
+ match message_recipients . entry ( first_node_id) {
717
717
hash_map:: Entry :: Vacant ( e) => match addresses {
718
718
None => Err ( SendError :: InvalidFirstHop ( first_node_id) ) ,
719
719
Some ( addresses) => {
720
- e. insert ( OnionMessageBuffer :: pending_connection ( addresses) )
720
+ e. insert ( OnionMessageRecipient :: pending_connection ( addresses) )
721
721
. enqueue_message ( onion_message) ;
722
722
Ok ( SendSuccess :: BufferedAwaitingConnection ( first_node_id) )
723
723
} ,
@@ -755,18 +755,18 @@ where
755
755
756
756
#[ cfg( test) ]
757
757
pub ( super ) fn release_pending_msgs ( & self ) -> HashMap < PublicKey , VecDeque < OnionMessage > > {
758
- let mut message_buffers = self . message_buffers . lock ( ) . unwrap ( ) ;
758
+ let mut message_recipients = self . message_recipients . lock ( ) . unwrap ( ) ;
759
759
let mut msgs = HashMap :: new ( ) ;
760
760
// We don't want to disconnect the peers by removing them entirely from the original map, so we
761
761
// release the pending message buffers individually.
762
- for ( peer_node_id , buffer ) in & mut * message_buffers {
763
- msgs. insert ( * peer_node_id , buffer . release_pending_messages ( ) ) ;
762
+ for ( node_id , recipient ) in & mut * message_recipients {
763
+ msgs. insert ( * node_id , recipient . release_pending_messages ( ) ) ;
764
764
}
765
765
msgs
766
766
}
767
767
}
768
768
769
- fn outbound_buffer_full ( peer_node_id : & PublicKey , buffer : & HashMap < PublicKey , OnionMessageBuffer > ) -> bool {
769
+ fn outbound_buffer_full ( peer_node_id : & PublicKey , buffer : & HashMap < PublicKey , OnionMessageRecipient > ) -> bool {
770
770
const MAX_TOTAL_BUFFER_SIZE : usize = ( 1 << 20 ) * 128 ;
771
771
const MAX_PER_PEER_BUFFER_SIZE : usize = ( 1 << 10 ) * 256 ;
772
772
let mut total_buffered_bytes = 0 ;
@@ -800,8 +800,8 @@ where
800
800
CMH :: Target : CustomOnionMessageHandler ,
801
801
{
802
802
fn process_pending_events < H : Deref > ( & self , handler : H ) where H :: Target : EventHandler {
803
- for ( node_id, recipient) in self . message_buffers . lock ( ) . unwrap ( ) . iter_mut ( ) {
804
- if let OnionMessageBuffer :: PendingConnection ( _, addresses, _) = recipient {
803
+ for ( node_id, recipient) in self . message_recipients . lock ( ) . unwrap ( ) . iter_mut ( ) {
804
+ if let OnionMessageRecipient :: PendingConnection ( _, addresses, _) = recipient {
805
805
if let Some ( addresses) = addresses. take ( ) {
806
806
handler. handle_event ( Event :: ConnectionNeeded { node_id : * node_id, addresses } ) ;
807
807
}
@@ -852,20 +852,20 @@ where
852
852
}
853
853
} ,
854
854
Ok ( PeeledOnion :: Forward ( next_node_id, onion_message) ) => {
855
- let mut message_buffers = self . message_buffers . lock ( ) . unwrap ( ) ;
856
- if outbound_buffer_full ( & next_node_id, & message_buffers ) {
855
+ let mut message_recipients = self . message_recipients . lock ( ) . unwrap ( ) ;
856
+ if outbound_buffer_full ( & next_node_id, & message_recipients ) {
857
857
log_trace ! ( self . logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full" , next_node_id) ;
858
858
return
859
859
}
860
860
861
861
#[ cfg( fuzzing) ]
862
- message_buffers
862
+ message_recipients
863
863
. entry ( next_node_id)
864
- . or_insert_with ( || OnionMessageBuffer :: ConnectedPeer ( VecDeque :: new ( ) ) ) ;
864
+ . or_insert_with ( || OnionMessageRecipient :: ConnectedPeer ( VecDeque :: new ( ) ) ) ;
865
865
866
- match message_buffers . entry ( next_node_id) {
866
+ match message_recipients . entry ( next_node_id) {
867
867
hash_map:: Entry :: Occupied ( mut e) if matches ! (
868
- e. get( ) , OnionMessageBuffer :: ConnectedPeer ( ..)
868
+ e. get( ) , OnionMessageRecipient :: ConnectedPeer ( ..)
869
869
) => {
870
870
e. get_mut ( ) . enqueue_message ( onion_message) ;
871
871
log_trace ! ( self . logger, "Forwarding an onion message to peer {}" , next_node_id) ;
@@ -884,39 +884,39 @@ where
884
884
885
885
fn peer_connected ( & self , their_node_id : & PublicKey , init : & msgs:: Init , _inbound : bool ) -> Result < ( ) , ( ) > {
886
886
if init. features . supports_onion_messages ( ) {
887
- self . message_buffers . lock ( ) . unwrap ( )
887
+ self . message_recipients . lock ( ) . unwrap ( )
888
888
. entry ( * their_node_id)
889
- . or_insert_with ( || OnionMessageBuffer :: ConnectedPeer ( VecDeque :: new ( ) ) )
889
+ . or_insert_with ( || OnionMessageRecipient :: ConnectedPeer ( VecDeque :: new ( ) ) )
890
890
. mark_connected ( ) ;
891
891
} else {
892
- self . message_buffers . lock ( ) . unwrap ( ) . remove ( their_node_id) ;
892
+ self . message_recipients . lock ( ) . unwrap ( ) . remove ( their_node_id) ;
893
893
}
894
894
895
895
Ok ( ( ) )
896
896
}
897
897
898
898
fn peer_disconnected ( & self , their_node_id : & PublicKey ) {
899
- match self . message_buffers . lock ( ) . unwrap ( ) . remove ( their_node_id) {
900
- Some ( OnionMessageBuffer :: ConnectedPeer ( ..) ) => { } ,
899
+ match self . message_recipients . lock ( ) . unwrap ( ) . remove ( their_node_id) {
900
+ Some ( OnionMessageRecipient :: ConnectedPeer ( ..) ) => { } ,
901
901
_ => debug_assert ! ( false ) ,
902
902
}
903
903
}
904
904
905
905
fn timer_tick_occurred ( & self ) {
906
- let mut message_buffers = self . message_buffers . lock ( ) . unwrap ( ) ;
906
+ let mut message_recipients = self . message_recipients . lock ( ) . unwrap ( ) ;
907
907
908
908
// Drop any pending recipients since the last call to avoid retaining buffered messages for
909
909
// too long.
910
- message_buffers . retain ( |_, recipient| match recipient {
911
- OnionMessageBuffer :: PendingConnection ( _, None , ticks) => * ticks < MAX_TIMER_TICKS ,
912
- OnionMessageBuffer :: PendingConnection ( _, Some ( _) , _) => true ,
910
+ message_recipients . retain ( |_, recipient| match recipient {
911
+ OnionMessageRecipient :: PendingConnection ( _, None , ticks) => * ticks < MAX_TIMER_TICKS ,
912
+ OnionMessageRecipient :: PendingConnection ( _, Some ( _) , _) => true ,
913
913
_ => true ,
914
914
} ) ;
915
915
916
916
// Increment a timer tick for pending recipients so that their buffered messages are dropped
917
917
// at MAX_TIMER_TICKS.
918
- for recipient in message_buffers . values_mut ( ) {
919
- if let OnionMessageBuffer :: PendingConnection ( _, None , ticks) = recipient {
918
+ for recipient in message_recipients . values_mut ( ) {
919
+ if let OnionMessageRecipient :: PendingConnection ( _, None , ticks) = recipient {
920
920
* ticks += 1 ;
921
921
}
922
922
}
@@ -960,7 +960,7 @@ where
960
960
) ;
961
961
}
962
962
963
- self . message_buffers . lock ( ) . unwrap ( )
963
+ self . message_recipients . lock ( ) . unwrap ( )
964
964
. get_mut ( & peer_node_id)
965
965
. and_then ( |buffer| buffer. dequeue_message ( ) )
966
966
}
0 commit comments