Skip to content

Commit 6ac2ec3

Browse files
authored
Merge pull request #197 from muzarski/use-cluster-lbp-for-exec-profiles-by-default
exec_profile: Use cluster lbp for exec profiles by default
2 parents 8238086 + dea239a commit 6ac2ec3

File tree

3 files changed

+53
-24
lines changed

3 files changed

+53
-24
lines changed

scylla-rust-wrapper/src/cluster.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,24 +56,27 @@ const DRIVER_VERSION: &str = env!("CARGO_PKG_VERSION");
5656
pub(crate) struct LoadBalancingConfig {
5757
pub(crate) token_awareness_enabled: bool,
5858
pub(crate) token_aware_shuffling_replicas_enabled: bool,
59-
pub(crate) dc_awareness: Option<DcAwareness>,
59+
pub(crate) load_balancing_kind: Option<LoadBalancingKind>,
6060
pub(crate) latency_awareness_enabled: bool,
6161
pub(crate) latency_awareness_builder: LatencyAwarenessBuilder,
6262
}
6363
impl LoadBalancingConfig {
6464
// This is `async` to prevent running this function from beyond tokio context,
6565
// as it results in panic due to DefaultPolicyBuilder::build() spawning a tokio task.
6666
pub(crate) async fn build(self) -> Arc<dyn LoadBalancingPolicy> {
67+
let load_balancing_kind = self
68+
.load_balancing_kind
69+
// Round robin is chosen by default for cluster wide LBP.
70+
.unwrap_or(LoadBalancingKind::RoundRobin);
71+
6772
let mut builder = DefaultPolicyBuilder::new().token_aware(self.token_awareness_enabled);
6873
if self.token_awareness_enabled {
6974
// Cpp-driver enables shuffling replicas only if token aware routing is enabled.
7075
builder =
7176
builder.enable_shuffling_replicas(self.token_aware_shuffling_replicas_enabled);
7277
}
73-
if let Some(dc_awareness) = self.dc_awareness.as_ref() {
74-
builder = builder
75-
.prefer_datacenter(dc_awareness.local_dc.clone())
76-
.permit_dc_failover(true)
78+
if let LoadBalancingKind::DcAware { local_dc } = load_balancing_kind {
79+
builder = builder.prefer_datacenter(local_dc).permit_dc_failover(true)
7780
}
7881
if self.latency_awareness_enabled {
7982
builder = builder.latency_awareness(self.latency_awareness_builder);
@@ -86,16 +89,17 @@ impl Default for LoadBalancingConfig {
8689
Self {
8790
token_awareness_enabled: true,
8891
token_aware_shuffling_replicas_enabled: true,
89-
dc_awareness: None,
92+
load_balancing_kind: None,
9093
latency_awareness_enabled: false,
9194
latency_awareness_builder: Default::default(),
9295
}
9396
}
9497
}
9598

9699
#[derive(Clone, Debug)]
97-
pub(crate) struct DcAwareness {
98-
pub(crate) local_dc: String,
100+
pub(crate) enum LoadBalancingKind {
101+
RoundRobin,
102+
DcAware { local_dc: String },
99103
}
100104

101105
#[derive(Clone)]
@@ -486,7 +490,7 @@ pub unsafe extern "C" fn cass_cluster_set_credentials_n(
486490
#[no_mangle]
487491
pub unsafe extern "C" fn cass_cluster_set_load_balance_round_robin(cluster_raw: *mut CassCluster) {
488492
let cluster = ptr_to_ref_mut(cluster_raw);
489-
cluster.load_balancing_config.dc_awareness = None;
493+
cluster.load_balancing_config.load_balancing_kind = Some(LoadBalancingKind::RoundRobin);
490494
}
491495

492496
#[no_mangle]
@@ -525,7 +529,7 @@ pub(crate) unsafe fn set_load_balance_dc_aware_n(
525529
.unwrap()
526530
.to_string();
527531

528-
load_balancing_config.dc_awareness = Some(DcAwareness { local_dc });
532+
load_balancing_config.load_balancing_kind = Some(LoadBalancingKind::DcAware { local_dc });
529533

530534
CassError::CASS_OK
531535
}
@@ -880,7 +884,7 @@ mod tests {
880884
/* Test valid configurations */
881885
let cluster = ptr_to_ref(cluster_raw);
882886
{
883-
assert_matches!(cluster.load_balancing_config.dc_awareness, None);
887+
assert_matches!(cluster.load_balancing_config.load_balancing_kind, None);
884888
assert!(cluster.load_balancing_config.token_awareness_enabled);
885889
assert!(!cluster.load_balancing_config.latency_awareness_enabled);
886890
}
@@ -907,8 +911,13 @@ mod tests {
907911
40,
908912
);
909913

910-
let dc_awareness = cluster.load_balancing_config.dc_awareness.as_ref().unwrap();
911-
assert_eq!(dc_awareness.local_dc, "eu");
914+
let load_balancing_kind = &cluster.load_balancing_config.load_balancing_kind;
915+
match load_balancing_kind {
916+
Some(LoadBalancingKind::DcAware { local_dc }) => {
917+
assert_eq!(local_dc, "eu")
918+
}
919+
_ => panic!("Expected preferred dc"),
920+
}
912921
assert!(!cluster.load_balancing_config.token_awareness_enabled);
913922
assert!(cluster.load_balancing_config.latency_awareness_enabled);
914923
}

scylla-rust-wrapper/src/exec_profile.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::argconv::{free_boxed, ptr_to_cstr_n, ptr_to_ref, ptr_to_ref_mut, strl
1717
use crate::batch::CassBatch;
1818
use crate::cass_error::CassError;
1919
use crate::cass_types::CassConsistency;
20-
use crate::cluster::{set_load_balance_dc_aware_n, LoadBalancingConfig};
20+
use crate::cluster::{set_load_balance_dc_aware_n, LoadBalancingConfig, LoadBalancingKind};
2121
use crate::retry_policy::CassRetryPolicy;
2222
use crate::retry_policy::RetryPolicy::{
2323
DefaultRetryPolicy, DowngradingConsistencyRetryPolicy, FallthroughRetryPolicy,
@@ -42,10 +42,19 @@ impl CassExecProfile {
4242
}
4343
}
4444

45-
pub(crate) async fn build(self) -> ExecutionProfile {
46-
self.inner
47-
.load_balancing_policy(self.load_balancing_config.build().await)
48-
.build()
45+
pub(crate) async fn build(
46+
self,
47+
cluster_default_profile: &ExecutionProfile,
48+
) -> ExecutionProfile {
49+
let load_balacing = if self.load_balancing_config.load_balancing_kind.is_some() {
50+
self.load_balancing_config.build().await
51+
} else {
52+
// If load balancing config does not have LB kind defined,
53+
// we make use of cluster's LBP.
54+
cluster_default_profile.get_load_balancing_policy().clone()
55+
};
56+
57+
self.inner.load_balancing_policy(load_balacing).build()
4958
}
5059
}
5160

@@ -353,7 +362,7 @@ pub unsafe extern "C" fn cass_execution_profile_set_load_balance_round_robin(
353362
profile: *mut CassExecProfile,
354363
) -> CassError {
355364
let profile_builder = ptr_to_ref_mut(profile);
356-
profile_builder.load_balancing_config.dc_awareness = None;
365+
profile_builder.load_balancing_config.load_balancing_kind = Some(LoadBalancingKind::RoundRobin);
357366

358367
CassError::CASS_OK
359368
}
@@ -473,7 +482,7 @@ mod tests {
473482
/* Test valid configurations */
474483
let profile = ptr_to_ref(profile_raw);
475484
{
476-
assert_matches!(profile.load_balancing_config.dc_awareness, None);
485+
assert_matches!(profile.load_balancing_config.load_balancing_kind, None);
477486
assert!(profile.load_balancing_config.token_awareness_enabled);
478487
assert!(!profile.load_balancing_config.latency_awareness_enabled);
479488
}
@@ -500,8 +509,13 @@ mod tests {
500509
40,
501510
);
502511

503-
let dc_awareness = profile.load_balancing_config.dc_awareness.as_ref().unwrap();
504-
assert_eq!(dc_awareness.local_dc, "eu");
512+
let load_balancing_kind = &profile.load_balancing_config.load_balancing_kind;
513+
match load_balancing_kind {
514+
Some(LoadBalancingKind::DcAware { local_dc }) => {
515+
assert_eq!(local_dc, "eu")
516+
}
517+
_ => panic!("Expected preferred dc"),
518+
}
505519
assert!(!profile.load_balancing_config.token_awareness_enabled);
506520
assert!(profile.load_balancing_config.latency_awareness_enabled);
507521
}

scylla-rust-wrapper/src/session.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,18 @@ impl CassSessionInner {
109109
"Already connecting, closing, or connected".msg(),
110110
));
111111
}
112+
113+
let mut session_builder = session_builder_fut.await;
114+
let default_profile = session_builder
115+
.config
116+
.default_execution_profile_handle
117+
.to_profile();
118+
112119
let mut exec_profile_map = HashMap::with_capacity(exec_profile_builder_map.len());
113120
for (name, builder) in exec_profile_builder_map {
114-
exec_profile_map.insert(name, builder.build().await.into_handle());
121+
exec_profile_map.insert(name, builder.build(&default_profile).await.into_handle());
115122
}
116123

117-
let mut session_builder = session_builder_fut.await;
118124
if let Some(keyspace) = keyspace {
119125
session_builder = session_builder.use_keyspace(keyspace, false);
120126
}

0 commit comments

Comments
 (0)