Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/client-core/src/cli_helpers/client_add_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ where
available_gateways,
#[cfg(unix)]
connection_fd_callback: None,
connect_timeout: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure connection timeout is a propery of a GatewaySetup (I don't like connection_fd_callback being there either). but if it has to be there, maybe it should be wrapped into some sort of config (alongside that callback)?

};

let init_details =
Expand Down
1 change: 1 addition & 0 deletions common/client-core/src/cli_helpers/client_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ where
available_gateways,
#[cfg(unix)]
connection_fd_callback: None,
connect_timeout: None,
};

let init_details =
Expand Down
13 changes: 13 additions & 0 deletions common/client-core/src/client/base_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use std::fmt::Debug;
use std::os::raw::c_int as RawFd;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::mpsc::Sender;
use url::Url;
Expand Down Expand Up @@ -230,6 +231,7 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {

#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,

derivation_material: Option<DerivationMaterial>,
}
Expand Down Expand Up @@ -258,6 +260,7 @@ where
setup_method: GatewaySetup::MustLoad { gateway_id: None },
#[cfg(unix)]
connection_fd_callback: None,
connect_timeout: None,
derivation_material: None,
}
}
Expand Down Expand Up @@ -356,6 +359,11 @@ where
self
}

pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would it be easier for you if rather than setting connection timeout as a separate thing of the client, you put it as part of the base client config. gateway_connection section inside the DebugConfig sounds like the perfect place for it, wouldn't you say?

self.connect_timeout = Some(timeout);
self
}

// note: do **NOT** make this method public as its only valid usage is from within `start_base`
// because it relies on the crypto keys being already loaded
fn mix_address(details: &InitialisationResult) -> Recipient {
Expand Down Expand Up @@ -533,6 +541,7 @@ where
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and then I guess (with my previous comment in mind), all of those extra changes could be avoided

shutdown_tracker: &ShutdownTracker,
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
where
Expand Down Expand Up @@ -577,6 +586,7 @@ where
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
shutdown_tracker.clone_shutdown_token(),
)
};
Expand Down Expand Up @@ -640,6 +650,7 @@ where
packet_router: PacketRouter,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
shutdown_tracker: &ShutdownTracker,
) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
where
Expand Down Expand Up @@ -672,6 +683,7 @@ where
stats_reporter,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
shutdown_tracker,
)
.await?;
Expand Down Expand Up @@ -1074,6 +1086,7 @@ where
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
self.connect_timeout,
&shutdown_tracker.child_tracker(),
)
.await?;
Expand Down
2 changes: 2 additions & 0 deletions common/client-core/src/init/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,15 @@ pub(super) async fn register_with_gateway(
gateway_listener: Url,
our_identity: Arc<ed25519::KeyPair>,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
) -> Result<RegistrationResult, ClientCoreError> {
let mut gateway_client = GatewayClient::new_init(
gateway_listener,
gateway_id,
our_identity.clone(),
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
);

gateway_client.establish_connection().await.map_err(|err| {
Expand Down
5 changes: 5 additions & 0 deletions common/client-core/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use nym_topology::node::RoutingNode;
use rand::rngs::OsRng;
use rand::{CryptoRng, RngCore};
use serde::Serialize;
use std::time::Duration;
#[cfg(unix)]
use std::{os::fd::RawFd, sync::Arc};

Expand Down Expand Up @@ -56,6 +57,7 @@ async fn setup_new_gateway<K, D>(
selection_specification: GatewaySelectionSpecification,
available_gateways: Vec<RoutingNode>,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
) -> Result<InitialisationResult, ClientCoreError>
where
K: KeyStore,
Expand Down Expand Up @@ -117,6 +119,7 @@ where
our_identity,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
)
.await?;
(
Expand Down Expand Up @@ -213,6 +216,7 @@ where
available_gateways,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
} => {
tracing::debug!("GatewaySetup::New with spec: {specification:?}");
setup_new_gateway(
Expand All @@ -222,6 +226,7 @@ where
available_gateways,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
)
.await
}
Expand Down
6 changes: 6 additions & 0 deletions common/client-core/src/init/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::fmt::{Debug, Display};
#[cfg(unix)]
use std::os::fd::RawFd;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use url::Url;

Expand Down Expand Up @@ -214,6 +215,9 @@ pub enum GatewaySetup {
/// Callback useful for allowing initial connection to gateway
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,

/// Timeout for establishing connection
connect_timeout: Option<Duration>,
},

ReuseConnection {
Expand All @@ -239,6 +243,7 @@ impl Debug for GatewaySetup {
available_gateways,
#[cfg(unix)]
connection_fd_callback: _,
connect_timeout: _,
} => f
.debug_struct("GatewaySetup::New")
.field("specification", specification)
Expand Down Expand Up @@ -280,6 +285,7 @@ impl GatewaySetup {
available_gateways: vec![],
#[cfg(unix)]
connection_fd_callback: None,
connect_timeout: None,
}
}

Expand Down
12 changes: 11 additions & 1 deletion common/client-libs/gateway-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use url::Url;

#[cfg(unix)]
use std::os::fd::RawFd;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::sleep;

Expand Down Expand Up @@ -104,10 +105,13 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
// currently unused (but populated)
negotiated_protocol: Option<u8>,

// Callback on the fd as soon as the connection has been established
/// Callback on the fd as soon as the connection has been established
#[cfg(unix)]
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,

/// Maximum duration to wait for a connection to be established when set
connect_timeout: Option<Duration>,

/// Listen to shutdown messages and send notifications back to the task manager
shutdown_token: ShutdownToken,
}
Expand All @@ -124,6 +128,7 @@ impl<C, St> GatewayClient<C, St> {
bandwidth_controller: Option<BandwidthController<C, St>>,
stats_reporter: ClientStatsSender,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
shutdown_token: ShutdownToken,
) -> Self {
GatewayClient {
Expand All @@ -141,6 +146,7 @@ impl<C, St> GatewayClient<C, St> {
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
shutdown_token,
}
}
Expand Down Expand Up @@ -208,6 +214,7 @@ impl<C, St> GatewayClient<C, St> {
&self.gateway_address,
#[cfg(unix)]
self.connection_fd_callback.clone(),
self.connect_timeout,
)
.await?;

Expand Down Expand Up @@ -1132,6 +1139,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
gateway_identity: ed25519::PublicKey,
local_identity: Arc<ed25519::KeyPair>,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
) -> Self {
log::trace!("Initialising gateway client");
use futures::channel::mpsc;
Expand All @@ -1158,6 +1166,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
negotiated_protocol: None,
#[cfg(unix)]
connection_fd_callback,
connect_timeout,
shutdown_token,
}
}
Expand Down Expand Up @@ -1190,6 +1199,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
negotiated_protocol: self.negotiated_protocol,
#[cfg(unix)]
connection_fd_callback: self.connection_fd_callback,
connect_timeout: self.connect_timeout,
shutdown_token,
}
}
Expand Down
19 changes: 18 additions & 1 deletion common/client-libs/gateway-client/src/client/websockets.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::error::GatewayClientError;

use nym_http_api_client::HickoryDnsResolver;
use std::time::Duration;
#[cfg(unix)]
use std::{
os::fd::{AsRawFd, RawFd},
Expand All @@ -17,6 +18,7 @@ use std::net::SocketAddr;
pub(crate) async fn connect_async(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while going through the code, I've noticed we have identical connect_async function in common/client-core/src/init/websockets.rs (don't ask me why - I wish I knew). and actually when you go to where it is being called (the duplicate), it's already wrapped in a timeout - perhaps we could remove some duplication there?

endpoint: &str,
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
connect_timeout: Option<Duration>,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
use tokio::net::TcpSocket;

Expand Down Expand Up @@ -64,7 +66,22 @@ pub(crate) async fn connect_async(
callback.as_ref()(socket.as_raw_fd());
}

match socket.connect(sock_addr).await {
let connect_res = if let Some(connect_timeout) = connect_timeout {
match tokio::time::timeout(connect_timeout, socket.connect(sock_addr)).await {
Ok(res) => res,
Err(_elapsed) => {
stream = Err(GatewayClientError::NetworkConnectionTimeout {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't existing GatewayConnectionTimeout variant be more appropriate?

address: endpoint.to_owned(),
timeout: connect_timeout,
});
continue;
}
}
} else {
socket.connect(sock_addr).await
};

match connect_res {
Ok(s) => {
stream = Ok(s);
break;
Expand Down
4 changes: 4 additions & 0 deletions common/client-libs/gateway-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use nym_gateway_requests::registration::handshake::error::HandshakeError;
use nym_gateway_requests::{GatewayRequestsError, SimpleGatewayRequestsError};
use std::io;
use std::time::Duration;
use thiserror::Error;
use tungstenite::Error as WsError;

Expand Down Expand Up @@ -46,6 +47,9 @@ pub enum GatewayClientError {
source: Box<WsError>,
},

#[error("timeout when establishing connection: {address}, timeout: {timeout:?}")]
NetworkConnectionTimeout { address: String, timeout: Duration },

#[error("no socket address for endpoint: {address}")]
NoEndpointForConnection { address: String },

Expand Down
25 changes: 23 additions & 2 deletions common/wasm/client-core/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use nym_topology::{EpochRewardedSet, NymTopology, RoutingNode};
use nym_validator_client::client::IdentityKey;
use nym_validator_client::{nym_api::NymApiClientExt, UserAgent};
use rand::thread_rng;
use std::time::Duration;
use url::Url;
use wasm_bindgen::prelude::wasm_bindgen;
use wasm_bindgen_futures::future_to_promise;
Expand Down Expand Up @@ -127,6 +128,7 @@ pub async fn setup_gateway_wasm(
force_tls: bool,
chosen_gateway: Option<IdentityKey>,
gateways: Vec<RoutingNode>,
connect_timeout: Option<Duration>,
) -> Result<InitialisationResult, WasmCoreError> {
// TODO: so much optimization and extra features could be added here, but that's for the future

Expand All @@ -144,6 +146,7 @@ pub async fn setup_gateway_wasm(
GatewaySetup::New {
specification: selection_spec,
available_gateways: gateways,
connect_timeout,
}
};

Expand All @@ -159,6 +162,7 @@ pub async fn setup_gateway_from_api(
nym_apis: &[Url],
minimum_performance: u8,
ignore_epoch_roles: bool,
connect_timeout: Option<Duration>,
) -> Result<InitialisationResult, WasmCoreError> {
let gateways = gateways_for_init(
nym_apis,
Expand All @@ -168,7 +172,14 @@ pub async fn setup_gateway_from_api(
None,
)
.await?;
setup_gateway_wasm(client_store, force_tls, chosen_gateway, gateways).await
setup_gateway_wasm(
client_store,
force_tls,
chosen_gateway,
gateways,
connect_timeout,
)
.await
}

pub async fn current_gateways_wasm(
Expand All @@ -192,9 +203,17 @@ pub async fn setup_from_topology(
force_tls: bool,
topology: &NymTopology,
client_store: &ClientStorage,
connect_timeout: Option<Duration>,
) -> Result<InitialisationResult, WasmCoreError> {
let gateways = topology.entry_capable_nodes().cloned().collect::<Vec<_>>();
setup_gateway_wasm(client_store, force_tls, explicit_gateway, gateways).await
setup_gateway_wasm(
client_store,
force_tls,
explicit_gateway,
gateways,
connect_timeout,
)
.await
}

pub async fn generate_new_client_keys(store: &ClientStorage) -> Result<(), WasmCoreError> {
Expand All @@ -213,6 +232,7 @@ pub async fn add_gateway(
min_performance: u8,
ignore_epoch_roles: bool,
storage: &ClientStorage,
connect_timeout: Option<Duration>,
) -> Result<(), WasmCoreError> {
let selection_spec = GatewaySelectionSpecification::new(
preferred_gateway.clone(),
Expand Down Expand Up @@ -267,6 +287,7 @@ pub async fn add_gateway(
let gateway_setup = GatewaySetup::New {
specification: selection_spec,
available_gateways,
connect_timeout,
};

let init_details = setup_gateway(gateway_setup, storage, storage).await?;
Expand Down
Loading
Loading