Skip to content

Commit d3612f9

Browse files
committed
[HSTACK] Push projection_deep down the physical nodes path
1 parent f03e9fe commit d3612f9

File tree

14 files changed

+426
-25
lines changed

14 files changed

+426
-25
lines changed

datafusion/core/src/datasource/listing/table.rs

+112
Original file line numberDiff line numberDiff line change
@@ -958,6 +958,118 @@ impl TableProvider for ListingTable {
958958
.await
959959
}
960960

961+
async fn scan_deep(
962+
&self,
963+
state: &dyn Session,
964+
projection: Option<&Vec<usize>>,
965+
projection_deep: Option<&HashMap<usize, Vec<String>>>,
966+
filters: &[Expr],
967+
limit: Option<usize>,
968+
) -> Result<Arc<dyn ExecutionPlan>> {
969+
// extract types of partition columns
970+
let table_partition_cols = self
971+
.options
972+
.table_partition_cols
973+
.iter()
974+
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
975+
.collect::<Result<Vec<_>>>()?;
976+
977+
let table_partition_col_names = table_partition_cols
978+
.iter()
979+
.map(|field| field.name().as_str())
980+
.collect::<Vec<_>>();
981+
// If the filters can be resolved using only partition cols, there is no need to
982+
// pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
983+
let (partition_filters, filters): (Vec<_>, Vec<_>) =
984+
filters.iter().cloned().partition(|filter| {
985+
can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter)
986+
});
987+
// TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here?
988+
let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
989+
990+
// We should not limit the number of partitioned files to scan if there are filters and limit
991+
// at the same time. This is because the limit should be applied after the filters are applied.
992+
let statistic_file_limit = if filters.is_empty() { limit } else { None };
993+
994+
let (mut partitioned_file_lists, statistics) = self
995+
.list_files_for_scan(session_state, &partition_filters, statistic_file_limit)
996+
.await?;
997+
998+
// if no files need to be read, return an `EmptyExec`
999+
if partitioned_file_lists.is_empty() {
1000+
let projected_schema = project_schema(&self.schema(), projection)?;
1001+
return Ok(Arc::new(EmptyExec::new(projected_schema)));
1002+
}
1003+
1004+
let output_ordering = self.try_create_output_ordering()?;
1005+
match state
1006+
.config_options()
1007+
.execution
1008+
.split_file_groups_by_statistics
1009+
.then(|| {
1010+
output_ordering.first().map(|output_ordering| {
1011+
FileScanConfig::split_groups_by_statistics(
1012+
&self.table_schema,
1013+
&partitioned_file_lists,
1014+
output_ordering,
1015+
)
1016+
})
1017+
})
1018+
.flatten()
1019+
{
1020+
Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
1021+
Some(Ok(new_groups)) => {
1022+
if new_groups.len() <= self.options.target_partitions {
1023+
partitioned_file_lists = new_groups;
1024+
} else {
1025+
log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
1026+
}
1027+
}
1028+
None => {} // no ordering required
1029+
};
1030+
1031+
let filters = match conjunction(filters.to_vec()) {
1032+
Some(expr) => {
1033+
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
1034+
let filters = create_physical_expr(
1035+
&expr,
1036+
&table_df_schema,
1037+
state.execution_props(),
1038+
)?;
1039+
Some(filters)
1040+
}
1041+
None => None,
1042+
};
1043+
1044+
let Some(object_store_url) =
1045+
self.table_paths.first().map(ListingTableUrl::object_store)
1046+
else {
1047+
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
1048+
};
1049+
1050+
// create the execution plan
1051+
self.options
1052+
.format
1053+
.create_physical_plan(
1054+
session_state,
1055+
FileScanConfig::new(
1056+
object_store_url,
1057+
Arc::clone(&self.file_schema),
1058+
self.options.format.file_source(),
1059+
)
1060+
.with_file_groups(partitioned_file_lists)
1061+
.with_constraints(self.constraints.clone())
1062+
.with_statistics(statistics)
1063+
.with_projection(projection.cloned())
1064+
.with_projection_deep(projection_deep.cloned())
1065+
.with_limit(limit)
1066+
.with_output_ordering(output_ordering)
1067+
.with_table_partition_cols(table_partition_cols),
1068+
filters.as_ref(),
1069+
)
1070+
.await
1071+
}
1072+
9611073
fn supports_filters_pushdown(
9621074
&self,
9631075
filters: &[&Expr],

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,8 @@ impl ParquetExecBuilder {
233233
}
234234

235235
let base_config = file_scan_config.with_source(Arc::new(parquet.clone()));
236-
debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
237-
base_config.file_groups, base_config.projection, predicate, base_config.limit);
236+
debug!("Creating ParquetExec, files: {:?}, projection {:?}, projection deep {:?}, predicate: {:?}, limit: {:?}",
237+
base_config.file_groups, base_config.projection, base_config.projection_deep, predicate, base_config.limit);
238238

239239
ParquetExec {
240240
inner: DataSourceExec::new(Arc::new(base_config.clone())),

datafusion/core/src/datasource/physical_plan/parquet/opener.rs

+160-10
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
//! [`ParquetOpener`] for opening Parquet files
1919
20-
use std::sync::Arc;
21-
2220
use crate::datasource::file_format::parquet::{
2321
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
2422
};
@@ -31,6 +29,9 @@ use crate::datasource::physical_plan::{
3129
FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory,
3230
};
3331
use crate::datasource::schema_adapter::SchemaAdapterFactory;
32+
use std::cmp::min;
33+
use std::collections::HashMap;
34+
use std::sync::Arc;
3435

3536
use arrow::datatypes::SchemaRef;
3637
use arrow::error::ArrowError;
@@ -40,18 +41,21 @@ use datafusion_physical_optimizer::pruning::PruningPredicate;
4041
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
4142

4243
use futures::{StreamExt, TryStreamExt};
43-
use log::debug;
44+
use log::{debug, info, trace};
4445
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
4546
use parquet::arrow::async_reader::AsyncFileReader;
4647
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
48+
use parquet::schema::types::SchemaDescriptor;
49+
// use datafusion_common::DataFusionError;
50+
use datafusion_common::deep::{has_deep_projection, rewrite_schema, splat_columns};
4751

4852
/// Implements [`FileOpener`] for a parquet file
4953
pub(super) struct ParquetOpener {
5054
/// Execution partition index
5155
pub partition_index: usize,
5256
/// Column indexes in `table_schema` needed by the query
5357
pub projection: Arc<[usize]>,
54-
/// Target number of rows in each output RecordBatch
58+
pub projection_deep: Arc<HashMap<usize, Vec<String>>>,
5559
pub batch_size: usize,
5660
/// Optional limit on the number of rows to read
5761
pub limit: Option<usize>,
@@ -105,11 +109,31 @@ impl FileOpener for ParquetOpener {
105109

106110
let batch_size = self.batch_size;
107111

108-
let projected_schema =
109-
SchemaRef::from(self.table_schema.project(&self.projection)?);
112+
let projection = self.projection.clone();
113+
let projection_vec = projection
114+
.as_ref()
115+
.iter()
116+
.map(|i| *i)
117+
.collect::<Vec<usize>>();
118+
info!(
119+
"ParquetOpener::open projection={:?}, deep_projection: {:?}",
120+
projection, &self.projection_deep
121+
);
122+
// FIXME @HStack: ADR: why do we need to do this ? our function needs another param maybe ?
123+
// In the case when the projections requested are empty, we should return an empty schema
124+
let projected_schema = if projection_vec.len() == 0 {
125+
SchemaRef::from(self.table_schema.project(&projection)?)
126+
} else {
127+
rewrite_schema(
128+
self.table_schema.clone(),
129+
&projection_vec,
130+
self.projection_deep.as_ref(),
131+
)
132+
};
110133
let schema_adapter = self
111134
.schema_adapter_factory
112135
.create(projected_schema, Arc::clone(&self.table_schema));
136+
let projection_deep = self.projection_deep.clone();
113137
let predicate = self.predicate.clone();
114138
let pruning_predicate = self.pruning_predicate.clone();
115139
let page_pruning_predicate = self.page_pruning_predicate.clone();
@@ -159,11 +183,37 @@ impl FileOpener for ParquetOpener {
159183
let (schema_mapping, adapted_projections) =
160184
schema_adapter.map_schema(&file_schema)?;
161185

162-
let mask = ProjectionMask::roots(
163-
builder.parquet_schema(),
164-
adapted_projections.iter().cloned(),
165-
);
186+
// let mask = ProjectionMask::roots(
187+
// builder.parquet_schema(),
188+
// adapted_projections.iter().cloned(),
189+
// );
190+
let mask = if has_deep_projection(Some(projection_deep.clone().as_ref())) {
191+
let leaves = generate_leaf_paths(
192+
table_schema.clone(),
193+
builder.parquet_schema(),
194+
&projection_vec,
195+
projection_deep.clone().as_ref(),
196+
);
197+
info!(
198+
"ParquetOpener::open, using deep projection parquet leaves: {:?}",
199+
leaves.clone()
200+
);
201+
// let tmp = builder.parquet_schema();
202+
// for (i, col) in tmp.columns().iter().enumerate() {
203+
// info!(" {} {}= {:?}", i, col.path(), col);
204+
// }
205+
ProjectionMask::leaves(builder.parquet_schema(), leaves)
206+
} else {
207+
info!(
208+
"ParquetOpener::open, using root projections: {:?}",
209+
&adapted_projections
210+
);
166211

212+
ProjectionMask::roots(
213+
builder.parquet_schema(),
214+
adapted_projections.iter().cloned(),
215+
)
216+
};
167217
// Filter pushdown: evaluate predicates during scan
168218
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
169219
let row_filter = row_filter::build_row_filter(
@@ -303,3 +353,103 @@ fn create_initial_plan(
303353
// default to scanning all row groups
304354
Ok(ParquetAccessPlan::new_all(row_group_count))
305355
}
356+
357+
// FIXME: @HStack ACTUALLY look at the arrow schema and handle map types correctly
358+
// Right now, we are matching "map-like" parquet leaves like "key_value.key" etc
359+
// But, we neeed to walk through both the arrow schema (which KNOWS about the map type)
360+
// and the parquet leaves to do this correctly.
361+
fn equivalent_projection_paths_from_parquet_schema(
362+
_arrow_schema: SchemaRef,
363+
parquet_schema: &SchemaDescriptor,
364+
) -> Vec<(usize, (String, String))> {
365+
let mut output: Vec<(usize, (String, String))> = vec![];
366+
for (i, col) in parquet_schema.columns().iter().enumerate() {
367+
let original_path = col.path().string();
368+
let converted_path =
369+
convert_parquet_path_to_deep_projection_path(&original_path.as_str());
370+
output.push((i, (original_path.clone(), converted_path)));
371+
}
372+
output
373+
}
374+
375+
fn convert_parquet_path_to_deep_projection_path(parquet_path: &str) -> String {
376+
if parquet_path.contains(".key_value.key")
377+
|| parquet_path.contains(".key_value.value")
378+
|| parquet_path.contains(".entries.keys")
379+
|| parquet_path.contains(".entries.values")
380+
|| parquet_path.contains(".list.element")
381+
{
382+
let tmp = parquet_path
383+
.replace("key_value.key", "*")
384+
.replace("key_value.value", "*")
385+
.replace("entries.keys", "*")
386+
.replace("entries.values", "*")
387+
.replace("list.element", "*");
388+
tmp
389+
} else {
390+
parquet_path.to_string()
391+
}
392+
}
393+
394+
fn generate_leaf_paths(
395+
arrow_schema: SchemaRef,
396+
parquet_schema: &SchemaDescriptor,
397+
projection: &Vec<usize>,
398+
projection_deep: &HashMap<usize, Vec<String>>,
399+
) -> Vec<usize> {
400+
let actual_projection = if projection.len() == 0 {
401+
(0..arrow_schema.fields().len()).collect()
402+
} else {
403+
projection.clone()
404+
};
405+
let splatted =
406+
splat_columns(arrow_schema.clone(), &actual_projection, &projection_deep);
407+
trace!(target: "deep", "generate_leaf_paths: splatted: {:?}", &splatted);
408+
409+
let mut out: Vec<usize> = vec![];
410+
for (i, (original, converted)) in
411+
equivalent_projection_paths_from_parquet_schema(arrow_schema, parquet_schema)
412+
{
413+
// FIXME: @HStack
414+
// for map fields, the actual parquet paths look like x.y.z.key_value.key, x.y.z.key_value.value
415+
// since we are ignoring these names in the paths, we need to actually collapse this access to a *
416+
// so we can filter for them
417+
// also, we need BOTH the key and the value for maps otherwise we run into an arrow-rs error
418+
// "partial projection of MapArray is not supported"
419+
420+
trace!(target: "deep", " generate_leaf_paths looking at index {} {} = {}", i, &original, &converted);
421+
422+
let mut found = false;
423+
for filter in splatted.iter() {
424+
// check if this filter matches this leaf path
425+
let filter_pieces = filter.split(".").collect::<Vec<&str>>();
426+
// let col_pieces = col_path.parts();
427+
let col_pieces = converted.split(".").collect::<Vec<_>>();
428+
// let's check
429+
let mut filter_found = true;
430+
for i in 0..min(filter_pieces.len(), col_pieces.len()) {
431+
if i >= filter_pieces.len() {
432+
// we are at the end of the filter, and we matched until now, so we break, we match !
433+
break;
434+
}
435+
if i >= col_pieces.len() {
436+
// we have a longer filter, we matched until now, we match !
437+
break;
438+
}
439+
// we can actually check
440+
if !(col_pieces[i] == filter_pieces[i] || filter_pieces[i] == "*") {
441+
filter_found = false;
442+
break;
443+
}
444+
}
445+
if filter_found {
446+
found = true;
447+
break;
448+
}
449+
}
450+
if found {
451+
out.push(i);
452+
}
453+
}
454+
out
455+
}

0 commit comments

Comments
 (0)