Skip to content

Commit 3db1ec1

Browse files
committed
identify: Move I/O from Networkehaviour,
reply to the Identification requests from the ConnectionHandler.
1 parent d703ba6 commit 3db1ec1

File tree

2 files changed

+67
-3
lines changed

2 files changed

+67
-3
lines changed

protocols/identify/src/behaviour.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,12 @@ impl NetworkBehaviour for Behaviour {
261261
score: AddressScore::Finite(1),
262262
});
263263
}
264+
handler::Event::Identification(peer) => {
265+
self.events
266+
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Sent {
267+
peer_id: peer,
268+
}));
269+
}
264270
handler::Event::IdentificationPushed => {
265271
self.events
266272
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed {

protocols/identify/src/handler.rs

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use libp2p_swarm::{
3636
};
3737
use log::warn;
3838
use smallvec::SmallVec;
39+
use std::collections::VecDeque;
3940
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};
4041

4142
pub struct Proto {
@@ -64,6 +65,17 @@ impl IntoConnectionHandler for Proto {
6465
}
6566
}
6667

68+
/// A pending reply to an inbound identification request.
69+
enum Pending {
70+
/// The reply is queued for sending.
71+
Queued(Reply),
72+
/// The reply is being sent.
73+
Sending {
74+
peer: PeerId,
75+
io: Pin<Box<dyn Future<Output = Result<(), UpgradeError>> + Send>>,
76+
},
77+
}
78+
6779
/// A reply to an inbound identification request.
6880
#[derive(Debug)]
6981
pub struct Reply {
@@ -90,6 +102,9 @@ pub struct Handler {
90102
>; 4],
91103
>,
92104

105+
/// Pending replies to send.
106+
pending_replies: VecDeque<Pending>,
107+
93108
/// Future that fires when we need to identify the node again.
94109
trigger_next_identify: Delay,
95110

@@ -106,11 +121,13 @@ pub struct Handler {
106121
pub enum Event {
107122
/// We obtained identification information from the remote.
108123
Identified(Info),
124+
/// We replied to an identification request from the remote.
125+
Identification(PeerId),
109126
/// We actively pushed our identification information to the remote.
110127
IdentificationPushed,
111128
/// We received a request for identification.
112129
Identify(ReplySubstream<NegotiatedSubstream>),
113-
/// Failed to identify the remote.
130+
/// Failed to identify the remote, or to reply to an identification request.
114131
IdentificationError(ConnectionHandlerUpgrErr<UpgradeError>),
115132
}
116133

@@ -130,6 +147,7 @@ impl Handler {
130147
remote_peer_id,
131148
inbound_identify_push: Default::default(),
132149
events: SmallVec::new(),
150+
pending_replies: VecDeque::new(),
133151
trigger_next_identify: Delay::new(initial_delay),
134152
keep_alive: KeepAlive::Yes,
135153
interval,
@@ -231,8 +249,15 @@ impl ConnectionHandler for Handler {
231249
),
232250
});
233251
}
234-
InEvent::Identify(_) => {
235-
todo!()
252+
InEvent::Identify(reply) => {
253+
if !self.pending_replies.is_empty() {
254+
warn!(
255+
"New inbound identify request from {} while a previous one \
256+
is still pending. Queueing the new one.",
257+
reply.peer,
258+
);
259+
}
260+
self.pending_replies.push_back(Pending::Queued(reply));
236261
}
237262
}
238263
}
@@ -275,6 +300,39 @@ impl ConnectionHandler for Handler {
275300
}
276301
}
277302

303+
// Check for pending replies to send.
304+
if let Some(mut pending) = self.pending_replies.pop_front() {
305+
loop {
306+
match pending {
307+
Pending::Queued(Reply { peer, io, info }) => {
308+
let io = Box::pin(io.send(info));
309+
pending = Pending::Sending { peer, io };
310+
}
311+
Pending::Sending { peer, mut io } => {
312+
match Future::poll(Pin::new(&mut io), cx) {
313+
Poll::Pending => {
314+
self.pending_replies
315+
.push_front(Pending::Sending { peer, io });
316+
return Poll::Pending;
317+
}
318+
Poll::Ready(Ok(())) => {
319+
return Poll::Ready(ConnectionHandlerEvent::Custom(
320+
Event::Identification(peer),
321+
));
322+
}
323+
Poll::Ready(Err(err)) => {
324+
return Poll::Ready(ConnectionHandlerEvent::Custom(
325+
Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade(
326+
libp2p_core::upgrade::UpgradeError::Apply(err),
327+
)),
328+
))
329+
}
330+
}
331+
}
332+
}
333+
}
334+
}
335+
278336
Poll::Pending
279337
}
280338

0 commit comments

Comments
 (0)