Skip to content

Commit 0cce75e

Browse files
committed
Schema agreement: Precise error handling logic
This changes the logic of schema agreement to allow some error to end the process immediately, without waiting until the timeout. For now the error clasification is not done with a lot of effort, just to have something that doesn't treat transient errors as non-transient, but also classify some errors as non-transient.
1 parent 884ca70 commit 0cce75e

File tree

1 file changed

+108
-16
lines changed

1 file changed

+108
-16
lines changed

scylla/src/client/session.rs

Lines changed: 108 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,18 @@ use futures::future::join_all;
4646
use futures::future::try_join_all;
4747
use itertools::Itertools;
4848
use scylla_cql::frame::response::NonErrorResponseWithDeserializedMetadata;
49+
use scylla_cql::frame::response::error::DbError;
4950
use scylla_cql::serialize::batch::BatchValues;
5051
use scylla_cql::serialize::row::{SerializeRow, SerializedValues};
5152
use std::borrow::Borrow;
5253
use std::future::Future;
5354
use std::net::{IpAddr, SocketAddr};
5455
use std::num::NonZeroU32;
56+
use std::ops::ControlFlow;
5557
use std::sync::Arc;
5658
use std::time::Duration;
5759
use tokio::time::timeout;
58-
#[cfg(feature = "unstable-cloud")]
59-
use tracing::warn;
60-
use tracing::{Instrument, debug, error, trace, trace_span};
60+
use tracing::{Instrument, debug, error, trace, trace_span, warn};
6161
use uuid::Uuid;
6262

6363
pub(crate) const TABLET_CHANNEL_SIZE: usize = 8192;
@@ -2209,6 +2209,78 @@ impl Session {
22092209
self.await_schema_agreement_with_required_node(None).await
22102210
}
22112211

2212+
/// Decides if an error should result in await_schema_agreement stopping immediately,
2213+
/// or of its fine to try again (after schema agreement interval).
2214+
/// The errors that should stop immediately are non-transient ones, for which
2215+
/// there is no hope that a retry will succeed.
2216+
fn classify_schema_check_error(error: &SchemaAgreementError) -> ControlFlow<()> {
2217+
#[deny(clippy::wildcard_enum_match_arm)]
2218+
match error {
2219+
// Unexpected format (type, row count, frame type etc) or deserialization error of response.
2220+
// It should not happen, but if it did it indicates a serious issue.
2221+
SchemaAgreementError::SingleRowError(_)
2222+
| SchemaAgreementError::TracesEventsIntoRowsResultError(_) => ControlFlow::Break(()),
2223+
2224+
// Should not be possible - we create this error only after returning here.
2225+
// Let's not panic, but log a warning so that it gets noticed.
2226+
SchemaAgreementError::Timeout(_) => {
2227+
warn!("Unexpected schema agreement error type: {}", error);
2228+
ControlFlow::Break(())
2229+
}
2230+
2231+
// Definitely a transient error.
2232+
SchemaAgreementError::ConnectionPoolError(_)
2233+
| SchemaAgreementError::RequiredHostAbsent(_) => ControlFlow::Continue(()),
2234+
2235+
SchemaAgreementError::RequestError(request_attempt_error) => {
2236+
#[deny(clippy::wildcard_enum_match_arm)]
2237+
match request_attempt_error {
2238+
// No idea if those are transient or even possible.
2239+
// To be safe, treat them as transient for now.
2240+
RequestAttemptError::SerializationError(_)
2241+
| RequestAttemptError::CqlRequestSerialization(_)
2242+
| RequestAttemptError::UnableToAllocStreamId
2243+
| RequestAttemptError::BrokenConnectionError(_)
2244+
| RequestAttemptError::BodyExtensionsParseError(_)
2245+
| RequestAttemptError::CqlResultParseError(_)
2246+
| RequestAttemptError::CqlErrorParseError(_)
2247+
| RequestAttemptError::UnexpectedResponse(_)
2248+
| RequestAttemptError::RepreparedIdChanged { .. }
2249+
| RequestAttemptError::RepreparedIdMissingInBatch
2250+
| RequestAttemptError::NonfinishedPagingState => ControlFlow::Continue(()),
2251+
2252+
#[deny(clippy::wildcard_enum_match_arm)]
2253+
RequestAttemptError::DbError(db_error, _) => match db_error {
2254+
// Those errors should not happen, but if they did, something is
2255+
// really wrong. Let's return early.
2256+
DbError::SyntaxError
2257+
| DbError::Invalid
2258+
| DbError::AlreadyExists { .. }
2259+
| DbError::FunctionFailure { .. }
2260+
| DbError::AuthenticationError
2261+
| DbError::Unauthorized
2262+
| DbError::ConfigError
2263+
| DbError::TruncateError
2264+
| DbError::ProtocolError => ControlFlow::Break(()),
2265+
2266+
DbError::Unavailable { .. }
2267+
| DbError::Overloaded
2268+
| DbError::IsBootstrapping
2269+
| DbError::ReadTimeout { .. }
2270+
| DbError::WriteTimeout { .. }
2271+
| DbError::ReadFailure { .. }
2272+
| DbError::WriteFailure { .. }
2273+
| DbError::Unprepared { .. }
2274+
| DbError::ServerError
2275+
| DbError::RateLimitReached { .. }
2276+
| DbError::Other(_)
2277+
| _ => ControlFlow::Continue(()),
2278+
},
2279+
}
2280+
}
2281+
}
2282+
}
2283+
22122284
/// Awaits schema agreement among all reachable nodes.
22132285
///
22142286
/// Issues an agreement check each `Session::schema_agreement_interval`.
@@ -2225,30 +2297,50 @@ impl Session {
22252297
// Some(Ok(())): Last attempt successful, without agreement
22262298
// Some(Err(_)): Last attempt failed
22272299
let mut last_agreement_failure: Option<Result<(), SchemaAgreementError>> = None;
2228-
timeout(self.schema_agreement_timeout, async {
2300+
// The future passed to timeout returns either Ok(Uuid) if agreement was
2301+
// reached, or Err(SchemaAgreementError) if there was an error that should
2302+
// stop the waiting before timeout.
2303+
let agreement_result = timeout(self.schema_agreement_timeout, async {
22292304
loop {
22302305
let result = self
22312306
.check_schema_agreement_with_required_node(required_node)
22322307
.await;
22332308
match result {
2234-
Ok(Some(agreed_version)) => return agreed_version,
2309+
Ok(Some(agreed_version)) => return Ok(agreed_version),
22352310
Ok(None) => last_agreement_failure = Some(Ok(())),
2236-
Err(err) => last_agreement_failure = Some(Err(err)),
2311+
Err(err) => {
2312+
let decision = Self::classify_schema_check_error(&err);
2313+
match decision {
2314+
ControlFlow::Continue(_) => {
2315+
last_agreement_failure = Some(Err(err));
2316+
}
2317+
ControlFlow::Break(_) => return Err(err),
2318+
}
2319+
}
22372320
}
22382321
tokio::time::sleep(self.schema_agreement_interval).await;
22392322
}
22402323
})
2241-
.await
2242-
.map_err(|_| {
2243-
match last_agreement_failure {
2244-
// There were no finished attempts - the only error we can return is Timeout.
2245-
None => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2246-
// If the last finished attempt resulted in an error, this error will be more informative than Timeout.
2247-
Some(Err(err)) => err,
2248-
// This is the canonical case for timeout - last attempt finished successfully, but without agreement.
2249-
Some(Ok(())) => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2324+
.await;
2325+
match agreement_result {
2326+
Err(_timeout) => {
2327+
// Timeout occurred. Either all attempts returned possibly-transient errors,
2328+
// or just did not reach agreement in time.
2329+
let effective_error = match last_agreement_failure {
2330+
// There were no finished attempts - the only error we can return is Timeout.
2331+
None => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2332+
// If the last finished attempt resulted in an error, this error will be more informative than Timeout.
2333+
Some(Err(err)) => err,
2334+
// This is the canonical case for timeout - last attempt finished successfully, but without agreement.
2335+
Some(Ok(())) => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
2336+
};
2337+
Err(effective_error)
22502338
}
2251-
})
2339+
// Agreement encountered a non-transient error, we must return it.
2340+
Ok(Err(inner_error)) => Err(inner_error),
2341+
// Agreement successful
2342+
Ok(Ok(uuid)) => Ok(uuid),
2343+
}
22522344
}
22532345

22542346
/// Checks if all reachable nodes have the same schema version.

0 commit comments

Comments
 (0)