Skip to content

Commit 21fa511

Browse files
authoredFeb 26, 2025
Benchmark query cancellation (apache#14818)
Connects to apache#14036. This benchmark loads multiple files into an in-memory object store, starts a datafusion query in a new tokio runtime, lets the query run for an amount of time, cancels the query, and measures how long it takes to drop the tokio runtime. This demonstrates datafusion is likely not yielding often enough to allow for timely query cancellation and freeing up of all resources.
1 parent f51cd6e commit 21fa511

File tree

6 files changed

+368
-17
lines changed

6 files changed

+368
-17
lines changed
 

‎Cargo.lock

+3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎benchmarks/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,16 @@ env_logger = { workspace = true }
4242
futures = { workspace = true }
4343
log = { workspace = true }
4444
mimalloc = { version = "0.1", optional = true, default-features = false }
45+
object_store = { workspace = true }
4546
parquet = { workspace = true, default-features = true }
47+
rand = { workspace = true }
4648
serde = { version = "1.0.218", features = ["derive"] }
4749
serde_json = { workspace = true }
4850
snmalloc-rs = { version = "0.3", optional = true }
4951
structopt = { version = "0.3", default-features = false }
5052
test-utils = { path = "../test-utils/", version = "0.1.0" }
5153
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
54+
tokio-util = { version = "0.7.4" }
5255

5356
[dev-dependencies]
5457
datafusion-proto = { workspace = true }

‎benchmarks/bench.sh

+21-8
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ tpch: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), s
7373
tpch_mem: TPCH inspired benchmark on Scale Factor (SF) 1 (~1GB), query from memory
7474
tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), single parquet file per table, hash join
7575
tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory
76+
cancellation: How long cancelling a query takes
7677
parquet: Benchmark of parquet reader's filtering speed
7778
sort: Benchmark of sorting speed
7879
sort_tpch: Benchmark of sorting speed for end-to-end sort queries on TPCH dataset
@@ -232,6 +233,7 @@ main() {
232233
run_tpch_mem "1"
233234
run_tpch "10"
234235
run_tpch_mem "10"
236+
run_cancellation
235237
run_parquet
236238
run_sort
237239
run_clickbench_1
@@ -255,6 +257,9 @@ main() {
255257
tpch_mem10)
256258
run_tpch_mem "10"
257259
;;
260+
cancellation)
261+
run_cancellation
262+
;;
258263
parquet)
259264
run_parquet
260265
;;
@@ -397,6 +402,14 @@ run_tpch_mem() {
397402
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}"
398403
}
399404

405+
# Runs the cancellation benchmark
406+
run_cancellation() {
407+
RESULTS_FILE="${RESULTS_DIR}/cancellation.json"
408+
echo "RESULTS_FILE: ${RESULTS_FILE}"
409+
echo "Running cancellation benchmark..."
410+
$CARGO_COMMAND --bin dfbench -- cancellation --iterations 5 --path "${DATA_DIR}/cancellation" -o "${RESULTS_FILE}"
411+
}
412+
400413
# Runs the parquet filter benchmark
401414
run_parquet() {
402415
RESULTS_FILE="${RESULTS_DIR}/parquet.json"
@@ -490,9 +503,9 @@ data_imdb() {
490503
local imdb_temp_gz="${imdb_dir}/imdb.tgz"
491504
local imdb_url="https://event.cwi.nl/da/job/imdb.tgz"
492505

493-
# imdb has 21 files, we just separate them into 3 groups for better readability
506+
# imdb has 21 files, we just separate them into 3 groups for better readability
494507
local first_required_files=(
495-
"aka_name.parquet"
508+
"aka_name.parquet"
496509
"aka_title.parquet"
497510
"cast_info.parquet"
498511
"char_name.parquet"
@@ -539,13 +552,13 @@ data_imdb() {
539552
if [ "$convert_needed" = true ]; then
540553
# Expected size of the dataset
541554
expected_size="1263193115" # 1.18 GB
542-
555+
543556
echo -n "Looking for imdb.tgz... "
544557
if [ -f "${imdb_temp_gz}" ]; then
545558
echo "found"
546559
echo -n "Checking size... "
547560
OUTPUT_SIZE=$(wc -c "${imdb_temp_gz}" 2>/dev/null | awk '{print $1}' || true)
548-
561+
549562
#Checking the size of the existing file
550563
if [ "${OUTPUT_SIZE}" = "${expected_size}" ]; then
551564
# Existing file is of the expected size, no need for download
@@ -559,7 +572,7 @@ data_imdb() {
559572

560573
# Download the dataset
561574
curl -o "${imdb_temp_gz}" "${imdb_url}"
562-
575+
563576
# Size check of the installed file
564577
DOWNLOADED_SIZE=$(wc -c "${imdb_temp_gz}" | awk '{print $1}')
565578
if [ "${DOWNLOADED_SIZE}" != "${expected_size}" ]; then
@@ -591,7 +604,7 @@ data_imdb() {
591604
# Runs the imdb benchmark
592605
run_imdb() {
593606
IMDB_DIR="${DATA_DIR}/imdb"
594-
607+
595608
RESULTS_FILE="${RESULTS_DIR}/imdb.json"
596609
echo "RESULTS_FILE: ${RESULTS_FILE}"
597610
echo "Running imdb benchmark..."
@@ -726,9 +739,9 @@ run_external_aggr() {
726739
echo "Running external aggregation benchmark..."
727740

728741
# Only parquet is supported.
729-
# Since per-operator memory limit is calculated as (total-memory-limit /
742+
# Since per-operator memory limit is calculated as (total-memory-limit /
730743
# number-of-partitions), and by default `--partitions` is set to number of
731-
# CPU cores, we set a constant number of partitions to prevent this
744+
# CPU cores, we set a constant number of partitions to prevent this
732745
# benchmark to fail on some machines.
733746
$CARGO_COMMAND --bin external_aggr -- benchmark --partitions 4 --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}"
734747
}

‎benchmarks/src/bin/dfbench.rs

+11-9
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,21 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
3434
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
3535

3636
use datafusion_benchmarks::{
37-
clickbench, h2o, imdb, parquet_filter, sort, sort_tpch, tpch,
37+
cancellation, clickbench, h2o, imdb, parquet_filter, sort, sort_tpch, tpch,
3838
};
3939

4040
#[derive(Debug, StructOpt)]
4141
#[structopt(about = "benchmark command")]
4242
enum Options {
43-
Tpch(tpch::RunOpt),
44-
TpchConvert(tpch::ConvertOpt),
43+
Cancellation(cancellation::RunOpt),
4544
Clickbench(clickbench::RunOpt),
45+
H2o(h2o::RunOpt),
46+
Imdb(imdb::RunOpt),
4647
ParquetFilter(parquet_filter::RunOpt),
4748
Sort(sort::RunOpt),
4849
SortTpch(sort_tpch::RunOpt),
49-
Imdb(imdb::RunOpt),
50-
H2o(h2o::RunOpt),
50+
Tpch(tpch::RunOpt),
51+
TpchConvert(tpch::ConvertOpt),
5152
}
5253

5354
// Main benchmark runner entrypoint
@@ -56,13 +57,14 @@ pub async fn main() -> Result<()> {
5657
env_logger::init();
5758

5859
match Options::from_args() {
59-
Options::Tpch(opt) => opt.run().await,
60-
Options::TpchConvert(opt) => opt.run().await,
60+
Options::Cancellation(opt) => opt.run().await,
6161
Options::Clickbench(opt) => opt.run().await,
62+
Options::H2o(opt) => opt.run().await,
63+
Options::Imdb(opt) => opt.run().await,
6264
Options::ParquetFilter(opt) => opt.run().await,
6365
Options::Sort(opt) => opt.run().await,
6466
Options::SortTpch(opt) => opt.run().await,
65-
Options::Imdb(opt) => opt.run().await,
66-
Options::H2o(opt) => opt.run().await,
67+
Options::Tpch(opt) => opt.run().await,
68+
Options::TpchConvert(opt) => opt.run().await,
6769
}
6870
}

0 commit comments

Comments
 (0)