Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 45 additions & 32 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,50 +550,63 @@ pub(crate) mod test_util {
use parquet::file::properties::WriterProperties;
use tempfile::NamedTempFile;

/// How many rows per page should be written
const ROWS_PER_PAGE: usize = 2;

/// Writes `batches` to a temporary parquet file
///
/// If multi_page is set to `true`, all batches are written into
/// one temporary parquet file and the parquet file is written
/// If multi_page is set to `true`, the parquet file(s) are written
/// with 2 rows per data page (used to test page filtering and
/// boundaries).
pub async fn store_parquet(
batches: Vec<RecordBatch>,
multi_page: bool,
) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
if multi_page {
// All batches write in to one file, each batch must have same schema.
let mut output = NamedTempFile::new().expect("creating temp file");
let mut builder = WriterProperties::builder();
builder = builder.set_data_page_row_count_limit(2);
let proper = builder.build();
let mut writer =
ArrowWriter::try_new(&mut output, batches[0].schema(), Some(proper))
.expect("creating writer");
for b in batches {
writer.write(&b).expect("Writing batch");
}
writer.close().unwrap();
Ok((vec![local_unpartitioned_file(&output)], vec![output]))
} else {
// Each batch writes to their own file
let files: Vec<_> = batches
.into_iter()
.map(|batch| {
let mut output = NamedTempFile::new().expect("creating temp file");
// Each batch writes to their own file
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is so I can actually test evolved schemas with page indexes (aka write multiple files with different schemas)

let files: Vec<_> = batches
.into_iter()
.map(|batch| {
let mut output = NamedTempFile::new().expect("creating temp file");

let builder = WriterProperties::builder();
let props = if multi_page {
builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
} else {
builder
}
.build();

let props = WriterProperties::builder().build();
let mut writer =
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
.expect("creating writer");
let mut writer =
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
.expect("creating writer");

if multi_page {
// write in smaller batches as the parquet writer
// only checks datapage size limits on the boundaries of each batch
write_in_chunks(&mut writer, &batch, ROWS_PER_PAGE);
} else {
writer.write(&batch).expect("Writing batch");
writer.close().unwrap();
output
})
.collect();
};
writer.close().unwrap();
output
})
.collect();

let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
Ok((meta, files))
let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
Ok((meta, files))
}

//// write batches chunk_size rows at a time
fn write_in_chunks<W: std::io::Write>(
writer: &mut ArrowWriter<W>,
batch: &RecordBatch,
chunk_size: usize,
) {
let mut i = 0;
while i < batch.num_rows() {
let num = chunk_size.min(batch.num_rows() - i);
writer.write(&batch.slice(i, num)).unwrap();
i += num;
}
}
}
Expand Down
25 changes: 9 additions & 16 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//! other source (e.g. a catalog)

use std::convert::TryFrom;
use std::{collections::HashSet, sync::Arc};
use std::sync::Arc;

use crate::execution::context::ExecutionProps;
use crate::prelude::lit;
Expand Down Expand Up @@ -233,25 +233,18 @@ impl PruningPredicate {
.unwrap_or_default()
}

/// Returns all need column indexes to evaluate this pruning predicate
pub(crate) fn need_input_columns_ids(&self) -> HashSet<usize> {
let mut set = HashSet::new();
self.required_columns.columns.iter().for_each(|x| {
match self.schema().column_with_name(x.0.name.as_str()) {
None => {}
Some(y) => {
set.insert(y.0);
}
}
});
set
pub(crate) fn required_columns(&self) -> &RequiredStatColumns {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the core change -- need_input_columns_ids returns indexes in terms of the overall table schema. If an individual parquet file does not have all the columns or has the columns in a different order, these indexes are not correct

Copy link
Member

@Ted-Jiang Ted-Jiang Feb 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explanation! 👍

If an individual parquet file does not have all the columns or has the columns in a different order

I have a question about if file_a (c1, c2), file_b(c3, c1), do df support create external table t(c1) on both file_a and file_b 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And file_a (c1, c2), file_b(c3) , support create external table t(c1)?
Do both file have to have the c1 meta in both parquet files meta ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see both situation support in below test 😆

&self.required_columns
}
}

/// Records for which columns statistics are necessary to evaluate a
/// pruning predicate.
///
/// Handles creating references to the min/max statistics
/// for columns as well as recording which statistics are needed
#[derive(Debug, Default, Clone)]
struct RequiredStatColumns {
pub(crate) struct RequiredStatColumns {
/// The statistics required to evaluate this predicate:
/// * The unqualified column in the input schema
/// * Statistics type (e.g. Min or Max or Null_Count)
Expand All @@ -267,7 +260,7 @@ impl RequiredStatColumns {

/// Returns an iterator over items in columns (see doc on
/// `self.columns` for details)
fn iter(&self) -> impl Iterator<Item = &(Column, StatisticsType, Field)> {
pub(crate) fn iter(&self) -> impl Iterator<Item = &(Column, StatisticsType, Field)> {
self.columns.iter()
}

Expand Down Expand Up @@ -852,7 +845,7 @@ fn build_statistics_expr(expr_builder: &mut PruningExpressionBuilder) -> Result<
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum StatisticsType {
pub(crate) enum StatisticsType {
Min,
Max,
NullCount,
Expand Down
132 changes: 124 additions & 8 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,8 +836,7 @@ mod tests {

/// round-trip record batches by writing each individual RecordBatch to
/// a parquet file and then reading that parquet file with the specified
/// options. If page_index_predicate is set to `true`, all RecordBatches
/// are written into a parquet file instead.
/// options.
#[derive(Debug, Default)]
struct RoundTrip {
projection: Option<Vec<usize>>,
Expand Down Expand Up @@ -1331,6 +1330,80 @@ mod tests {
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
}

#[tokio::test]
async fn evolved_schema_disjoint_schema_with_page_index_pushdown() {
let c1: ArrayRef = Arc::new(StringArray::from(vec![
// Page 1
Some("Foo"),
Some("Bar"),
// Page 2
Some("Foo2"),
Some("Bar2"),
// Page 3
Some("Foo3"),
Some("Bar3"),
]));

let c2: ArrayRef = Arc::new(Int64Array::from(vec![
// Page 1:
Some(1),
Some(2),
// Page 2: (pruned)
Some(3),
Some(4),
// Page 3: (pruned)
Some(5),
None,
]));

// batch1: c1(string)
let batch1 = create_batch(vec![("c1", c1.clone())]);

// batch2: c2(int64)
let batch2 = create_batch(vec![("c2", c2.clone())]);

// batch3 (has c2, c1) -- both columns, should still prune
let batch3 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);

// batch4 (has c2, c1) -- different column order, should still prune
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test case 👍

let batch4 = create_batch(vec![("c2", c2), ("c1", c1)]);

let filter = col("c2").eq(lit(1_i64));

// read/write them files:
let rt = RoundTrip::new()
.with_predicate(filter)
.with_page_index_predicate()
.round_trip(vec![batch1, batch2, batch3, batch4])
.await;

let expected = vec![
"+------+----+",
"| c1 | c2 |",
"+------+----+",
"| | 1 |",
"| | 2 |",
"| Bar | |",
"| Bar | 2 |",
"| Bar | 2 |",
"| Bar2 | |",
"| Bar3 | |",
"| Foo | |",
"| Foo | 1 |",
"| Foo | 1 |",
"| Foo2 | |",
"| Foo3 | |",
"+------+----+",
];
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
let metrics = rt.parquet_exec.metrics().unwrap();

// There are 4 rows pruned in each of batch2, batch3, and
// batch4 for a total of 12. batch1 had no pruning as c2 was
// filled in as null
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 12);
}

#[tokio::test]
async fn multi_column_predicate_pushdown() {
let c1: ArrayRef =
Expand Down Expand Up @@ -1362,6 +1435,38 @@ mod tests {
assert_batches_sorted_eq!(expected, &read);
}

#[tokio::test]
async fn multi_column_predicate_pushdown_page_index_pushdown() {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));

let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);

// Columns in different order to schema
let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar")));

// read/write them files:
let read = RoundTrip::new()
.with_predicate(filter)
.with_page_index_predicate()
.round_trip_to_batches(vec![batch1])
.await
.unwrap();

let expected = vec![
"+-----+----+",
"| c1 | c2 |",
"+-----+----+",
"| | 2 |",
"| Foo | 1 |",
"| bar | |",
"+-----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}

#[tokio::test]
async fn evolved_schema_incompatible_types() {
let c1: ArrayRef =
Expand Down Expand Up @@ -1635,27 +1740,38 @@ mod tests {

#[tokio::test]
async fn parquet_page_index_exec_metrics() {
let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));
let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)]));
let c1: ArrayRef = Arc::new(Int32Array::from(vec![
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the only test that used the "merge multiple batches together" behavior of store_parquet -- so I rewrote the tests to inline the creation and ensure we got evenly created two row pages

Some(1),
None,
Some(2),
Some(3),
Some(4),
Some(5),
]));
let batch1 = create_batch(vec![("int", c1.clone())]);
let batch2 = create_batch(vec![("int", c2.clone())]);

let filter = col("int").eq(lit(4_i32));

let rt = RoundTrip::new()
.with_predicate(filter)
.with_page_index_predicate()
.round_trip(vec![batch1, batch2])
.round_trip(vec![batch1])
.await;

let metrics = rt.parquet_exec.metrics().unwrap();

// assert the batches and some metrics
#[rustfmt::skip]
let expected = vec![
"+-----+", "| int |", "+-----+", "| 3 |", "| 4 |", "| 5 |", "+-----+",
"+-----+",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is different because previously the page layout was as follows

Page1:
1
None
2

Page 2
3
4
5

Now the page layout is

Page1:
1
None

Page2
2
3

Page3
4
5

"| int |",
"+-----+",
"| 4 |",
"| 5 |",
"+-----+",
];
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 3);
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 4);
assert!(
get_value(&metrics, "page_index_eval_time") > 0,
"no eval time in metrics: {metrics:#?}"
Expand Down
Loading