Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions lib/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::auth::{ClientCertificate, ConnectionTLSConfig, MutualTLS};
use crate::errors::{Error, Result};
use backon::ExponentialBuilder;
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
use serde::{Deserialize, Deserializer, Serialize};
use std::path::Path;
Expand Down Expand Up @@ -67,6 +68,87 @@ pub struct LiveConfig {
pub(crate) fetch_size: usize,
}

#[derive(Debug, Clone, PartialEq)]
pub struct BackoffConfig {
pub(crate) multiplier: Option<f32>,
pub(crate) min_delay_ms: Option<u64>,
pub(crate) max_delay_ms: Option<u64>,
pub(crate) total_delay_ms: Option<u64>,
}

impl Default for BackoffConfig {
fn default() -> Self {
BackoffConfig {
multiplier: Some(2.0),
min_delay_ms: Some(1),
max_delay_ms: Some(10000),
total_delay_ms: Some(60000),
}
}
}

impl BackoffConfig {
pub fn to_exponential_builder(&self) -> ExponentialBuilder {
ExponentialBuilder::new()
.with_jitter()
.with_factor(self.multiplier.unwrap_or(2.0))
.without_max_times()
.with_min_delay(std::time::Duration::from_millis(
self.min_delay_ms.unwrap_or(1),
))
.with_max_delay(std::time::Duration::from_millis(
self.max_delay_ms.unwrap_or(10_000),
))
.with_total_delay(Some(std::time::Duration::from_millis(
self.total_delay_ms.unwrap_or(60_000),
)))
}
}

#[derive(Default)]
pub struct BackoffConfigBuilder {
multiplier: Option<f32>,
min_delay_ms: Option<u64>,
max_delay_ms: Option<u64>,
total_delay_ms: Option<u64>,
}

#[allow(dead_code)]
impl BackoffConfigBuilder {
pub fn new() -> Self {
Self::default()
}

pub fn with_multiplier(mut self, multiplier: f32) -> Self {
self.multiplier = Some(multiplier);
self
}

pub fn with_min_delay_ms(mut self, min_delay_ms: u64) -> Self {
self.min_delay_ms = Some(min_delay_ms);
self
}

pub fn with_max_delay_ms(mut self, max_delay_ms: u64) -> Self {
self.max_delay_ms = Some(max_delay_ms);
self
}

pub fn with_total_delay_ms(mut self, max_total_delay_ms: Option<u64>) -> Self {
self.total_delay_ms = max_total_delay_ms;
self
}

pub fn build(self) -> BackoffConfig {
BackoffConfig {
multiplier: self.multiplier,
min_delay_ms: self.min_delay_ms,
max_delay_ms: self.max_delay_ms,
total_delay_ms: self.total_delay_ms,
}
}
}

/// The configuration used to connect to the database, see [`crate::Graph::connect`].
#[derive(Debug, Clone)]
pub struct Config {
Expand All @@ -77,6 +159,7 @@ pub struct Config {
pub(crate) db: Option<Database>,
pub(crate) fetch_size: usize,
pub(crate) tls_config: ConnectionTLSConfig,
pub(crate) backoff: Option<BackoffConfig>,
}

impl Config {
Expand All @@ -97,6 +180,7 @@ pub struct ConfigBuilder {
fetch_size: usize,
max_connections: usize,
tls_config: ConnectionTLSConfig,
backoff_config: Option<BackoffConfig>,
}

impl ConfigBuilder {
Expand Down Expand Up @@ -178,6 +262,11 @@ impl ConfigBuilder {
self
}

pub fn with_backoff(mut self, backoff: Option<BackoffConfig>) -> Self {
self.backoff_config = backoff;
self
}

pub fn build(self) -> Result<Config> {
if let (Some(uri), Some(user), Some(password)) = (self.uri, self.user, self.password) {
Ok(Config {
Expand All @@ -188,6 +277,7 @@ impl ConfigBuilder {
max_connections: self.max_connections,
db: self.db,
tls_config: self.tls_config,
backoff: self.backoff_config,
})
} else {
Err(Error::InvalidConfig)
Expand All @@ -205,6 +295,7 @@ impl Default for ConfigBuilder {
max_connections: DEFAULT_MAX_CONNECTIONS,
fetch_size: DEFAULT_FETCH_SIZE,
tls_config: ConnectionTLSConfig::None,
backoff_config: Some(BackoffConfig::default()),
}
}
}
Expand All @@ -222,6 +313,7 @@ mod tests {
.db("some_db")
.fetch_size(10)
.max_connections(5)
.with_backoff(None)
.build()
.unwrap();
assert_eq!(config.uri, "127.0.0.1:7687");
Expand All @@ -231,6 +323,7 @@ mod tests {
assert_eq!(config.fetch_size, 10);
assert_eq!(config.max_connections, 5);
assert_eq!(config.tls_config, ConnectionTLSConfig::None);
assert_eq!(config.backoff, None);
}

#[test]
Expand All @@ -248,6 +341,8 @@ mod tests {
assert_eq!(config.fetch_size, 200);
assert_eq!(config.max_connections, 16);
assert_eq!(config.tls_config, ConnectionTLSConfig::None);
assert!(config.backoff.is_some());
assert_eq!(config.backoff.as_ref().unwrap(), &BackoffConfig::default());
}

#[test]
Expand All @@ -257,6 +352,9 @@ mod tests {
.user("some_user")
.password("some_password")
.skip_ssl_validation()
.with_backoff(Some(
BackoffConfigBuilder::new().with_multiplier(2.0).build(),
))
.build()
.unwrap();
assert_eq!(config.uri, "127.0.0.1:7687");
Expand All @@ -266,6 +364,8 @@ mod tests {
assert_eq!(config.fetch_size, 200);
assert_eq!(config.max_connections, 16);
assert_eq!(config.tls_config, ConnectionTLSConfig::NoSSLValidation);
assert!(config.backoff.is_some());
assert_eq!(config.backoff.as_ref().unwrap().multiplier.unwrap(), 2.0);
}

#[test]
Expand Down
38 changes: 23 additions & 15 deletions lib/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl ConnectionPoolManager {
}
}

fn backoff(&self) -> ExponentialBuilder {
fn backoff(&self) -> Option<ExponentialBuilder> {
match self {
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
Routed(manager) => manager.backoff(),
Expand Down Expand Up @@ -233,13 +233,17 @@ impl Graph {
) -> Result<RunResult> {
let query = query.into_retryable(db, operation, &self.pool, None);

let (query, result) = RetryableQuery::retry_run
.retry(self.pool.backoff())
.sleep(tokio::time::sleep)
.context(query)
.when(|e| matches!(e, Retry::Yes(_)))
.notify(Self::log_retry)
.await;
let (query, result) = if let Some(exponential_backoff) = self.pool.backoff() {
RetryableQuery::retry_run
.retry(exponential_backoff)
.sleep(tokio::time::sleep)
.context(query)
.when(|e| matches!(e, Retry::Yes(_)))
.notify(Self::log_retry)
.await
} else {
query.retry_run().await
};

match result {
Ok(result) => {
Expand Down Expand Up @@ -331,13 +335,17 @@ impl Graph {
) -> Result<DetachedRowStream> {
let query = query.into_retryable(db, operation, &self.pool, Some(self.config.fetch_size));

let (query, result) = RetryableQuery::retry_execute
.retry(self.pool.backoff())
.sleep(tokio::time::sleep)
.context(query)
.when(|e| matches!(e, Retry::Yes(_)))
.notify(Self::log_retry)
.await;
let (_, result) = if let Some(exponential_backoff) = self.pool.backoff() {
RetryableQuery::retry_execute
.retry(exponential_backoff)
.sleep(tokio::time::sleep)
.context(query)
.when(|e| matches!(e, Retry::Yes(_)))
.notify(Self::log_retry)
.await
} else {
query.retry_execute().await
};

result.map_err(Retry::into_inner)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ mod types;
mod version;

pub use crate::auth::ClientCertificate;
pub use crate::config::{Config, ConfigBuilder, Database};
pub use crate::config::{BackoffConfig, BackoffConfigBuilder, Config, ConfigBuilder, Database};
pub use crate::errors::{
Error, Neo4jClientErrorKind, Neo4jError, Neo4jErrorKind, Neo4jSecurityErrorKind, Result,
};
Expand Down
21 changes: 6 additions & 15 deletions lib/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::time::Duration;

use crate::auth::ConnectionTLSConfig;
use crate::config::BackoffConfig;
use crate::{
config::Config,
connection::{Connection, ConnectionInfo},
Expand All @@ -15,7 +14,7 @@ pub type ManagedConnection = Object<ConnectionManager>;

pub struct ConnectionManager {
info: ConnectionInfo,
backoff: ExponentialBuilder,
backoff: Option<ExponentialBuilder>,
}

impl ConnectionManager {
Expand All @@ -24,27 +23,18 @@ impl ConnectionManager {
user: &str,
password: &str,
tls_config: &ConnectionTLSConfig,
backoff_config: Option<&BackoffConfig>,
) -> Result<Self> {
let info = ConnectionInfo::new(uri, user, password, tls_config)?;
let backoff = backoff();
let backoff = backoff_config.map(|backoff_config| backoff_config.to_exponential_builder());
Ok(ConnectionManager { info, backoff })
}

pub fn backoff(&self) -> ExponentialBuilder {
pub fn backoff(&self) -> Option<ExponentialBuilder> {
self.backoff
}
}

pub(crate) fn backoff() -> ExponentialBuilder {
ExponentialBuilder::new()
.with_jitter()
.with_factor(2.0)
.without_max_times()
.with_min_delay(Duration::from_millis(1))
.with_max_delay(Duration::from_secs(10))
.with_total_delay(Some(Duration::from_secs(60)))
}

impl Manager for ConnectionManager {
type Type = Connection;
type Error = Error;
Expand All @@ -66,6 +56,7 @@ pub fn create_pool(config: &Config) -> Result<ConnectionPool> {
&config.user,
&config.password,
&config.tls_config,
config.backoff.as_ref(),
)?;
info!(
"creating connection pool for node {} with max size {}",
Expand Down
3 changes: 3 additions & 0 deletions lib/src/routing/connection_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ async fn refresh_routing_table(
server.clone(),
create_pool(&Config {
uri,
backoff: None,
..config.clone()
})?,
);
Expand Down Expand Up @@ -418,6 +419,7 @@ mod tests {
db: None,
fetch_size: 200,
tls_config: ConnectionTLSConfig::None,
backoff: None,
};
let registry = Arc::new(ConnectionRegistry::default());
let ttl = refresh_all_routing_tables(
Expand Down Expand Up @@ -539,6 +541,7 @@ mod tests {
db: None,
fetch_size: 200,
tls_config: ConnectionTLSConfig::None,
backoff: None,
};
let registry = Arc::new(ConnectionRegistry::default());
// get registry for db1 amd refresh routing table
Expand Down
12 changes: 9 additions & 3 deletions lib/src/routing/routed_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@ pub struct RoutedConnectionManager {
load_balancing_strategy: Arc<dyn LoadBalancingStrategy>,
connection_registry: Arc<ConnectionRegistry>,
bookmarks: Arc<Mutex<Vec<String>>>,
backoff: ExponentialBuilder,
backoff: Option<ExponentialBuilder>,
channel: Sender<RegistryCommand>,
}

const ROUTING_TABLE_MAX_WAIT_TIME_MS: i32 = 5000;

impl RoutedConnectionManager {
pub fn new(config: &Config, provider: Arc<dyn RoutingTableProvider>) -> Result<Self, Error> {
let backoff = crate::pool::backoff();
// backoff config should be set to None here, since the routing table updater will handle retries
// We could provide some configuration to "force" the retry mechanism in a clustered env,
// but for now we will turn it off
let backoff = config
.backoff
.clone()
.map(|config| config.to_exponential_builder());
let connection_registry = Arc::new(ConnectionRegistry::default());
let channel = start_background_updater(config, connection_registry.clone(), provider);
Ok(RoutedConnectionManager {
Expand Down Expand Up @@ -131,7 +137,7 @@ impl RoutedConnectionManager {
}
}

pub(crate) fn backoff(&self) -> ExponentialBuilder {
pub(crate) fn backoff(&self) -> Option<ExponentialBuilder> {
self.backoff
}

Expand Down
Loading