Skip to content

Commit c622d61

Browse files
authored
Merge pull request #330 from wprzytula/fix-serial-consistency-handling
Fix serial consistency handling
2 parents ebf4200 + f846baa commit c622d61

File tree

8 files changed

+337
-113
lines changed

8 files changed

+337
-113
lines changed

Makefile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,7 @@ CASSANDRA_NO_VALGRIND_TEST_FILTER := $(subst ${SPACE},${EMPTY},AsyncTests.Integr
121121
endif
122122

123123
ifndef CCM_COMMIT_ID
124-
# TODO: change it back to master/next when https://github.com/scylladb/scylla-ccm/issues/646 is fixed.
125-
export CCM_COMMIT_ID := 5392dd68
124+
export CCM_COMMIT_ID := master
126125
endif
127126

128127
ifndef SCYLLA_VERSION

scylla-rust-wrapper/src/batch.rs

Lines changed: 77 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,16 @@ use crate::argconv::{
33
FFI, FromBox,
44
};
55
use crate::cass_error::CassError;
6-
use crate::cass_types::CassConsistency;
7-
use crate::cass_types::{CassBatchType, make_batch_type};
6+
use crate::cass_types::{CassBatchType, CassConsistency, make_batch_type};
7+
use crate::config_value::MaybeUnsetConfig;
88
use crate::exec_profile::PerStatementExecProfile;
99
use crate::retry_policy::CassRetryPolicy;
1010
use crate::statement::{BoundStatement, CassStatement};
1111
use crate::types::*;
1212
use crate::value::CassCqlValue;
1313
use scylla::statement::batch::Batch;
14+
use scylla::statement::{Consistency, SerialConsistency};
1415
use scylla::value::MaybeUnset;
15-
use std::convert::TryInto;
1616
use std::sync::Arc;
1717

1818
pub struct CassBatch {
@@ -65,14 +65,41 @@ pub unsafe extern "C" fn cass_batch_set_consistency(
6565
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
6666
};
6767

68-
let consistency = match consistency.try_into().ok() {
69-
Some(c) => c,
70-
None => return CassError::CASS_ERROR_LIB_BAD_PARAMS,
68+
let Ok(maybe_set_consistency) = MaybeUnsetConfig::<Consistency>::from_c_value(consistency)
69+
else {
70+
// Invalid consistency value provided.
71+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
7172
};
72-
Arc::make_mut(&mut batch.state)
73-
.batch
74-
.set_consistency(consistency);
7573

74+
match maybe_set_consistency {
75+
MaybeUnsetConfig::Unset => {
76+
// The correct semantics for `CASS_CONSISTENCY_UNKNOWN` is to
77+
// make batch not have any opinion at all about consistency.
78+
// Then, the default from the cluster/execution profile should be used.
79+
// Unfortunately, the Rust Driver does not support
80+
// "unsetting" consistency from a batch at the moment.
81+
//
82+
// FIXME: Implement unsetting consistency in the Rust Driver.
83+
// Then, fix this code.
84+
//
85+
// For now, we will throw an error in order to warn the user
86+
// about this limitation.
87+
tracing::warn!(
88+
"Passed `CASS_CONSISTENCY_UNKNOWN` to `cass_batch_set_consistency`. \
89+
This is not supported by the CPP Rust Driver yet: once you set some consistency \
90+
on a batch, you cannot unset it. This limitation will be fixed in the future. \
91+
As a workaround, you can refrain from setting consistency on a batch, which \
92+
will make the driver use the consistency set on execution profile or cluster level."
93+
);
94+
95+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
96+
}
97+
MaybeUnsetConfig::Set(consistency) => {
98+
Arc::make_mut(&mut batch.state)
99+
.batch
100+
.set_consistency(consistency);
101+
}
102+
};
76103
CassError::CASS_OK
77104
}
78105

@@ -86,13 +113,48 @@ pub unsafe extern "C" fn cass_batch_set_serial_consistency(
86113
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
87114
};
88115

89-
let serial_consistency = match serial_consistency.try_into().ok() {
90-
Some(c) => c,
91-
None => return CassError::CASS_ERROR_LIB_BAD_PARAMS,
116+
// cpp-driver doesn't validate passed value in any way.
117+
// If it is an incorrect serial-consistency value then it will be set
118+
// and sent as-is.
119+
// Before adapting the driver to Rust Driver 0.12 this code
120+
// set serial consistency if a user passed correct value and set it to
121+
// None otherwise.
122+
// I think that failing explicitly is a better idea, so I decided to return
123+
// an error.
124+
let Ok(maybe_set_serial_consistency) =
125+
MaybeUnsetConfig::<Option<SerialConsistency>>::from_c_value(serial_consistency)
126+
else {
127+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
128+
};
129+
130+
match maybe_set_serial_consistency {
131+
MaybeUnsetConfig::Unset => {
132+
// The correct semantics for `CASS_CONSISTENCY_UNKNOWN` is to
133+
// make batch not have any opinion at all about serial consistency.
134+
// Then, the default from the cluster/execution profile should be used.
135+
// Unfortunately, the Rust Driver does not support
136+
// "unsetting" serial consistency from a batch at the moment.
137+
//
138+
// FIXME: Implement unsetting serial consistency in the Rust Driver.
139+
// Then, fix this code.
140+
//
141+
// For now, we will throw an error in order to warn the user
142+
// about this limitation.
143+
tracing::warn!(
144+
"Passed `CASS_CONSISTENCY_UNKNOWN` to `cass_batch_set_serial_consistency`. \
145+
This is not supported by the CPP Rust Driver yet: once you set some serial consistency \
146+
on a batch, you cannot unset it. This limitation will be fixed in the future. \
147+
As a workaround, you can refrain from setting serial consistency on a batch, which \
148+
will make the driver use the serial consistency set on execution profile or cluster level."
149+
);
150+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
151+
}
152+
MaybeUnsetConfig::Set(serial_consistency) => {
153+
Arc::make_mut(&mut batch.state)
154+
.batch
155+
.set_serial_consistency(serial_consistency);
156+
}
92157
};
93-
Arc::make_mut(&mut batch.state)
94-
.batch
95-
.set_serial_consistency(Some(serial_consistency));
96158

97159
CassError::CASS_OK
98160
}

scylla-rust-wrapper/src/cass_types.rs

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ use crate::cass_error::CassError;
33
use crate::types::*;
44
use scylla::cluster::metadata::{CollectionType, NativeType};
55
use scylla::frame::response::result::ColumnType;
6-
use scylla::frame::types::{Consistency, SerialConsistency};
76
use scylla::statement::batch::BatchType;
87
use std::cell::UnsafeCell;
9-
use std::convert::TryFrom;
108
use std::os::raw::c_char;
119
use std::sync::Arc;
1210

@@ -894,39 +892,6 @@ pub unsafe extern "C" fn cass_data_type_add_sub_value_type_by_name_n(
894892
}
895893
}
896894

897-
impl TryFrom<CassConsistency> for Consistency {
898-
type Error = ();
899-
900-
fn try_from(c: CassConsistency) -> Result<Consistency, Self::Error> {
901-
match c {
902-
CassConsistency::CASS_CONSISTENCY_ANY => Ok(Consistency::Any),
903-
CassConsistency::CASS_CONSISTENCY_ONE => Ok(Consistency::One),
904-
CassConsistency::CASS_CONSISTENCY_TWO => Ok(Consistency::Two),
905-
CassConsistency::CASS_CONSISTENCY_THREE => Ok(Consistency::Three),
906-
CassConsistency::CASS_CONSISTENCY_QUORUM => Ok(Consistency::Quorum),
907-
CassConsistency::CASS_CONSISTENCY_ALL => Ok(Consistency::All),
908-
CassConsistency::CASS_CONSISTENCY_LOCAL_QUORUM => Ok(Consistency::LocalQuorum),
909-
CassConsistency::CASS_CONSISTENCY_EACH_QUORUM => Ok(Consistency::EachQuorum),
910-
CassConsistency::CASS_CONSISTENCY_LOCAL_ONE => Ok(Consistency::LocalOne),
911-
CassConsistency::CASS_CONSISTENCY_LOCAL_SERIAL => Ok(Consistency::LocalSerial),
912-
CassConsistency::CASS_CONSISTENCY_SERIAL => Ok(Consistency::Serial),
913-
_ => Err(()),
914-
}
915-
}
916-
}
917-
918-
impl TryFrom<CassConsistency> for SerialConsistency {
919-
type Error = ();
920-
921-
fn try_from(serial: CassConsistency) -> Result<SerialConsistency, Self::Error> {
922-
match serial {
923-
CassConsistency::CASS_CONSISTENCY_SERIAL => Ok(SerialConsistency::Serial),
924-
CassConsistency::CASS_CONSISTENCY_LOCAL_SERIAL => Ok(SerialConsistency::LocalSerial),
925-
_ => Err(()),
926-
}
927-
}
928-
}
929-
930895
pub(crate) fn make_batch_type(type_: CassBatchType) -> Option<BatchType> {
931896
match type_ {
932897
CassBatchType::CASS_BATCH_TYPE_LOGGED => Some(BatchType::Logged),

scylla-rust-wrapper/src/cluster.rs

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::argconv::*;
22
use crate::cass_error::CassError;
33
use crate::cass_types::CassConsistency;
4+
use crate::config_value::MaybeUnsetConfig;
45
use crate::exec_profile::{CassExecProfile, ExecProfileName, exec_profile_builder_modify};
56
use crate::future::CassFuture;
67
use crate::load_balancing::{
@@ -39,6 +40,8 @@ use crate::cass_compression_types::CassCompressionType;
3940
// According to `cassandra.h` the defaults for
4041
// - consistency for statements is LOCAL_ONE,
4142
const DEFAULT_CONSISTENCY: Consistency = Consistency::LocalOne;
43+
// - serial consistency for statements is ANY, which corresponds to None in Rust Driver.
44+
const DEFAULT_SERIAL_CONSISTENCY: Option<SerialConsistency> = None;
4245
// - request client timeout is 12000 millis,
4346
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_millis(12000);
4447
// - fetching schema metadata is true
@@ -173,6 +176,7 @@ pub(crate) fn build_session_builder(
173176
pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr<CassCluster, CMut> {
174177
let default_execution_profile_builder = ExecutionProfileBuilder::default()
175178
.consistency(DEFAULT_CONSISTENCY)
179+
.serial_consistency(DEFAULT_SERIAL_CONSISTENCY)
176180
.request_timeout(Some(DEFAULT_REQUEST_TIMEOUT));
177181

178182
// Default config options - according to cassandra.h
@@ -1410,14 +1414,24 @@ pub unsafe extern "C" fn cass_cluster_set_consistency(
14101414
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
14111415
};
14121416

1413-
let consistency: Consistency = match consistency.try_into() {
1414-
Ok(c) => c,
1415-
Err(_) => return CassError::CASS_ERROR_LIB_BAD_PARAMS,
1417+
let Ok(maybe_set_consistency) = MaybeUnsetConfig::<Consistency>::from_c_value(consistency)
1418+
else {
1419+
// Invalid consistency value provided.
1420+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
14161421
};
14171422

1418-
exec_profile_builder_modify(&mut cluster.default_execution_profile_builder, |builder| {
1419-
builder.consistency(consistency)
1420-
});
1423+
match maybe_set_consistency {
1424+
MaybeUnsetConfig::Unset => {
1425+
// `CASS_CONSISTENCY_UNKNOWN` is not supported in the cluster settings.
1426+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
1427+
}
1428+
MaybeUnsetConfig::Set(consistency) => {
1429+
exec_profile_builder_modify(
1430+
&mut cluster.default_execution_profile_builder,
1431+
|builder| builder.consistency(consistency),
1432+
);
1433+
}
1434+
}
14211435

14221436
CassError::CASS_OK
14231437
}
@@ -1432,14 +1446,25 @@ pub unsafe extern "C" fn cass_cluster_set_serial_consistency(
14321446
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
14331447
};
14341448

1435-
let serial_consistency: SerialConsistency = match serial_consistency.try_into() {
1436-
Ok(c) => c,
1437-
Err(_) => return CassError::CASS_ERROR_LIB_BAD_PARAMS,
1449+
let Ok(maybe_set_serial_consistency) =
1450+
MaybeUnsetConfig::<Option<SerialConsistency>>::from_c_value(serial_consistency)
1451+
else {
1452+
// Invalid serial consistency value provided.
1453+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
14381454
};
14391455

1440-
exec_profile_builder_modify(&mut cluster.default_execution_profile_builder, |builder| {
1441-
builder.serial_consistency(Some(serial_consistency))
1442-
});
1456+
match maybe_set_serial_consistency {
1457+
MaybeUnsetConfig::Unset => {
1458+
// `CASS_CONSISTENCY_UNKNOWN` is not supported in the cluster settings.
1459+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
1460+
}
1461+
MaybeUnsetConfig::Set(serial_consistency) => {
1462+
exec_profile_builder_modify(
1463+
&mut cluster.default_execution_profile_builder,
1464+
|builder| builder.serial_consistency(serial_consistency),
1465+
);
1466+
}
1467+
}
14431468

14441469
CassError::CASS_OK
14451470
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
use scylla::statement::{Consistency, SerialConsistency};
2+
3+
use crate::cass_types::CassConsistency;
4+
5+
/// Represents a configuration value that may or may not be set.
6+
/// If a configuration value is unset, it means that the default value
7+
/// should be used.
8+
pub(crate) enum MaybeUnsetConfig<T> {
9+
Unset,
10+
Set(T),
11+
}
12+
13+
/// Represents types that can be converted from a C value have the special unset value.
14+
/// This is used to handle cases where a configuration value may not be set,
15+
/// allowing the driver to clearly distinguish between an unset value and a set value.
16+
pub(crate) trait MaybeUnsetConfigValue: Sized {
17+
type CValue;
18+
type Error;
19+
20+
/// Checks if the given C value is considered unset.
21+
fn is_unset(cvalue: &Self::CValue) -> bool;
22+
23+
/// Converts a maybe unset C value to a Rust value, returning an error if the value
24+
/// is invalid.
25+
fn from_c_value(cvalue: Self::CValue) -> Result<MaybeUnsetConfig<Self>, Self::Error> {
26+
if Self::is_unset(&cvalue) {
27+
Ok(MaybeUnsetConfig::Unset)
28+
} else {
29+
let rust_value = Self::from_set_c_value(cvalue)?;
30+
Ok(MaybeUnsetConfig::Set(rust_value))
31+
}
32+
}
33+
34+
/// Converts a **set** C value to a Rust value, returning an error if the value
35+
/// is invalid or if the value is unset.
36+
fn from_set_c_value(cvalue: Self::CValue) -> Result<Self, Self::Error>;
37+
}
38+
39+
impl<T: MaybeUnsetConfigValue> MaybeUnsetConfig<T> {
40+
/// Converts a maybe unset C value to a Rust value, returning an error if the value
41+
/// is invalid.
42+
pub(crate) fn from_c_value(cvalue: T::CValue) -> Result<Self, T::Error> {
43+
<T as MaybeUnsetConfigValue>::from_c_value(cvalue)
44+
}
45+
}
46+
47+
impl MaybeUnsetConfigValue for Consistency {
48+
type CValue = CassConsistency;
49+
type Error = ();
50+
51+
fn is_unset(cvalue: &Self::CValue) -> bool {
52+
*cvalue == CassConsistency::CASS_CONSISTENCY_UNKNOWN
53+
}
54+
55+
fn from_set_c_value(cvalue: Self::CValue) -> Result<Self, Self::Error> {
56+
match cvalue {
57+
CassConsistency::CASS_CONSISTENCY_ANY => Ok(Consistency::Any),
58+
CassConsistency::CASS_CONSISTENCY_ONE => Ok(Consistency::One),
59+
CassConsistency::CASS_CONSISTENCY_TWO => Ok(Consistency::Two),
60+
CassConsistency::CASS_CONSISTENCY_THREE => Ok(Consistency::Three),
61+
CassConsistency::CASS_CONSISTENCY_QUORUM => Ok(Consistency::Quorum),
62+
CassConsistency::CASS_CONSISTENCY_ALL => Ok(Consistency::All),
63+
CassConsistency::CASS_CONSISTENCY_LOCAL_QUORUM => Ok(Consistency::LocalQuorum),
64+
CassConsistency::CASS_CONSISTENCY_EACH_QUORUM => Ok(Consistency::EachQuorum),
65+
CassConsistency::CASS_CONSISTENCY_LOCAL_ONE => Ok(Consistency::LocalOne),
66+
CassConsistency::CASS_CONSISTENCY_LOCAL_SERIAL => Ok(Consistency::LocalSerial),
67+
CassConsistency::CASS_CONSISTENCY_SERIAL => Ok(Consistency::Serial),
68+
_ => Err(()),
69+
}
70+
}
71+
}
72+
73+
impl MaybeUnsetConfigValue for Option<SerialConsistency> {
74+
type CValue = CassConsistency;
75+
type Error = ();
76+
77+
fn is_unset(cvalue: &Self::CValue) -> bool {
78+
*cvalue == CassConsistency::CASS_CONSISTENCY_UNKNOWN
79+
}
80+
81+
fn from_set_c_value(cvalue: Self::CValue) -> Result<Self, Self::Error> {
82+
match cvalue {
83+
CassConsistency::CASS_CONSISTENCY_ANY => {
84+
// This is in line with the CPP Driver: if 0 is passed (which is Consistency::Any),
85+
// then serial consistency is not set:
86+
// ```c++
87+
// if (callback->serial_consistency() != 0) {
88+
// flags |= CASS_QUERY_FLAG_SERIAL_CONSISTENCY;
89+
// }
90+
// ```
91+
Ok(None)
92+
}
93+
CassConsistency::CASS_CONSISTENCY_LOCAL_SERIAL => {
94+
Ok(Some(SerialConsistency::LocalSerial))
95+
}
96+
CassConsistency::CASS_CONSISTENCY_SERIAL => Ok(Some(SerialConsistency::Serial)),
97+
_ => Err(()),
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)