Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: add support for error handling #3359

Open
wants to merge 5 commits into
base: pipelines-target-branch
Choose a base branch
from

Conversation

shohamazon
Copy link
Collaborator

@shohamazon shohamazon commented Mar 12, 2025

This PR adds support for handling errors within a pipeline.
If an error occurs during pipeline execution, failed commands will be identified and retried based on their respective retry mechanisms.

For example, commands that fail with a MOVED error will be collected, the cluster topology will be updated accordingly, and the commands will be retried on the correct node.

This PR enhances error handling for pipeline execution in cluster mode.
Previously, all multi-key commands had to be mapped to the same slot, as cluster mode required key-based routing. However, with pipelines, we can now send commands involving different keys across multiple slots, allowing for more flexible command execution.

Why This Change Is Needed

Up until now, all multi-key commands in cluster mode had to be mapped to the same slot. However, with pipeline support, commands involving different keys can now be executed together, even if they belong to different slots. This introduces a challenge: since a sub-pipeline (which contains all commands sent to a specific node) can fail as a whole, an error like MOVED might affect the entire sub-pipeline, even if only a single command within it caused the issue. As a result, the previous approach of retrying the whole sub-pipeline was incorrect, as unaffected commands were unnecessarily retried.

To address this, we now implement separate error handling specifically for pipelines, ensuring that only the failed commands are retried while maintaining correctness and efficiency.

Issue link

This Pull Request is linked to issue (URL): [REPLACE ME]

Checklist

Before submitting the PR make sure the following are checked:

  • This Pull Request is related to one issue.
  • Commit message has a detailed description of what changed and why.
  • Tests are added or updated.
  • CHANGELOG.md and documentation files are updated.
  • Destination branch is correct - main or release
  • Create merge commit if merging release branch into main, squash otherwise.

@shohamazon shohamazon self-assigned this Mar 12, 2025
@shohamazon shohamazon added the Core changes Used to label a PR as PR with significant changes that should trigger a full matrix tests. label Mar 12, 2025
Signed-off-by: Shoham Elias <[email protected]>
@shohamazon shohamazon marked this pull request as ready for review March 12, 2025 12:17
@shohamazon shohamazon requested a review from a team as a code owner March 12, 2025 12:17
@shohamazon shohamazon requested a review from barshaul March 12, 2025 12:29
Comment on lines -586 to 587
let value = result.and_then(|v| v.extract_error())?;
let value = result?;
match value {
Value::Array(mut values) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

match result {
    Ok(Value::Array(mut values) => {
...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is that? we can raise an error if value is a redis-error (like a parsing problem that occured)

@@ -614,6 +615,7 @@ enum CmdArg<C> {
count: usize,
route: InternalSingleNodeRouting<C>,
sub_pipeline: bool,
retry: u32,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please enrich your documentation for retries.
As PR description says

For example, commands that fail with a MOVED error will be collected, the cluster topology will be updated accordingly, and the commands will be retried on the correct node.

How other errors are handled?

Consider posting detailed docs to the wiki page and/or to the feature docs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a limit on retry? I think we should have.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ofc I do 🙃
And I added the moved example simply bc it's the simplest of them all

"Received a single response for a pipeline with multiple commands.".to_string(),
)),
},
Ok(Ok(Response::ClusterScanResult(_, _))) => ServerError::KnownError {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that in this line, lolwut
image

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤣🤣

/// - `Err((OperationTarget, RedisError))` if a node-level or reception error occurs.
/// - **Ok**: A `HashMap<RetryMethod, Vec<RetryEntry>>` mapping each retry method to the list of commands that failed and
/// should be retried.
/// - **Err**: A tuple `(OperationTarget, RedisError)` if a node-level or reception error occurs while processing responses.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it never returns err

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not? I agree that i might not have to document this but its still returning a result

match self {
ServerError::ExtensionError { .. } => ErrorKind::ExtensionError,
ServerError::KnownError { kind, .. } => match kind {
ServerErrorKind::ResponseError => ErrorKind::ResponseError,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably have a macro to do this for you.
At least you can avoid duplicating code on lines 225-236 and 266-277

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I agree, I will fix it

Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
/// - **Ok**: A `HashMap<RetryMethod, Vec<RetryEntry>>` mapping each retry method to the list of commands that failed and
/// should be retried.
/// - **Err**: A tuple `(OperationTarget, RedisError)` if a node-level or reception error occurs while processing responses.
#[allow(clippy::type_complexity)]
pub fn process_pipeline_responses(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to remove pub

pub fn process_pipeline_responses(
pipeline_responses: &mut PipelineResponses,
responses: Vec<Result<RedisResult<Response>, RecvError>>,
addresses_and_indices: AddressAndIndices,
) -> Result<(), (OperationTarget, RedisError)> {
) -> Result<
HashMap<RetryMethod, Vec<((usize, Option<usize>), String, ServerError)>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retryMap

/// - **Ok**: A `HashMap<RetryMethod, Vec<RetryEntry>>` mapping each retry method to the list of commands that failed and
/// should be retried.
/// - **Err**: A tuple `(OperationTarget, RedisError)` if a node-level or reception error occurs while processing responses.
#[allow(clippy::type_complexity)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[allow(clippy::type_complexity)]

@@ -467,43 +523,552 @@ pub fn process_pipeline_responses(
address.clone(),
)?;
}
continue;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add doc

@@ -467,43 +523,552 @@ pub fn process_pipeline_responses(
address.clone(),
)?;
}
continue;
}
Ok(Err(err)) => err.into(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add doc

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove under to be above Err(err)

continue;
}
Ok(Err(err)) => err.into(),
Ok(Ok(Response::Single(_))) => ServerError::KnownError {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc

}
Ok(Err(err)) => err.into(),
Ok(Ok(Response::Single(_))) => ServerError::KnownError {
kind: (ServerErrorKind::ResponseError),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to ExtensionError

Ok(retry_map) => {
// If there are no retirable errors, or we have reached the maximum number of retries, we're done
if retry_map.is_empty() || retry >= retry_params.number_of_retries {
break Ok(());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return

RetryMethod::NoRetry => {
// The server error was already added to the pipeline responses, so we can just continue.
}
RetryMethod::Reconnect | RetryMethod::ReconnectAndRetry => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we need to reconnect without retrying, use trigger_refresh_connection_tasks

where
C: Clone + ConnectionLike + Connect + Send + Sync + 'static,
{
let retry_params = core
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create retrry pararm with lower min and max timeouts as we dont want to stale the whole pipeline due to some failed node/shard/slot migration


futures::future::join_all(futures).await;
}
_ => {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add unreachable with explaining message

})?;

// Search for the response policy based on the index.
let routing_info = if inner_index.is_some() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write about the relation between inner_index and response policies

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to add all related logic under the if inner_index.is_some() section


// Search for the response policy based on the index.
let routing_info = if inner_index.is_some() {
response_policies.and_then(|vec| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if response_policies can be a hashmap and then save the compute

// Return the transformed command using the extracted indices.
Ok(command_for_multi_slot_indices(cmd.as_ref(), indices.1.iter()).into())
} else {
// For non-multi-slot commands, simply return a clone.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a clone

pipeline_responses,
index,
inner_index,
Value::ServerError(server_error),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

append the error


// Attempt to retrieve a connection for the given address.
let connection = {
let lock = core.conn_lock.read().expect(MUTEX_READ_ERR);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use get_connection with ByAddress

pipeline_responses,
index,
inner_index,
Value::ServerError(server_error),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

append?

/// - The address of the node where the command was originally sent.
/// - The `ServerError` that occurred.
/// * `pipeline_responses` - A mutable reference to the collection of pipeline responses.
async fn retry_commands<C>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function suggests that commands are being retried here but they aren't

let redis_error: RedisError = error.clone().into();
let redirect_info = redis_error
.redirect()
.ok_or_else(|| (OperationTarget::FanOut, error.clone().into()))?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

server error

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Core changes Used to label a PR as PR with significant changes that should trigger a full matrix tests.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants