Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python - Add Pipeline Support #3374

Draft
wants to merge 8 commits into
base: pipelines-target-branch
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,7 @@ impl MultiplexedConnection {
}
}
}
// TODO: remove this when `raise_on_error` flag will be added
let value = result.and_then(|v| v.extract_error())?;
let value = result?;
match value {
Value::Array(mut values) => {
values.drain(..offset);
Expand Down
337 changes: 202 additions & 135 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs

Large diffs are not rendered by default.

651 changes: 608 additions & 43 deletions glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs

Large diffs are not rendered by default.

143 changes: 126 additions & 17 deletions glide-core/redis-rs/redis/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::io;
use std::str::{from_utf8, Utf8Error};
use std::string::FromUtf8Error;

use crate::cluster_routing::Redirect;
use num_bigint::BigInt;
pub(crate) use std::collections::{HashMap, HashSet};
use std::ops::Deref;
Expand Down Expand Up @@ -80,7 +81,7 @@ pub enum NumericBehavior {
}

/// An enum of all error kinds.
#[derive(PartialEq, Eq, Copy, Clone, Debug)]
#[derive(PartialEq, Eq, Copy, Clone, Debug, Display)]
#[non_exhaustive]
pub enum ErrorKind {
/// The server generated an invalid response.
Expand Down Expand Up @@ -160,7 +161,7 @@ pub enum ErrorKind {
UserOperationError,
}

#[derive(PartialEq, Debug, Clone, Display)]
#[derive(PartialEq, Debug, Clone, Display, Copy)]
pub enum ServerErrorKind {
ResponseError,
ExecAbortError,
Expand Down Expand Up @@ -215,6 +216,116 @@ impl ServerError {
ServerError::KnownError { detail, .. } => detail.as_ref().map(|str| str.as_str()),
}
}

/// Returns the error kind of the error.
pub fn kind(&self) -> ErrorKind {
match self {
ServerError::ExtensionError { .. } => ErrorKind::ExtensionError,
ServerError::KnownError { kind, .. } => (*kind).into(),
}
}

/// Appends the string representation of `other` to the existing `detail`.
/// If no detail exists, it simply sets it to `other`’s string.
pub fn append_detail(&mut self, other: &ServerError) {
// Convert the other error to a string representation.
let other_str = format!("{}", other);
match self {
ServerError::ExtensionError { detail, .. } | ServerError::KnownError { detail, .. } => {
if let Some(existing) = detail {
// Append with a separator.
existing.push_str("; ");
existing.push_str(&other_str);
} else {
*detail = Some(other_str);
}
}
};
}
}

macro_rules! map_error_kinds {
($($variant:ident),*) => {
impl From<ServerErrorKind> for ErrorKind {
fn from(kind: ServerErrorKind) -> Self {
match kind {
$(ServerErrorKind::$variant => ErrorKind::$variant,)*
}
}
}

impl From<ErrorKind> for Option<ServerErrorKind> {
fn from(kind: ErrorKind) -> Self {
match kind {
$(ErrorKind::$variant => Some(ServerErrorKind::$variant),)*
_ => None,
}
}
}
};
}

// Define mappings using the macro
map_error_kinds!(
ResponseError,
ExecAbortError,
BusyLoadingError,
NoScriptError,
Moved,
Ask,
TryAgain,
ClusterDown,
CrossSlot,
MasterDown,
ReadOnly,
NotBusy
);

impl From<RedisError> for ServerError {
fn from(redis_error: RedisError) -> Self {
match redis_error.repr {
ErrorRepr::ExtensionError(code, detail) => ServerError::ExtensionError {
code,
detail: Some(detail),
},
ErrorRepr::WithDescription(kind, desc) => {
if let Some(mapped) = kind.into() {
ServerError::KnownError {
kind: mapped,
detail: Some(desc.to_string()),
}
} else {
ServerError::ExtensionError {
code: kind.to_string(),
detail: Some(format!(
"Unhandled error kind: {:?} description {}",
kind, desc
)),
}
}
}
ErrorRepr::WithDescriptionAndDetail(kind, desc, detail) => {
if let Some(mapped) = kind.into() {
ServerError::KnownError {
kind: mapped,
detail: Some(format!("{} {}", desc, detail)),
}
} else {
ServerError::ExtensionError {
code: kind.to_string(),
detail: Some(format!(
"Unhandled error kind: {:?} with description {} and detail {}",
kind, desc, detail
)),
}
}
}
ErrorRepr::IoError(io_err) => ServerError::ExtensionError {
code: "IOERROR".into(),
detail: Some(io_err.to_string()),
},
}
}
}

impl From<tokio::time::error::Elapsed> for RedisError {
Expand All @@ -229,21 +340,8 @@ impl From<ServerError> for RedisError {
match value {
ServerError::ExtensionError { code, detail } => make_extension_error(code, detail),
ServerError::KnownError { kind, detail } => {
let desc = "An error was signalled by the server";
let kind = match kind {
ServerErrorKind::ResponseError => ErrorKind::ResponseError,
ServerErrorKind::ExecAbortError => ErrorKind::ExecAbortError,
ServerErrorKind::BusyLoadingError => ErrorKind::BusyLoadingError,
ServerErrorKind::NoScriptError => ErrorKind::NoScriptError,
ServerErrorKind::Moved => ErrorKind::Moved,
ServerErrorKind::Ask => ErrorKind::Ask,
ServerErrorKind::TryAgain => ErrorKind::TryAgain,
ServerErrorKind::ClusterDown => ErrorKind::ClusterDown,
ServerErrorKind::CrossSlot => ErrorKind::CrossSlot,
ServerErrorKind::MasterDown => ErrorKind::MasterDown,
ServerErrorKind::ReadOnly => ErrorKind::ReadOnly,
ServerErrorKind::NotBusy => ErrorKind::NotBusy,
};
let desc = "An error was signalled by the server:";
let kind: ErrorKind = kind.into();
match detail {
Some(detail) => RedisError::from((kind, desc, detail)),
None => RedisError::from((kind, desc)),
Expand Down Expand Up @@ -799,6 +897,7 @@ impl fmt::Debug for RedisError {
}
}

#[derive(PartialEq, Eq, Hash)]
pub(crate) enum RetryMethod {
Reconnect,
ReconnectAndRetry,
Expand Down Expand Up @@ -992,6 +1091,16 @@ impl RedisError {
Some((addr, slot_id))
}

/// Returns the redirect method for this error.
pub(crate) fn redirect(&self) -> Option<Redirect> {
let node = self.redirect_node()?;
match self.kind() {
ErrorKind::Ask => Some(Redirect::Ask(node.0.to_string())),
ErrorKind::Moved => Some(Redirect::Moved(node.0.to_string())),
_ => None,
}
}

/// Returns the extension error code.
///
/// This method should not be used because every time the redis library
Expand Down
Loading