Skip to content

Commit 96962f7

Browse files
authored
Merge pull request #354 from wprzytula/configurable-runtime
implement `cass_cluster_set_num_threads_io`
2 parents 59ba8a1 + a20c4f0 commit 96962f7

File tree

12 files changed

+545
-204
lines changed

12 files changed

+545
-204
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ SCYLLA_EXAMPLES_TO_RUN := \
153153
# execution_profiles <- unimplemented `cass_statement_set_keyspace()`
154154
# host_listener <- unimplemented `cass_cluster_set_host_listener_callback()`
155155
# logging <- unimplemented `cass_cluster_set_host_listener_callback()`
156-
# perf <- unimplemented `cass_cluster_set_num_threads_io()`, `cass_cluster_set_queue_size_io()`
156+
# perf <- unimplemented `cass_cluster_set_queue_size_io()`
157157
# schema_meta <- unimplemented multiple schema-related functions
158158
# cloud <- out of interest for us, not related to ScyllaDB
159159
endif

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,6 @@ The driver inherits almost all the features of C/C++ and Rust drivers, such as:
263263
- cass_cluster_set_monitor_reporting_interval
264264
- cass_cluster_set_new_request_ratio
265265
- cass_cluster_set_no_compact
266-
- cass_cluster_set_num_threads_io
267266
- cass_cluster_set_pending_requests_high_water_mark
268267
- cass_cluster_set_pending_requests_low_water_mark
269268
- cass_cluster_set_prepare_on_all_hosts

examples/perf/perf.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ CassCluster* create_cluster(const char* hosts) {
137137
cass_cluster_set_credentials(cluster, "cassandra", "cassandra");
138138
cass_cluster_set_num_threads_io(cluster, NUM_IO_WORKER_THREADS);
139139
cass_cluster_set_queue_size_io(cluster, 10000);
140-
cass_cluster_set_core_connections_per_host(cluster, 1);
140+
cass_cluster_set_core_connections_per_shard(cluster, 1);
141141
return cluster;
142142
}
143143

include/cassandra.h

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1729,10 +1729,37 @@ cass_cluster_set_serial_consistency(CassCluster* cluster,
17291729
CassConsistency consistency);
17301730

17311731
/**
1732-
* Sets the number of IO threads. This is the number of threads
1733-
* that will handle query requests.
1734-
*
1735-
* <b>Default:</b> 1
1732+
* Sets the number of IO threads. This is the number of dedicated runtime threads
1733+
* that will resolve driver's futures, handling requests and other IO operations.
1734+
*
1735+
* If `num_threads` > 0 is given, the driver will create a dedicated thread pool
1736+
* with the specified number of threads. This is the recommended way to use the
1737+
* driver, as it allows the driver to execute tasks in parallel and utilize
1738+
* multiple CPU cores. Also, this makes the execution of futures eager and
1739+
* in-background, allowing the main thread to do whatever it wants concurrently
1740+
* with the futures.
1741+
*
1742+
* If 0 is specified, the `current_thread` tokio runtime will be used. This runtime
1743+
* has no dedicated worker threads, but instead uses the current thread to execute
1744+
* all tasks. This ensures the lowest possible overhead, may make sense for testing
1745+
* and debugging purposes, or for applications that do not require high concurrency.
1746+
* Also, single-CPU machines may benefit from this runtime, as operating on a single
1747+
* thread is usually faster than switching between multiple threads.
1748+
* **BEWARE:** the semantics of `CassFuture` when `current_thread` runtime is enabled
1749+
* are different. The futures will not start execution immediately when they are
1750+
* created, but only when some user thread awaits some future. That is, any thread
1751+
* that awaits a future will start the execution of all futures that are ready
1752+
* to be executed at that moment. This means that the only way to ensure that
1753+
* a future is executed is to await it. On the other hand, if one future is being
1754+
* awaited, then all other existing futures will be executed in the same thread
1755+
* until the awaited future is resolved.
1756+
* A notable example of code that is not compatible with `current_thread` runtime
1757+
* is the `callbacks` example in this codebase. This is because the main thread
1758+
* sets up callbacks and then blocks itself on a conditional variable. As no other
1759+
* thread exists that drives the futures, the callbacks will never be called
1760+
* and thus the program will hang.
1761+
*
1762+
* <b>Default:</b> Number of CPU cores available to the system.
17361763
*
17371764
* @public @memberof CassCluster
17381765
*

scylla-rust-wrapper/src/api.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pub mod cluster {
103103
// cass_cluster_set_new_request_ratio, UNIMPLEMENTED
104104
// cass_cluster_set_no_compact, UNIMPLEMENTED
105105
cass_cluster_set_no_speculative_execution_policy,
106-
// cass_cluster_set_num_threads_io, UNIMPLEMENTED
106+
cass_cluster_set_num_threads_io,
107107
cass_cluster_set_port,
108108
// cass_cluster_set_pending_requests_high_water_mark, UNIMPLEMENTED
109109
// cass_cluster_set_pending_requests_low_water_mark, UNIMPLEMENTED
@@ -984,7 +984,6 @@ pub mod integration_testing {
984984
cass_cluster_set_authenticator_callbacks,
985985
cass_cluster_set_cloud_secure_connection_bundle,
986986
cass_cluster_set_host_listener_callback,
987-
cass_cluster_set_num_threads_io,
988987
cass_cluster_set_queue_size_io,
989988
cass_cluster_set_cloud_secure_connection_bundle_n,
990989
cass_cluster_set_exponential_reconnect,

scylla-rust-wrapper/src/cluster.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::load_balancing::{
77
CassHostFilter, DcRestriction, LoadBalancingConfig, LoadBalancingKind,
88
};
99
use crate::retry_policy::CassRetryPolicy;
10+
use crate::runtime::RUNTIMES;
1011
use crate::ssl::CassSsl;
1112
use crate::timestamp_generator::CassTimestampGen;
1213
use crate::types::*;
@@ -81,8 +82,13 @@ const DEFAULT_SHARD_AWARE_LOCAL_PORT_RANGE: ShardAwarePortRange =
8182
const DRIVER_NAME: &str = "ScyllaDB Cpp-Rust Driver";
8283
const DRIVER_VERSION: &str = env!("CARGO_PKG_VERSION");
8384

84-
#[derive(Clone)]
8585
pub struct CassCluster {
86+
/// Number of threads in the tokio runtime thread pool.
87+
///
88+
/// Specified with `cass_cluster_set_num_threads_io`.
89+
/// If not set, the default tokio runtime is used.
90+
num_threads_io: Option<usize>,
91+
8692
session_builder: SessionBuilder,
8793
default_execution_profile_builder: ExecutionProfileBuilder,
8894
execution_profile_map: HashMap<ExecProfileName, CassExecProfile>,
@@ -100,6 +106,22 @@ pub struct CassCluster {
100106
}
101107

102108
impl CassCluster {
109+
/// Gets the runtime that has been set for the cluster.
110+
/// If no runtime has been set yet, it creates a default runtime
111+
/// and makes it cached in the global `Runtimes` instance.
112+
pub(crate) fn get_runtime(&self) -> Arc<tokio::runtime::Runtime> {
113+
let mut runtimes = RUNTIMES.lock().unwrap();
114+
115+
if let Some(num_threads_io) = self.num_threads_io {
116+
// If the number of threads is set, we create a runtime with that number of threads.
117+
runtimes.n_thread_runtime(num_threads_io)
118+
} else {
119+
// Otherwise, we use the default runtime.
120+
runtimes.default_runtime()
121+
}
122+
.unwrap_or_else(|err| panic!("Failed to create an async runtime: {err}"))
123+
}
124+
103125
pub(crate) fn execution_profile_map(&self) -> &HashMap<ExecProfileName, CassExecProfile> {
104126
&self.execution_profile_map
105127
}
@@ -310,6 +332,8 @@ pub unsafe extern "C" fn cass_cluster_new() -> CassOwnedExclusivePtr<CassCluster
310332
};
311333

312334
BoxFFI::into_ptr(Box::new(CassCluster {
335+
num_threads_io: None,
336+
313337
session_builder: default_session_builder,
314338
port: 9042,
315339
contact_points: Vec::new(),
@@ -1531,6 +1555,21 @@ pub unsafe extern "C" fn cass_cluster_set_execution_profile_n(
15311555
CassError::CASS_OK
15321556
}
15331557

1558+
#[unsafe(no_mangle)]
1559+
pub unsafe extern "C" fn cass_cluster_set_num_threads_io(
1560+
cluster: CassBorrowedExclusivePtr<CassCluster, CMut>,
1561+
num_threads: cass_uint32_t,
1562+
) -> CassError {
1563+
let Some(cluster) = BoxFFI::as_mut_ref(cluster) else {
1564+
tracing::error!("Provided null cluster pointer to cass_cluster_set_num_threads_io!");
1565+
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
1566+
};
1567+
1568+
cluster.num_threads_io = Some(num_threads as usize);
1569+
1570+
CassError::CASS_OK
1571+
}
1572+
15341573
#[cfg(test)]
15351574
mod tests {
15361575
use crate::testing::{assert_cass_error_eq, setup_tracing};

0 commit comments

Comments
 (0)