Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ struct Peer<'a> {
peers_connected: &'a RefCell<[bool; 256]>,
}
impl<'a> SocketDescriptor for Peer<'a> {
fn send_data(&mut self, data: &[u8], _resume_read: bool) -> usize {
fn send_data(&mut self, data: &[u8], _continue_read: bool) -> usize {
data.len()
}
fn disconnect_socket(&mut self) {
Expand Down Expand Up @@ -695,7 +695,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc<dyn Logger>) {
}
let mut peer = Peer { id: peer_id, peers_connected: &peers };
match loss_detector.handler.read_event(&mut peer, get_slice!(get_slice!(1)[0])) {
Ok(res) => assert!(!res),
Ok(()) => {},
Err(_) => {
peers.borrow_mut()[peer_id as usize] = false;
},
Expand Down
4 changes: 2 additions & 2 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
/// # #[derive(Eq, PartialEq, Clone, Hash)]
/// # struct SocketDescriptor {}
/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
/// # fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
/// # fn disconnect_socket(&mut self) {}
/// # }
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
Expand Down Expand Up @@ -1878,7 +1878,7 @@ mod tests {
#[derive(Clone, Hash, PartialEq, Eq)]
struct TestDescriptor {}
impl SocketDescriptor for TestDescriptor {
fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize {
0
}

Expand Down
27 changes: 8 additions & 19 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,8 @@ impl Connection {
Ok(len) => {
let read_res =
peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
let mut us_lock = us.lock().unwrap();
match read_res {
Ok(pause_read) => {
if pause_read {
us_lock.read_paused = true;
}
},
Ok(()) => {},
Err(_) => break Disconnect::CloseConnection,
}
},
Expand Down Expand Up @@ -533,7 +528,7 @@ impl SocketDescriptor {
}
}
impl peer_handler::SocketDescriptor for SocketDescriptor {
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
fn send_data(&mut self, data: &[u8], continue_read: bool) -> usize {
// To send data, we take a lock on our Connection to access the TcpStream, writing to it if
// there's room in the kernel buffer, or otherwise create a new Waker with a
// SocketDescriptor in it which can wake up the write_avail Sender, waking up the
Expand All @@ -544,13 +539,16 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
return 0;
}

if resume_read && us.read_paused {
let read_was_paused = us.read_paused;
us.read_paused = !continue_read;

if continue_read && read_was_paused {
// The schedule_read future may go to lock up but end up getting woken up by there
// being more room in the write buffer, dropping the other end of this Sender
// before we get here, so we ignore any failures to wake it up.
us.read_paused = false;
let _ = us.read_waker.try_send(());
}

if data.is_empty() {
return 0;
}
Expand All @@ -576,16 +574,7 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
}
},
task::Poll::Ready(Err(_)) => return written_len,
task::Poll::Pending => {
// We're queued up for a write event now, but we need to make sure we also
// pause read given we're now waiting on the remote end to ACK (and in
// accordance with the send_data() docs).
us.read_paused = true;
// Further, to avoid any current pending read causing a `read_event` call, wake
// up the read_waker and restart its loop.
let _ = us.read_waker.try_send(());
return written_len;
},
task::Poll::Pending => return written_len,
}
}
}
Expand Down
Loading
Loading