Skip to content

Commit d056fb5

Browse files
authored
#15507 -- extract tokio runtime creation from hot loop (#15508)
1 parent 818e739 commit d056fb5

11 files changed

+112
-60
lines changed

datafusion/core/benches/aggregate_query_sql.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ use parking_lot::Mutex;
2929
use std::sync::Arc;
3030
use tokio::runtime::Runtime;
3131

32-
fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
33-
let rt = Runtime::new().unwrap();
32+
fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
3433
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
3534
criterion::black_box(rt.block_on(df.collect()).unwrap());
3635
}
@@ -51,11 +50,13 @@ fn criterion_benchmark(c: &mut Criterion) {
5150
let array_len = 32768 * 2; // 2^16
5251
let batch_size = 2048; // 2^11
5352
let ctx = create_context(partitions_len, array_len, batch_size).unwrap();
53+
let rt = Runtime::new().unwrap();
5454

5555
c.bench_function("aggregate_query_no_group_by 15 12", |b| {
5656
b.iter(|| {
5757
query(
5858
ctx.clone(),
59+
&rt,
5960
"SELECT MIN(f64), AVG(f64), COUNT(f64) \
6061
FROM t",
6162
)
@@ -66,6 +67,7 @@ fn criterion_benchmark(c: &mut Criterion) {
6667
b.iter(|| {
6768
query(
6869
ctx.clone(),
70+
&rt,
6971
"SELECT MIN(f64), MAX(f64) \
7072
FROM t",
7173
)
@@ -76,6 +78,7 @@ fn criterion_benchmark(c: &mut Criterion) {
7678
b.iter(|| {
7779
query(
7880
ctx.clone(),
81+
&rt,
7982
"SELECT COUNT(DISTINCT u64_wide) \
8083
FROM t",
8184
)
@@ -86,6 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) {
8689
b.iter(|| {
8790
query(
8891
ctx.clone(),
92+
&rt,
8993
"SELECT COUNT(DISTINCT u64_narrow) \
9094
FROM t",
9195
)
@@ -96,6 +100,7 @@ fn criterion_benchmark(c: &mut Criterion) {
96100
b.iter(|| {
97101
query(
98102
ctx.clone(),
103+
&rt,
99104
"SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \
100105
FROM t GROUP BY utf8",
101106
)
@@ -106,6 +111,7 @@ fn criterion_benchmark(c: &mut Criterion) {
106111
b.iter(|| {
107112
query(
108113
ctx.clone(),
114+
&rt,
109115
"SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) \
110116
FROM t \
111117
WHERE f32 > 10 AND f32 < 20 GROUP BY utf8",
@@ -117,6 +123,7 @@ fn criterion_benchmark(c: &mut Criterion) {
117123
b.iter(|| {
118124
query(
119125
ctx.clone(),
126+
&rt,
120127
"SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \
121128
FROM t GROUP BY u64_narrow",
122129
)
@@ -127,6 +134,7 @@ fn criterion_benchmark(c: &mut Criterion) {
127134
b.iter(|| {
128135
query(
129136
ctx.clone(),
137+
&rt,
130138
"SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \
131139
FROM t \
132140
WHERE f32 > 10 AND f32 < 20 GROUP BY u64_narrow",
@@ -138,6 +146,7 @@ fn criterion_benchmark(c: &mut Criterion) {
138146
b.iter(|| {
139147
query(
140148
ctx.clone(),
149+
&rt,
141150
"SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) \
142151
FROM t GROUP BY u64_wide, utf8",
143152
)
@@ -148,6 +157,7 @@ fn criterion_benchmark(c: &mut Criterion) {
148157
b.iter(|| {
149158
query(
150159
ctx.clone(),
160+
&rt,
151161
"SELECT utf8, approx_percentile_cont(u64_wide, 0.5, 2500) \
152162
FROM t GROUP BY utf8",
153163
)
@@ -158,6 +168,7 @@ fn criterion_benchmark(c: &mut Criterion) {
158168
b.iter(|| {
159169
query(
160170
ctx.clone(),
171+
&rt,
161172
"SELECT utf8, approx_percentile_cont(f32, 0.5, 2500) \
162173
FROM t GROUP BY utf8",
163174
)
@@ -168,6 +179,7 @@ fn criterion_benchmark(c: &mut Criterion) {
168179
b.iter(|| {
169180
query(
170181
ctx.clone(),
182+
&rt,
171183
"SELECT MEDIAN(DISTINCT u64_wide), MEDIAN(DISTINCT u64_narrow) \
172184
FROM t",
173185
)
@@ -178,6 +190,7 @@ fn criterion_benchmark(c: &mut Criterion) {
178190
b.iter(|| {
179191
query(
180192
ctx.clone(),
193+
&rt,
181194
"SELECT first_value(u64_wide order by f64, u64_narrow, utf8),\
182195
last_value(u64_wide order by f64, u64_narrow, utf8) \
183196
FROM t GROUP BY u64_narrow",
@@ -189,6 +202,7 @@ fn criterion_benchmark(c: &mut Criterion) {
189202
b.iter(|| {
190203
query(
191204
ctx.clone(),
205+
&rt,
192206
"SELECT first_value(u64_wide ignore nulls order by f64, u64_narrow, utf8), \
193207
last_value(u64_wide ignore nulls order by f64, u64_narrow, utf8) \
194208
FROM t GROUP BY u64_narrow",
@@ -200,6 +214,7 @@ fn criterion_benchmark(c: &mut Criterion) {
200214
b.iter(|| {
201215
query(
202216
ctx.clone(),
217+
&rt,
203218
"SELECT first_value(u64_wide order by f64), \
204219
last_value(u64_wide order by f64) \
205220
FROM t GROUP BY u64_narrow",

datafusion/core/benches/csv_load.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,12 @@ use std::time::Duration;
3232
use test_utils::AccessLogGenerator;
3333
use tokio::runtime::Runtime;
3434

35-
fn load_csv(ctx: Arc<Mutex<SessionContext>>, path: &str, options: CsvReadOptions) {
36-
let rt = Runtime::new().unwrap();
35+
fn load_csv(
36+
ctx: Arc<Mutex<SessionContext>>,
37+
rt: &Runtime,
38+
path: &str,
39+
options: CsvReadOptions,
40+
) {
3741
let df = rt.block_on(ctx.lock().read_csv(path, options)).unwrap();
3842
criterion::black_box(rt.block_on(df.collect()).unwrap());
3943
}
@@ -61,6 +65,7 @@ fn generate_test_file() -> TestCsvFile {
6165

6266
fn criterion_benchmark(c: &mut Criterion) {
6367
let ctx = create_context().unwrap();
68+
let rt = Runtime::new().unwrap();
6469
let test_file = generate_test_file();
6570

6671
let mut group = c.benchmark_group("load csv testing");
@@ -70,6 +75,7 @@ fn criterion_benchmark(c: &mut Criterion) {
7075
b.iter(|| {
7176
load_csv(
7277
ctx.clone(),
78+
&rt,
7379
test_file.path().to_str().unwrap(),
7480
CsvReadOptions::default(),
7581
)
@@ -80,6 +86,7 @@ fn criterion_benchmark(c: &mut Criterion) {
8086
b.iter(|| {
8187
load_csv(
8288
ctx.clone(),
89+
&rt,
8390
test_file.path().to_str().unwrap(),
8491
CsvReadOptions::default().null_regex(Some("^NULL$|^$".to_string())),
8592
)

datafusion/core/benches/dataframe.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,7 @@ fn create_context(field_count: u32) -> datafusion_common::Result<Arc<SessionCont
4444
Ok(Arc::new(ctx))
4545
}
4646

47-
fn run(column_count: u32, ctx: Arc<SessionContext>) {
48-
let rt = Runtime::new().unwrap();
49-
47+
fn run(column_count: u32, ctx: Arc<SessionContext>, rt: &Runtime) {
5048
criterion::black_box(rt.block_on(async {
5149
let mut data_frame = ctx.table("t").await.unwrap();
5250

@@ -67,11 +65,13 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) {
6765
}
6866

6967
fn criterion_benchmark(c: &mut Criterion) {
68+
let rt = Runtime::new().unwrap();
69+
7070
for column_count in [10, 100, 200, 500] {
7171
let ctx = create_context(column_count).unwrap();
7272

7373
c.bench_function(&format!("with_column_{column_count}"), |b| {
74-
b.iter(|| run(column_count, ctx.clone()))
74+
b.iter(|| run(column_count, ctx.clone(), &rt))
7575
});
7676
}
7777
}

datafusion/core/benches/distinct_query_sql.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ use parking_lot::Mutex;
3333
use std::{sync::Arc, time::Duration};
3434
use tokio::runtime::Runtime;
3535

36-
fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
37-
let rt = Runtime::new().unwrap();
36+
fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
3837
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
3938
criterion::black_box(rt.block_on(df.collect()).unwrap());
4039
}
@@ -55,6 +54,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
5554
let array_len = 1 << 26; // 64 M
5655
let batch_size = 8192;
5756
let ctx = create_context(partitions_len, array_len, batch_size).unwrap();
57+
let rt = Runtime::new().unwrap();
5858

5959
let mut group = c.benchmark_group("custom-measurement-time");
6060
group.measurement_time(Duration::from_secs(40));
@@ -63,6 +63,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
6363
b.iter(|| {
6464
query(
6565
ctx.clone(),
66+
&rt,
6667
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 10",
6768
)
6869
})
@@ -72,6 +73,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
7273
b.iter(|| {
7374
query(
7475
ctx.clone(),
76+
&rt,
7577
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 100",
7678
)
7779
})
@@ -81,6 +83,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
8183
b.iter(|| {
8284
query(
8385
ctx.clone(),
86+
&rt,
8487
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 1000",
8588
)
8689
})
@@ -90,6 +93,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
9093
b.iter(|| {
9194
query(
9295
ctx.clone(),
96+
&rt,
9397
"SELECT DISTINCT u64_narrow FROM t GROUP BY u64_narrow LIMIT 10000",
9498
)
9599
})
@@ -99,6 +103,7 @@ fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
99103
b.iter(|| {
100104
query(
101105
ctx.clone(),
106+
&rt,
102107
"SELECT u64_narrow, u64_wide, utf8, f64 FROM t GROUP BY 1, 2, 3, 4 LIMIT 10",
103108
)
104109
})

datafusion/core/benches/filter_query_sql.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@ use futures::executor::block_on;
2727
use std::sync::Arc;
2828
use tokio::runtime::Runtime;
2929

30-
async fn query(ctx: &SessionContext, sql: &str) {
31-
let rt = Runtime::new().unwrap();
32-
30+
async fn query(ctx: &SessionContext, rt: &Runtime, sql: &str) {
3331
// execute the query
3432
let df = rt.block_on(ctx.sql(sql)).unwrap();
3533
criterion::black_box(rt.block_on(df.collect()).unwrap());
@@ -68,17 +66,19 @@ fn create_context(array_len: usize, batch_size: usize) -> Result<SessionContext>
6866
fn criterion_benchmark(c: &mut Criterion) {
6967
let array_len = 524_288; // 2^19
7068
let batch_size = 4096; // 2^12
69+
let rt = Runtime::new().unwrap();
7170

7271
c.bench_function("filter_array", |b| {
7372
let ctx = create_context(array_len, batch_size).unwrap();
74-
b.iter(|| block_on(query(&ctx, "select f32, f64 from t where f32 >= f64")))
73+
b.iter(|| block_on(query(&ctx, &rt, "select f32, f64 from t where f32 >= f64")))
7574
});
7675

7776
c.bench_function("filter_scalar", |b| {
7877
let ctx = create_context(array_len, batch_size).unwrap();
7978
b.iter(|| {
8079
block_on(query(
8180
&ctx,
81+
&rt,
8282
"select f32, f64 from t where f32 >= 250 and f64 > 250",
8383
))
8484
})
@@ -89,6 +89,7 @@ fn criterion_benchmark(c: &mut Criterion) {
8989
b.iter(|| {
9090
block_on(query(
9191
&ctx,
92+
&rt,
9293
"select f32, f64 from t where f32 in (10, 20, 30, 40)",
9394
))
9495
})

datafusion/core/benches/math_query_sql.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,7 @@ use datafusion::datasource::MemTable;
3636
use datafusion::error::Result;
3737
use datafusion::execution::context::SessionContext;
3838

39-
fn query(ctx: Arc<Mutex<SessionContext>>, sql: &str) {
40-
let rt = Runtime::new().unwrap();
41-
39+
fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
4240
// execute the query
4341
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
4442
rt.block_on(df.collect()).unwrap();
@@ -81,29 +79,31 @@ fn criterion_benchmark(c: &mut Criterion) {
8179
let array_len = 1048576; // 2^20
8280
let batch_size = 512; // 2^9
8381
let ctx = create_context(array_len, batch_size).unwrap();
82+
let rt = Runtime::new().unwrap();
83+
8484
c.bench_function("sqrt_20_9", |b| {
85-
b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
85+
b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
8686
});
8787

8888
let array_len = 1048576; // 2^20
8989
let batch_size = 4096; // 2^12
9090
let ctx = create_context(array_len, batch_size).unwrap();
9191
c.bench_function("sqrt_20_12", |b| {
92-
b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
92+
b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
9393
});
9494

9595
let array_len = 4194304; // 2^22
9696
let batch_size = 4096; // 2^12
9797
let ctx = create_context(array_len, batch_size).unwrap();
9898
c.bench_function("sqrt_22_12", |b| {
99-
b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
99+
b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
100100
});
101101

102102
let array_len = 4194304; // 2^22
103103
let batch_size = 16384; // 2^14
104104
let ctx = create_context(array_len, batch_size).unwrap();
105105
c.bench_function("sqrt_22_14", |b| {
106-
b.iter(|| query(ctx.clone(), "SELECT sqrt(f32) FROM t"))
106+
b.iter(|| query(ctx.clone(), &rt, "SELECT sqrt(f32) FROM t"))
107107
});
108108
}
109109

datafusion/core/benches/physical_plan.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering;
4242
// as inputs. All record batches must have the same schema.
4343
fn sort_preserving_merge_operator(
4444
session_ctx: Arc<SessionContext>,
45+
rt: &Runtime,
4546
batches: Vec<RecordBatch>,
4647
sort: &[&str],
4748
) {
@@ -63,7 +64,6 @@ fn sort_preserving_merge_operator(
6364
.unwrap();
6465
let merge = Arc::new(SortPreservingMergeExec::new(sort, exec));
6566
let task_ctx = session_ctx.task_ctx();
66-
let rt = Runtime::new().unwrap();
6767
rt.block_on(collect(merge, task_ctx)).unwrap();
6868
}
6969

@@ -166,14 +166,16 @@ fn criterion_benchmark(c: &mut Criterion) {
166166
];
167167

168168
let ctx = Arc::new(SessionContext::new());
169+
let rt = Runtime::new().unwrap();
170+
169171
for (name, input) in benches {
170-
let ctx_clone = ctx.clone();
171-
c.bench_function(name, move |b| {
172+
c.bench_function(name, |b| {
172173
b.iter_batched(
173174
|| input.clone(),
174175
|input| {
175176
sort_preserving_merge_operator(
176-
ctx_clone.clone(),
177+
ctx.clone(),
178+
&rt,
177179
input,
178180
&["a", "b", "c", "d"],
179181
);

0 commit comments

Comments
 (0)