Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix client trying to use “occupied file” for socket. #3200

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ authors = ["Valkey GLIDE Maintainers"]
[dependencies]
tokio = { version = "1", features = ["macros", "time", "rt-multi-thread"] }
glide-core = { path = "../../glide-core" }
logger_core = {path = "../../logger_core"}
logger_core = { path = "../../logger_core" }
redis = { path = "../../glide-core/redis-rs/redis", features = ["aio"] }
futures = "0.3.28"
rand = "0.8.5"
rand = "0.9"
itoa = "1.0.6"
clap = { version = "4.3.8", features = ["derive"] }
chrono = "0.4.26"
serde_json = "1.0.99"
statistical = "1.0.0"
tikv-jemallocator = "0.5.4"
tikv-jemallocator = "0.6"

[profile.release]
debug = true
Expand Down
16 changes: 8 additions & 8 deletions benchmarks/rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ static GLOBAL: Jemalloc = Jemalloc;
use clap::Parser;
use futures::{self, future::join_all, stream, StreamExt};
use glide_core::client::{Client, ConnectionRequest, NodeAddress, TlsMode};
use rand::{thread_rng, Rng};
use rand::{distr::Alphanumeric, rng, Rng};
use serde_json::Value;
use std::{
cmp::max,
Expand Down Expand Up @@ -210,8 +210,8 @@ fn calculate_latencies(values: &[Duration], prefix: &str) -> HashMap<String, Val
}

fn generate_random_string(length: usize) -> String {
rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
rand::rng()
.sample_iter(Alphanumeric)
.take(length)
.map(char::from)
.collect()
Expand Down Expand Up @@ -280,19 +280,19 @@ async fn perform_operation(
data_size: usize,
) -> ChosenAction {
let mut cmd = redis::Cmd::new();
let action = if rand::thread_rng().gen_bool(PROB_GET) {
if rand::thread_rng().gen_bool(PROB_GET_EXISTING_KEY) {
let action = if rand::rng().random_bool(PROB_GET) {
if rand::rng().random_bool(PROB_GET_EXISTING_KEY) {
cmd.arg("GET")
.arg(buffer.format(thread_rng().gen_range(0..SIZE_SET_KEYSPACE)));
.arg(buffer.format(rng().random_range(0..SIZE_SET_KEYSPACE)));
ChosenAction::GetExisting
} else {
cmd.arg("GET")
.arg(buffer.format(thread_rng().gen_range(SIZE_SET_KEYSPACE..SIZE_GET_KEYSPACE)));
.arg(buffer.format(rng().random_range(SIZE_SET_KEYSPACE..SIZE_GET_KEYSPACE)));
ChosenAction::GetNonExisting
}
} else {
cmd.arg("SET")
.arg(buffer.format(thread_rng().gen_range(0..SIZE_SET_KEYSPACE)))
.arg(buffer.format(rng().random_range(0..SIZE_SET_KEYSPACE)))
.arg(generate_random_string(data_size));
ChosenAction::Set
};
Expand Down
8 changes: 2 additions & 6 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ db-urls = ["https://github.com/rustsec/advisory-db"]
yanked = "deny"
# A list of advisory IDs to ignore. Note that ignored advisories will still
# output a note when they are encountered.
ignore = [
# suppress this validation until #3226 not resolved
# https://github.com/valkey-io/valkey-glide/issues/3226
"RUSTSEC-2025-0007",
]
ignore = []
# Threshold for security vulnerabilities, any vulnerability with a CVSS score
# lower than the range specified will be ignored. Note that ignored advisories
# will still output a note when they are encountered.
Expand Down Expand Up @@ -60,7 +56,7 @@ allow = [
"ISC",
"OpenSSL",
"MPL-2.0",
"Unicode-3.0"
"Unicode-3.0",
]
# The confidence threshold for detecting a license from license text.
# The higher the value, the more closely the license text must be to the
Expand Down
38 changes: 19 additions & 19 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ authors = ["Valkey GLIDE Maintainers"]

[dependencies]
bytes = "1"
futures = "^0.3"
futures = "0.3"
redis = { path = "./redis-rs/redis", features = [
"aio",
"tokio-comp",
Expand All @@ -21,27 +21,27 @@ redis = { path = "./redis-rs/redis", features = [
telemetrylib = { path = "./telemetry" }
tokio = { version = "1", features = ["macros", "time"] }
logger_core = { path = "../logger_core" }
dispose = "0.5.0"
tokio-util = { version = "^0.7", features = ["rt"], optional = true }
num_cpus = { version = "^1.15", optional = true }
tokio-retry2 = {version = "0.5", features = ["jitter"]}
dispose = "0.5"
tokio-util = { version = "0.7", features = ["rt"], optional = true }
num_cpus = { version = "1", optional = true }
tokio-retry2 = { version = "0.5", features = ["jitter"] }

protobuf = { version = "3", features = [
"bytes",
"with-bytes",
], optional = true }
integer-encoding = { version = "4.0.0", optional = true }
thiserror = "1"
rand = { version = "0.8.5" }
futures-intrusive = "0.5.0"
directories = { version = "5.0", optional = true }
once_cell = "1.18.0"
sha1_smol = "1.0.0"
nanoid = "0.4.0"
async-trait = { version = "0.1.24" }
integer-encoding = { version = "4", optional = true }
thiserror = "2"
rand = "0.9"
futures-intrusive = "0.5"
directories = { version = "6", optional = true }
once_cell = "1"
sha1_smol = "1"
nanoid = "0.4"
async-trait = { version = "0.1" }
serde_json = "1"
serde = { version = "1", features = ["derive"] }
versions = "6.3"
versions = "7"

[features]
proto = ["protobuf"]
Expand All @@ -57,12 +57,12 @@ standalone_heartbeat = []
[dev-dependencies]
rsevents = "0.3.1"
socket2 = "^0.5"
tempfile = "3.3.0"
rstest = "^0.23"
tempfile = "3.17"
rstest = "^0.24"
serial_test = "3"
criterion = { version = "^0.5", features = ["html_reports", "async_tokio"] }
which = "6"
ctor = "0.2.2"
which = "7"
ctor = "0.4"
redis = { path = "./redis-rs/redis", features = ["tls-rustls-insecure"] }
iai-callgrind = "0.14"
tokio = { version = "1", features = ["rt-multi-thread"] }
Expand Down
4 changes: 2 additions & 2 deletions glide-core/benches/rotating_buffer_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use glide_core::{
};
use integer_encoding::VarInt;
use protobuf::Message;
use rand::{distributions::Alphanumeric, Rng};
use rand::{distr::Alphanumeric, Rng};

fn benchmark(
c: &mut Criterion,
Expand Down Expand Up @@ -102,7 +102,7 @@ fn benchmark_split_data(
}

fn generate_random_string(length: usize) -> bytes::Bytes {
let s: String = rand::thread_rng()
let s: String = rand::rng()
.sample_iter(&Alphanumeric)
.take(length)
.map(char::from)
Expand Down
24 changes: 12 additions & 12 deletions glide-core/redis-rs/redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ ryu = "1.0"
itoa = "1.0"

# Strum is a set of macros and traits for working with enums and strings easier in Rust.
strum = "0.26"
strum_macros = "0.26"
strum = "0.27"
strum_macros = "0.27"

# This is a dependency that already exists in url
percent-encoding = "2.1"
Expand Down Expand Up @@ -58,7 +58,7 @@ r2d2 = { version = "0.8.8", optional = true }

# Only needed for cluster
crc16 = { version = "0.4", optional = true }
rand = { version = "0.8", optional = true }
rand = { version = "0.9", optional = true }

# Only needed for async cluster
dashmap = { version = "6.0", optional = true }
Expand All @@ -73,10 +73,10 @@ native-tls = { version = "0.2", optional = true }
tokio-native-tls = { version = "0.3", optional = true }

# Only needed for rustls
rustls = { version = "0.22", optional = true }
rustls = { version = "0.23.23", optional = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason we are locked into a specific minor version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually tried to do the opposite everywhere, mistake

webpki-roots = { version = "0.26", optional = true }
rustls-native-certs = { version = "0.7", optional = true }
tokio-rustls = { version = "0.25", optional = true }
rustls-native-certs = { version = "0.8", optional = true }
tokio-rustls = { version = "0.26", optional = true }
rustls-pemfile = { version = "2", optional = true }
rustls-pki-types = { version = "1", optional = true }

Expand Down Expand Up @@ -160,13 +160,13 @@ disable-client-setinfo = []
tls = ["tls-native-tls"] # use "tls-native-tls" instead

[dev-dependencies]
rand = "0.8"
rand = "0.9"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo, we should just use major version, e.g:

rand = "0"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When its not stable yet, minor can be breaking as well, that's why under one i left the minor

socket2 = "0.5"
assert_approx_eq = "1.0"
fnv = "1.0.5"
futures = "0.3"
futures-time = "3"
criterion = "0.4"
criterion = "0.5"
partial-io = { version = "0.5", features = ["tokio", "quickcheck1"] }
quickcheck = "1.0.3"
tokio = { version = "1", features = [
Expand All @@ -175,13 +175,13 @@ tokio = { version = "1", features = [
"rt-multi-thread",
"time",
] }
tempfile = "=3.6.0"
tempfile = "3.17"
once_cell = "1"
anyhow = "1"
sscanf = "0.4.1"
serial_test = "^2"
versions = "6.3"
which = "7.0.1"
serial_test = "^3"
versions = "7"
which = "7"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }

[[test]]
Expand Down
6 changes: 3 additions & 3 deletions glide-core/redis-rs/redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use crate::{
cluster_routing::{Redirect, Route, RoutingInfo},
IntoConnectionInfo, PushInfo,
};
use rand::{seq::IteratorRandom, thread_rng};
use rand::{rng, seq::IteratorRandom};
use std::cell::RefCell;
use std::collections::HashSet;
use std::str::FromStr;
Expand Down Expand Up @@ -372,7 +372,7 @@ where

fn create_new_slots(&self) -> RedisResult<SlotMap> {
let mut connections = self.connections.borrow_mut();
let mut rng = thread_rng();
let mut rng = rng();
let len = connections.len();
let samples = connections.iter_mut().choose_multiple(&mut rng, len);
let mut result = Err(RedisError::from((
Expand Down Expand Up @@ -959,7 +959,7 @@ fn get_random_connection<C: ConnectionLike + Connect + Sized>(
) -> (String, &mut C) {
let addr = connections
.keys()
.choose(&mut thread_rng())
.choose(&mut rand::rng())
.expect("Connections is empty")
.to_string();
let con = connections.get_mut(&addr).expect("Connections is empty");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ where
(!self.connection_map.is_empty()).then_some({
self.connection_map
.iter()
.choose_multiple(&mut rand::thread_rng(), amount)
.choose_multiple(&mut rand::rng(), amount)
.into_iter()
.map(move |item| {
let (address, node) = (item.key(), item.value());
Expand All @@ -657,7 +657,7 @@ where
// Increase the total number of connections by the number of connections managed by `node`
Telemetry::incr_total_connections(node.connections_count());

if let Some(old_conn) = self.connection_map.insert(address.clone(), node) {
if let Some(old_conn) = self.connection_map.insert(String::from(&address), node) {
// We are replacing a node. Reduce the counter by the number of connections managed by
// the old connection
Telemetry::decr_total_connections(old_conn.connections_count());
Expand Down
4 changes: 2 additions & 2 deletions glide-core/redis-rs/redis/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl RetryParams {
let clamped_wait = base_wait
.min(self.max_wait_time)
.max(self.min_wait_time + 1);
let jittered_wait = rand::thread_rng().gen_range(self.min_wait_time..clamped_wait);
let jittered_wait = rand::rng().random_range(self.min_wait_time..clamped_wait);
Duration::from_millis(jittered_wait)
}
}
Expand Down Expand Up @@ -121,7 +121,7 @@ impl SlotsRefreshRateLimit {
pub(crate) fn wait_duration(&self) -> Duration {
let duration_jitter = match self.max_jitter_milli {
0 => Duration::from_millis(0),
_ => Duration::from_millis(rand::thread_rng().gen_range(0..self.max_jitter_milli)),
_ => Duration::from_millis(rand::rng().random_range(0..self.max_jitter_milli)),
};
self.interval_duration.add(duration_jitter)
}
Expand Down
4 changes: 2 additions & 2 deletions glide-core/redis-rs/redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1420,8 +1420,8 @@ impl Route {

/// Choose a random slot from `0..SLOT_SIZE` (excluding)
fn random_slot() -> u16 {
let mut rng = rand::thread_rng();
rng.gen_range(0..crate::cluster_topology::SLOT_SIZE)
let mut rng = rand::rng();
rng.random_range(0..crate::cluster_topology::SLOT_SIZE)
}

#[cfg(test)]
Expand Down
20 changes: 17 additions & 3 deletions glide-core/redis-rs/redis/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,8 +898,22 @@ pub(crate) fn create_rustls_config(
not(feature = "tls-native-tls"),
not(feature = "tls-rustls-webpki-roots")
))]
for cert in load_native_certs()? {
root_store.add(cert)?;
let native_certs = load_native_certs();
#[cfg(all(
feature = "tls-rustls",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support both versions? if so, why?

not(feature = "tls-native-tls"),
not(feature = "tls-rustls-webpki-roots")
))]
if native_certs.errors.is_empty() {
native_certs
.certs
.iter()
.try_for_each(|der| root_store.add(der.to_owned()))?;
} else {
fail!((
ErrorKind::InvalidClientConfig,
"Unable to load native certificates"
));
}

let config = rustls::ClientConfig::builder();
Expand Down Expand Up @@ -939,7 +953,7 @@ pub(crate) fn create_rustls_config(
config
.dangerous()
.set_certificate_verifier(Arc::new(NoCertificateVerification {
supported: rustls::crypto::ring::default_provider()
supported: rustls::crypto::aws_lc_rs::default_provider()
.signature_verification_algorithms,
}));

Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/sentinel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ fn is_replica_valid(replica_info: &HashMap<String, String>) -> bool {

/// Generates a random value in the 0..max range.
fn random_replica_index(max: NonZeroUsize) -> usize {
rand::thread_rng().gen_range(0..max.into())
rand::rng().random_range(0..max.into())
}

fn try_connect_to_first_replica(
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/tests/test_cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ mod test_cluster_scan_async {
) -> RoutingInfo {
let mut cluster_conn = cluster.async_connection(None).await;
let distribution_clone = slot_distribution.clone();
let index_of_random_node = rand::thread_rng().gen_range(0..slot_distribution.len());
let index_of_random_node = rand::rng().random_range(0..slot_distribution.len());
let random_node = distribution_clone.get(index_of_random_node).unwrap();
let random_node_route_info = RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
host: random_node.1.clone(),
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
use_try_shorthand = true
edition = "2018"
edition = "2024"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this break on older versions of Ubuntu, for example 22.04 LTS? (I am not sure which version of Rust they provide, but this might be a concern)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also use 2021 if 2018 does not work with our current code base

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break for usage or break for development?

3 changes: 1 addition & 2 deletions glide-core/src/client/standalone_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::retry_strategies::RetryStrategy;
use futures::{future, stream, StreamExt};
use logger_core::log_debug;
use logger_core::log_warn;
use rand::Rng;
use redis::aio::ConnectionLike;
use redis::cluster_routing::{self, is_readonly_cmd, ResponsePolicy, Routable, RoutingInfo};
use redis::{PushInfo, RedisError, RedisResult, Value};
Expand Down Expand Up @@ -129,7 +128,7 @@ impl StandaloneClient {
let tls_mode = connection_request.tls_mode;
let node_count = connection_request.addresses.len();
// randomize pubsub nodes, maybe a batter option is to always use the primary
let pubsub_node_index = rand::thread_rng().gen_range(0..node_count);
let pubsub_node_index = rand::random_range(0..node_count);
let pubsub_addr = &connection_request.addresses[pubsub_node_index];
let discover_az = matches!(
connection_request.read_from,
Expand Down
Loading
Loading