Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Refactor: Optimize client request handling to use two async tasks per client #3263

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 196 additions & 110 deletions glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::response;
use crate::response::Response;
use bytes::Bytes;
use directories::BaseDirs;
use futures::{future::poll_fn, stream::FuturesUnordered, StreamExt};
use logger_core::{log_debug, log_error, log_info, log_trace, log_warn};
use once_cell::sync::Lazy;
use protobuf::{Chars, Message};
Expand All @@ -25,13 +26,13 @@ use std::collections::HashSet;
use std::ptr::from_mut;
use std::rc::Rc;
use std::sync::RwLock;
use std::{env, str};
use std::{env, fmt, str};
use std::{io, thread};
use thiserror::Error;
use tokio::net::{UnixListener, UnixStream};
use tokio::runtime::Builder;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task;
use tokio_util::task::LocalPoolHandle;
Expand Down Expand Up @@ -459,112 +460,96 @@ fn get_route(
}
}

fn handle_request(request: CommandRequest, mut client: Client, writer: Rc<Writer>) {
task::spawn_local(async move {
let mut updated_inflight_counter = true;
let client_clone = client.clone();
async fn handle_request(request: CommandRequest, mut client: Client, writer: Rc<Writer>) {
let mut updated_inflight_counter = true;
let client_clone = client.clone();

let result = match client.reserve_inflight_request() {
false => {
updated_inflight_counter = false;
Err(ClientUsageError::User(
"Reached maximum inflight requests".to_string(),
))
}
true => match request.command {
Some(action) => match action {
command_request::Command::ClusterScan(cluster_scan_command) => {
cluster_scan(cluster_scan_command, client).await
}
command_request::Command::SingleCommand(command) => {
match get_redis_command(&command) {
Ok(cmd) => match get_route(request.route.0, Some(&cmd)) {
Ok(routes) => send_command(cmd, client, routes).await,
Err(e) => Err(e),
},
let result = match client.reserve_inflight_request() {
false => {
updated_inflight_counter = false;
Err(ClientUsageError::User(
"Reached maximum inflight requests".to_string(),
))
}
true => match request.command {
Some(action) => match action {
command_request::Command::ClusterScan(cluster_scan_command) => {
cluster_scan(cluster_scan_command, client).await
}
command_request::Command::SingleCommand(command) => {
match get_redis_command(&command) {
Ok(cmd) => match get_route(request.route.0, Some(&cmd)) {
Ok(routes) => send_command(cmd, client, routes).await,
Err(e) => Err(e),
}
},
Err(e) => Err(e),
}
command_request::Command::Transaction(transaction) => {
match get_route(request.route.0, None) {
Ok(routes) => send_transaction(transaction, &mut client, routes).await,
Err(e) => Err(e),
}
}
command_request::Command::Transaction(transaction) => {
match get_route(request.route.0, None) {
Ok(routes) => send_transaction(transaction, &mut client, routes).await,
Err(e) => Err(e),
}
command_request::Command::ScriptInvocation(script) => {
match get_route(request.route.0, None) {
Ok(routes) => {
invoke_script(
script.hash,
Some(script.keys),
Some(script.args),
client,
routes,
)
.await
}
Err(e) => Err(e),
}
command_request::Command::ScriptInvocation(script) => {
match get_route(request.route.0, None) {
Ok(routes) => {
invoke_script(
script.hash,
Some(script.keys),
Some(script.args),
client,
routes,
)
.await
}
Err(e) => Err(e),
}
command_request::Command::ScriptInvocationPointers(script) => {
let keys = script
.keys_pointer
.map(|pointer| *unsafe { Box::from_raw(pointer as *mut Vec<Bytes>) });
let args = script
.args_pointer
.map(|pointer| *unsafe { Box::from_raw(pointer as *mut Vec<Bytes>) });
match get_route(request.route.0, None) {
Ok(routes) => {
invoke_script(script.hash, keys, args, client, routes).await
}
Err(e) => Err(e),
}
}
command_request::Command::ScriptInvocationPointers(script) => {
let keys = script
.keys_pointer
.map(|pointer| *unsafe { Box::from_raw(pointer as *mut Vec<Bytes>) });
let args = script
.args_pointer
.map(|pointer| *unsafe { Box::from_raw(pointer as *mut Vec<Bytes>) });
match get_route(request.route.0, None) {
Ok(routes) => invoke_script(script.hash, keys, args, client, routes).await,
Err(e) => Err(e),
}
command_request::Command::UpdateConnectionPassword(
update_connection_password_command,
) => client
.update_connection_password(
update_connection_password_command
.password
.map(|chars| chars.to_string()),
update_connection_password_command.immediate_auth,
)
.await
.map_err(|err| err.into()),
},
None => {
log_debug(
"received error",
format!(
"Received empty request for callback {}",
request.callback_idx
),
);
Err(ClientUsageError::Internal(
"Received empty request".to_string(),
))
}
command_request::Command::UpdateConnectionPassword(
update_connection_password_command,
) => client
.update_connection_password(
update_connection_password_command
.password
.map(|chars| chars.to_string()),
update_connection_password_command.immediate_auth,
)
.await
.map_err(|err| err.into()),
},
};

if updated_inflight_counter {
client_clone.release_inflight_request();
}

let _res = write_result(result, request.callback_idx, &writer).await;
});
}
None => {
log_debug(
"received error",
format!(
"Received empty request for callback {}",
request.callback_idx
),
);
Err(ClientUsageError::Internal(
"Received empty request".to_string(),
))
}
},
};

async fn handle_requests(
received_requests: Vec<CommandRequest>,
client: &Client,
writer: &Rc<Writer>,
) {
for request in received_requests {
handle_request(request, client.clone(), writer.clone());
if updated_inflight_counter {
client_clone.release_inflight_request();
}
// Yield to ensure that the subtasks aren't starved.
task::yield_now().await;

let _res = write_result(result, request.callback_idx, &writer).await;
}

pub fn close_socket(socket_path: &String) {
Expand Down Expand Up @@ -605,18 +590,25 @@ async fn wait_for_connection_configuration_and_create_client(
}
}

async fn read_values_loop(
/// Listens for new requests on the socket, parses them, and forwards them to the request processor.
///
/// # Arguments:
/// - `client_listener`: The client's socket listener responsible for receiving incoming requests.
/// - `processor_channel`: A sender channel used to forward the parsed requests for processing.
async fn client_reader_loop(
mut client_listener: UnixStreamListener,
client: &Client,
writer: Rc<Writer>,
processor_channel: Sender<Vec<CommandRequest>>,
) -> ClosingReason {
loop {
match client_listener.next_values().await {
Closed(reason) => {
return reason;
}
ReceivedValues(received_requests) => {
handle_requests(received_requests, client, &writer).await;
if let Err(_err) = processor_channel.send(received_requests).await {
// Failed to send requests because the processor task was unexpectedly closed
return ClosingReason::ClientRequestProcessorClosed;
}
}
}
}
Expand Down Expand Up @@ -651,6 +643,43 @@ async fn push_manager_loop(mut push_rx: mpsc::UnboundedReceiver<PushInfo>, write
}
}

// Process all incoming requests received from the socket for this client. This task would be responsible for two things:
// 1. Listening on the channel for new requests and pushing them into the futures queue
// 2. Processing the futures queue by polling the queue to let the futures progress and removing completed futures
// This task will be closed when the channel to send requests through will be closed or when aborted by the caller.
async fn request_processor_loop(
client: Client,
writer: Rc<Writer>,
mut requests_channel: Receiver<Vec<CommandRequest>>,
) {
let mut futures_queue = FuturesUnordered::new();
loop {
tokio::select! {
// Handle new incoming requests from the channel
Some(requests) = requests_channel.recv() => {
requests
.into_iter()
.map(|request| handle_request(request, client.clone(), writer.clone()))
.for_each(|future| futures_queue.push(future));
}

// Poll the futures queue and process the next completed future.
Some(_) = poll_fn(|cx| futures_queue.poll_next_unpin(cx)) => {},

// Exit the loop if both the channel and queue are empty
else => {
if futures_queue.is_empty() {
log_debug(
"request processor",
"Client channel is closed, and no more tasks are pending. Shutting down the request processor."
);
break;
}
}
}
}
}

async fn listen_on_client_stream(socket: UnixStream) {
let socket = Rc::new(socket);
// Spawn a new task to listen on this client's stream
Expand Down Expand Up @@ -707,24 +736,48 @@ async fn listen_on_client_stream(socket: UnixStream) {
}
};
log_info("connection", "new connection started");
// Each client has two dedicated tasks:
// 1. `client_reader_loop`: Listens on the socket, receives new requests, parses them,
// and forwards them to the second task.
// 2. `request_processor_loop`: Manages a futures queue by receiving requests from the channel,
// adding them to the queue, and continuously polling them until completion.
let (requests_sender, requests_receiver) = mpsc::channel::<Vec<CommandRequest>>(100);
let cloned_writer = writer.clone();
let request_processor = task::spawn_local(request_processor_loop(
client.clone(),
cloned_writer,
requests_receiver,
));
tokio::select! {
reader_closing = read_values_loop(client_listener, &client, writer.clone()) => {
if let ClosingReason::UnhandledError(err) = reader_closing {
let _res = write_closing_error(ClosingError{err_message: err.to_string()}, u32::MAX, &writer, "client closing").await;
reader_closing = client_reader_loop(client_listener, requests_sender) => {
// Write the closing reason back to the wrapper
if reader_closing.is_error() {
if let Err(err) = write_closing_error(
ClosingError{err_message: reader_closing.to_string()},
u32::MAX,
&writer,
"client closing"
).await {
log_warn(
"client closing",
format!("Failed to write the closing error: {err:?}")
);
}
};
log_trace("client closing", "reader closed");
},
writer_closing = receiver.recv() => {
if let Some(ClosingReason::UnhandledError(err)) = writer_closing {
log_error("client closing", format!("Writer closed with error: {err}"));
} else {
log_trace("client closing", "writer closed");
}
},
match writer_closing {
Some(closing_reason) if closing_reason.is_error() => {
log_error("client closing", format!("Writer closed with error: {closing_reason}"));
},
_ => log_trace("client closing", "writer closed")
}},
_ = push_manager_loop(push_rx, writer.clone()) => {
log_trace("client closing", "push manager closed");
}
}
request_processor.abort();
log_trace("client closing", "closing connection");
}

Expand All @@ -733,10 +786,43 @@ async fn listen_on_client_stream(socket: UnixStream) {
pub enum ClosingReason {
/// The socket was closed. This is the expected way that the listener should be closed.
ReadSocketClosed,
/// The client's request processor task was unexpectedly closed.
ClientRequestProcessorClosed,
/// The listener encounter an error it couldn't handle.
UnhandledError(RedisError),
}

impl ClosingReason {
/// Returns `true` if the closing reason was due to an error, otherwise `false`.
pub(crate) fn is_error(&self) -> bool {
match self {
ClosingReason::ReadSocketClosed => false, // Expected closure, not an error
ClosingReason::ClientRequestProcessorClosed => true, // Unexpected closure, treated as an error
ClosingReason::UnhandledError(_) => true, // Error encountered
}
}
}

// Implement Display for ClosingReason
impl fmt::Display for ClosingReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ClosingReason::ReadSocketClosed => {
write!(f, "The socket was closed")
}
ClosingReason::ClientRequestProcessorClosed => {
write!(
f,
"The client's request processor has been unexpectedly closed."
)
}
ClosingReason::UnhandledError(err) => {
write!(f, "Unhandled error encountered: {}", err)
}
}
}
}

impl From<io::Error> for ClosingReason {
fn from(error: io::Error) -> Self {
UnhandledError(error.into())
Expand Down
Loading