Skip to content

Commit e52665e

Browse files
author
Roman S. Borschel
committed
Fix regression w.r.t. reporting of dial errors.
PR [1440] introduced a regression w.r.t. the reporting of dial errors. In particular, if a connection attempt fails due to an invalid remote peer ID, any remaining addresses for the same peer would not be tried (intentional) but the dial failure would not be reported to the behaviour, causing e.g. libp2p-kad queries to potentially stall. In hindsight, I figured it is better to preserve the previous behaviour to still try alternative addresses of the peer even on invalid peer ID errors on an earlier address. In particular because in the context of libp2p-kad it is not uncommon for peers to report localhost addresses while the local node actually has e.g. an ipfs node running on that address, obviously with a different peer ID, which is the scenario causing frequent invalid peer ID (mismatch) errors when running the ipfs-kad example. This commit thus restores the previous behaviour w.r.t. trying all remaining addresses on invalid peer ID errors as well as making sure `inject_dial_error` is always called when the last attempt failed. [1440]: #1440.
1 parent 96cd509 commit e52665e

File tree

8 files changed

+92
-109
lines changed

8 files changed

+92
-109
lines changed

.circleci/config.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ workflows:
77
- test
88
- test-wasm
99
- check-rustdoc-links
10+
- integration-test
1011

1112
jobs:
1213
test:
@@ -90,3 +91,24 @@ jobs:
9091
- ./target
9192
- /usr/local/cargo
9293
- /root/.cache/sccache
94+
95+
integration-test:
96+
docker:
97+
- image: rust
98+
- image: ipfs/go-ipfs
99+
steps:
100+
- checkout
101+
- restore_cache:
102+
key: integration-test-cache-{{ epoch }}
103+
- run:
104+
name: Print Rust version
105+
command: |
106+
rustc --version
107+
- run:
108+
command: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad
109+
- save_cache:
110+
key: integration-test-cache-{{ epoch }}
111+
paths:
112+
- "~/.cargo"
113+
- "./target"
114+

core/src/connection/error.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ pub enum ConnectionError<THandlerErr> {
2929
// TODO: Eventually this should also be a custom error?
3030
IO(io::Error),
3131

32+
/// The connection was dropped because the connection limit
33+
/// for a peer has been reached.
34+
ConnectionLimit(ConnectionLimit),
35+
3236
/// The connection handler produced an error.
3337
Handler(THandlerErr),
3438
}
@@ -44,6 +48,8 @@ where
4448
write!(f, "Connection error: I/O error: {}", err),
4549
ConnectionError::Handler(err) =>
4650
write!(f, "Connection error: Handler error: {}", err),
51+
ConnectionError::ConnectionLimit(l) =>
52+
write!(f, "Connection error: Connection limit: {}.", l)
4753
}
4854
}
4955
}
@@ -57,6 +63,7 @@ where
5763
match self {
5864
ConnectionError::IO(err) => Some(err),
5965
ConnectionError::Handler(err) => Some(err),
66+
ConnectionError::ConnectionLimit(..) => None,
6067
}
6168
}
6269
}
@@ -71,10 +78,6 @@ pub enum PendingConnectionError<TTransErr> {
7178
/// match the one that was expected or is otherwise invalid.
7279
InvalidPeerId,
7380

74-
/// The pending connection was successfully negotiated but dropped
75-
/// because the connection limit for a peer has been reached.
76-
ConnectionLimit(ConnectionLimit),
77-
7881
/// An I/O error occurred on the connection.
7982
// TODO: Eventually this should also be a custom error?
8083
IO(io::Error),
@@ -93,8 +96,6 @@ where
9396
write!(f, "Pending connection: Transport error: {}", err),
9497
PendingConnectionError::InvalidPeerId =>
9598
write!(f, "Pending connection: Invalid peer ID."),
96-
PendingConnectionError::ConnectionLimit(l) =>
97-
write!(f, "Pending connection: Connection limit: {}.", l)
9899
}
99100
}
100101
}
@@ -109,7 +110,6 @@ where
109110
PendingConnectionError::IO(err) => Some(err),
110111
PendingConnectionError::Transport(err) => Some(err),
111112
PendingConnectionError::InvalidPeerId => None,
112-
PendingConnectionError::ConnectionLimit(..) => None,
113113
}
114114
}
115115
}

core/src/connection/pool.rs

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TC
112112
error: PendingConnectionError<TTransErr>,
113113
/// The handler that was supposed to handle the connection,
114114
/// if the connection failed before the handler was consumed.
115-
handler: Option<THandler>,
115+
handler: THandler,
116116
/// The (expected) peer of the failed connection.
117117
peer: Option<TPeerId>,
118118
/// A reference to the pool that managed the connection.
@@ -222,6 +222,7 @@ where
222222
TOutEvent: Send + 'static,
223223
TMuxer: StreamMuxer + Send + Sync + 'static,
224224
TMuxer::OutboundSubstream: Send + 'static,
225+
TPeerId: Clone + Send + 'static,
225226
{
226227
let endpoint = info.to_connected_point();
227228
if let Some(limit) = self.limits.max_pending_incoming {
@@ -263,7 +264,7 @@ where
263264
TOutEvent: Send + 'static,
264265
TMuxer: StreamMuxer + Send + Sync + 'static,
265266
TMuxer::OutboundSubstream: Send + 'static,
266-
TPeerId: Clone,
267+
TPeerId: Clone + Send + 'static,
267268
{
268269
self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?;
269270
let endpoint = info.to_connected_point();
@@ -298,14 +299,32 @@ where
298299
TOutEvent: Send + 'static,
299300
TMuxer: StreamMuxer + Send + Sync + 'static,
300301
TMuxer::OutboundSubstream: Send + 'static,
302+
TPeerId: Clone + Send + 'static,
301303
{
304+
// Validate the received peer ID as the last step of the pending connection
305+
// future, so that these errors can be raised before the `handler` is consumed
306+
// by the background task, which happens when this future resolves to an
307+
// "established" connection.
302308
let future = future.and_then({
303309
let endpoint = endpoint.clone();
310+
let expected_peer = peer.clone();
311+
let local_id = self.local_id.clone();
304312
move |(info, muxer)| {
313+
if let Some(peer) = expected_peer {
314+
if &peer != info.peer_id() {
315+
return future::err(PendingConnectionError::InvalidPeerId)
316+
}
317+
}
318+
319+
if &local_id == info.peer_id() {
320+
return future::err(PendingConnectionError::InvalidPeerId)
321+
}
322+
305323
let connected = Connected { info, endpoint };
306324
future::ready(Ok((connected, muxer)))
307325
}
308326
});
327+
309328
let id = self.manager.add_pending(future, handler);
310329
self.pending.insert(id, (endpoint, peer));
311330
id
@@ -536,7 +555,7 @@ where
536555
PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
537556
> where
538557
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Clone,
539-
TPeerId: Clone,
558+
TPeerId: Clone + fmt::Debug,
540559
{
541560
loop {
542561
let item = match self.manager.poll(cx) {
@@ -551,7 +570,7 @@ where
551570
id,
552571
endpoint,
553572
error,
554-
handler: Some(handler),
573+
handler,
555574
peer,
556575
pool: self
557576
})
@@ -581,40 +600,22 @@ where
581600
.map_or(0, |conns| conns.len());
582601
if let Err(e) = self.limits.check_established(current) {
583602
let connected = entry.close();
584-
return Poll::Ready(PoolEvent::PendingConnectionError {
603+
let num_established = e.current;
604+
return Poll::Ready(PoolEvent::ConnectionError {
585605
id,
586-
endpoint: connected.endpoint,
587-
peer: Some(connected.info.peer_id().clone()),
588-
error: PendingConnectionError::ConnectionLimit(e),
606+
connected,
607+
error: ConnectionError::ConnectionLimit(e),
608+
num_established,
589609
pool: self,
590-
handler: None,
591610
})
592611
}
593-
// Check peer ID.
594-
if let Some(peer) = peer {
595-
if &peer != entry.connected().peer_id() {
596-
let connected = entry.close();
597-
return Poll::Ready(PoolEvent::PendingConnectionError {
598-
id,
599-
endpoint: connected.endpoint,
600-
peer: Some(connected.info.peer_id().clone()),
601-
error: PendingConnectionError::InvalidPeerId,
602-
pool: self,
603-
handler: None,
604-
})
612+
// Peer ID checks must already have happened. See `add_pending`.
613+
if cfg!(debug_assertions) {
614+
assert_ne!(&self.local_id, entry.connected().peer_id());
615+
if let Some(peer) = peer {
616+
assert_eq!(&peer, entry.connected().peer_id());
605617
}
606618
}
607-
if &self.local_id == entry.connected().peer_id() {
608-
let connected = entry.close();
609-
return Poll::Ready(PoolEvent::PendingConnectionError {
610-
id,
611-
endpoint: connected.endpoint,
612-
peer: Some(connected.info.peer_id().clone()),
613-
error: PendingConnectionError::InvalidPeerId,
614-
pool: self,
615-
handler: None,
616-
})
617-
}
618619
// Add the connection to the pool.
619620
let peer = entry.connected().peer_id().clone();
620621
let conns = self.established.entry(peer).or_default();

core/src/network.rs

Lines changed: 14 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ use std::{
5555
error,
5656
fmt,
5757
hash::Hash,
58-
num::NonZeroUsize,
5958
pin::Pin,
6059
task::{Context, Poll},
6160
};
@@ -331,7 +330,7 @@ where
331330
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
332331
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
333332
TConnInfo: Clone,
334-
TPeerId: AsRef<[u8]> + Send + 'static,
333+
TPeerId: fmt::Debug + Send + 'static,
335334
{
336335
// Poll the listener(s) for new connections.
337336
match ListenersStream::poll(Pin::new(&mut self.listeners), cx) {
@@ -383,7 +382,7 @@ where
383382
}
384383
Poll::Ready(PoolEvent::PendingConnectionError { id, endpoint, error, handler, pool, .. }) => {
385384
let dialing = &mut self.dialing;
386-
let (next, event) = on_connection_failed(pool, dialing, id, endpoint, error, handler);
385+
let (next, event) = on_connection_failed(dialing, id, endpoint, error, handler);
387386
if let Some(dial) = next {
388387
let transport = self.listeners.transport().clone();
389388
if let Err(e) = dial_peer_impl(transport, pool, dialing, dial) {
@@ -496,13 +495,11 @@ where
496495
/// If the failed connection attempt was a dialing attempt and there
497496
/// are more addresses to try, new `DialingOpts` are returned.
498497
fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>(
499-
pool: &Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
500-
<THandler::Handler as ConnectionHandler>::Error, TConnInfo, TPeerId>,
501498
dialing: &mut FnvHashMap<TPeerId, peer::DialingAttempt>,
502499
id: ConnectionId,
503500
endpoint: ConnectedPoint,
504501
error: PendingConnectionError<TTrans::Error>,
505-
handler: Option<THandler>,
502+
handler: THandler,
506503
) -> (Option<DialingOpts<TPeerId, THandler>>, NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>)
507504
where
508505
TTrans: Transport,
@@ -518,41 +515,27 @@ where
518515

519516
if let Some(peer_id) = dialing_peer {
520517
// A pending outgoing connection to a known peer failed.
521-
let attempt = dialing.remove(&peer_id).expect("by (1)");
518+
let mut attempt = dialing.remove(&peer_id).expect("by (1)");
522519

523520
let num_remain = attempt.next.len();
524521
let failed_addr = attempt.current.clone();
525522

526-
let new_state = if pool.is_connected(&peer_id) {
527-
peer::PeerState::Connected
528-
} else if num_remain == 0 { // (2)
529-
peer::PeerState::Disconnected
530-
} else {
531-
peer::PeerState::Dialing {
532-
num_pending_addresses: NonZeroUsize::new(num_remain).expect("by (2)"),
533-
}
534-
};
535-
536523
let opts =
537-
if let Some(handler) = handler {
538-
if !attempt.next.is_empty() {
539-
let mut attempt = attempt;
540-
let next_attempt = attempt.next.remove(0);
541-
Some(DialingOpts {
542-
peer: peer_id.clone(),
543-
handler,
544-
address: next_attempt,
545-
remaining: attempt.next
546-
})
547-
} else {
548-
None
549-
}
524+
if num_remain > 0 {
525+
let next_attempt = attempt.next.remove(0);
526+
let opts = DialingOpts {
527+
peer: peer_id.clone(),
528+
handler,
529+
address: next_attempt,
530+
remaining: attempt.next
531+
};
532+
Some(opts)
550533
} else {
551534
None
552535
};
553536

554537
(opts, NetworkEvent::DialError {
555-
new_state,
538+
attempts_remaining: num_remain,
556539
peer_id,
557540
multiaddr: failed_addr,
558541
error,

core/src/network/event.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use crate::{
3939
pool::Pool,
4040
},
4141
muxing::StreamMuxer,
42-
network::peer::PeerState,
4342
transport::{Transport, TransportError},
4443
};
4544
use futures::prelude::*;
@@ -122,8 +121,8 @@ where
122121

123122
/// A dialing attempt to an address of a peer failed.
124123
DialError {
125-
/// New state of a peer.
126-
new_state: PeerState,
124+
/// The number of remaining dialing attempts.
125+
attempts_remaining: usize,
127126

128127
/// Id of the peer we were trying to dial.
129128
peer_id: TPeerId,
@@ -145,7 +144,7 @@ where
145144

146145
/// The handler that was passed to `dial()`, if the
147146
/// connection failed before the handler was consumed.
148-
handler: Option<THandler>,
147+
handler: THandler,
149148
},
150149

151150
/// An established connection produced an event.
@@ -219,9 +218,9 @@ where
219218
.field("error", error)
220219
.finish()
221220
}
222-
NetworkEvent::DialError { new_state, peer_id, multiaddr, error } => {
221+
NetworkEvent::DialError { attempts_remaining, peer_id, multiaddr, error } => {
223222
f.debug_struct("DialError")
224-
.field("new_state", new_state)
223+
.field("attempts_remaining", attempts_remaining)
225224
.field("peer_id", peer_id)
226225
.field("multiaddr", multiaddr)
227226
.field("error", error)

core/src/network/peer.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,27 +42,9 @@ use std::{
4242
error,
4343
fmt,
4444
hash::Hash,
45-
num::NonZeroUsize,
4645
};
4746
use super::{Network, DialingOpts};
4847

49-
/// The state of a (remote) peer as seen by the local peer
50-
/// through a [`Network`].
51-
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
52-
pub enum PeerState {
53-
/// The [`Network`] is connected to the peer, i.e. has at least one
54-
/// established connection.
55-
Connected,
56-
/// We are currently trying to reach this peer.
57-
Dialing {
58-
/// Number of addresses we are trying to dial.
59-
num_pending_addresses: NonZeroUsize,
60-
},
61-
/// The [`Network`] is disconnected from the peer, i.e. has no
62-
/// established connection and no pending, outgoing connection.
63-
Disconnected,
64-
}
65-
6648
/// The possible representations of a peer in a [`Network`], as
6749
/// seen by the local node.
6850
pub enum Peer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>

0 commit comments

Comments
 (0)