Skip to content

Commit 0c8ed93

Browse files
committed
Schema agreement: Precise error handling logic
This changes the logic of schema agreement to allow some error to end the process immediately, without waiting until the timeout. For now the error clasification is not done with a lot of effort, just to have something that doesn't treat transient errors as non-transient, but also classify some errors as non-transient.
1 parent 4feb673 commit 0c8ed93

File tree

1 file changed

+104
-16
lines changed

1 file changed

+104
-16
lines changed

scylla/src/client/session.rs

Lines changed: 104 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,18 @@ use futures::future::join_all;
4646
use futures::future::try_join_all;
4747
use itertools::Itertools;
4848
use scylla_cql::frame::response::NonErrorResponseWithDeserializedMetadata;
49+
use scylla_cql::frame::response::error::DbError;
4950
use scylla_cql::serialize::batch::BatchValues;
5051
use scylla_cql::serialize::row::{SerializeRow, SerializedValues};
5152
use std::borrow::Borrow;
5253
use std::future::Future;
5354
use std::net::{IpAddr, SocketAddr};
5455
use std::num::NonZeroU32;
56+
use std::ops::ControlFlow;
5557
use std::sync::Arc;
5658
use std::time::Duration;
5759
use tokio::time::timeout;
58-
#[cfg(feature = "unstable-cloud")]
59-
use tracing::warn;
60-
use tracing::{Instrument, debug, error, trace, trace_span};
60+
use tracing::{Instrument, debug, error, trace, trace_span, warn};
6161
use uuid::Uuid;
6262

6363
pub(crate) const TABLET_CHANNEL_SIZE: usize = 8192;
@@ -2209,6 +2209,74 @@ impl Session {
22092209
self.await_schema_agreement_with_required_node(None).await
22102210
}
22112211

2212+
fn classify_schema_check_error(error: &SchemaAgreementError) -> ControlFlow<()> {
2213+
#[deny(clippy::wildcard_enum_match_arm)]
2214+
match error {
2215+
// Unexpected format (type, row count, frame type etc) or deserialization error of response.
2216+
// It should not happen, but if it did it indicates a serious issue.
2217+
SchemaAgreementError::SingleRowError(_)
2218+
| SchemaAgreementError::TracesEventsIntoRowsResultError(_) => ControlFlow::Break(()),
2219+
2220+
// Should not be possible - we create this error only after returning here.
2221+
// Let's not panic, but log a warning so that it gets noticed.
2222+
SchemaAgreementError::Timeout(_) => {
2223+
warn!("Unexpected schema agreement error type: {}", error);
2224+
ControlFlow::Break(())
2225+
}
2226+
2227+
// Definitely a transient error.
2228+
SchemaAgreementError::ConnectionPoolError(_)
2229+
| SchemaAgreementError::RequiredHostAbsent(_) => ControlFlow::Continue(()),
2230+
2231+
SchemaAgreementError::RequestError(request_attempt_error) => {
2232+
#[deny(clippy::wildcard_enum_match_arm)]
2233+
match request_attempt_error {
2234+
// No idea if those are transient or even possible.
2235+
// To be safe, treat them as transient for now.
2236+
RequestAttemptError::SerializationError(_)
2237+
| RequestAttemptError::CqlRequestSerialization(_)
2238+
| RequestAttemptError::UnableToAllocStreamId
2239+
| RequestAttemptError::BrokenConnectionError(_)
2240+
| RequestAttemptError::BodyExtensionsParseError(_)
2241+
| RequestAttemptError::CqlResultParseError(_)
2242+
| RequestAttemptError::CqlErrorParseError(_)
2243+
| RequestAttemptError::UnexpectedResponse(_)
2244+
| RequestAttemptError::RepreparedIdChanged { .. }
2245+
| RequestAttemptError::RepreparedIdMissingInBatch
2246+
| RequestAttemptError::NonfinishedPagingState => ControlFlow::Continue(()),
2247+
2248+
#[deny(clippy::wildcard_enum_match_arm)]
2249+
RequestAttemptError::DbError(db_error, _) => match db_error {
2250+
// Those errors should not happen, but if they did, something is
2251+
// really wrong. Let's return early.
2252+
DbError::SyntaxError
2253+
| DbError::Invalid
2254+
| DbError::AlreadyExists { .. }
2255+
| DbError::FunctionFailure { .. }
2256+
| DbError::AuthenticationError
2257+
| DbError::Unauthorized
2258+
| DbError::ConfigError
2259+
| DbError::TruncateError
2260+
| DbError::ProtocolError => ControlFlow::Break(()),
2261+
2262+
DbError::Unavailable { .. }
2263+
| DbError::Overloaded
2264+
| DbError::IsBootstrapping
2265+
| DbError::ReadTimeout { .. }
2266+
| DbError::WriteTimeout { .. }
2267+
| DbError::ReadFailure { .. }
2268+
| DbError::WriteFailure { .. }
2269+
| DbError::Unprepared { .. }
2270+
| DbError::ServerError
2271+
| DbError::RateLimitReached { .. }
2272+
| DbError::Other(_)
2273+
| _ => ControlFlow::Continue(()),
2274+
},
2275+
}
2276+
}
2277+
}
2278+
}
2279+
22122280
/// Awaits schema agreement among all reachable nodes.
22132281
///
22142282
/// Issues an agreement check each `Session::schema_agreement_interval`.
@@ -2225,30 +2293,50 @@ impl Session {
22252293
// Some(Ok(())): Last attempt successful, without agreement
22262294
// Some(Err(_)): Last attempt failed
22272295
let mut last_agreement_failure: Option<Result<(), SchemaAgreementError>> = None;
2228-
timeout(self.schema_agreement_timeout, async {
2296+
// The future passed to timeout returns either Ok(Uuid) if agreement was
2297+
// reached, or Err(SchemaAgreementError) if there was an error that should
2298+
// stop the waiting before timeout.
2299+
let agreement_result = timeout(self.schema_agreement_timeout, async {
22292300
loop {
22302301
let result = self
22312302
.check_schema_agreement_with_required_node(required_node)
22322303
.await;
22332304
match result {
2234-
Ok(Some(agreed_version)) => return agreed_version,
2305+
Ok(Some(agreed_version)) => return Ok(agreed_version),
22352306
Ok(None) => last_agreement_failure = Some(Ok(())),
2236-
Err(err) => last_agreement_failure = Some(Err(err)),
2307+
Err(err) => {
2308+
let decision = Self::classify_schema_check_error(&err);
2309+
match decision {
2310+
ControlFlow::Continue(_) => {
2311+
last_agreement_failure = Some(Err(err));
2312+
}
2313+
ControlFlow::Break(_) => return Err(err),
2314+
}
2315+
}
22372316
}
22382317
tokio::time::sleep(self.schema_agreement_interval).await;
22392318
}
22402319
})
2241-
.await
2242-
.map_err(|_| {
2243-
match last_agreement_failure {
2244-
// There were no finished attempts - the only error we can return is Timeout.
2245-
None => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2246-
// If the last finished attempt resulted in an error, this error will be more informative than Timeout.
2247-
Some(Err(err)) => err,
2248-
// This is the canonical case for timeout - last attempt finished successfully, but without agreement.
2249-
Some(Ok(())) => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2320+
.await;
2321+
match agreement_result {
2322+
Err(_timeout) => {
2323+
// Timeout occured. Either all attempts returned possibly-transient errors,
2324+
// or just did not reach agreement in time.
2325+
let effective_error = match last_agreement_failure {
2326+
// There were no finished attempts - the only error we can return is Timeout.
2327+
None => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2328+
// If the last finished attempt resulted in an error, this error will be more informative than Timeout.
2329+
Some(Err(err)) => err,
2330+
// This is the canonical case for timeout - last attempt finished successfully, but without agreement.
2331+
Some(Ok(())) => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2332+
};
2333+
Err(effective_error)
22502334
}
2251-
})
2335+
// Agreement encountered a non-transient error, we must return it.
2336+
Ok(Err(inner_error)) => Err(inner_error),
2337+
// Agreement successfull
2338+
Ok(Ok(uuid)) => Ok(uuid),
2339+
}
22522340
}
22532341

22542342
/// Checks if all reachable nodes have the same schema version.

0 commit comments

Comments
 (0)