Skip to content

Commit 9ae41b1

Browse files
authored
Update tpch, clickbench, sort_tpch to mark failed queries (#16182)
* Move struct QueryResult to util/run.rs * Modify benches to continue query execution even on failure * Mark benchmark query success on output json
1 parent bf7859e commit 9ae41b1

File tree

8 files changed

+132
-59
lines changed

8 files changed

+132
-59
lines changed

benchmarks/compare.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,15 @@ class QueryRun:
4747
query: int
4848
iterations: List[QueryResult]
4949
start_time: int
50+
success: bool = True
5051

5152
@classmethod
5253
def load_from(cls, data: Dict[str, Any]) -> QueryRun:
5354
return cls(
5455
query=data["query"],
5556
iterations=[QueryResult(**iteration) for iteration in data["iterations"]],
5657
start_time=data["start_time"],
58+
success=data["success"],
5759
)
5860

5961
@property
@@ -125,11 +127,26 @@ def compare(
125127
faster_count = 0
126128
slower_count = 0
127129
no_change_count = 0
130+
failure_count = 0
128131
total_baseline_time = 0
129132
total_comparison_time = 0
130133

131134
for baseline_result, comparison_result in zip(baseline.queries, comparison.queries):
132135
assert baseline_result.query == comparison_result.query
136+
137+
base_failed = not baseline_result.success
138+
comp_failed = not comparison_result.success
139+
# If a query fails, its execution time is excluded from the performance comparison
140+
if base_failed or comp_failed:
141+
change_text = "incomparable"
142+
failure_count += 1
143+
table.add_row(
144+
f"Q{baseline_result.query}",
145+
"FAIL" if base_failed else f"{baseline_result.execution_time:.2f}ms",
146+
"FAIL" if comp_failed else f"{comparison_result.execution_time:.2f}ms",
147+
change_text,
148+
)
149+
continue
133150

134151
total_baseline_time += baseline_result.execution_time
135152
total_comparison_time += comparison_result.execution_time
@@ -156,8 +173,12 @@ def compare(
156173
console.print(table)
157174

158175
# Calculate averages
159-
avg_baseline_time = total_baseline_time / len(baseline.queries)
160-
avg_comparison_time = total_comparison_time / len(comparison.queries)
176+
avg_baseline_time = 0.0
177+
avg_comparison_time = 0.0
178+
if len(baseline.queries) - failure_count > 0:
179+
avg_baseline_time = total_baseline_time / (len(baseline.queries) - failure_count)
180+
if len(comparison.queries) - failure_count > 0:
181+
avg_comparison_time = total_comparison_time / (len(comparison.queries) - failure_count)
161182

162183
# Summary table
163184
summary_table = Table(show_header=True, header_style="bold magenta")
@@ -171,6 +192,7 @@ def compare(
171192
summary_table.add_row("Queries Faster", str(faster_count))
172193
summary_table.add_row("Queries Slower", str(slower_count))
173194
summary_table.add_row("Queries with No Change", str(no_change_count))
195+
summary_table.add_row("Queries with Failure", str(failure_count))
174196

175197
console.print(summary_table)
176198

benchmarks/src/bin/external_aggr.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use datafusion::execution::SessionStateBuilder;
4040
use datafusion::physical_plan::display::DisplayableExecutionPlan;
4141
use datafusion::physical_plan::{collect, displayable};
4242
use datafusion::prelude::*;
43-
use datafusion_benchmarks::util::{BenchmarkRun, CommonOpt};
43+
use datafusion_benchmarks::util::{BenchmarkRun, CommonOpt, QueryResult};
4444
use datafusion_common::instant::Instant;
4545
use datafusion_common::utils::get_available_parallelism;
4646
use datafusion_common::{exec_err, DEFAULT_PARQUET_EXTENSION};
@@ -77,11 +77,6 @@ struct ExternalAggrConfig {
7777
output_path: Option<PathBuf>,
7878
}
7979

80-
struct QueryResult {
81-
elapsed: std::time::Duration,
82-
row_count: usize,
83-
}
84-
8580
/// Query Memory Limits
8681
/// Map query id to predefined memory limits
8782
///

benchmarks/src/clickbench.rs

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::path::Path;
1919
use std::path::PathBuf;
2020

21-
use crate::util::{BenchmarkRun, CommonOpt};
21+
use crate::util::{BenchmarkRun, CommonOpt, QueryResult};
2222
use datafusion::{
2323
error::{DataFusionError, Result},
2424
prelude::SessionContext,
@@ -128,36 +128,58 @@ impl RunOpt {
128128
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
129129
self.register_hits(&ctx).await?;
130130

131-
let iterations = self.common.iterations;
132131
let mut benchmark_run = BenchmarkRun::new();
133132
for query_id in query_range {
134-
let mut millis = Vec::with_capacity(iterations);
135133
benchmark_run.start_new_case(&format!("Query {query_id}"));
136-
let sql = queries.get_query(query_id)?;
137-
println!("Q{query_id}: {sql}");
138-
139-
for i in 0..iterations {
140-
let start = Instant::now();
141-
let results = ctx.sql(sql).await?.collect().await?;
142-
let elapsed = start.elapsed();
143-
let ms = elapsed.as_secs_f64() * 1000.0;
144-
millis.push(ms);
145-
let row_count: usize = results.iter().map(|b| b.num_rows()).sum();
146-
println!(
147-
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
148-
);
149-
benchmark_run.write_iter(elapsed, row_count);
134+
let query_run = self.benchmark_query(&queries, query_id, &ctx).await;
135+
match query_run {
136+
Ok(query_results) => {
137+
for iter in query_results {
138+
benchmark_run.write_iter(iter.elapsed, iter.row_count);
139+
}
140+
}
141+
Err(e) => {
142+
benchmark_run.mark_failed();
143+
eprintln!("Query {query_id} failed: {e}");
144+
}
150145
}
151-
if self.common.debug {
152-
ctx.sql(sql).await?.explain(false, false)?.show().await?;
153-
}
154-
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
155-
println!("Query {query_id} avg time: {avg:.2} ms");
156146
}
157147
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
148+
benchmark_run.maybe_print_failures();
158149
Ok(())
159150
}
160151

152+
async fn benchmark_query(
153+
&self,
154+
queries: &AllQueries,
155+
query_id: usize,
156+
ctx: &SessionContext,
157+
) -> Result<Vec<QueryResult>> {
158+
let sql = queries.get_query(query_id)?;
159+
println!("Q{query_id}: {sql}");
160+
161+
let mut millis = Vec::with_capacity(self.iterations());
162+
let mut query_results = vec![];
163+
for i in 0..self.iterations() {
164+
let start = Instant::now();
165+
let results = ctx.sql(sql).await?.collect().await?;
166+
let elapsed = start.elapsed();
167+
let ms = elapsed.as_secs_f64() * 1000.0;
168+
millis.push(ms);
169+
let row_count: usize = results.iter().map(|b| b.num_rows()).sum();
170+
println!(
171+
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
172+
);
173+
query_results.push(QueryResult { elapsed, row_count })
174+
}
175+
if self.common.debug {
176+
ctx.sql(sql).await?.explain(false, false)?.show().await?;
177+
}
178+
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
179+
println!("Query {query_id} avg time: {avg:.2} ms");
180+
Ok(query_results)
181+
}
182+
161183
/// Registers the `hits.parquet` as a table named `hits`
162184
async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
163185
let options = Default::default();
@@ -171,4 +193,8 @@ impl RunOpt {
171193
)
172194
})
173195
}
196+
197+
fn iterations(&self) -> usize {
198+
self.common.iterations
199+
}
174200
}

benchmarks/src/imdb/run.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::path::PathBuf;
1919
use std::sync::Arc;
2020

2121
use super::{get_imdb_table_schema, get_query_sql, IMDB_TABLES};
22-
use crate::util::{BenchmarkRun, CommonOpt};
22+
use crate::util::{BenchmarkRun, CommonOpt, QueryResult};
2323

2424
use arrow::record_batch::RecordBatch;
2525
use arrow::util::pretty::{self, pretty_format_batches};
@@ -475,11 +475,6 @@ impl RunOpt {
475475
}
476476
}
477477

478-
struct QueryResult {
479-
elapsed: std::time::Duration,
480-
row_count: usize,
481-
}
482-
483478
#[cfg(test)]
484479
// Only run with "ci" mode when we have the data
485480
#[cfg(feature = "ci")]

benchmarks/src/sort_tpch.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use datafusion_common::instant::Instant;
4040
use datafusion_common::utils::get_available_parallelism;
4141
use datafusion_common::DEFAULT_PARQUET_EXTENSION;
4242

43-
use crate::util::{BenchmarkRun, CommonOpt};
43+
use crate::util::{BenchmarkRun, CommonOpt, QueryResult};
4444

4545
#[derive(Debug, StructOpt)]
4646
pub struct RunOpt {
@@ -74,11 +74,6 @@ pub struct RunOpt {
7474
limit: Option<usize>,
7575
}
7676

77-
struct QueryResult {
78-
elapsed: std::time::Duration,
79-
row_count: usize,
80-
}
81-
8277
impl RunOpt {
8378
const SORT_TABLES: [&'static str; 1] = ["lineitem"];
8479

@@ -179,7 +174,7 @@ impl RunOpt {
179174
/// If query is specified from command line, run only that query.
180175
/// Otherwise, run all queries.
181176
pub async fn run(&self) -> Result<()> {
182-
let mut benchmark_run = BenchmarkRun::new();
177+
let mut benchmark_run: BenchmarkRun = BenchmarkRun::new();
183178

184179
let query_range = match self.query {
185180
Some(query_id) => query_id..=query_id,
@@ -189,14 +184,22 @@ impl RunOpt {
189184
for query_id in query_range {
190185
benchmark_run.start_new_case(&format!("{query_id}"));
191186

192-
let query_results = self.benchmark_query(query_id).await?;
193-
for iter in query_results {
194-
benchmark_run.write_iter(iter.elapsed, iter.row_count);
187+
let query_results = self.benchmark_query(query_id).await;
188+
match query_results {
189+
Ok(query_results) => {
190+
for iter in query_results {
191+
benchmark_run.write_iter(iter.elapsed, iter.row_count);
192+
}
193+
}
194+
Err(e) => {
195+
benchmark_run.mark_failed();
196+
eprintln!("Query {query_id} failed: {e}");
197+
}
195198
}
196199
}
197200

198201
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
199-
202+
benchmark_run.maybe_print_failures();
200203
Ok(())
201204
}
202205

@@ -294,7 +297,7 @@ impl RunOpt {
294297

295298
let mut stream = execute_stream(physical_plan.clone(), state.task_ctx())?;
296299
while let Some(batch) = stream.next().await {
297-
row_count += batch.unwrap().num_rows();
300+
row_count += batch?.num_rows();
298301
}
299302

300303
if debug {

benchmarks/src/tpch/run.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::sync::Arc;
2121
use super::{
2222
get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES,
2323
};
24-
use crate::util::{BenchmarkRun, CommonOpt};
24+
use crate::util::{BenchmarkRun, CommonOpt, QueryResult};
2525

2626
use arrow::record_batch::RecordBatch;
2727
use arrow::util::pretty::{self, pretty_format_batches};
@@ -121,12 +121,21 @@ impl RunOpt {
121121

122122
for query_id in query_range {
123123
benchmark_run.start_new_case(&format!("Query {query_id}"));
124-
let query_run = self.benchmark_query(query_id, &ctx).await?;
125-
for iter in query_run {
126-
benchmark_run.write_iter(iter.elapsed, iter.row_count);
124+
let query_run = self.benchmark_query(query_id, &ctx).await;
125+
match query_run {
126+
Ok(query_results) => {
127+
for iter in query_results {
128+
benchmark_run.write_iter(iter.elapsed, iter.row_count);
129+
}
130+
}
131+
Err(e) => {
132+
benchmark_run.mark_failed();
133+
eprintln!("Query {query_id} failed: {e}");
134+
}
127135
}
128136
}
129137
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
138+
benchmark_run.maybe_print_failures();
130139
Ok(())
131140
}
132141

@@ -320,11 +329,6 @@ impl RunOpt {
320329
}
321330
}
322331

323-
struct QueryResult {
324-
elapsed: std::time::Duration,
325-
row_count: usize,
326-
}
327-
328332
#[cfg(test)]
329333
// Only run with "ci" mode when we have the data
330334
#[cfg(feature = "ci")]

benchmarks/src/util/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ mod run;
2222

2323
pub use access_log::AccessLogOpt;
2424
pub use options::CommonOpt;
25-
pub use run::{BenchQuery, BenchmarkRun};
25+
pub use run::{BenchQuery, BenchmarkRun, QueryResult};

benchmarks/src/util/run.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,13 @@ pub struct BenchQuery {
9090
iterations: Vec<QueryIter>,
9191
#[serde(serialize_with = "serialize_start_time")]
9292
start_time: SystemTime,
93+
success: bool,
94+
}
95+
/// Internal representation of a single benchmark query iteration result.
96+
pub struct QueryResult {
97+
pub elapsed: Duration,
98+
pub row_count: usize,
9399
}
94-
95100
/// collects benchmark run data and then serializes it at the end
96101
pub struct BenchmarkRun {
97102
context: RunContext,
@@ -120,6 +125,7 @@ impl BenchmarkRun {
120125
query: id.to_owned(),
121126
iterations: vec![],
122127
start_time: SystemTime::now(),
128+
success: true,
123129
});
124130
if let Some(c) = self.current_case.as_mut() {
125131
*c += 1;
@@ -138,6 +144,28 @@ impl BenchmarkRun {
138144
}
139145
}
140146

147+
/// Print the names of failed queries, if any
148+
pub fn maybe_print_failures(&self) {
149+
let failed_queries: Vec<&str> = self
150+
.queries
151+
.iter()
152+
.filter_map(|q| (!q.success).then_some(q.query.as_str()))
153+
.collect();
154+
155+
if !failed_queries.is_empty() {
156+
println!("Failed Queries: {}", failed_queries.join(", "));
157+
}
158+
}
159+
160+
/// Mark current query
161+
pub fn mark_failed(&mut self) {
162+
if let Some(idx) = self.current_case {
163+
self.queries[idx].success = false;
164+
} else {
165+
unreachable!("Cannot mark failure: no current case");
166+
}
167+
}
168+
141169
/// Stringify data into formatted json
142170
pub fn to_json(&self) -> String {
143171
let mut output = HashMap::<&str, Value>::new();

0 commit comments

Comments
 (0)