Skip to content

Commit 65bb5b9

Browse files
committed
Only pause read in PeerManager send_data not read_event
We recently ran into a race condition on macOS where `read_event` would return `Ok(true)` (implying reads should be paused) but calls to `send_data` which flushed the buffer completed before the `read_event` caller was able to set the read-pause flag. This should be fairly rare, but not unheard of - the `pause_read` flag in `read_event` is calculated before handling the last message, so there's some time between when its calculated and when its returned. However, that has to race with multiple calls to `send_data` to send all the pending messages, which all have to complete before the `read_event` return happens. We've (as far as I recall) never hit this in prod, but a benchmark HTLC-flood test managed to hit it somewhat reliably within a few minutes on macOS and when a synthetic few-ms sleep was added to each message handling call. Ultimately we can't fix this with the current API (though we could make it more rare). Thus, here, we stick to a single "stream" of pause-read events from `PeerManager` to user code via `send_data` calls, dropping the read-pause flag return from `read_event` entirely. Technically this adds risk that someone can flood us with enough messages fast enough to bloat our outbound buffer for a peer before `PeerManager::process_events` gets called and can flush the pause flag via `read_event` calls to all descriptors. This isn't ideal but it should still be relatively hard to do as `process_events` calls are pretty quick and should be triggered immediately after each `read_event` call completes.
1 parent 9d9fb79 commit 65bb5b9

File tree

2 files changed

+41
-52
lines changed

2 files changed

+41
-52
lines changed

lightning-net-tokio/src/lib.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -214,13 +214,8 @@ impl Connection {
214214
Ok(len) => {
215215
let read_res =
216216
peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
217-
let mut us_lock = us.lock().unwrap();
218217
match read_res {
219-
Ok(pause_read) => {
220-
if pause_read {
221-
us_lock.read_paused = true;
222-
}
223-
},
218+
Ok(()) => {},
224219
Err(_) => break Disconnect::CloseConnection,
225220
}
226221
},
@@ -521,7 +516,10 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
521516
// before we get here, so we ignore any failures to wake it up.
522517
us.read_paused = false;
523518
let _ = us.read_waker.try_send(());
519+
} else if !resume_read {
520+
us.read_paused = true;
524521
}
522+
525523
if data.is_empty() {
526524
return 0;
527525
}

lightning/src/ln/peer_handler.rs

Lines changed: 37 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -632,16 +632,15 @@ pub trait SocketDescriptor: cmp::Eq + hash::Hash + Clone {
632632
///
633633
/// If the returned size is smaller than `data.len()`, a
634634
/// [`PeerManager::write_buffer_space_avail`] call must be made the next time more data can be
635-
/// written. Additionally, until a `send_data` event completes fully, no further
636-
/// [`PeerManager::read_event`] calls should be made for the same peer! Because this is to
637-
/// prevent denial-of-service issues, you should not read or buffer any data from the socket
638-
/// until then.
635+
/// written.
639636
///
640-
/// If a [`PeerManager::read_event`] call on this descriptor had previously returned true
641-
/// (indicating that read events should be paused to prevent DoS in the send buffer),
642-
/// `resume_read` may be set indicating that read events on this descriptor should resume. A
643-
/// `resume_read` of false carries no meaning, and should not cause any action.
644-
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize;
637+
/// If `continue_read` is *not* set, further [`PeerManager::read_event`] calls should be
638+
/// avoided until another call is made with it set. This allows us to pause read if there are
639+
/// too many outgoing messages queued for a peer to avoid DoS issues where a peer fills our
640+
/// buffer by sending us messages that need response without reading the responses.
641+
///
642+
/// Note that calls may be made with an empty `data` to update the `continue_read` flag.
643+
fn send_data(&mut self, data: &[u8], continue_read: bool) -> usize;
645644
/// Disconnect the socket pointed to by this SocketDescriptor.
646645
///
647646
/// You do *not* need to call [`PeerManager::socket_disconnected`] with this socket after this
@@ -1664,7 +1663,7 @@ where
16641663
Some(peer_mutex) => {
16651664
let mut peer = peer_mutex.lock().unwrap();
16661665
peer.awaiting_write_event = false;
1667-
self.do_attempt_write_data(descriptor, &mut peer, false);
1666+
self.do_attempt_write_data(descriptor, &mut peer, true);
16681667
},
16691668
};
16701669
Ok(())
@@ -1676,11 +1675,9 @@ where
16761675
///
16771676
/// Will *not* call back into [`send_data`] on any descriptors to avoid reentrancy complexity.
16781677
/// Thus, however, you should call [`process_events`] after any `read_event` to generate
1679-
/// [`send_data`] calls to handle responses.
1680-
///
1681-
/// If `Ok(true)` is returned, further read_events should not be triggered until a
1682-
/// [`send_data`] call on this descriptor has `resume_read` set (preventing DoS issues in the
1683-
/// send buffer).
1678+
/// [`send_data`] calls to handle responses. This is also important to give [`send_data`] calls
1679+
/// a chance to pause reads if too many messages have been queued in response allowing a peer
1680+
/// to bloat our memory.
16841681
///
16851682
/// In order to avoid processing too many messages at once per peer, `data` should be on the
16861683
/// order of 4KiB.
@@ -1689,7 +1686,7 @@ where
16891686
/// [`process_events`]: PeerManager::process_events
16901687
pub fn read_event(
16911688
&self, peer_descriptor: &mut Descriptor, data: &[u8],
1692-
) -> Result<bool, PeerHandleError> {
1689+
) -> Result<(), PeerHandleError> {
16931690
match self.do_read_event(peer_descriptor, data) {
16941691
Ok(res) => Ok(res),
16951692
Err(e) => {
@@ -1718,8 +1715,7 @@ where
17181715

17191716
fn do_read_event(
17201717
&self, peer_descriptor: &mut Descriptor, data: &[u8],
1721-
) -> Result<bool, PeerHandleError> {
1722-
let mut pause_read = false;
1718+
) -> Result<(), PeerHandleError> {
17231719
let peers = self.peers.read().unwrap();
17241720
let mut msgs_to_forward = Vec::new();
17251721
let mut peer_node_id = None;
@@ -1994,7 +1990,6 @@ where
19941990
},
19951991
}
19961992
}
1997-
pause_read = !self.peer_should_read(peer);
19981993

19991994
if let Some(message) = msg_to_handle {
20001995
match self.handle_message(&peer_mutex, peer_lock, message) {
@@ -2027,7 +2022,7 @@ where
20272022
);
20282023
}
20292024

2030-
Ok(pause_read)
2025+
Ok(())
20312026
}
20322027

20332028
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
@@ -3939,12 +3934,8 @@ mod tests {
39393934

39403935
fn try_establish_connection<'a>(
39413936
peer_a: &TestPeer<'a>, peer_b: &TestPeer<'a>,
3942-
) -> (
3943-
FileDescriptor,
3944-
FileDescriptor,
3945-
Result<bool, PeerHandleError>,
3946-
Result<bool, PeerHandleError>,
3947-
) {
3937+
) -> (FileDescriptor, FileDescriptor, Result<(), PeerHandleError>, Result<(), PeerHandleError>)
3938+
{
39483939
let addr_a = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 };
39493940
let addr_b = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1001 };
39503941

@@ -3958,11 +3949,11 @@ mod tests {
39583949
let initial_data =
39593950
peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
39603951
peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
3961-
assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
3952+
peer_a.read_event(&mut fd_a, &initial_data).unwrap();
39623953
peer_a.process_events();
39633954

39643955
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
3965-
assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
3956+
peer_b.read_event(&mut fd_b, &a_data).unwrap();
39663957

39673958
peer_b.process_events();
39683959
let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
@@ -3989,8 +3980,8 @@ mod tests {
39893980

39903981
let (fd_a, fd_b, a_refused, b_refused) = try_establish_connection(peer_a, peer_b);
39913982

3992-
assert_eq!(a_refused.unwrap(), false);
3993-
assert_eq!(b_refused.unwrap(), false);
3983+
a_refused.unwrap();
3984+
b_refused.unwrap();
39943985

39953986
assert_eq!(peer_a.peer_by_node_id(&id_b).unwrap().counterparty_node_id, id_b);
39963987
assert_eq!(peer_a.peer_by_node_id(&id_b).unwrap().socket_address, Some(addr_b));
@@ -4113,11 +4104,11 @@ mod tests {
41134104
let initial_data =
41144105
peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
41154106
peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
4116-
assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
4107+
peer_a.read_event(&mut fd_a, &initial_data).unwrap();
41174108
peer_a.process_events();
41184109

41194110
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4120-
assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
4111+
peer_b.read_event(&mut fd_b, &a_data).unwrap();
41214112

41224113
peer_b.process_events();
41234114
let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
@@ -4144,11 +4135,11 @@ mod tests {
41444135
let initial_data =
41454136
peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
41464137
peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
4147-
assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
4138+
peer_a.read_event(&mut fd_a, &initial_data).unwrap();
41484139
peer_a.process_events();
41494140

41504141
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4151-
assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
4142+
peer_b.read_event(&mut fd_b, &a_data).unwrap();
41524143

41534144
peer_b.process_events();
41544145
let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
@@ -4220,7 +4211,7 @@ mod tests {
42204211
peers[0].process_events();
42214212

42224213
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4223-
assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
4214+
peers[1].read_event(&mut fd_b, &a_data).unwrap();
42244215
}
42254216

42264217
#[test]
@@ -4240,13 +4231,13 @@ mod tests {
42404231
let mut dup_encryptor =
42414232
PeerChannelEncryptor::new_outbound(id_a, SecretKey::from_slice(&[42; 32]).unwrap());
42424233
let initial_data = dup_encryptor.get_act_one(&peers[1].secp_ctx);
4243-
assert_eq!(peers[0].read_event(&mut fd_dup, &initial_data).unwrap(), false);
4234+
peers[0].read_event(&mut fd_dup, &initial_data).unwrap();
42444235
peers[0].process_events();
42454236

42464237
let a_data = fd_dup.outbound_data.lock().unwrap().split_off(0);
42474238
let (act_three, _) =
42484239
dup_encryptor.process_act_two(&a_data[..], &&cfgs[1].node_signer).unwrap();
4249-
assert_eq!(peers[0].read_event(&mut fd_dup, &act_three).unwrap(), false);
4240+
peers[0].read_event(&mut fd_dup, &act_three).unwrap();
42504241

42514242
let not_init_msg = msgs::Ping { ponglen: 4, byteslen: 0 };
42524243
let msg_bytes = dup_encryptor.encrypt_message(&not_init_msg);
@@ -4504,10 +4495,10 @@ mod tests {
45044495
assert_eq!(peers_len, 1);
45054496
}
45064497

4507-
assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
4498+
peers[0].read_event(&mut fd_a, &initial_data).unwrap();
45084499
peers[0].process_events();
45094500
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4510-
assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
4501+
peers[1].read_event(&mut fd_b, &a_data).unwrap();
45114502
peers[1].process_events();
45124503

45134504
// ...but if we get a second timer tick, we should disconnect the peer
@@ -4557,11 +4548,11 @@ mod tests {
45574548
let act_one = peer_b.new_outbound_connection(a_id, fd_b.clone(), None).unwrap();
45584549
peer_a.new_inbound_connection(fd_a.clone(), None).unwrap();
45594550

4560-
assert_eq!(peer_a.read_event(&mut fd_a, &act_one).unwrap(), false);
4551+
peer_a.read_event(&mut fd_a, &act_one).unwrap();
45614552
peer_a.process_events();
45624553

45634554
let act_two = fd_a.outbound_data.lock().unwrap().split_off(0);
4564-
assert_eq!(peer_b.read_event(&mut fd_b, &act_two).unwrap(), false);
4555+
peer_b.read_event(&mut fd_b, &act_two).unwrap();
45654556
peer_b.process_events();
45664557

45674558
// Calling this here triggers the race on inbound connections.
@@ -4575,7 +4566,7 @@ mod tests {
45754566
assert!(!handshake_complete);
45764567
}
45774568

4578-
assert_eq!(peer_a.read_event(&mut fd_a, &act_three_with_init_b).unwrap(), false);
4569+
peer_a.read_event(&mut fd_a, &act_three_with_init_b).unwrap();
45794570
peer_a.process_events();
45804571

45814572
{
@@ -4595,7 +4586,7 @@ mod tests {
45954586
assert!(!handshake_complete);
45964587
}
45974588

4598-
assert_eq!(peer_b.read_event(&mut fd_b, &init_a).unwrap(), false);
4589+
peer_b.read_event(&mut fd_b, &init_a).unwrap();
45994590
peer_b.process_events();
46004591

46014592
{
@@ -4632,7 +4623,7 @@ mod tests {
46324623
peer_a.process_events();
46334624
let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
46344625
assert!(!msg.is_empty());
4635-
assert_eq!(peer_b.read_event(&mut fd_b, &msg).unwrap(), false);
4626+
peer_b.read_event(&mut fd_b, &msg).unwrap();
46364627
peer_b.process_events();
46374628
};
46384629

@@ -4675,12 +4666,12 @@ mod tests {
46754666

46764667
let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
46774668
if !msg.is_empty() {
4678-
assert_eq!(peers[1].read_event(&mut fd_b, &msg).unwrap(), false);
4669+
peers[1].read_event(&mut fd_b, &msg).unwrap();
46794670
continue;
46804671
}
46814672
let msg = fd_b.outbound_data.lock().unwrap().split_off(0);
46824673
if !msg.is_empty() {
4683-
assert_eq!(peers[0].read_event(&mut fd_a, &msg).unwrap(), false);
4674+
peers[0].read_event(&mut fd_a, &msg).unwrap();
46844675
continue;
46854676
}
46864677
break;

0 commit comments

Comments
 (0)