Skip to content

Commit 515a64e

Browse files
devanbenzalamb
andauthored
feat(planner): Allowing setting sort order of parquet files without specifying the schema (#12466)
* fix(planner): Allowing setting sort order of parquet files without specifying the schema This PR allows for the following SQL query to be passed without a schema create external table cpu stored as parquet location 'cpu.parquet' with order (time); closes #7317 * chore: fmt'ing * fix: fmt * fix: remove test that checks for error with schema * Add some more tests * fix: use !asc Co-authored-by: Andrew Lamb <[email protected]> * feat: clean up some testing and modify statement when building order by expr --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent e1b992a commit 515a64e

File tree

5 files changed

+106
-16
lines changed

5 files changed

+106
-16
lines changed

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! Factory for creating ListingTables with default options
1919
20+
use std::collections::HashSet;
2021
use std::path::Path;
2122
use std::sync::Arc;
2223

@@ -27,7 +28,7 @@ use crate::datasource::listing::{
2728
use crate::execution::context::SessionState;
2829

2930
use arrow::datatypes::{DataType, SchemaRef};
30-
use datafusion_common::{arrow_datafusion_err, DataFusionError};
31+
use datafusion_common::{arrow_datafusion_err, plan_err, DataFusionError, ToDFSchema};
3132
use datafusion_common::{config_datafusion_err, Result};
3233
use datafusion_expr::CreateExternalTable;
3334

@@ -113,19 +114,39 @@ impl TableProviderFactory for ListingTableFactory {
113114
.with_collect_stat(state.config().collect_statistics())
114115
.with_file_extension(file_extension)
115116
.with_target_partitions(state.config().target_partitions())
116-
.with_table_partition_cols(table_partition_cols)
117-
.with_file_sort_order(cmd.order_exprs.clone());
117+
.with_table_partition_cols(table_partition_cols);
118118

119119
options
120120
.validate_partitions(session_state, &table_path)
121121
.await?;
122122

123123
let resolved_schema = match provided_schema {
124-
None => options.infer_schema(session_state, &table_path).await?,
124+
// We will need to check the table columns against the schema
125+
// this is done so that we can do an ORDER BY for external table creation
126+
// specifically for parquet file format.
127+
// See: https://github.com/apache/datafusion/issues/7317
128+
None => {
129+
let schema = options.infer_schema(session_state, &table_path).await?;
130+
let df_schema = schema.clone().to_dfschema()?;
131+
let column_refs: HashSet<_> = cmd
132+
.order_exprs
133+
.iter()
134+
.flat_map(|sort| sort.iter())
135+
.flat_map(|s| s.expr.column_refs())
136+
.collect();
137+
138+
for column in &column_refs {
139+
if !df_schema.has_column(column) {
140+
return plan_err!("Column {column} is not in schema");
141+
}
142+
}
143+
144+
schema
145+
}
125146
Some(s) => s,
126147
};
127148
let config = ListingTableConfig::new(table_path)
128-
.with_listing_options(options)
149+
.with_listing_options(options.with_file_sort_order(cmd.order_exprs.clone()))
129150
.with_schema(resolved_schema);
130151
let provider = ListingTable::try_new(config)?
131152
.with_cache(state.runtime_env().cache_manager.get_file_statistic_cache());

datafusion/sql/src/statement.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,11 +1136,33 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
11361136
schema: &DFSchemaRef,
11371137
planner_context: &mut PlannerContext,
11381138
) -> Result<Vec<Vec<SortExpr>>> {
1139-
// Ask user to provide a schema if schema is empty.
11401139
if !order_exprs.is_empty() && schema.fields().is_empty() {
1141-
return plan_err!(
1142-
"Provide a schema before specifying the order while creating a table."
1143-
);
1140+
let results = order_exprs
1141+
.iter()
1142+
.map(|lex_order| {
1143+
let result = lex_order
1144+
.iter()
1145+
.map(|order_by_expr| {
1146+
let ordered_expr = &order_by_expr.expr;
1147+
let ordered_expr = ordered_expr.to_owned();
1148+
let ordered_expr = self
1149+
.sql_expr_to_logical_expr(
1150+
ordered_expr,
1151+
schema,
1152+
planner_context,
1153+
)
1154+
.unwrap();
1155+
let asc = order_by_expr.asc.unwrap_or(true);
1156+
let nulls_first = order_by_expr.nulls_first.unwrap_or(!asc);
1157+
1158+
SortExpr::new(ordered_expr, asc, nulls_first)
1159+
})
1160+
.collect::<Vec<SortExpr>>();
1161+
result
1162+
})
1163+
.collect::<Vec<Vec<SortExpr>>>();
1164+
1165+
return Ok(results);
11441166
}
11451167

11461168
let mut all_results = vec![];

datafusion/sql/tests/sql_integration.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2002,6 +2002,13 @@ fn create_external_table_parquet_no_schema() {
20022002
quick_test(sql, expected);
20032003
}
20042004

2005+
#[test]
2006+
fn create_external_table_parquet_no_schema_sort_order() {
2007+
let sql = "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet' WITH ORDER (id)";
2008+
let expected = "CreateExternalTable: Bare { table: \"t\" }";
2009+
quick_test(sql, expected);
2010+
}
2011+
20052012
#[test]
20062013
fn equijoin_explicit_syntax() {
20072014
let sql = "SELECT id, order_id \

datafusion/sqllogictest/test_files/create_external_table.slt

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,3 +228,50 @@ OPTIONS (
228228
format.delimiter '|',
229229
has_header false,
230230
compression gzip);
231+
232+
# Create an external parquet table and infer schema to order by
233+
234+
# query should succeed
235+
statement ok
236+
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id);
237+
238+
## Verify that the table is created with a sort order. Explain should show output_ordering=[id@0 ASC]
239+
query TT
240+
EXPLAIN SELECT id FROM t ORDER BY id ASC;
241+
----
242+
logical_plan
243+
01)Sort: t.id ASC NULLS LAST
244+
02)--TableScan: t projection=[id]
245+
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST]
246+
247+
## Test a DESC order and verify that output_ordering is ASC from the previous OBRDER BY
248+
query TT
249+
EXPLAIN SELECT id FROM t ORDER BY id DESC;
250+
----
251+
logical_plan
252+
01)Sort: t.id DESC NULLS FIRST
253+
02)--TableScan: t projection=[id]
254+
physical_plan
255+
01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false]
256+
02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST]
257+
258+
statement ok
259+
DROP TABLE t;
260+
261+
# Create table with non default sort order
262+
statement ok
263+
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (id DESC NULLS FIRST);
264+
265+
## Verify that the table is created with a sort order. Explain should show output_ordering=[id@0 DESC NULLS FIRST]
266+
query TT
267+
EXPLAIN SELECT id FROM t;
268+
----
269+
logical_plan TableScan: t projection=[id]
270+
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 DESC]
271+
272+
statement ok
273+
DROP TABLE t;
274+
275+
# query should fail with bad column
276+
statement error DataFusion error: Error during planning: Column foo is not in schema
277+
CREATE EXTERNAL TABLE t STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet' WITH ORDER (foo);

datafusion/sqllogictest/test_files/order.slt

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -653,13 +653,6 @@ physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/te
653653
query error DataFusion error: Error during planning: Column a is not in schema
654654
CREATE EXTERNAL TABLE dt (a_id integer, a_str string, a_bool boolean) STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';
655655

656-
657-
# Create external table with DDL ordered columns without schema
658-
# When schema is missing the query is expected to fail
659-
query error DataFusion error: Error during planning: Provide a schema before specifying the order while creating a table\.
660-
CREATE EXTERNAL TABLE dt STORED AS CSV WITH ORDER (a ASC) LOCATION 'file://path/to/table';
661-
662-
663656
# Sort with duplicate sort expressions
664657
# Table is sorted multiple times on the same column name and should not fail
665658
statement ok

0 commit comments

Comments
 (0)