Skip to content

Commit ec8d068

Browse files
author
avifenesh
committed
Fix issue with socket folders being held by non existing process and bump dependencies
Signed-off-by: avifenesh <[email protected]>
1 parent 4de9ae4 commit ec8d068

File tree

6 files changed

+82
-63
lines changed

6 files changed

+82
-63
lines changed

Diff for: glide-core/Cargo.toml

+18-18
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ authors = ["Valkey GLIDE Maintainers"]
99

1010
[dependencies]
1111
bytes = "1"
12-
futures = "^0.3"
12+
futures = "0.3"
1313
redis = { path = "./redis-rs/redis", features = [
1414
"aio",
1515
"tokio-comp",
@@ -21,27 +21,27 @@ redis = { path = "./redis-rs/redis", features = [
2121
telemetrylib = { path = "./telemetry" }
2222
tokio = { version = "1", features = ["macros", "time"] }
2323
logger_core = { path = "../logger_core" }
24-
dispose = "0.5.0"
25-
tokio-util = { version = "^0.7", features = ["rt"], optional = true }
26-
num_cpus = { version = "^1.15", optional = true }
27-
tokio-retry2 = {version = "0.5", features = ["jitter"]}
24+
dispose = "0.5"
25+
tokio-util = { version = "0.7", features = ["rt"], optional = true }
26+
num_cpus = { version = "1", optional = true }
27+
tokio-retry2 = { version = "0.5", features = ["jitter"] }
2828

2929
protobuf = { version = "3", features = [
3030
"bytes",
3131
"with-bytes",
3232
], optional = true }
33-
integer-encoding = { version = "4.0.0", optional = true }
34-
thiserror = "1"
35-
rand = { version = "0.8.5" }
36-
futures-intrusive = "0.5.0"
37-
directories = { version = "5.0", optional = true }
38-
once_cell = "1.18.0"
39-
sha1_smol = "1.0.0"
40-
nanoid = "0.4.0"
41-
async-trait = { version = "0.1.24" }
33+
integer-encoding = { version = "4", optional = true }
34+
thiserror = "2"
35+
rand = "0.9"
36+
futures-intrusive = "0.5"
37+
directories = { version = "6", optional = true }
38+
once_cell = "1"
39+
sha1_smol = "1"
40+
nanoid = "0.4"
41+
async-trait = { version = "0.1" }
4242
serde_json = "1"
4343
serde = { version = "1", features = ["derive"] }
44-
versions = "6.3"
44+
versions = "6"
4545

4646
[features]
4747
socket-layer = [
@@ -57,11 +57,11 @@ standalone_heartbeat = []
5757
rsevents = "0.3.1"
5858
socket2 = "^0.5"
5959
tempfile = "3.3.0"
60-
rstest = "^0.23"
60+
rstest = "^0.24"
6161
serial_test = "3"
6262
criterion = { version = "^0.5", features = ["html_reports", "async_tokio"] }
63-
which = "6"
64-
ctor = "0.2.2"
63+
which = "7"
64+
ctor = "0.3"
6565
redis = { path = "./redis-rs/redis", features = ["tls-rustls-insecure"] }
6666
iai-callgrind = "0.14"
6767
tokio = { version = "1", features = ["rt-multi-thread"] }

Diff for: glide-core/benches/rotating_buffer_benchmark.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use glide_core::{
1111
};
1212
use integer_encoding::VarInt;
1313
use protobuf::Message;
14-
use rand::{distributions::Alphanumeric, Rng};
14+
use rand::{distr::Alphanumeric, Rng};
1515

1616
fn benchmark(
1717
c: &mut Criterion,
@@ -102,7 +102,7 @@ fn benchmark_split_data(
102102
}
103103

104104
fn generate_random_string(length: usize) -> bytes::Bytes {
105-
let s: String = rand::thread_rng()
105+
let s: String = rand::rng()
106106
.sample_iter(&Alphanumeric)
107107
.take(length)
108108
.map(char::from)

Diff for: glide-core/src/client/standalone_client.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use crate::retry_strategies::RetryStrategy;
99
use futures::{future, stream, StreamExt};
1010
use logger_core::log_debug;
1111
use logger_core::log_warn;
12-
use rand::Rng;
1312
use redis::aio::ConnectionLike;
1413
use redis::cluster_routing::{self, is_readonly_cmd, ResponsePolicy, Routable, RoutingInfo};
1514
use redis::{PushInfo, RedisError, RedisResult, Value};
@@ -125,7 +124,7 @@ impl StandaloneClient {
125124
let tls_mode = connection_request.tls_mode;
126125
let node_count = connection_request.addresses.len();
127126
// randomize pubsub nodes, maybe a batter option is to always use the primary
128-
let pubsub_node_index = rand::thread_rng().gen_range(0..node_count);
127+
let pubsub_node_index = rand::random_range(0..node_count);
129128
let pubsub_addr = &connection_request.addresses[pubsub_node_index];
130129
let discover_az = matches!(
131130
connection_request.read_from,

Diff for: glide-core/src/rotating_buffer.rs

+13-14
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,17 @@ impl RotatingBuffer {
2929
let start_pos = prev_position + bytes_read;
3030
if (start_pos + request_len as usize) > buffer_len {
3131
break;
32-
} else {
33-
match T::parse_from_tokio_bytes(
34-
&buffer.slice(start_pos..start_pos + request_len as usize),
35-
) {
36-
Ok(request) => {
37-
prev_position += request_len as usize + bytes_read;
38-
results.push(request);
39-
}
40-
Err(err) => {
41-
log_error("parse input", format!("Failed to parse request: {err}"));
42-
return Err(err.into());
43-
}
32+
}
33+
match T::parse_from_tokio_bytes(
34+
&buffer.slice(start_pos..start_pos + request_len as usize),
35+
) {
36+
Ok(request) => {
37+
prev_position += request_len as usize + bytes_read;
38+
results.push(request);
39+
}
40+
Err(err) => {
41+
log_error("parse input", format!("Failed to parse request: {err}"));
42+
return Err(err.into());
4443
}
4544
}
4645
} else {
@@ -68,7 +67,7 @@ mod tests {
6867
use crate::command_request::{command, command_request};
6968
use crate::command_request::{Command, CommandRequest, RequestType};
7069
use bytes::BufMut;
71-
use rand::{distributions::Alphanumeric, Rng};
70+
use rand::{distr::Alphanumeric, Rng};
7271
use rstest::rstest;
7372

7473
fn write_length(buffer: &mut BytesMut, length: u32) {
@@ -161,7 +160,7 @@ mod tests {
161160
}
162161

163162
fn generate_random_string(length: usize) -> String {
164-
rand::thread_rng()
163+
rand::rng()
165164
.sample_iter(&Alphanumeric)
166165
.take(length)
167166
.map(char::from)

Diff for: glide-core/src/socket_listener.rs

+46-25
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use crate::errors::{error_message, error_type, RequestErrorType};
1111
use crate::response;
1212
use crate::response::Response;
1313
use bytes::Bytes;
14-
use directories::BaseDirs;
1514
use logger_core::{log_debug, log_error, log_info, log_trace, log_warn};
1615
use once_cell::sync::Lazy;
1716
use protobuf::{Chars, Message};
@@ -22,6 +21,7 @@ use redis::cluster_routing::{ResponsePolicy, Routable};
2221
use redis::{ClusterScanArgs, Cmd, PushInfo, RedisError, ScanStateRC, Value};
2322
use std::cell::Cell;
2423
use std::collections::HashSet;
24+
use std::path::PathBuf;
2525
use std::ptr::from_mut;
2626
use std::rc::Rc;
2727
use std::sync::RwLock;
@@ -39,7 +39,9 @@ use ClosingReason::*;
3939
use PipeListeningResult::*;
4040

4141
/// The socket file name
42-
const SOCKET_FILE_NAME: &str = "glide-socket";
42+
const SOCKET_FILE_NAME: &str = "glide-socket.soc";
43+
/// The socket folder
44+
const SOCKET_FOLDER: &str = "glide";
4345

4446
/// The maximum length of a request's arguments to be passed as a vector of
4547
/// strings instead of a pointer
@@ -569,7 +571,7 @@ async fn handle_requests(
569571

570572
pub fn close_socket(socket_path: &String) {
571573
log_info("close_socket", format!("closing socket at {socket_path}"));
572-
let _ = std::fs::remove_file(socket_path);
574+
remove_socket_dir();
573575
}
574576

575577
async fn create_client(
@@ -783,29 +785,39 @@ struct ClosingError {
783785
/// Get the socket full path.
784786
/// The socket file name will contain the process ID and will try to be saved into the user's runtime directory
785787
/// (e.g. /run/user/1000) in Unix systems. If the runtime dir isn't found, the socket file will be saved to the temp dir.
786-
/// For Windows, the socket file will be saved to %AppData%\Local.
787788
pub fn get_socket_path_from_name(socket_name: String) -> String {
788-
let base_dirs = BaseDirs::new().expect("Failed to create BaseDirs");
789-
let tmp_dir;
790-
let folder = if cfg!(windows) {
791-
base_dirs.data_local_dir()
792-
} else {
793-
base_dirs.runtime_dir().unwrap_or({
794-
tmp_dir = env::temp_dir();
795-
tmp_dir.as_path()
796-
})
797-
};
798-
folder
789+
get_socket_dir()
799790
.join(socket_name)
800791
.into_os_string()
801792
.into_string()
802-
.expect("Couldn't create socket path")
793+
.expect("Failed to create socket path from name")
794+
}
795+
796+
/// Get the socket directory path.
797+
fn get_socket_dir() -> PathBuf {
798+
// Use XDG_RUNTIME_DIR if available, else fallback to temp directory
799+
if let Ok(runtime_dir) = env::var("XDG_RUNTIME_DIR") {
800+
PathBuf::from(runtime_dir)
801+
.join(SOCKET_FOLDER)
802+
.join(std::process::id().to_string())
803+
} else {
804+
env::temp_dir()
805+
.join(SOCKET_FOLDER)
806+
.join(std::process::id().to_string())
807+
}
808+
}
809+
810+
/// Remove socket dir of the process
811+
pub fn remove_socket_dir() {
812+
let socket_dir = get_socket_dir();
813+
if socket_dir.exists() {
814+
let _ = std::fs::remove_dir_all(socket_dir);
815+
}
803816
}
804817

805818
/// Get the socket path as a string
806819
pub fn get_socket_path() -> String {
807-
let socket_name = format!("{}-{}", SOCKET_FILE_NAME, std::process::id());
808-
get_socket_path_from_name(socket_name)
820+
get_socket_path_from_name(SOCKET_FILE_NAME.to_string())
809821
}
810822

811823
/// This function is exposed only for the sake of testing with a nonstandard `socket_path`.
@@ -820,7 +832,10 @@ pub fn start_socket_listener_internal<InitCallback>(
820832
static INITIALIZED_SOCKETS: Lazy<RwLock<HashSet<String>>> =
821833
Lazy::new(|| RwLock::new(HashSet::new()));
822834

823-
let socket_path = socket_path.unwrap_or_else(get_socket_path);
835+
let socket_path = match socket_path {
836+
Some(path) => path,
837+
None => get_socket_path(),
838+
};
824839

825840
{
826841
// Optimize for already initialized
@@ -841,7 +856,6 @@ pub fn start_socket_listener_internal<InitCallback>(
841856
init_callback(Ok(socket_path.clone()));
842857
return;
843858
}
844-
845859
let (tx, rx) = std::sync::mpsc::channel();
846860
let socket_path_cloned = socket_path.clone();
847861
let init_callback_cloned = init_callback.clone();
@@ -860,8 +874,16 @@ pub fn start_socket_listener_internal<InitCallback>(
860874
}
861875
Ok(runtime) => runtime,
862876
};
863-
864877
runtime.block_on(async move {
878+
// Clean up any leftover from previous runs and create socket dir
879+
remove_socket_dir();
880+
if let Err(err) = std::fs::create_dir_all(get_socket_dir()) {
881+
log_error(
882+
"listen_on_socket",
883+
format!("Failed to create socket directory: {err}"),
884+
);
885+
}
886+
865887
let listener_socket = match UnixListener::bind(socket_path_cloned.clone()) {
866888
Err(err) => {
867889
log_error(
@@ -874,8 +896,6 @@ pub fn start_socket_listener_internal<InitCallback>(
874896
};
875897

876898
// Signal initialization is successful.
877-
// IMPORTANT:
878-
// tx.send() must be called before init_callback_cloned() to ensure runtimes, such as Python, can properly complete the main function
879899
let _ = tx.send(true);
880900
init_callback_cloned(Ok(socket_path_cloned.clone()));
881901

@@ -900,7 +920,9 @@ pub fn start_socket_listener_internal<InitCallback>(
900920
drop(listener_socket);
901921
let _ = std::fs::remove_file(socket_path_cloned.clone());
902922

903-
// no more listening on socket - update the sockets db
923+
// Clean the entire process-id socket directory on close
924+
remove_socket_dir();
925+
904926
let mut sockets_write_guard = INITIALIZED_SOCKETS
905927
.write()
906928
.expect("Failed to acquire sockets db write guard");
@@ -917,7 +939,6 @@ pub fn start_socket_listener_internal<InitCallback>(
917939
})
918940
.expect("Thread spawn failed. Cannot report error because callback was moved.");
919941

920-
// wait for thread initialization signaling, callback invocation is done in the thread
921942
let _ = rx.recv().map(|res| {
922943
if res {
923944
sockets_write_guard.insert(socket_path);

Diff for: glide-core/tests/utilities/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use glide_core::{
77
connection_request::{self, AuthenticationInfo, NodeAddress, ProtocolVersion},
88
};
99
use once_cell::sync::Lazy;
10-
use rand::{distributions::Alphanumeric, Rng};
10+
use rand::{distr::Alphanumeric, Rng};
1111
use redis::{
1212
cluster_routing::{MultipleNodeRoutingInfo, RoutingInfo},
1313
ConnectionAddr, GlideConnectionOptions, PushInfo, RedisConnectionInfo, RedisResult, Value,
@@ -517,7 +517,7 @@ pub fn get_address_info(address: &ConnectionAddr) -> NodeAddress {
517517
}
518518

519519
pub fn generate_random_string(length: usize) -> String {
520-
rand::thread_rng()
520+
rand::rng()
521521
.sample_iter(&Alphanumeric)
522522
.take(length)
523523
.map(char::from)

0 commit comments

Comments
 (0)