Skip to content

Commit d703ba6

Browse files
committed
Identify: start moving I/O from NetworkBehaviour,
instead of responding pending replies from NetworkBehaviour, send them back to ConnectionHandler. ConnectionHandler for now just receives them, it's implementation of the responding will come next.
1 parent cff7c4a commit d703ba6

File tree

2 files changed

+58
-81
lines changed

2 files changed

+58
-81
lines changed

protocols/identify/src/behaviour.rs

Lines changed: 27 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21-
use crate::handler::{self, Proto, Push};
21+
use crate::handler::{self, InEvent, Proto, Reply};
2222
use crate::protocol::{Info, ReplySubstream, UpgradeError};
23-
use futures::prelude::*;
2423
use libp2p_core::{
2524
connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey,
2625
};
@@ -35,7 +34,6 @@ use std::num::NonZeroUsize;
3534
use std::{
3635
collections::{HashMap, HashSet, VecDeque},
3736
iter::FromIterator,
38-
pin::Pin,
3937
task::Context,
4038
task::Poll,
4139
time::Duration,
@@ -51,8 +49,8 @@ pub struct Behaviour {
5149
config: Config,
5250
/// For each peer we're connected to, the observed address to send back to it.
5351
connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
54-
/// Pending replies to send.
55-
pending_replies: VecDeque<Reply>,
52+
/// Pending requests to respond.
53+
requests: VecDeque<Request>,
5654
/// Pending events to be emitted when polled.
5755
events: VecDeque<NetworkBehaviourAction<Event, Proto>>,
5856
/// Peers to which an active push with current information about
@@ -63,18 +61,10 @@ pub struct Behaviour {
6361
}
6462

6563
/// A pending reply to an inbound identification request.
66-
enum Reply {
67-
/// The reply is queued for sending.
68-
Queued {
69-
peer: PeerId,
70-
io: ReplySubstream<NegotiatedSubstream>,
71-
observed: Multiaddr,
72-
},
73-
/// The reply is being sent.
74-
Sending {
75-
peer: PeerId,
76-
io: Pin<Box<dyn Future<Output = Result<(), UpgradeError>> + Send>>,
77-
},
64+
struct Request {
65+
peer: PeerId,
66+
io: ReplySubstream<NegotiatedSubstream>,
67+
observed: Multiaddr,
7868
}
7969

8070
/// Configuration for the [`identify::Behaviour`](Behaviour).
@@ -184,7 +174,7 @@ impl Behaviour {
184174
Self {
185175
config,
186176
connected: HashMap::new(),
187-
pending_replies: VecDeque::new(),
177+
requests: VecDeque::new(),
188178
events: VecDeque::new(),
189179
pending_push: HashSet::new(),
190180
discovered_peers,
@@ -287,7 +277,7 @@ impl NetworkBehaviour for Behaviour {
287277
with an established connection and calling `NetworkBehaviour::on_event` \
288278
with `FromSwarm::ConnectionEstablished ensures there is an entry; qed",
289279
);
290-
self.pending_replies.push_back(Reply::Queued {
280+
self.requests.push_back(Request {
291281
peer: peer_id,
292282
io: sender,
293283
observed: observed.clone(),
@@ -305,7 +295,7 @@ impl NetworkBehaviour for Behaviour {
305295

306296
fn poll(
307297
&mut self,
308-
cx: &mut Context<'_>,
298+
_cx: &mut Context<'_>,
309299
params: &mut impl PollParameters,
310300
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
311301
if let Some(event) = self.events.pop_front() {
@@ -333,7 +323,7 @@ impl NetworkBehaviour for Behaviour {
333323
observed_addr,
334324
};
335325

336-
(*peer, Push(info))
326+
(*peer, InEvent::Push(info))
337327
})
338328
});
339329

@@ -346,55 +336,21 @@ impl NetworkBehaviour for Behaviour {
346336
});
347337
}
348338

349-
// Check for pending replies to send.
350-
if let Some(r) = self.pending_replies.pop_front() {
351-
let mut sending = 0;
352-
let to_send = self.pending_replies.len() + 1;
353-
let mut reply = Some(r);
354-
loop {
355-
match reply {
356-
Some(Reply::Queued { peer, io, observed }) => {
357-
let info = Info {
358-
listen_addrs: listen_addrs(params),
359-
protocols: supported_protocols(params),
360-
public_key: self.config.local_public_key.clone(),
361-
protocol_version: self.config.protocol_version.clone(),
362-
agent_version: self.config.agent_version.clone(),
363-
observed_addr: observed,
364-
};
365-
let io = Box::pin(io.send(info));
366-
reply = Some(Reply::Sending { peer, io });
367-
}
368-
Some(Reply::Sending { peer, mut io }) => {
369-
sending += 1;
370-
match Future::poll(Pin::new(&mut io), cx) {
371-
Poll::Ready(Ok(())) => {
372-
let event = Event::Sent { peer_id: peer };
373-
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
374-
}
375-
Poll::Pending => {
376-
self.pending_replies.push_back(Reply::Sending { peer, io });
377-
if sending == to_send {
378-
// All remaining futures are NotReady
379-
break;
380-
} else {
381-
reply = self.pending_replies.pop_front();
382-
}
383-
}
384-
Poll::Ready(Err(err)) => {
385-
let event = Event::Error {
386-
peer_id: peer,
387-
error: ConnectionHandlerUpgrErr::Upgrade(
388-
libp2p_core::upgrade::UpgradeError::Apply(err),
389-
),
390-
};
391-
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
392-
}
393-
}
394-
}
395-
None => unreachable!(),
396-
}
397-
}
339+
// Check for pending requests to send back to the handler for reply.
340+
if let Some(Request { peer, io, observed }) = self.requests.pop_front() {
341+
let info = Info {
342+
listen_addrs: listen_addrs(params),
343+
protocols: supported_protocols(params),
344+
public_key: self.config.local_public_key.clone(),
345+
protocol_version: self.config.protocol_version.clone(),
346+
agent_version: self.config.agent_version.clone(),
347+
observed_addr: observed,
348+
};
349+
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
350+
peer_id: peer,
351+
handler: NotifyHandler::Any,
352+
event: InEvent::Identify(Reply { peer, info, io }),
353+
});
398354
}
399355

400356
Poll::Pending
@@ -557,6 +513,7 @@ impl PeerCache {
557513
mod tests {
558514
use super::*;
559515
use futures::pin_mut;
516+
use futures::prelude::*;
560517
use libp2p::mplex::MplexConfig;
561518
use libp2p::noise;
562519
use libp2p::tcp;

protocols/identify/src/handler.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ impl IntoConnectionHandler for Proto {
6464
}
6565
}
6666

67+
/// A reply to an inbound identification request.
68+
#[derive(Debug)]
69+
pub struct Reply {
70+
pub peer: PeerId,
71+
pub io: ReplySubstream<NegotiatedSubstream>,
72+
pub info: Info,
73+
}
74+
6775
/// Protocol handler for sending and receiving identification requests.
6876
///
6977
/// Outbound requests are sent periodically. The handler performs expects
@@ -106,9 +114,14 @@ pub enum Event {
106114
IdentificationError(ConnectionHandlerUpgrErr<UpgradeError>),
107115
}
108116

109-
/// Identifying information of the local node that is pushed to a remote.
110117
#[derive(Debug)]
111-
pub struct Push(pub Info);
118+
#[allow(clippy::large_enum_variant)]
119+
pub enum InEvent {
120+
/// Identifying information of the local node that is pushed to a remote.
121+
Push(Info),
122+
/// Identifying information requested from this node.
123+
Identify(Reply),
124+
}
112125

113126
impl Handler {
114127
/// Creates a new `Handler`.
@@ -195,7 +208,7 @@ impl Handler {
195208
}
196209

197210
impl ConnectionHandler for Handler {
198-
type InEvent = Push;
211+
type InEvent = InEvent;
199212
type OutEvent = Event;
200213
type Error = io::Error;
201214
type InboundProtocol = SelectUpgrade<Protocol, PushProtocol<InboundPush>>;
@@ -207,14 +220,21 @@ impl ConnectionHandler for Handler {
207220
SubstreamProtocol::new(SelectUpgrade::new(Protocol, PushProtocol::inbound()), ())
208221
}
209222

210-
fn on_behaviour_event(&mut self, Push(push): Self::InEvent) {
211-
self.events
212-
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
213-
protocol: SubstreamProtocol::new(
214-
EitherUpgrade::B(PushProtocol::outbound(push)),
215-
(),
216-
),
217-
});
223+
fn on_behaviour_event(&mut self, event: Self::InEvent) {
224+
match event {
225+
InEvent::Push(push) => {
226+
self.events
227+
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
228+
protocol: SubstreamProtocol::new(
229+
EitherUpgrade::B(PushProtocol::outbound(push)),
230+
(),
231+
),
232+
});
233+
}
234+
InEvent::Identify(_) => {
235+
todo!()
236+
}
237+
}
218238
}
219239

220240
fn connection_keep_alive(&self) -> KeepAlive {

0 commit comments

Comments
 (0)