Skip to content

Commit 41467ab

Browse files
authored
Extract common parquet testing code to parquet-test-util crate (#4042)
* Extract common parquet testing code to `parquet-test-util` crate * fix doc tests
1 parent ea31da9 commit 41467ab

File tree

6 files changed

+306
-133
lines changed

6 files changed

+306
-133
lines changed

Cargo.toml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,20 @@
1717

1818
[workspace]
1919
exclude = ["datafusion-cli"]
20-
members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/jit", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/row", "datafusion/sql", "datafusion-examples", "benchmarks",
20+
members = [
21+
"datafusion/common",
22+
"datafusion/core",
23+
"datafusion/expr",
24+
"datafusion/jit",
25+
"datafusion/optimizer",
26+
"datafusion/physical-expr",
27+
"datafusion/proto",
28+
"datafusion/row",
29+
"datafusion/sql",
30+
"datafusion-examples",
31+
"test-utils",
32+
"parquet-test-utils",
33+
"benchmarks",
2134
]
2235

2336
[profile.release]

benchmarks/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ mimalloc = { version = "0.1", optional = true, default-features = false }
4141
num_cpus = "1.13.0"
4242
object_store = "0.5.0"
4343
parquet = "25.0.0"
44+
parquet-test-utils = { path = "../parquet-test-utils/" }
4445
rand = "0.8.4"
4546
serde = { version = "1.0.136", features = ["derive"] }
4647
serde_json = "1.0.78"

benchmarks/src/bin/parquet_filter_pushdown.rs

Lines changed: 16 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use arrow::datatypes::SchemaRef;
1918
use arrow::util::pretty;
20-
use datafusion::common::{Result, ToDFSchema};
21-
use datafusion::config::{
22-
ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS,
23-
OPT_PARQUET_REORDER_FILTERS,
24-
};
25-
use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
26-
use datafusion::datasource::object_store::ObjectStoreUrl;
27-
use datafusion::execution::context::ExecutionProps;
19+
use datafusion::common::Result;
2820
use datafusion::logical_expr::{lit, or, Expr};
2921
use datafusion::optimizer::utils::disjunction;
30-
use datafusion::physical_expr::create_physical_expr;
3122
use datafusion::physical_plan::collect;
32-
use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
33-
use datafusion::physical_plan::filter::FilterExec;
3423
use datafusion::prelude::{col, SessionConfig, SessionContext};
35-
use object_store::path::Path;
36-
use object_store::ObjectMeta;
37-
use parquet::arrow::ArrowWriter;
38-
use parquet::file::properties::WriterProperties;
39-
use std::fs::File;
24+
use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
4025
use std::path::PathBuf;
41-
use std::sync::Arc;
4226
use std::time::Instant;
4327
use structopt::StructOpt;
4428
use test_utils::AccessLogGenerator;
@@ -89,34 +73,16 @@ async fn main() -> Result<()> {
8973

9074
let path = opt.path.join("logs.parquet");
9175

92-
let (schema, object_store_url, object_meta) =
93-
gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
76+
let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
9477

95-
run_benchmarks(
96-
&mut ctx,
97-
schema,
98-
object_store_url,
99-
object_meta,
100-
opt.iterations,
101-
opt.debug,
102-
)
103-
.await?;
78+
run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?;
10479

10580
Ok(())
10681
}
10782

108-
#[derive(Debug, Clone)]
109-
struct ParquetScanOptions {
110-
pushdown_filters: bool,
111-
reorder_filters: bool,
112-
enable_page_index: bool,
113-
}
114-
11583
async fn run_benchmarks(
11684
ctx: &mut SessionContext,
117-
schema: SchemaRef,
118-
object_store_url: ObjectStoreUrl,
119-
object_meta: ObjectMeta,
85+
test_file: &TestParquetFile,
12086
iterations: usize,
12187
debug: bool,
12288
) -> Result<()> {
@@ -156,8 +122,7 @@ async fn run_benchmarks(
156122
disjunction([
157123
col("request_method").not_eq(lit("GET")),
158124
col("response_status").eq(lit(400_u16)),
159-
// TODO this fails in the FilterExec with Error: Internal("The type of Dictionary(Int32, Utf8) = Utf8 of binary physical should be same")
160-
// col("service").eq(lit("backend")),
125+
col("service").eq(lit("backend")),
161126
])
162127
.unwrap(),
163128
// Filter everything
@@ -174,9 +139,7 @@ async fn run_benchmarks(
174139
let start = Instant::now();
175140
let rows = exec_scan(
176141
ctx,
177-
schema.clone(),
178-
object_store_url.clone(),
179-
object_meta.clone(),
142+
test_file,
180143
filter_expr.clone(),
181144
scan_options.clone(),
182145
debug,
@@ -197,52 +160,12 @@ async fn run_benchmarks(
197160

198161
async fn exec_scan(
199162
ctx: &SessionContext,
200-
schema: SchemaRef,
201-
object_store_url: ObjectStoreUrl,
202-
object_meta: ObjectMeta,
163+
test_file: &TestParquetFile,
203164
filter: Expr,
204165
scan_options: ParquetScanOptions,
205166
debug: bool,
206167
) -> Result<usize> {
207-
let ParquetScanOptions {
208-
pushdown_filters,
209-
reorder_filters,
210-
enable_page_index,
211-
} = scan_options;
212-
213-
let mut config_options = ConfigOptions::new();
214-
config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
215-
config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
216-
config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);
217-
218-
let scan_config = FileScanConfig {
219-
object_store_url,
220-
file_schema: schema.clone(),
221-
file_groups: vec![vec![PartitionedFile {
222-
object_meta,
223-
partition_values: vec![],
224-
range: None,
225-
extensions: None,
226-
}]],
227-
statistics: Default::default(),
228-
projection: None,
229-
limit: None,
230-
table_partition_cols: vec![],
231-
config_options: config_options.into_shareable(),
232-
};
233-
234-
let df_schema = schema.clone().to_dfschema()?;
235-
236-
let physical_filter_expr = create_physical_expr(
237-
&filter,
238-
&df_schema,
239-
schema.as_ref(),
240-
&ExecutionProps::default(),
241-
)?;
242-
243-
let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), None));
244-
245-
let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
168+
let exec = test_file.create_scan(filter, scan_options).await?;
246169

247170
let task_ctx = ctx.task_ctx();
248171
let result = collect(exec, task_ctx).await?;
@@ -258,53 +181,15 @@ fn gen_data(
258181
scale_factor: f32,
259182
page_size: Option<usize>,
260183
row_group_size: Option<usize>,
261-
) -> Result<(SchemaRef, ObjectStoreUrl, ObjectMeta)> {
184+
) -> Result<TestParquetFile> {
262185
let generator = AccessLogGenerator::new();
263186

264-
let file = File::create(&path).unwrap();
265-
266-
let mut props_builder = WriterProperties::builder();
267-
268-
if let Some(s) = page_size {
269-
props_builder = props_builder
270-
.set_data_pagesize_limit(s)
271-
.set_write_batch_size(s);
272-
}
273-
274-
if let Some(s) = row_group_size {
275-
props_builder = props_builder.set_max_row_group_size(s);
276-
}
277-
278-
let schema = generator.schema();
279-
let mut writer =
280-
ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build())).unwrap();
281-
282-
let mut num_rows = 0;
283-
284187
let num_batches = 100_f32 * scale_factor;
285188

286-
for batch in generator.take(num_batches as usize) {
287-
writer.write(&batch).unwrap();
288-
writer.flush()?;
289-
num_rows += batch.num_rows();
290-
}
291-
writer.close().unwrap();
292-
293-
println!("Generated test dataset with {} rows", num_rows);
294-
295-
let size = std::fs::metadata(&path)?.len() as usize;
296-
297-
let canonical_path = path.canonicalize()?;
298-
299-
let object_store_url =
300-
ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
301-
.object_store();
302-
303-
let object_meta = ObjectMeta {
304-
location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
305-
last_modified: Default::default(),
306-
size,
307-
};
308-
309-
Ok((schema, object_store_url, object_meta))
189+
TestParquetFile::try_new(
190+
path,
191+
generator.take(num_batches as usize),
192+
page_size,
193+
row_group_size,
194+
)
310195
}

parquet-test-utils/Cargo.toml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "parquet-test-utils"
20+
version = "0.1.0"
21+
edition = "2021"
22+
23+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
24+
25+
[dependencies]
26+
datafusion = { path = "../datafusion/core" }
27+
object_store = "0.5.0"
28+
parquet = "25.0.0"

0 commit comments

Comments
 (0)