Skip to content

Commit 052caeb

Browse files
committed
Non-deprecated support for planning SQL without DDL
1 parent 2f5b25d commit 052caeb

File tree

13 files changed

+150
-108
lines changed

13 files changed

+150
-108
lines changed

benchmarks/src/bin/tpch.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -324,18 +324,17 @@ async fn execute_query(
324324
debug: bool,
325325
enable_scheduler: bool,
326326
) -> Result<Vec<RecordBatch>> {
327-
let plan = ctx.sql(sql).await?;
328-
let plan = plan.into_unoptimized_plan();
327+
let plan = ctx.sql(sql).await?.into_unoptimized_plan();
329328

330329
if debug {
331330
println!("=== Logical plan ===\n{:?}\n", plan);
332331
}
333332

334-
let plan = ctx.optimize(&plan)?;
333+
let plan = ctx.dataframe(plan).await?.into_optimized_plan()?;
335334
if debug {
336335
println!("=== Optimized logical plan ===\n{:?}\n", plan);
337336
}
338-
let physical_plan = ctx.create_physical_plan(&plan).await?;
337+
let physical_plan = ctx.dataframe(plan).await?.create_physical_plan().await?;
339338
if debug {
340339
println!(
341340
"=== Physical plan ===\n{}\n",

datafusion/core/src/execution/context.rs

Lines changed: 47 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -245,26 +245,31 @@ impl SessionContext {
245245
self.state.read().config.clone()
246246
}
247247

248-
/// Creates a [`DataFrame`] that will execute a SQL query.
248+
/// Creates a [`DataFrame`] that executes a SQL query supported by
249+
/// DataFusion, including DDL (such as `CREATE TABLE`).
249250
///
250-
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
251-
/// might require the schema to be inferred.
251+
/// You can use [`Self::plan_sql`] and
252+
/// [`DataFrame::create_physical_plan`] directly if you need read only
253+
/// query support (no way to create external tables, for example)
254+
///
255+
/// This method is `async` because queries of type `CREATE
256+
/// EXTERNAL TABLE` might require the schema to be inferred.
252257
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
253-
let mut statements = DFParser::parse_sql(sql)?;
254-
if statements.len() != 1 {
255-
return Err(DataFusionError::NotImplemented(
256-
"The context currently only supports a single SQL statement".to_string(),
257-
));
258-
}
259-
260-
// create a query planner
261-
let plan = {
262-
// TODO: Move catalog off SessionState onto SessionContext
263-
let state = self.state.read();
264-
let query_planner = SqlToRel::new(&*state);
265-
query_planner.statement_to_plan(statements.pop_front().unwrap())?
266-
};
258+
let plan = self.plan_sql(sql)?;
259+
self.dataframe(plan).await
260+
}
267261

262+
/// Creates a [`DataFrame`] that will execute the specified
263+
/// LogicalPlan, including DDL (such as `CREATE TABLE`).
264+
/// Use [`Self::dataframe_without_ddl`] if you do not want
265+
/// to support DDL statements.
266+
///
267+
/// Any DDL statements are executed during this function (not when
268+
/// the [`DataFrame`] is evaluated)
269+
///
270+
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
271+
/// might require the schema to be inferred by performing I/O.
272+
pub async fn dataframe(&self, plan: LogicalPlan) -> Result<DataFrame> {
268273
match plan {
269274
LogicalPlan::CreateExternalTable(cmd) => {
270275
self.create_external_table(&cmd).await
@@ -492,6 +497,15 @@ impl SessionContext {
492497
}
493498
}
494499

500+
/// Creates a [`DataFrame`] that will execute the specified
501+
/// LogicalPlan, but will error if the plans represent DDL such as
502+
/// `CREATE TABLE`
503+
///
504+
/// Use [`Self::dataframe`] to run plans with DDL
505+
pub fn dataframe_without_ddl(&self, plan: LogicalPlan) -> Result<DataFrame> {
506+
Ok(DataFrame::new(self.state(), plan))
507+
}
508+
495509
// return an empty dataframe
496510
fn return_empty_dataframe(&self) -> Result<DataFrame> {
497511
let plan = LogicalPlanBuilder::empty(false).build()?;
@@ -559,11 +573,9 @@ impl SessionContext {
559573
}
560574
Ok(false)
561575
}
562-
/// Creates a logical plan.
563-
///
564-
/// This function is intended for internal use and should not be called directly.
565-
#[deprecated(note = "Use SessionContext::sql which snapshots the SessionState")]
566-
pub fn create_logical_plan(&self, sql: &str) -> Result<LogicalPlan> {
576+
577+
/// Creates a [`LogicalPlan`] from a SQL query.
578+
pub fn plan_sql(&self, sql: &str) -> Result<LogicalPlan> {
567579
let mut statements = DFParser::parse_sql(sql)?;
568580

569581
if statements.len() != 1 {
@@ -1000,32 +1012,24 @@ impl SessionContext {
10001012
}
10011013

10021014
/// Optimizes the logical plan by applying optimizer rules.
1003-
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
1004-
self.state.read().optimize(plan)
1015+
#[deprecated(
1016+
note = "Use `SessionContext::dataframe_without_ddl` and `DataFrame::into_optimized_plan`"
1017+
)]
1018+
pub fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
1019+
self.dataframe_without_ddl(plan)?.into_optimized_plan()
10051020
}
10061021

1007-
/// Creates a physical plan from a logical plan.
1022+
/// Creates a physical [`ExecutionPlan`] from a [`LogicalPlan`].
1023+
#[deprecated(
1024+
note = "Use `SessionContext::::dataframe_without_ddl` and `DataFrame::create_physical_plan`"
1025+
)]
10081026
pub async fn create_physical_plan(
10091027
&self,
1010-
logical_plan: &LogicalPlan,
1028+
logical_plan: LogicalPlan,
10111029
) -> Result<Arc<dyn ExecutionPlan>> {
1012-
let state_cloned = {
1013-
let mut state = self.state.write();
1014-
state.execution_props.start_execution();
1015-
1016-
// We need to clone `state` to release the lock that is not `Send`. We could
1017-
// make the lock `Send` by using `tokio::sync::Mutex`, but that would require to
1018-
// propagate async even to the `LogicalPlan` building methods.
1019-
// Cloning `state` here is fine as we then pass it as immutable `&state`, which
1020-
// means that we avoid write consistency issues as the cloned version will not
1021-
// be written to. As for eventual modifications that would be applied to the
1022-
// original state after it has been cloned, they will not be picked up by the
1023-
// clone but that is okay, as it is equivalent to postponing the state update
1024-
// by keeping the lock until the end of the function scope.
1025-
state.clone()
1026-
};
1027-
1028-
state_cloned.create_physical_plan(logical_plan).await
1030+
self.dataframe_without_ddl(logical_plan)?
1031+
.create_physical_plan()
1032+
.await
10291033
}
10301034

10311035
/// Executes a query and writes the results to a partitioned CSV file.

datafusion/core/src/physical_plan/planner.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,15 +1097,15 @@ impl DefaultPhysicalPlanner {
10971097
// TABLE" -- it must be handled at a higher level (so
10981098
// that the appropriate table can be registered with
10991099
// the context)
1100-
Err(DataFusionError::Internal(
1100+
Err(DataFusionError::Plan(
11011101
"Unsupported logical plan: CreateExternalTable".to_string(),
11021102
))
11031103
}
11041104
LogicalPlan::Prepare(_) => {
11051105
// There is no default plan for "PREPARE" -- it must be
11061106
// handled at a higher level (so that the appropriate
11071107
// statement can be prepared)
1108-
Err(DataFusionError::Internal(
1108+
Err(DataFusionError::Plan(
11091109
"Unsupported logical plan: Prepare".to_string(),
11101110
))
11111111
}
@@ -1114,7 +1114,7 @@ impl DefaultPhysicalPlanner {
11141114
// It must be handled at a higher level (so
11151115
// that the schema can be registered with
11161116
// the context)
1117-
Err(DataFusionError::Internal(
1117+
Err(DataFusionError::Plan(
11181118
"Unsupported logical plan: CreateCatalogSchema".to_string(),
11191119
))
11201120
}
@@ -1123,7 +1123,7 @@ impl DefaultPhysicalPlanner {
11231123
// It must be handled at a higher level (so
11241124
// that the schema can be registered with
11251125
// the context)
1126-
Err(DataFusionError::Internal(
1126+
Err(DataFusionError::Plan(
11271127
"Unsupported logical plan: CreateCatalog".to_string(),
11281128
))
11291129
}
@@ -1132,7 +1132,7 @@ impl DefaultPhysicalPlanner {
11321132
// It must be handled at a higher level (so
11331133
// that the schema can be registered with
11341134
// the context)
1135-
Err(DataFusionError::Internal(
1135+
Err(DataFusionError::Plan(
11361136
"Unsupported logical plan: CreateMemoryTable".to_string(),
11371137
))
11381138
}
@@ -1141,7 +1141,7 @@ impl DefaultPhysicalPlanner {
11411141
// It must be handled at a higher level (so
11421142
// that the schema can be registered with
11431143
// the context)
1144-
Err(DataFusionError::Internal(
1144+
Err(DataFusionError::Plan(
11451145
"Unsupported logical plan: DropTable".to_string(),
11461146
))
11471147
}
@@ -1150,7 +1150,7 @@ impl DefaultPhysicalPlanner {
11501150
// It must be handled at a higher level (so
11511151
// that the schema can be registered with
11521152
// the context)
1153-
Err(DataFusionError::Internal(
1153+
Err(DataFusionError::Plan(
11541154
"Unsupported logical plan: DropView".to_string(),
11551155
))
11561156
}
@@ -1159,16 +1159,16 @@ impl DefaultPhysicalPlanner {
11591159
// It must be handled at a higher level (so
11601160
// that the schema can be registered with
11611161
// the context)
1162-
Err(DataFusionError::Internal(
1162+
Err(DataFusionError::Plan(
11631163
"Unsupported logical plan: CreateView".to_string(),
11641164
))
11651165
}
11661166
LogicalPlan::SetVariable(_) => {
1167-
Err(DataFusionError::Internal(
1167+
Err(DataFusionError::Plan(
11681168
"Unsupported logical plan: SetVariable must be root of the plan".to_string(),
11691169
))
11701170
}
1171-
LogicalPlan::Explain(_) => Err(DataFusionError::Internal(
1171+
LogicalPlan::Explain(_) => Err(DataFusionError::Plan(
11721172
"Unsupported logical plan: Explain must be root of the plan".to_string(),
11731173
)),
11741174
LogicalPlan::Analyze(a) => {

datafusion/core/tests/custom_sources.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,8 @@ async fn custom_source_dataframe() -> Result<()> {
212212
.project(vec![col("c2")])?
213213
.build()?;
214214

215-
let optimized_plan = ctx.optimize(&logical_plan)?;
215+
let optimized_plan = ctx.dataframe(logical_plan).await?.into_optimized_plan()?;
216+
216217
match &optimized_plan {
217218
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
218219
LogicalPlan::TableScan(TableScan {
@@ -235,7 +236,11 @@ async fn custom_source_dataframe() -> Result<()> {
235236
);
236237
assert_eq!(format!("{:?}", optimized_plan), expected);
237238

238-
let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
239+
let physical_plan = ctx
240+
.dataframe(optimized_plan)
241+
.await?
242+
.create_physical_plan()
243+
.await?;
239244

240245
assert_eq!(1, physical_plan.schema().fields().len());
241246
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
@@ -261,10 +266,7 @@ async fn optimizers_catch_all_statistics() {
261266
.await
262267
.unwrap();
263268

264-
let physical_plan = ctx
265-
.create_physical_plan(&df.into_optimized_plan().unwrap())
266-
.await
267-
.unwrap();
269+
let physical_plan = df.create_physical_plan().await.unwrap();
268270

269271
// when the optimization kicks in, the source is replaced by an EmptyExec
270272
assert!(

datafusion/core/tests/parquet/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,12 @@ impl ContextWithParquet {
210210
.expect("getting input");
211211
let pretty_input = pretty_format_batches(&input).unwrap().to_string();
212212

213-
let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan");
214-
215213
let physical_plan = self
216214
.ctx
217-
.create_physical_plan(&logical_plan)
215+
.dataframe(logical_plan)
216+
.await
217+
.expect("planning")
218+
.create_physical_plan()
218219
.await
219220
.expect("creating physical plan");
220221

datafusion/core/tests/sql/aggregates.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,9 +1120,8 @@ async fn aggregate_with_alias() -> Result<()> {
11201120
.project(vec![col("c1"), sum(col("c2")).alias("total_salary")])?
11211121
.build()?;
11221122

1123-
let plan = ctx.optimize(&plan)?;
1123+
let physical_plan = ctx.dataframe(plan).await?.create_physical_plan().await?;
11241124

1125-
let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
11261125
assert_eq!("c1", physical_plan.schema().field(0).name().as_str());
11271126
assert_eq!(
11281127
"total_salary",

datafusion/core/tests/sql/errors.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -135,39 +135,46 @@ async fn invalid_qualified_table_references() -> Result<()> {
135135
}
136136

137137
#[tokio::test]
138-
#[allow(deprecated)] // TODO: Remove this test once create_logical_plan removed
138+
/// This test demonstrates it is posible to run SQL queries in DataFusion without
139+
/// any DDL Support (such as CREATE TABLE / CREATE VIEW, ETC).
139140
async fn unsupported_sql_returns_error() -> Result<()> {
140141
let ctx = SessionContext::new();
141142
register_aggregate_csv(&ctx).await?;
142143
// create view
143144
let sql = "create view test_view as select * from aggregate_test_100";
144-
let plan = ctx.create_logical_plan(sql);
145-
let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await;
146-
assert!(physical_plan.is_err());
145+
let plan = ctx.plan_sql(sql)?;
146+
let err = ctx
147+
.dataframe_without_ddl(plan)?
148+
.create_physical_plan()
149+
.await
150+
.unwrap_err();
147151
assert_eq!(
148-
format!("{}", physical_plan.unwrap_err()),
149-
"Internal error: Unsupported logical plan: CreateView. \
150-
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker"
152+
err.to_string(),
153+
"Error during planning: Unsupported logical plan: CreateView"
151154
);
152155
// // drop view
153156
let sql = "drop view test_view";
154-
let plan = ctx.create_logical_plan(sql);
155-
let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await;
156-
assert!(physical_plan.is_err());
157+
let plan = ctx.plan_sql(sql)?;
158+
let err = ctx
159+
.dataframe_without_ddl(plan)?
160+
.create_physical_plan()
161+
.await
162+
.unwrap_err();
157163
assert_eq!(
158-
format!("{}", physical_plan.unwrap_err()),
159-
"Internal error: Unsupported logical plan: DropView. \
160-
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker"
164+
err.to_string(),
165+
"Error during planning: Unsupported logical plan: DropView"
161166
);
162167
// // drop table
163168
let sql = "drop table aggregate_test_100";
164-
let plan = ctx.create_logical_plan(sql);
165-
let physical_plan = ctx.create_physical_plan(&plan.unwrap()).await;
166-
assert!(physical_plan.is_err());
169+
let plan = ctx.plan_sql(sql)?;
170+
let err = ctx
171+
.dataframe_without_ddl(plan)?
172+
.create_physical_plan()
173+
.await
174+
.unwrap_err();
167175
assert_eq!(
168-
format!("{}", physical_plan.unwrap_err()),
169-
"Internal error: Unsupported logical plan: DropTable. \
170-
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker"
176+
err.to_string(),
177+
"Error during planning: Unsupported logical plan: DropTable"
171178
);
172179
Ok(())
173180
}

datafusion/core/tests/sql/explain.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ fn optimize_explain() {
3838
}
3939

4040
// now optimize the plan and expect to see more plans
41-
let optimized_plan = SessionContext::new().optimize(&plan).unwrap();
41+
let optimized_plan = SessionContext::new()
42+
.dataframe_without_ddl(plan.clone())
43+
.unwrap()
44+
.into_optimized_plan()
45+
.unwrap();
4246
if let LogicalPlan::Explain(e) = &optimized_plan {
4347
// should have more than one plan
4448
assert!(

0 commit comments

Comments
 (0)