Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 108 additions & 16 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@ use futures::future::join_all;
use futures::future::try_join_all;
use itertools::Itertools;
use scylla_cql::frame::response::NonErrorResponseWithDeserializedMetadata;
use scylla_cql::frame::response::error::DbError;
use scylla_cql::serialize::batch::BatchValues;
use scylla_cql::serialize::row::{SerializeRow, SerializedValues};
use std::borrow::Borrow;
use std::future::Future;
use std::net::{IpAddr, SocketAddr};
use std::num::NonZeroU32;
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::timeout;
#[cfg(feature = "unstable-cloud")]
use tracing::warn;
use tracing::{Instrument, debug, error, trace, trace_span};
use tracing::{Instrument, debug, error, trace, trace_span, warn};
use uuid::Uuid;

pub(crate) const TABLET_CHANNEL_SIZE: usize = 8192;
Expand Down Expand Up @@ -2209,6 +2209,78 @@ impl Session {
self.await_schema_agreement_with_required_node(None).await
}

/// Decides if an error should result in await_schema_agreement stopping immediately,
/// or of its fine to try again (after schema agreement interval).
/// The errors that should stop immediately are non-transient ones, for which
/// there is no hope that a retry will succeed.
fn classify_schema_check_error(error: &SchemaAgreementError) -> ControlFlow<()> {
#[deny(clippy::wildcard_enum_match_arm)]
match error {
// Unexpected format (type, row count, frame type etc) or deserialization error of response.
// It should not happen, but if it did it indicates a serious issue.
SchemaAgreementError::SingleRowError(_)
| SchemaAgreementError::TracesEventsIntoRowsResultError(_) => ControlFlow::Break(()),

// Should not be possible - we create this error only after returning here.
// Let's not panic, but log a warning so that it gets noticed.
SchemaAgreementError::Timeout(_) => {
warn!("Unexpected schema agreement error type: {}", error);
ControlFlow::Break(())
}

// Definitely a transient error.
SchemaAgreementError::ConnectionPoolError(_)
| SchemaAgreementError::RequiredHostAbsent(_) => ControlFlow::Continue(()),

SchemaAgreementError::RequestError(request_attempt_error) => {
#[deny(clippy::wildcard_enum_match_arm)]
match request_attempt_error {
// No idea if those are transient or even possible.
// To be safe, treat them as transient for now.
RequestAttemptError::SerializationError(_)
| RequestAttemptError::CqlRequestSerialization(_)
| RequestAttemptError::UnableToAllocStreamId
| RequestAttemptError::BrokenConnectionError(_)
| RequestAttemptError::BodyExtensionsParseError(_)
| RequestAttemptError::CqlResultParseError(_)
| RequestAttemptError::CqlErrorParseError(_)
| RequestAttemptError::UnexpectedResponse(_)
| RequestAttemptError::RepreparedIdChanged { .. }
| RequestAttemptError::RepreparedIdMissingInBatch
| RequestAttemptError::NonfinishedPagingState => ControlFlow::Continue(()),

#[deny(clippy::wildcard_enum_match_arm)]
RequestAttemptError::DbError(db_error, _) => match db_error {
// Those errors should not happen, but if they did, something is
// really wrong. Let's return early.
DbError::SyntaxError
| DbError::Invalid
| DbError::AlreadyExists { .. }
| DbError::FunctionFailure { .. }
| DbError::AuthenticationError
| DbError::Unauthorized
| DbError::ConfigError
| DbError::TruncateError
| DbError::ProtocolError => ControlFlow::Break(()),

DbError::Unavailable { .. }
| DbError::Overloaded
| DbError::IsBootstrapping
| DbError::ReadTimeout { .. }
| DbError::WriteTimeout { .. }
| DbError::ReadFailure { .. }
| DbError::WriteFailure { .. }
| DbError::Unprepared { .. }
| DbError::ServerError
| DbError::RateLimitReached { .. }
| DbError::Other(_)
| _ => ControlFlow::Continue(()),
},
}
}
}
}

/// Awaits schema agreement among all reachable nodes.
///
/// Issues an agreement check each `Session::schema_agreement_interval`.
Expand All @@ -2225,30 +2297,50 @@ impl Session {
// Some(Ok(())): Last attempt successful, without agreement
// Some(Err(_)): Last attempt failed
let mut last_agreement_failure: Option<Result<(), SchemaAgreementError>> = None;
timeout(self.schema_agreement_timeout, async {
// The future passed to timeout returns either Ok(Uuid) if agreement was
// reached, or Err(SchemaAgreementError) if there was an error that should
// stop the waiting before timeout.
let agreement_result = timeout(self.schema_agreement_timeout, async {
loop {
let result = self
.check_schema_agreement_with_required_node(required_node)
.await;
match result {
Ok(Some(agreed_version)) => return agreed_version,
Ok(Some(agreed_version)) => return Ok(agreed_version),
Ok(None) => last_agreement_failure = Some(Ok(())),
Err(err) => last_agreement_failure = Some(Err(err)),
Err(err) => {
let decision = Self::classify_schema_check_error(&err);
match decision {
ControlFlow::Continue(_) => {
last_agreement_failure = Some(Err(err));
}
ControlFlow::Break(_) => return Err(err),
}
}
}
tokio::time::sleep(self.schema_agreement_interval).await;
}
})
.await
.map_err(|_| {
match last_agreement_failure {
// There were no finished attempts - the only error we can return is Timeout.
None => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
// If the last finished attempt resulted in an error, this error will be more informative than Timeout.
Some(Err(err)) => err,
// This is the canonical case for timeout - last attempt finished successfully, but without agreement.
Some(Ok(())) => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
.await;
match agreement_result {
Err(_timeout) => {
// Timeout occurred. Either all attempts returned possibly-transient errors,
// or just did not reach agreement in time.
let effective_error = match last_agreement_failure {
// There were no finished attempts - the only error we can return is Timeout.
None => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
// If the last finished attempt resulted in an error, this error will be more informative than Timeout.
Some(Err(err)) => err,
// This is the canonical case for timeout - last attempt finished successfully, but without agreement.
Some(Ok(())) => SchemaAgreementError::Timeout(self.schema_agreement_timeout),
};
Err(effective_error)
}
})
// Agreement encountered a non-transient error, we must return it.
Ok(Err(inner_error)) => Err(inner_error),
// Agreement successful
Ok(Ok(uuid)) => Ok(uuid),
}
}

/// Checks if all reachable nodes have the same schema version.
Expand Down
4 changes: 4 additions & 0 deletions scylla/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ pub enum SchemaAgreementError {
RequestError(#[from] RequestAttemptError),

/// Failed to convert schema version query result into rows result.
///
/// This variant should be named `SchemaVersionIntoRowsResultError`, current name
/// is a copy-paste error. It will be renamed in 2.0
//TODO(2.0): Rename to `SchemaVersionIntoRowsResultError`
#[error("Failed to convert schema version query result into rows result: {0}")]
TracesEventsIntoRowsResultError(IntoRowsResultError),

Expand Down
19 changes: 18 additions & 1 deletion scylla/tests/integration/session/schema_agreement.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::Arc;
use std::time::Duration;

use assert_matches::assert_matches;
use scylla::client::PoolSize;
Expand Down Expand Up @@ -73,13 +74,19 @@ async fn test_schema_await_with_unreachable_node() {
let session: Session = SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.address_translator(Arc::new(translation_map.clone()))
// Let's try more often to prevent timeouts.
.schema_agreement_interval(Duration::from_millis(30))
// And also not make the test too long.
.schema_agreement_timeout(Duration::from_millis(600))
.build()
.await
.unwrap();

let host_ids = calculate_proxy_host_ids(&proxy_uris, &translation_map, &session);

{
tracing::info!("Sub test 1");

// Case 1: Paused node is a coordinator for DDL.
// DDL needs to fail.
let result = run_some_ddl_with_unreachable_node(
Expand All @@ -100,6 +107,8 @@ async fn test_schema_await_with_unreachable_node() {
}

{
tracing::info!("Sub test 1");

// Case 2: Paused node is NOT a coordinator for DDL.
// DDL should succeed, because auto schema agreement only needs available nodes to agree.
let result = run_some_ddl_with_unreachable_node(
Expand All @@ -113,6 +122,8 @@ async fn test_schema_await_with_unreachable_node() {
}

{
tracing::info!("Sub test 1");

// Case 3: Paused node is a coordinator for DDL, and is used by control connection.
// It is the same as case 1, but paused node is also control connection.
// DDL needs to fail.
Expand All @@ -134,6 +145,8 @@ async fn test_schema_await_with_unreachable_node() {
}

{
tracing::info!("Sub test 1");

// Case 4: Paused node is NOT a coordinator for DDL, but is used by control connection.
// It is the same as case 2, but paused node is also control connection.
// DDL should succeed, because auto schema agreement only needs available nodes to agree,
Expand Down Expand Up @@ -177,6 +190,10 @@ async fn test_schema_await_with_transient_failure() {
// Shard connections are created asynchronously, so it's hard to predict how many will be opened
// already when we check schema agreement.
.pool_size(PoolSize::PerHost(1.try_into().unwrap()))
// Let's try more often to prevent timeouts.
.schema_agreement_interval(Duration::from_millis(30))
// And also not make the test too long.
.schema_agreement_timeout(Duration::from_millis(300))
.build()
.await
.unwrap();
Expand All @@ -191,7 +208,7 @@ async fn test_schema_await_with_transient_failure() {
// Use error that would prevent DefaultRetryPolicy from retrying.
// I don't think it is used for those queries, but it's additional future-proofing
// for the test.
RequestReaction::forge_with_error(DbError::SyntaxError),
RequestReaction::forge_with_error(DbError::Overloaded),
)]);

// First, a sanity check for proxy rules.
Expand Down
Loading