Skip to content

Commit dc76ec1

Browse files
authored
Introduce new optional scheduler, using Morsel-driven Parallelism + rayon (#2199) (#2226)
* Morsel-driven Parallelism using rayon (#2199) * Fix LIFO spawn ordering * Further docs for ExecutionPipeline * Deduplicate concurrent wakes * Add license headers * Sort Cargo.toml * Revert accidental change to ParquetExec * Handle wakeups triggered by other threads * Use SeqCst memory ordering * Review feedback * Add panic handler * Cleanup structs Add test of tokio interoperation * Review feedback * Use BatchPartitioner Cleanup error handling * Clarify shutdown characteristics * Fix racy test_panic * Don't overload Query nomenclature * Rename QueryResults to ExecutionResults * Further review feedback * Merge scheduler into datafusion/core * Review feedback * Fix partitioned execution * Format * Format Cargo.toml * Fix doc link
1 parent e8ba45c commit dc76ec1

File tree

11 files changed

+1905
-19
lines changed

11 files changed

+1905
-19
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ The parquet SQL benchmarks can be run with
150150
cargo bench --bench parquet_query_sql
151151
```
152152

153-
These randomly generate a parquet file, and then benchmark queries sourced from [parquet_query_sql.sql](./datafusion/benches/parquet_query_sql.sql) against it. This can therefore be a quick way to add coverage of particular query and/or data paths.
153+
These randomly generate a parquet file, and then benchmark queries sourced from [parquet_query_sql.sql](./datafusion/core/benches/parquet_query_sql.sql) against it. This can therefore be a quick way to add coverage of particular query and/or data paths.
154154

155155
If the environment variable `PARQUET_FILE` is set, the benchmark will run queries against this file instead of a randomly generated one. This can be useful for performing multiple runs, potentially with different code, against the same source data, or for testing against a custom dataset.
156156

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
[workspace]
1919
members = [
20-
"datafusion/core",
2120
"datafusion/common",
21+
"datafusion/core",
2222
"datafusion/expr",
2323
"datafusion/jit",
2424
"datafusion/physical-expr",

datafusion/core/Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ repository = "https://github.com/apache/arrow-datafusion"
2424
readme = "../README.md"
2525
authors = ["Apache Arrow <[email protected]>"]
2626
license = "Apache-2.0"
27-
keywords = [ "arrow", "query", "sql" ]
27+
keywords = ["arrow", "query", "sql"]
2828
include = [
2929
"benches/*.rs",
3030
"src/**/*.rs",
@@ -50,6 +50,8 @@ pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"]
5050
regex_expressions = ["datafusion-physical-expr/regex_expressions"]
5151
# Used to enable row format experiment
5252
row = ["datafusion-row"]
53+
# Used to enable scheduler
54+
scheduler = ["rayon"]
5355
simd = ["arrow/simd"]
5456
unicode_expressions = ["datafusion-physical-expr/regex_expressions"]
5557

@@ -75,9 +77,10 @@ ordered-float = "3.0"
7577
parking_lot = "0.12"
7678
parquet = { version = "13", features = ["arrow"] }
7779
paste = "^1.0"
78-
pin-project-lite= "^0.2.7"
80+
pin-project-lite = "^0.2.7"
7981
pyo3 = { version = "0.16", optional = true }
8082
rand = "0.8"
83+
rayon = { version = "1.5", optional = true }
8184
smallvec = { version = "1.6", features = ["union"] }
8285
sqlparser = "0.16"
8386
tempfile = "3"
@@ -88,6 +91,7 @@ uuid = { version = "1.0", features = ["v4"] }
8891
[dev-dependencies]
8992
criterion = "0.3"
9093
doc-comment = "0.3"
94+
env_logger = "0.9"
9195
fuzz-utils = { path = "fuzz-utils" }
9296

9397
[[bench]]
@@ -121,6 +125,7 @@ name = "physical_plan"
121125
[[bench]]
122126
harness = false
123127
name = "parquet_query_sql"
128+
required-features = ["scheduler"]
124129

125130
[[bench]]
126131
harness = false

datafusion/core/benches/parquet_query_sql.rs

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use arrow::datatypes::{
2424
};
2525
use arrow::record_batch::RecordBatch;
2626
use criterion::{criterion_group, criterion_main, Criterion};
27-
use datafusion::prelude::{ParquetReadOptions, SessionContext};
27+
use datafusion::prelude::{SessionConfig, SessionContext};
28+
use datafusion::scheduler::Scheduler;
29+
use futures::stream::StreamExt;
2830
use parquet::arrow::ArrowWriter;
2931
use parquet::file::properties::{WriterProperties, WriterVersion};
3032
use rand::distributions::uniform::SampleUniform;
@@ -37,7 +39,6 @@ use std::path::Path;
3739
use std::sync::Arc;
3840
use std::time::Instant;
3941
use tempfile::NamedTempFile;
40-
use tokio_stream::StreamExt;
4142

4243
/// The number of batches to write
4344
const NUM_BATCHES: usize = 2048;
@@ -193,15 +194,24 @@ fn criterion_benchmark(c: &mut Criterion) {
193194
assert!(Path::new(&file_path).exists(), "path not found");
194195
println!("Using parquet file {}", file_path);
195196

196-
let context = SessionContext::new();
197+
let partitions = 4;
198+
let config = SessionConfig::new().with_target_partitions(partitions);
199+
let context = SessionContext::with_config(config);
197200

198-
let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
199-
rt.block_on(context.register_parquet(
200-
"t",
201-
file_path.as_str(),
202-
ParquetReadOptions::default(),
203-
))
204-
.unwrap();
201+
let scheduler = Scheduler::new(partitions);
202+
203+
let local_rt = tokio::runtime::Builder::new_current_thread()
204+
.build()
205+
.unwrap();
206+
207+
let query_rt = tokio::runtime::Builder::new_multi_thread()
208+
.worker_threads(partitions)
209+
.build()
210+
.unwrap();
211+
212+
local_rt
213+
.block_on(context.register_parquet("t", file_path.as_str(), Default::default()))
214+
.unwrap();
205215

206216
// We read the queries from a file so they can be changed without recompiling the benchmark
207217
let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
@@ -220,17 +230,42 @@ fn criterion_benchmark(c: &mut Criterion) {
220230
continue;
221231
}
222232

223-
let query = query.as_str();
224-
c.bench_function(query, |b| {
233+
c.bench_function(&format!("tokio: {}", query), |b| {
225234
b.iter(|| {
235+
let query = query.clone();
226236
let context = context.clone();
227-
rt.block_on(async move {
228-
let query = context.sql(query).await.unwrap();
237+
let (sender, mut receiver) = futures::channel::mpsc::unbounded();
238+
239+
// Spawn work to a separate tokio thread pool
240+
query_rt.spawn(async move {
241+
let query = context.sql(&query).await.unwrap();
229242
let mut stream = query.execute_stream().await.unwrap();
230-
while criterion::black_box(stream.next().await).is_some() {}
243+
244+
while let Some(next) = stream.next().await {
245+
sender.unbounded_send(next).unwrap();
246+
}
247+
});
248+
249+
local_rt.block_on(async {
250+
while receiver.next().await.transpose().unwrap().is_some() {}
231251
})
232252
});
233253
});
254+
255+
c.bench_function(&format!("scheduled: {}", query), |b| {
256+
b.iter(|| {
257+
let query = query.clone();
258+
let context = context.clone();
259+
260+
local_rt.block_on(async {
261+
let query = context.sql(&query).await.unwrap();
262+
let plan = query.create_physical_plan().await.unwrap();
263+
let mut stream =
264+
scheduler.schedule(plan, context.task_ctx()).unwrap();
265+
while stream.next().await.transpose().unwrap().is_some() {}
266+
});
267+
});
268+
});
234269
}
235270

236271
// Temporary file must outlive the benchmarks, it is deleted when dropped

datafusion/core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ pub mod physical_optimizer;
218218
pub mod physical_plan;
219219
pub mod prelude;
220220
pub mod scalar;
221+
#[cfg(feature = "scheduler")]
222+
pub mod scheduler;
221223
pub mod sql;
222224
pub mod variable;
223225

0 commit comments

Comments
 (0)