Skip to content

Commit e8c8e20

Browse files
Add DedicatedExecutor to FlightSQL Server (#247)
Add's a dedicated executor for running CPU bound work on the FlightSQL server. There is interest from the [DataFusion community](apache/datafusion#13274 (comment)) for this, it was already on our [roadmap](#197) and I think the DFT FlightSQL server is a great place to have a reference implementation. Initial inspiration and context can be found [here](https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/). Most of the initial implementation was copied from [here](https://github.com/influxdata/influxdb3_core/blob/6fcbb004232738d55655f32f4ad2385523d10696/executor/src/lib.rs) with some tweaks for our current setup. In particular we dont have metrics yet in the FlightSQL server implementation (but it is on the [roadmap](#210)) - I expect to do a follow on where metrics are integrated.
1 parent c1973b1 commit e8c8e20

File tree

13 files changed

+1100
-80
lines changed

13 files changed

+1100
-80
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ http-body = {version = "0.4.5" }
3232
itertools = "0.13.0"
3333
lazy_static = "1.4.0"
3434
log = "0.4.22"
35+
num_cpus = "1.16.0"
3536
object_store = { version = "0.10.2", features = ["aws"], optional = true }
37+
parking_lot = "0.12.3"
3638
pin-project-lite = {version = "0.2.14" }
3739
prost = "0.12.3"
3840
ratatui = "0.28.0"

f.csv

Lines changed: 0 additions & 2 deletions
This file was deleted.

src/args.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ pub struct DftArgs {
9191
#[clap(short = 'n', help = "Set the number of benchmark iterations to run")]
9292
pub benchmark_iterations: Option<usize>,
9393

94-
#[cfg(feature = "flightsql")]
94+
#[cfg(any(feature = "flightsql", feature = "experimental-flightsql-server"))]
9595
#[clap(long, help = "Set the host and port to be used for FlightSQL")]
9696
pub flightsql_host: Option<String>,
9797
}

src/config.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@ pub struct ExecutionConfig {
172172
pub tui_batch_size: usize,
173173
#[serde(default = "default_flightsql_server_batch_size")]
174174
pub flightsql_server_batch_size: usize,
175+
#[serde(default = "default_dedicated_executor_enabled")]
176+
pub dedicated_executor_enabled: bool,
177+
#[serde(default = "default_dedicated_executor_threads")]
178+
pub dedicated_executor_threads: usize,
175179
}
176180

177181
fn default_ddl_path() -> Option<PathBuf> {
@@ -204,6 +208,18 @@ fn default_flightsql_server_batch_size() -> usize {
204208
8092
205209
}
206210

211+
fn default_dedicated_executor_enabled() -> bool {
212+
false
213+
}
214+
215+
fn default_dedicated_executor_threads() -> usize {
216+
// By default we slightly over provision CPUs. For example, if you have N CPUs available we
217+
// have N CPUs for the [`DedicatedExecutor`] and 1 for the main / IO runtime.
218+
//
219+
// Ref: https://github.com/datafusion-contrib/datafusion-dft/pull/247#discussion_r1848270250
220+
num_cpus::get()
221+
}
222+
207223
impl Default for ExecutionConfig {
208224
fn default() -> Self {
209225
Self {
@@ -213,6 +229,8 @@ impl Default for ExecutionConfig {
213229
cli_batch_size: default_cli_batch_size(),
214230
tui_batch_size: default_tui_batch_size(),
215231
flightsql_server_batch_size: default_flightsql_server_batch_size(),
232+
dedicated_executor_enabled: default_dedicated_executor_enabled(),
233+
dedicated_executor_threads: default_dedicated_executor_threads(),
216234
}
217235
}
218236
}

0 commit comments

Comments
 (0)