Skip to content

Commit fa5cef8

Browse files
authored
Fixed parquet path partitioning when only selecting partitioned columns (#2000)
* Fixed parquet path partitioning when only selecting partitioned columns * Removed unnecesary row group pruning and file metrics * Ran cargo fmt * Switched from row group level metadata to file level metadata to determine number of rows to emit * Reworked task spawning in ParquetExec::execute * Changed index based partition column generating to iterator version * Moved limit unwrap outside of loop * Fixed bug about number of rows emitted when querying only partition columns and reuse partition record batch * Added limit logic tests for partitioned hive partitioned parquet file * Formatted code * Fixed clippy lint * Fixed other clippy lint
1 parent a0d8b66 commit fa5cef8

File tree

3 files changed

+237
-14
lines changed

3 files changed

+237
-14
lines changed

datafusion/core/src/physical_plan/file_format/mod.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,36 @@ impl PartitionColumnProjector {
310310
}
311311
}
312312

313+
// Creates a RecordBatch with values from the partition_values. Used when no non-partition values are read
314+
fn project_from_size(
315+
&mut self,
316+
batch_size: usize,
317+
partition_values: &[ScalarValue],
318+
) -> ArrowResult<RecordBatch> {
319+
let expected_cols = self.projected_schema.fields().len();
320+
if expected_cols != self.projected_partition_indexes.len() {
321+
return Err(ArrowError::SchemaError(format!(
322+
"Unexepected number of partition values, expected {} but got {}",
323+
expected_cols,
324+
partition_values.len()
325+
)));
326+
}
327+
//The destination index is not needed. Since there are no non-partition columns it will simply be equivalent to
328+
//the index that would be provided by .enumerate()
329+
let cols = self
330+
.projected_partition_indexes
331+
.iter()
332+
.map(|(pidx, _)| {
333+
create_dict_array(
334+
&mut self.key_buffer_cache,
335+
&partition_values[*pidx],
336+
batch_size,
337+
)
338+
})
339+
.collect();
340+
RecordBatch::try_new(Arc::clone(&self.projected_schema), cols)
341+
}
342+
313343
// Transform the batch read from the file by inserting the partitioning columns
314344
// to the right positions as deduced from `projected_schema`
315345
// - file_batch: batch read from the file, with internal projection applied
@@ -329,7 +359,6 @@ impl PartitionColumnProjector {
329359
file_batch.columns().len()
330360
)));
331361
}
332-
333362
let mut cols = file_batch.columns().to_vec();
334363
for &(pidx, sidx) in &self.projected_partition_indexes {
335364
cols.insert(

datafusion/core/src/physical_plan/file_format/parquet.rs

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -235,19 +235,32 @@ impl ExecutionPlan for ParquetExec {
235235
let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
236236

237237
let join_handle = task::spawn_blocking(move || {
238-
if let Err(e) = read_partition(
239-
object_store.as_ref(),
240-
adapter,
241-
partition_index,
242-
&partition,
243-
metrics,
244-
&projection,
245-
&pruning_predicate,
246-
batch_size,
247-
response_tx.clone(),
248-
limit,
249-
partition_col_proj,
250-
) {
238+
let res = if projection.is_empty() {
239+
read_partition_no_file_columns(
240+
object_store.as_ref(),
241+
&partition,
242+
batch_size,
243+
response_tx.clone(),
244+
limit,
245+
partition_col_proj,
246+
)
247+
} else {
248+
read_partition(
249+
object_store.as_ref(),
250+
adapter,
251+
partition_index,
252+
&partition,
253+
metrics,
254+
&projection,
255+
&pruning_predicate,
256+
batch_size,
257+
response_tx.clone(),
258+
limit,
259+
partition_col_proj,
260+
)
261+
};
262+
263+
if let Err(e) = res {
251264
warn!(
252265
"Parquet reader thread terminated due to error: {:?} for files: {:?}",
253266
e, partition
@@ -448,6 +461,56 @@ fn build_row_group_predicate(
448461
)
449462
}
450463

464+
fn read_partition_no_file_columns(
465+
object_store: &dyn ObjectStore,
466+
partition: &[PartitionedFile],
467+
batch_size: usize,
468+
response_tx: Sender<ArrowResult<RecordBatch>>,
469+
limit: Option<usize>,
470+
mut partition_column_projector: PartitionColumnProjector,
471+
) -> Result<()> {
472+
use parquet::file::reader::FileReader;
473+
let mut limit = limit.unwrap_or(usize::MAX);
474+
475+
for partitioned_file in partition {
476+
if limit == 0 {
477+
break;
478+
}
479+
let object_reader =
480+
object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?;
481+
let file_reader = SerializedFileReader::new(ChunkObjectReader(object_reader))?;
482+
let mut num_rows = usize::min(limit, file_reader
483+
.metadata()
484+
.file_metadata()
485+
.num_rows()
486+
.try_into()
487+
.expect("Row count should always be greater than or equal to 0 and less than usize::MAX"));
488+
limit -= num_rows;
489+
490+
let partition_batch = partition_column_projector
491+
.project_from_size(batch_size, &partitioned_file.partition_values)
492+
.map_err(|e| {
493+
let err_msg =
494+
format!("Error reading batch from {}: {}", partitioned_file, e);
495+
if let Err(send_err) = send_result(
496+
&response_tx,
497+
Err(ArrowError::ParquetError(err_msg.clone())),
498+
) {
499+
return send_err;
500+
}
501+
DataFusionError::Execution(err_msg)
502+
})?;
503+
504+
while num_rows > batch_size {
505+
send_result(&response_tx, Ok(partition_batch.clone()))?;
506+
num_rows -= batch_size;
507+
}
508+
let residual_batch = partition_batch.slice(0, num_rows);
509+
send_result(&response_tx, Ok(residual_batch))?;
510+
}
511+
Ok(())
512+
}
513+
451514
#[allow(clippy::too_many_arguments)]
452515
fn read_partition(
453516
object_store: &dyn ObjectStore,

datafusion/core/tests/path_partition.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,139 @@ use datafusion::{
3838
prelude::SessionContext,
3939
test_util::{self, arrow_test_data, parquet_test_data},
4040
};
41+
use datafusion_common::ScalarValue;
4142
use futures::{stream, StreamExt};
4243

44+
#[tokio::test]
45+
async fn parquet_distinct_partition_col() -> Result<()> {
46+
let ctx = SessionContext::new();
47+
48+
register_partitioned_alltypes_parquet(
49+
&ctx,
50+
&[
51+
"year=2021/month=09/day=09/file.parquet",
52+
"year=2021/month=10/day=09/file.parquet",
53+
"year=2021/month=10/day=28/file.parquet",
54+
],
55+
&["year", "month", "day"],
56+
"",
57+
"alltypes_plain.parquet",
58+
)
59+
.await;
60+
//Test that only selecting partition columns is possible
61+
let result = ctx
62+
.sql("SELECT distinct year,month,day FROM t")
63+
.await?
64+
.collect()
65+
.await?;
66+
67+
let expected = vec![
68+
"+------+-------+-----+",
69+
"| year | month | day |",
70+
"+------+-------+-----+",
71+
"| 2021 | 09 | 09 |",
72+
"| 2021 | 10 | 09 |",
73+
"| 2021 | 10 | 28 |",
74+
"+------+-------+-----+",
75+
];
76+
assert_batches_sorted_eq!(expected, &result);
77+
//Test that the number of rows returned by partition column scan and actually reading the parquet file are the same
78+
let actual_row_count: usize = ctx
79+
.sql("SELECT id from t")
80+
.await?
81+
.collect()
82+
.await?
83+
.into_iter()
84+
.map(|batch| batch.num_rows())
85+
.sum();
86+
87+
let partition_row_count: usize = ctx
88+
.sql("SELECT year from t")
89+
.await?
90+
.collect()
91+
.await?
92+
.into_iter()
93+
.map(|batch| batch.num_rows())
94+
.sum();
95+
assert_eq!(actual_row_count, partition_row_count);
96+
97+
//Test limit logic. 3 test cases
98+
//1. limit is contained within a single partition with leftover rows
99+
//2. limit is contained within a single partition without leftover rows
100+
//3. limit is not contained within a single partition
101+
//The id column is included to ensure that the parquet file is actually scanned.
102+
let results = ctx
103+
.sql("SELECT COUNT(*) as num_rows_per_month, month, MAX(id) from t group by month order by num_rows_per_month desc")
104+
.await?
105+
.collect()
106+
.await?;
107+
108+
let mut max_limit = match ScalarValue::try_from_array(results[0].column(0), 0)? {
109+
ScalarValue::UInt64(Some(count)) => count,
110+
s => panic!("Expected count as Int64 found {}", s),
111+
};
112+
113+
max_limit += 1;
114+
let last_batch = results
115+
.last()
116+
.expect("There shouled be at least one record batch returned");
117+
let last_row_idx = last_batch.num_rows() - 1;
118+
let mut min_limit =
119+
match ScalarValue::try_from_array(last_batch.column(0), last_row_idx)? {
120+
ScalarValue::UInt64(Some(count)) => count,
121+
s => panic!("Expected count as Int64 found {}", s),
122+
};
123+
124+
min_limit -= 1;
125+
126+
let sql_cross_partition_boundary = format!("SELECT month FROM t limit {}", max_limit);
127+
let resulting_limit: u64 = ctx
128+
.sql(sql_cross_partition_boundary.as_str())
129+
.await?
130+
.collect()
131+
.await?
132+
.into_iter()
133+
.map(|r| r.num_rows() as u64)
134+
.sum();
135+
136+
assert_eq!(max_limit, resulting_limit);
137+
138+
let sql_within_partition_boundary =
139+
format!("SELECT month from t limit {}", min_limit);
140+
let resulting_limit: u64 = ctx
141+
.sql(sql_within_partition_boundary.as_str())
142+
.await?
143+
.collect()
144+
.await?
145+
.into_iter()
146+
.map(|r| r.num_rows() as u64)
147+
.sum();
148+
149+
assert_eq!(min_limit, resulting_limit);
150+
151+
let month = match ScalarValue::try_from_array(results[0].column(1), 0)? {
152+
ScalarValue::Utf8(Some(month)) => month,
153+
s => panic!("Expected count as Int64 found {}", s),
154+
};
155+
156+
let sql_on_partition_boundary = format!(
157+
"SELECT month from t where month = '{}' LIMIT {}",
158+
month,
159+
max_limit - 1
160+
);
161+
let resulting_limit: u64 = ctx
162+
.sql(sql_on_partition_boundary.as_str())
163+
.await?
164+
.collect()
165+
.await?
166+
.into_iter()
167+
.map(|r| r.num_rows() as u64)
168+
.sum();
169+
let partition_row_count = max_limit - 1;
170+
assert_eq!(partition_row_count, resulting_limit);
171+
Ok(())
172+
}
173+
43174
#[tokio::test]
44175
async fn csv_filter_with_file_col() -> Result<()> {
45176
let ctx = SessionContext::new();

0 commit comments

Comments
 (0)