Skip to content

Commit e4a70b9

Browse files
committed
Ensure we call send_data when we need to pause/unpause reads
In the previous commit, we moved the `send_data` `resume_read` flag to also indicate that we should pause if its unset. This should work as we mostly only set the flag when we're sending but may cause us to fail to pause if we are blocked on gossip validation but `awaiting_write_event` wasn't set as we had previously failed to fully flush a buffer (which no longer implies read-pause). Here we make this logic much more robust by ensuring we always make at least one `send_data` call in `do_attempt_write_data` if we need to pause read (or unpause read).
1 parent b4f9786 commit e4a70b9

File tree

1 file changed

+20
-9
lines changed

1 file changed

+20
-9
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,7 @@ struct Peer {
781781
/// Note that these messages are *not* encrypted/MAC'd, and are only serialized.
782782
gossip_broadcast_buffer: VecDeque<MessageBuf>,
783783
awaiting_write_event: bool,
784+
sent_pause_read: bool,
784785

785786
pending_read_buffer: Vec<u8>,
786787
pending_read_buffer_pos: usize,
@@ -1440,6 +1441,7 @@ where
14401441
pending_outbound_buffer_first_msg_offset: 0,
14411442
gossip_broadcast_buffer: VecDeque::new(),
14421443
awaiting_write_event: false,
1444+
sent_pause_read: false,
14431445

14441446
pending_read_buffer,
14451447
pending_read_buffer_pos: 0,
@@ -1500,6 +1502,7 @@ where
15001502
pending_outbound_buffer_first_msg_offset: 0,
15011503
gossip_broadcast_buffer: VecDeque::new(),
15021504
awaiting_write_event: false,
1505+
sent_pause_read: false,
15031506

15041507
pending_read_buffer,
15051508
pending_read_buffer_pos: 0,
@@ -1535,10 +1538,18 @@ where
15351538
}
15361539

15371540
fn do_attempt_write_data(
1538-
&self, descriptor: &mut Descriptor, peer: &mut Peer, force_one_write: bool,
1541+
&self, descriptor: &mut Descriptor, peer: &mut Peer, mut force_one_write: bool,
15391542
) {
1540-
let mut have_written = false;
1541-
while !peer.awaiting_write_event {
1543+
if !self.peer_should_read(peer) {
1544+
if !peer.sent_pause_read {
1545+
force_one_write = true;
1546+
}
1547+
} else {
1548+
if peer.sent_pause_read {
1549+
force_one_write = false;
1550+
}
1551+
}
1552+
while force_one_write || !peer.awaiting_write_event {
15421553
if peer.should_buffer_onion_message() {
15431554
if let Some((peer_node_id, _)) = peer.their_node_id {
15441555
let handler = &self.message_handler.onion_message_handler;
@@ -1606,20 +1617,20 @@ where
16061617
let should_read = self.peer_should_read(peer);
16071618
let next_buff = match peer.pending_outbound_buffer.front() {
16081619
None => {
1609-
if force_one_write && !have_written {
1610-
if should_read {
1611-
let data_sent = descriptor.send_data(&[], should_read);
1612-
debug_assert_eq!(data_sent, 0, "Can't write more than no data");
1613-
}
1620+
if force_one_write {
1621+
let data_sent = descriptor.send_data(&[], should_read);
1622+
debug_assert_eq!(data_sent, 0, "Can't write more than no data");
1623+
peer.sent_pause_read = !should_read;
16141624
}
16151625
return;
16161626
},
16171627
Some(buff) => buff,
16181628
};
1629+
force_one_write = false;
16191630

16201631
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
16211632
let data_sent = descriptor.send_data(pending, should_read);
1622-
have_written = true;
1633+
peer.sent_pause_read = !should_read;
16231634
peer.pending_outbound_buffer_first_msg_offset += data_sent;
16241635
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
16251636
peer.pending_outbound_buffer_first_msg_offset = 0;

0 commit comments

Comments
 (0)