Skip to content

Commit 7f7b701

Browse files
committed
Add connect timeout
1 parent 9856198 commit 7f7b701

File tree

15 files changed

+89
-5
lines changed

15 files changed

+89
-5
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/client-core/src/cli_helpers/client_add_gateway.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ where
141141
available_gateways,
142142
#[cfg(unix)]
143143
connection_fd_callback: None,
144+
connect_timeout: None,
144145
};
145146

146147
let init_details =

common/client-core/src/cli_helpers/client_init.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ where
189189
available_gateways,
190190
#[cfg(unix)]
191191
connection_fd_callback: None,
192+
connect_timeout: None,
192193
};
193194

194195
let init_details =

common/client-core/src/client/base_client/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ use std::fmt::Debug;
6464
use std::os::raw::c_int as RawFd;
6565
use std::path::Path;
6666
use std::sync::Arc;
67+
use std::time::Duration;
6768
use time::OffsetDateTime;
6869
use tokio::sync::mpsc::Sender;
6970
use tracing::*;
@@ -204,6 +205,7 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
204205

205206
#[cfg(unix)]
206207
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
208+
connect_timeout: Option<Duration>,
207209

208210
derivation_material: Option<DerivationMaterial>,
209211
}
@@ -230,6 +232,7 @@ where
230232
setup_method: GatewaySetup::MustLoad { gateway_id: None },
231233
#[cfg(unix)]
232234
connection_fd_callback: None,
235+
connect_timeout: None,
233236
derivation_material: None,
234237
}
235238
}
@@ -477,6 +480,7 @@ where
477480
packet_router: PacketRouter,
478481
stats_reporter: ClientStatsSender,
479482
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
483+
connect_timeout: Option<Duration>,
480484
shutdown_tracker: &ShutdownTracker,
481485
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
482486
where
@@ -521,6 +525,7 @@ where
521525
stats_reporter,
522526
#[cfg(unix)]
523527
connection_fd_callback,
528+
connect_timeout,
524529
shutdown_tracker.clone_shutdown_token(),
525530
)
526531
};
@@ -584,6 +589,7 @@ where
584589
packet_router: PacketRouter,
585590
stats_reporter: ClientStatsSender,
586591
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
592+
connect_timeout: Option<Duration>,
587593
shutdown_tracker: &ShutdownTracker,
588594
) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
589595
where
@@ -616,6 +622,7 @@ where
616622
stats_reporter,
617623
#[cfg(unix)]
618624
connection_fd_callback,
625+
connect_timeout,
619626
shutdown_tracker,
620627
)
621628
.await?;
@@ -956,6 +963,7 @@ where
956963
stats_reporter.clone(),
957964
#[cfg(unix)]
958965
self.connection_fd_callback,
966+
self.connect_timeout,
959967
&shutdown_tracker.child_tracker(),
960968
)
961969
.await?;

common/client-core/src/init/helpers.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,13 +374,15 @@ pub(super) async fn register_with_gateway(
374374
gateway_listener: Url,
375375
our_identity: Arc<ed25519::KeyPair>,
376376
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
377+
connect_timeout: Option<Duration>,
377378
) -> Result<RegistrationResult, ClientCoreError> {
378379
let mut gateway_client = GatewayClient::new_init(
379380
gateway_listener,
380381
gateway_id,
381382
our_identity.clone(),
382383
#[cfg(unix)]
383384
connection_fd_callback,
385+
connect_timeout,
384386
);
385387

386388
gateway_client.establish_connection().await.map_err(|err| {

common/client-core/src/init/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use nym_topology::node::RoutingNode;
2323
use rand::rngs::OsRng;
2424
use rand::{CryptoRng, RngCore};
2525
use serde::Serialize;
26+
use std::time::Duration;
2627
#[cfg(unix)]
2728
use std::{os::fd::RawFd, sync::Arc};
2829

@@ -56,6 +57,7 @@ async fn setup_new_gateway<K, D>(
5657
selection_specification: GatewaySelectionSpecification,
5758
available_gateways: Vec<RoutingNode>,
5859
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
60+
connect_timeout: Option<Duration>,
5961
) -> Result<InitialisationResult, ClientCoreError>
6062
where
6163
K: KeyStore,
@@ -117,6 +119,7 @@ where
117119
our_identity,
118120
#[cfg(unix)]
119121
connection_fd_callback,
122+
connect_timeout,
120123
)
121124
.await?;
122125
(
@@ -213,6 +216,7 @@ where
213216
available_gateways,
214217
#[cfg(unix)]
215218
connection_fd_callback,
219+
connect_timeout,
216220
} => {
217221
tracing::debug!("GatewaySetup::New with spec: {specification:?}");
218222
setup_new_gateway(
@@ -222,6 +226,7 @@ where
222226
available_gateways,
223227
#[cfg(unix)]
224228
connection_fd_callback,
229+
connect_timeout,
225230
)
226231
.await
227232
}

common/client-core/src/init/types.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::fmt::{Debug, Display};
2121
#[cfg(unix)]
2222
use std::os::fd::RawFd;
2323
use std::sync::Arc;
24+
use std::time::Duration;
2425
use time::OffsetDateTime;
2526
use url::Url;
2627

@@ -214,6 +215,9 @@ pub enum GatewaySetup {
214215
/// Callback useful for allowing initial connection to gateway
215216
#[cfg(unix)]
216217
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
218+
219+
/// Timeout for establishing connection
220+
connect_timeout: Option<Duration>,
217221
},
218222

219223
ReuseConnection {
@@ -239,6 +243,7 @@ impl Debug for GatewaySetup {
239243
available_gateways,
240244
#[cfg(unix)]
241245
connection_fd_callback: _,
246+
connect_timeout: _,
242247
} => f
243248
.debug_struct("GatewaySetup::New")
244249
.field("specification", specification)
@@ -280,6 +285,7 @@ impl GatewaySetup {
280285
available_gateways: vec![],
281286
#[cfg(unix)]
282287
connection_fd_callback: None,
288+
connect_timeout: None,
283289
}
284290
}
285291

common/client-libs/gateway-client/src/client/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use url::Url;
3838

3939
#[cfg(unix)]
4040
use std::os::fd::RawFd;
41+
use std::time::Duration;
4142
#[cfg(not(target_arch = "wasm32"))]
4243
use tokio::time::sleep;
4344

@@ -104,10 +105,13 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
104105
// currently unused (but populated)
105106
negotiated_protocol: Option<u8>,
106107

107-
// Callback on the fd as soon as the connection has been established
108+
/// Callback on the fd as soon as the connection has been established
108109
#[cfg(unix)]
109110
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
110111

112+
/// Maximum duration to wait for a connection to be established when set
113+
connect_timeout: Option<Duration>,
114+
111115
/// Listen to shutdown messages and send notifications back to the task manager
112116
shutdown_token: ShutdownToken,
113117
}
@@ -124,6 +128,7 @@ impl<C, St> GatewayClient<C, St> {
124128
bandwidth_controller: Option<BandwidthController<C, St>>,
125129
stats_reporter: ClientStatsSender,
126130
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
131+
connect_timeout: Option<Duration>,
127132
shutdown_token: ShutdownToken,
128133
) -> Self {
129134
GatewayClient {
@@ -141,6 +146,7 @@ impl<C, St> GatewayClient<C, St> {
141146
negotiated_protocol: None,
142147
#[cfg(unix)]
143148
connection_fd_callback,
149+
connect_timeout,
144150
shutdown_token,
145151
}
146152
}
@@ -208,6 +214,7 @@ impl<C, St> GatewayClient<C, St> {
208214
&self.gateway_address,
209215
#[cfg(unix)]
210216
self.connection_fd_callback.clone(),
217+
self.connect_timeout,
211218
)
212219
.await?;
213220

@@ -1132,6 +1139,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
11321139
gateway_identity: ed25519::PublicKey,
11331140
local_identity: Arc<ed25519::KeyPair>,
11341141
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
1142+
connect_timeout: Option<Duration>,
11351143
) -> Self {
11361144
log::trace!("Initialising gateway client");
11371145
use futures::channel::mpsc;
@@ -1158,6 +1166,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
11581166
negotiated_protocol: None,
11591167
#[cfg(unix)]
11601168
connection_fd_callback,
1169+
connect_timeout,
11611170
shutdown_token,
11621171
}
11631172
}
@@ -1190,6 +1199,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
11901199
negotiated_protocol: self.negotiated_protocol,
11911200
#[cfg(unix)]
11921201
connection_fd_callback: self.connection_fd_callback,
1202+
connect_timeout: self.connect_timeout,
11931203
shutdown_token,
11941204
}
11951205
}

common/client-libs/gateway-client/src/client/websockets.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::error::GatewayClientError;
22

33
use nym_http_api_client::HickoryDnsResolver;
4+
use std::time::Duration;
45
#[cfg(unix)]
56
use std::{
67
os::fd::{AsRawFd, RawFd},
@@ -17,6 +18,7 @@ use std::net::SocketAddr;
1718
pub(crate) async fn connect_async(
1819
endpoint: &str,
1920
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
21+
connect_timeout: Option<Duration>,
2022
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
2123
use tokio::net::TcpSocket;
2224

@@ -64,7 +66,22 @@ pub(crate) async fn connect_async(
6466
callback.as_ref()(socket.as_raw_fd());
6567
}
6668

67-
match socket.connect(sock_addr).await {
69+
let connect_res = if let Some(connect_timeout) = connect_timeout {
70+
match tokio::time::timeout(connect_timeout, socket.connect(sock_addr)).await {
71+
Ok(res) => res,
72+
Err(_elapsed) => {
73+
stream = Err(GatewayClientError::NetworkConnectionTimeout {
74+
address: endpoint.to_owned(),
75+
timeout: connect_timeout,
76+
});
77+
continue;
78+
}
79+
}
80+
} else {
81+
socket.connect(sock_addr).await
82+
};
83+
84+
match connect_res {
6885
Ok(s) => {
6986
stream = Ok(s);
7087
break;

common/client-libs/gateway-client/src/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use nym_gateway_requests::registration::handshake::error::HandshakeError;
55
use nym_gateway_requests::{GatewayRequestsError, SimpleGatewayRequestsError};
66
use std::io;
7+
use std::time::Duration;
78
use thiserror::Error;
89
use tungstenite::Error as WsError;
910

@@ -46,6 +47,9 @@ pub enum GatewayClientError {
4647
source: Box<WsError>,
4748
},
4849

50+
#[error("timeout when establishing connection: {address}, timeout: {timeout:?}")]
51+
NetworkConnectionTimeout { address: String, timeout: Duration },
52+
4953
#[error("no socket address for endpoint: {address}")]
5054
NoEndpointForConnection { address: String },
5155

0 commit comments

Comments
 (0)