Skip to content

Commit cb096f6

Browse files
authored
Deprecate SessionContext::create_logical_plan (#4617) (#4679)
* Deprecate SessionContext::create_phsyical_plan (#4617) * Fix tests * Fix merge conflicts * Rename to_* to into_* * Add schema check to execute_to_batches
1 parent bfef105 commit cb096f6

32 files changed

+323
-546
lines changed

benchmarks/src/bin/nyctaxi.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use datafusion::arrow::util::pretty;
2828
use datafusion::error::Result;
2929
use datafusion::execution::context::{SessionConfig, SessionContext};
3030

31-
use datafusion::physical_plan::collect;
3231
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
3332
use structopt::StructOpt;
3433

@@ -119,14 +118,11 @@ async fn datafusion_sql_benchmarks(
119118
}
120119

121120
async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result<()> {
122-
let plan = ctx.create_logical_plan(sql)?;
123-
let plan = ctx.optimize(&plan)?;
121+
let dataframe = ctx.sql(sql).await?;
124122
if debug {
125-
println!("Optimized logical plan:\n{:?}", plan);
123+
println!("Optimized logical plan:\n{:?}", dataframe.logical_plan());
126124
}
127-
let physical_plan = ctx.create_physical_plan(&plan).await?;
128-
let task_ctx = ctx.task_ctx();
129-
let result = collect(physical_plan, task_ctx).await?;
125+
let result = dataframe.collect().await?;
130126
if debug {
131127
pretty::print_batches(&result)?;
132128
}

benchmarks/src/bin/tpch.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ async fn execute_query(
328328
enable_scheduler: bool,
329329
) -> Result<Vec<RecordBatch>> {
330330
let plan = ctx.sql(sql).await?;
331-
let plan = plan.to_unoptimized_plan();
331+
let plan = plan.into_unoptimized_plan();
332332

333333
if debug {
334334
println!("=== Logical plan ===\n{:?}\n", plan);
@@ -643,7 +643,7 @@ mod tests {
643643
let sql = get_query_sql(query)?;
644644
for sql in &sql {
645645
let df = ctx.sql(sql.as_str()).await?;
646-
let plan = df.to_logical_plan()?;
646+
let plan = df.into_optimized_plan()?;
647647
if !actual.is_empty() {
648648
actual += "\n";
649649
}

benchmarks/src/tpch.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,7 @@ pub async fn convert_tbl(
353353
}
354354

355355
// create the physical plan
356-
let csv = csv.to_logical_plan()?;
357-
let csv = ctx.create_physical_plan(&csv).await?;
356+
let csv = csv.create_physical_plan().await?;
358357

359358
let output_path = output_root_path.join(table);
360359
let output_path = output_path.to_str().unwrap().to_owned();

datafusion/core/src/dataframe.rs

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323
use async_trait::async_trait;
2424
use parquet::file::properties::WriterProperties;
2525

26-
use datafusion_common::{Column, DFSchema};
26+
use datafusion_common::{Column, DFSchema, ScalarValue};
2727
use datafusion_expr::TableProviderFilterPushDown;
2828

2929
use crate::arrow::datatypes::Schema;
@@ -505,17 +505,40 @@ impl DataFrame {
505505
self.plan.schema()
506506
}
507507

508-
/// Return the unoptimized logical plan represented by this DataFrame.
509-
pub fn to_unoptimized_plan(self) -> LogicalPlan {
508+
/// Return the unoptimized logical plan
509+
pub fn logical_plan(&self) -> &LogicalPlan {
510+
&self.plan
511+
}
512+
513+
/// Return the logical plan represented by this DataFrame without running the optimizers
514+
///
515+
/// Note: This method should not be used outside testing, as it loses the snapshot
516+
/// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent
517+
/// operations may take place against a different state
518+
pub fn into_unoptimized_plan(self) -> LogicalPlan {
510519
self.plan
511520
}
512521

513522
/// Return the optimized logical plan represented by this DataFrame.
514-
pub fn to_logical_plan(self) -> Result<LogicalPlan> {
523+
///
524+
/// Note: This method should not be used outside testing, as it loses the snapshot
525+
/// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent
526+
/// operations may take place against a different state
527+
pub fn into_optimized_plan(self) -> Result<LogicalPlan> {
515528
// Optimize the plan first for better UX
516529
self.session_state.optimize(&self.plan)
517530
}
518531

532+
/// Return the optimized logical plan represented by this DataFrame.
533+
///
534+
/// Note: This method should not be used outside testing, as it loses the snapshot
535+
/// of the [`SessionState`] attached to this [`DataFrame`] and consequently subsequent
536+
/// operations may take place against a different state
537+
#[deprecated(note = "Use DataFrame::into_optimized_plan")]
538+
pub fn to_logical_plan(self) -> Result<LogicalPlan> {
539+
self.into_optimized_plan()
540+
}
541+
519542
/// Return a DataFrame with the explanation of its plan so far.
520543
///
521544
/// if `analyze` is specified, runs the plan and reports metrics
@@ -714,6 +737,12 @@ impl DataFrame {
714737
}
715738
}
716739

740+
/// Convert a prepare logical plan into its inner logical plan with all params replaced with their corresponding values
741+
pub fn with_param_values(self, param_values: Vec<ScalarValue>) -> Result<Self> {
742+
let plan = self.plan.with_param_values(param_values)?;
743+
Ok(Self::new(self.session_state, plan))
744+
}
745+
717746
/// Cache DataFrame as a memory table.
718747
///
719748
/// ```
@@ -1014,11 +1043,10 @@ mod tests {
10141043
let df = df.select(vec![expr])?;
10151044

10161045
// build query using SQL
1017-
let sql_plan =
1018-
ctx.create_logical_plan("SELECT my_fn(c12) FROM aggregate_test_100")?;
1046+
let sql_plan = ctx.sql("SELECT my_fn(c12) FROM aggregate_test_100").await?;
10191047

10201048
// the two plans should be identical
1021-
assert_same_plan(&df.plan, &sql_plan);
1049+
assert_same_plan(&df.plan, &sql_plan.plan);
10221050

10231051
Ok(())
10241052
}
@@ -1128,7 +1156,7 @@ mod tests {
11281156
async fn create_plan(sql: &str) -> Result<LogicalPlan> {
11291157
let mut ctx = SessionContext::new();
11301158
register_aggregate_csv(&mut ctx, "aggregate_test_100").await?;
1131-
ctx.create_logical_plan(sql)
1159+
Ok(ctx.sql(sql).await?.into_unoptimized_plan())
11321160
}
11331161

11341162
async fn test_table_with_name(name: &str) -> Result<DataFrame> {
@@ -1308,7 +1336,7 @@ mod tests {
13081336
\n Inner Join: t1.c1 = t2.c1\
13091337
\n TableScan: t1\
13101338
\n TableScan: t2",
1311-
format!("{:?}", df_renamed.clone().to_unoptimized_plan())
1339+
format!("{:?}", df_renamed.logical_plan())
13121340
);
13131341

13141342
assert_eq!("\
@@ -1322,7 +1350,7 @@ mod tests {
13221350
\n SubqueryAlias: t2\
13231351
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
13241352
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
1325-
format!("{:?}", df_renamed.clone().to_logical_plan()?)
1353+
format!("{:?}", df_renamed.clone().into_optimized_plan()?)
13261354
);
13271355

13281356
let df_results = df_renamed.collect().await?;
@@ -1469,7 +1497,7 @@ mod tests {
14691497

14701498
assert_eq!(
14711499
"TableScan: ?table? projection=[c2, c3, sum]",
1472-
format!("{:?}", cached_df.clone().to_logical_plan()?)
1500+
format!("{:?}", cached_df.clone().into_optimized_plan()?)
14731501
);
14741502

14751503
let df_results = df.collect().await?;

datafusion/core/src/datasource/view.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -493,12 +493,10 @@ mod tests {
493493
let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
494494
session_ctx.sql(view_sql).await?.collect().await?;
495495

496-
let plan = session_ctx
496+
let dataframe = session_ctx
497497
.sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc")
498-
.await?
499-
.to_logical_plan()
500-
.unwrap();
501-
let plan = session_ctx.optimize(&plan).unwrap();
498+
.await?;
499+
let plan = dataframe.into_optimized_plan()?;
502500
let actual = format!("{}", plan.display_indent());
503501
let expected = "\
504502
Explain\
@@ -507,12 +505,10 @@ mod tests {
507505
\n TableScan: abc projection=[column1, column2, column3]";
508506
assert_eq!(expected, actual);
509507

510-
let plan = session_ctx
508+
let dataframe = session_ctx
511509
.sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc WHERE column2 = 5")
512-
.await?
513-
.to_logical_plan()
514-
.unwrap();
515-
let plan = session_ctx.optimize(&plan).unwrap();
510+
.await?;
511+
let plan = dataframe.into_optimized_plan()?;
516512
let actual = format!("{}", plan.display_indent());
517513
let expected = "\
518514
Explain\
@@ -522,12 +518,10 @@ mod tests {
522518
\n TableScan: abc projection=[column1, column2, column3]";
523519
assert_eq!(expected, actual);
524520

525-
let plan = session_ctx
521+
let dataframe = session_ctx
526522
.sql("EXPLAIN CREATE VIEW xyz AS SELECT column1, column2 FROM abc WHERE column2 = 5")
527-
.await?
528-
.to_logical_plan()
529-
.unwrap();
530-
let plan = session_ctx.optimize(&plan).unwrap();
523+
.await?;
524+
let plan = dataframe.into_optimized_plan()?;
531525
let actual = format!("{}", plan.display_indent());
532526
let expected = "\
533527
Explain\

datafusion/core/src/execution/context.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,21 @@ impl SessionContext {
254254
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
255255
/// might require the schema to be inferred.
256256
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
257-
let plan = self.create_logical_plan(sql)?;
257+
let mut statements = DFParser::parse_sql(sql)?;
258+
if statements.len() != 1 {
259+
return Err(DataFusionError::NotImplemented(
260+
"The context currently only supports a single SQL statement".to_string(),
261+
));
262+
}
263+
264+
// create a query planner
265+
let plan = {
266+
// TODO: Move catalog off SessionState onto SessionContext
267+
let state = self.state.read();
268+
let query_planner = SqlToRel::new(&*state);
269+
query_planner.statement_to_plan(statements.pop_front().unwrap())?
270+
};
271+
258272
match plan {
259273
LogicalPlan::CreateExternalTable(cmd) => {
260274
self.create_external_table(&cmd).await
@@ -553,6 +567,7 @@ impl SessionContext {
553567
/// Creates a logical plan.
554568
///
555569
/// This function is intended for internal use and should not be called directly.
570+
#[deprecated(note = "Use SessionContext::sql which snapshots the SessionState")]
556571
pub fn create_logical_plan(&self, sql: &str) -> Result<LogicalPlan> {
557572
let mut statements = DFParser::parse_sql(sql)?;
558573

@@ -1791,9 +1806,10 @@ impl SessionState {
17911806
&self,
17921807
logical_plan: &LogicalPlan,
17931808
) -> Result<Arc<dyn ExecutionPlan>> {
1794-
let planner = self.query_planner.clone();
17951809
let logical_plan = self.optimize(logical_plan)?;
1796-
planner.create_physical_plan(&logical_plan, self).await
1810+
self.query_planner
1811+
.create_physical_plan(&logical_plan, self)
1812+
.await
17971813
}
17981814

17991815
/// return the configuration options
@@ -2046,8 +2062,7 @@ mod tests {
20462062
use std::fs::File;
20472063
use std::path::PathBuf;
20482064
use std::sync::Weak;
2049-
use std::thread::{self, JoinHandle};
2050-
use std::{env, io::prelude::*, sync::Mutex};
2065+
use std::{env, io::prelude::*};
20512066
use tempfile::TempDir;
20522067

20532068
#[tokio::test]
@@ -2267,23 +2282,21 @@ mod tests {
22672282
// environment. Usecase is for concurrent planing.
22682283
let tmp_dir = TempDir::new()?;
22692284
let partition_count = 4;
2270-
let ctx = Arc::new(Mutex::new(create_ctx(&tmp_dir, partition_count).await?));
2285+
let ctx = Arc::new(create_ctx(&tmp_dir, partition_count).await?);
22712286

2272-
let threads: Vec<JoinHandle<Result<_>>> = (0..2)
2287+
let threads: Vec<_> = (0..2)
22732288
.map(|_| ctx.clone())
2274-
.map(|ctx_clone| {
2275-
thread::spawn(move || {
2276-
let ctx = ctx_clone.lock().expect("Locked context");
2289+
.map(|ctx| {
2290+
tokio::spawn(async move {
22772291
// Ensure we can create logical plan code on a separate thread.
2278-
ctx.create_logical_plan(
2279-
"SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3",
2280-
)
2292+
ctx.sql("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")
2293+
.await
22812294
})
22822295
})
22832296
.collect();
22842297

2285-
for thread in threads {
2286-
thread.join().expect("Failed to join thread")?;
2298+
for handle in threads {
2299+
handle.await.unwrap().unwrap();
22872300
}
22882301
Ok(())
22892302
}

datafusion/core/src/physical_plan/coalesce_batches.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,8 +347,8 @@ mod tests {
347347
let partition = create_vec_batches(&schema, 10);
348348
let table = MemTable::try_new(schema, vec![partition])?;
349349
ctx.register_table("a", Arc::new(table))?;
350-
let plan = ctx.create_logical_plan("SELECT * FROM a WHERE c0 < 1")?;
351-
ctx.create_physical_plan(&plan).await
350+
let dataframe = ctx.sql("SELECT * FROM a WHERE c0 < 1").await?;
351+
dataframe.create_physical_plan().await
352352
}
353353

354354
#[tokio::test(flavor = "multi_thread")]

datafusion/core/src/physical_plan/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -317,11 +317,8 @@ pub fn with_new_children_if_necessary(
317317
/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await.unwrap();
318318
///
319319
/// // create a plan to run a SQL query
320-
/// let plan = ctx
321-
/// .create_logical_plan("SELECT a FROM example WHERE a < 5")
322-
/// .unwrap();
323-
/// let plan = ctx.optimize(&plan).unwrap();
324-
/// let physical_plan = ctx.create_physical_plan(&plan).await.unwrap();
320+
/// let dataframe = ctx.sql("SELECT a FROM example WHERE a < 5").await.unwrap();
321+
/// let physical_plan = dataframe.create_physical_plan().await.unwrap();
325322
///
326323
/// // Format using display string
327324
/// let displayable_plan = displayable(physical_plan.as_ref());

datafusion/core/src/physical_plan/planner.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2234,10 +2234,11 @@ mod tests {
22342234
let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
22352235
let ctx = SessionContext::new();
22362236

2237-
let logical_plan =
2238-
LogicalPlanBuilder::from(ctx.read_table(Arc::new(table))?.to_logical_plan()?)
2239-
.aggregate(vec![col("d1")], vec![sum(col("d2"))])?
2240-
.build()?;
2237+
let logical_plan = LogicalPlanBuilder::from(
2238+
ctx.read_table(Arc::new(table))?.into_optimized_plan()?,
2239+
)
2240+
.aggregate(vec![col("d1")], vec![sum(col("d2"))])?
2241+
.build()?;
22412242

22422243
let execution_plan = plan(&logical_plan).await?;
22432244
let formatted = format!("{:?}", execution_plan);
@@ -2445,20 +2446,21 @@ mod tests {
24452446
let testdata = crate::test_util::arrow_test_data();
24462447
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
24472448
let options = CsvReadOptions::new().schema_infer_max_records(100);
2448-
let logical_plan = match ctx.read_csv(path, options).await?.to_logical_plan()? {
2449-
LogicalPlan::TableScan(ref scan) => {
2450-
let mut scan = scan.clone();
2451-
scan.table_name = name.to_string();
2452-
let new_schema = scan
2453-
.projected_schema
2454-
.as_ref()
2455-
.clone()
2456-
.replace_qualifier(name);
2457-
scan.projected_schema = Arc::new(new_schema);
2458-
LogicalPlan::TableScan(scan)
2459-
}
2460-
_ => unimplemented!(),
2461-
};
2449+
let logical_plan =
2450+
match ctx.read_csv(path, options).await?.into_optimized_plan()? {
2451+
LogicalPlan::TableScan(ref scan) => {
2452+
let mut scan = scan.clone();
2453+
scan.table_name = name.to_string();
2454+
let new_schema = scan
2455+
.projected_schema
2456+
.as_ref()
2457+
.clone()
2458+
.replace_qualifier(name);
2459+
scan.projected_schema = Arc::new(new_schema);
2460+
LogicalPlan::TableScan(scan)
2461+
}
2462+
_ => unimplemented!(),
2463+
};
24622464
Ok(LogicalPlanBuilder::from(logical_plan))
24632465
}
24642466

@@ -2468,7 +2470,7 @@ mod tests {
24682470
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
24692471
let options = CsvReadOptions::new().schema_infer_max_records(100);
24702472
Ok(LogicalPlanBuilder::from(
2471-
ctx.read_csv(path, options).await?.to_logical_plan()?,
2473+
ctx.read_csv(path, options).await?.into_optimized_plan()?,
24722474
))
24732475
}
24742476
}

0 commit comments

Comments
 (0)