Skip to content

Commit df5b5dc

Browse files
committed
dml: store destination as table source
This patch adds a field `dst` into struct `DmlStatement`. No reason to do it in the other way than for a table scan, where we store the table source. Whereas storing a dst adds the ability to analyze the table for which DML operation is performed. It is useful for analyzer/optimizer passes writing.
1 parent d6ff3db commit df5b5dc

File tree

11 files changed

+108
-55
lines changed

11 files changed

+108
-55
lines changed

datafusion-examples/examples/dataframe_output.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use datafusion::datasource::provider_as_source;
1819
use datafusion::{dataframe::DataFrameWriteOptions, prelude::*};
1920
use datafusion_common::config::CsvOptions;
2021
use datafusion_common::{parsers::CompressionTypeVariant, DataFusionError};
22+
use datafusion_sql::TableReference;
2123

2224
/// This example demonstrates the various methods to write out a DataFrame to local storage.
2325
/// See datafusion-examples/examples/external_dependency/dataframe-to-s3.rs for an example
@@ -32,20 +34,26 @@ async fn main() -> Result<(), DataFusionError> {
3234
df = df.with_column_renamed("column1", "tablecol1").unwrap();
3335

3436
ctx.sql(
35-
"create external table
37+
"create external table
3638
test(tablecol1 varchar)
37-
stored as parquet
39+
stored as parquet
3840
location './datafusion-examples/test_table/'",
3941
)
4042
.await?
4143
.collect()
4244
.await?;
4345

46+
let dst = ctx.table_provider(TableReference::bare("test")).await?;
47+
4448
// This is equivalent to INSERT INTO test VALUES ('a'), ('b'), ('c').
4549
// The behavior of write_table depends on the TableProvider's implementation
4650
// of the insert_into method.
4751
df.clone()
48-
.write_table("test", DataFrameWriteOptions::new())
52+
.write_table(
53+
"test",
54+
provider_as_source(dst),
55+
DataFrameWriteOptions::new(),
56+
)
4957
.await?;
5058

5159
df.clone()

datafusion/core/src/dataframe/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions};
5252
use datafusion_common::{
5353
plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions,
5454
};
55-
use datafusion_expr::{case, is_null, lit, SortExpr};
55+
use datafusion_expr::{case, is_null, lit, SortExpr, TableSource};
5656
use datafusion_expr::{
5757
utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
5858
};
@@ -1296,13 +1296,13 @@ impl DataFrame {
12961296
pub async fn write_table(
12971297
self,
12981298
table_name: &str,
1299+
dst: Arc<dyn TableSource>,
12991300
write_options: DataFrameWriteOptions,
13001301
) -> Result<Vec<RecordBatch>, DataFusionError> {
1301-
let arrow_schema = Schema::from(self.schema());
13021302
let plan = LogicalPlanBuilder::insert_into(
13031303
self.plan,
13041304
table_name.to_owned(),
1305-
&arrow_schema,
1305+
dst,
13061306
write_options.overwrite,
13071307
)?
13081308
.build()?;

datafusion/core/src/datasource/listing/table.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,7 @@ mod tests {
10911091
use datafusion_physical_expr::PhysicalSortExpr;
10921092
use datafusion_physical_plan::ExecutionPlanProperties;
10931093

1094+
use datafusion_sql::TableReference;
10941095
use tempfile::TempDir;
10951096

10961097
#[tokio::test]
@@ -1890,6 +1891,11 @@ mod tests {
18901891
_ => panic!("Unrecognized file extension {file_type_ext}"),
18911892
}
18921893

1894+
let dst_table = session_ctx
1895+
.table_provider(TableReference::bare("t"))
1896+
.await
1897+
.unwrap();
1898+
18931899
// Create and register the source table with the provided schema and inserted data
18941900
let source_table = Arc::new(MemTable::try_new(
18951901
schema.clone(),
@@ -1905,8 +1911,13 @@ mod tests {
19051911
// Since logical plan contains a filter, increasing parallelism is helpful.
19061912
// Therefore, we will have 8 partitions in the final plan.
19071913
// Create an insert plan to insert the source data into the initial table
1908-
let insert_into_table =
1909-
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
1914+
let insert_into_table = LogicalPlanBuilder::insert_into(
1915+
scan_plan,
1916+
"t",
1917+
provider_as_source(dst_table),
1918+
false,
1919+
)?
1920+
.build()?;
19101921
// Create a physical plan from the insert plan
19111922
let plan = session_ctx
19121923
.state()

datafusion/core/src/datasource/memory.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -625,8 +625,13 @@ mod tests {
625625
// Create a table scan logical plan to read from the source table
626626
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?;
627627
// Create an insert plan to insert the source data into the initial table
628-
let insert_into_table =
629-
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
628+
let insert_into_table = LogicalPlanBuilder::insert_into(
629+
scan_plan,
630+
"t",
631+
provider_as_source(Arc::clone(&initial_table) as Arc<_>),
632+
false,
633+
)?
634+
.build()?;
630635
// Create a physical plan from the insert plan
631636
let plan = session_ctx
632637
.state()

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -792,6 +792,7 @@ mod tests {
792792
use crate::datasource::file_format::test_util::scan_format;
793793
use crate::datasource::listing::{FileRange, ListingOptions};
794794
use crate::datasource::object_store::ObjectStoreUrl;
795+
use crate::datasource::provider_as_source;
795796
use crate::execution::context::SessionState;
796797
use crate::physical_plan::displayable;
797798
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
@@ -816,6 +817,7 @@ mod tests {
816817
use datafusion_physical_plan::ExecutionPlanProperties;
817818

818819
use chrono::{TimeZone, Utc};
820+
use datafusion_sql::TableReference;
819821
use futures::StreamExt;
820822
use object_store::local::LocalFileSystem;
821823
use object_store::path::Path;
@@ -2024,8 +2026,16 @@ mod tests {
20242026
)
20252027
.await
20262028
.unwrap();
2027-
df.write_table("my_table", DataFrameWriteOptions::new())
2028-
.await?;
2029+
let dst = ctx
2030+
.table_provider(TableReference::bare("my_table"))
2031+
.await
2032+
.unwrap();
2033+
df.write_table(
2034+
"my_table",
2035+
provider_as_source(dst),
2036+
DataFrameWriteOptions::new(),
2037+
)
2038+
.await?;
20292039

20302040
// create a new context and verify that the results were saved to a partitioned parquet file
20312041
let ctx = SessionContext::new();

datafusion/core/src/physical_planner.rs

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ use arrow_array::builder::StringBuilder;
6868
use arrow_array::RecordBatch;
6969
use datafusion_common::display::ToStringifiedPlan;
7070
use datafusion_common::{
71-
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
72-
ScalarValue,
71+
internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, ScalarValue,
7372
};
7473
use datafusion_expr::dml::CopyTo;
7574
use datafusion_expr::expr::{
@@ -540,36 +539,26 @@ impl DefaultPhysicalPlanner {
540539
.await?
541540
}
542541
LogicalPlan::Dml(DmlStatement {
543-
table_name,
544542
op: WriteOp::InsertInto,
543+
dst,
545544
..
546545
}) => {
547-
let name = table_name.table();
548-
let schema = session_state.schema_for_ref(table_name.clone())?;
549-
if let Some(provider) = schema.table(name).await? {
550-
let input_exec = children.one()?;
551-
provider
552-
.insert_into(session_state, input_exec, false)
553-
.await?
554-
} else {
555-
return exec_err!("Table '{table_name}' does not exist");
556-
}
546+
let input_exec = children.one()?;
547+
let provider = source_as_provider(dst)?;
548+
provider
549+
.insert_into(session_state, input_exec, false)
550+
.await?
557551
}
558552
LogicalPlan::Dml(DmlStatement {
559-
table_name,
553+
dst,
560554
op: WriteOp::InsertOverwrite,
561555
..
562556
}) => {
563-
let name = table_name.table();
564-
let schema = session_state.schema_for_ref(table_name.clone())?;
565-
if let Some(provider) = schema.table(name).await? {
566-
let input_exec = children.one()?;
567-
provider
568-
.insert_into(session_state, input_exec, true)
569-
.await?
570-
} else {
571-
return exec_err!("Table '{table_name}' does not exist");
572-
}
557+
let input_exec = children.one()?;
558+
let provider = source_as_provider(dst)?;
559+
provider
560+
.insert_into(session_state, input_exec, true)
561+
.await?
573562
}
574563
LogicalPlan::Window(Window {
575564
input, window_expr, ..

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,11 +302,9 @@ impl LogicalPlanBuilder {
302302
pub fn insert_into(
303303
input: LogicalPlan,
304304
table_name: impl Into<TableReference>,
305-
table_schema: &Schema,
305+
dst: Arc<dyn TableSource>,
306306
overwrite: bool,
307307
) -> Result<Self> {
308-
let table_schema = table_schema.clone().to_dfschema_ref()?;
309-
310308
let op = if overwrite {
311309
WriteOp::InsertOverwrite
312310
} else {
@@ -315,7 +313,7 @@ impl LogicalPlanBuilder {
315313

316314
Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
317315
table_name.into(),
318-
table_schema,
316+
dst,
319317
op,
320318
Arc::new(input),
321319
))))

datafusion/expr/src/logical_plan/dml.rs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow::datatypes::{DataType, Field, Schema};
2424
use datafusion_common::file_options::file_type::FileType;
2525
use datafusion_common::{DFSchemaRef, TableReference};
2626

27-
use crate::LogicalPlan;
27+
use crate::{LogicalPlan, TableSource};
2828

2929
/// Operator that copies the contents of a database to file(s)
3030
#[derive(Clone)]
@@ -73,12 +73,12 @@ impl Hash for CopyTo {
7373

7474
/// The operator that modifies the content of a database (adapted from
7575
/// substrait WriteRel)
76-
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76+
#[derive(Clone)]
7777
pub struct DmlStatement {
7878
/// The table name
7979
pub table_name: TableReference,
80-
/// The schema of the table (must align with Rel input)
81-
pub table_schema: DFSchemaRef,
80+
/// The operation destination
81+
pub dst: Arc<dyn TableSource>,
8282
/// The type of operation to perform
8383
pub op: WriteOp,
8484
/// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
@@ -87,17 +87,49 @@ pub struct DmlStatement {
8787
pub output_schema: DFSchemaRef,
8888
}
8989

90+
impl Debug for DmlStatement {
91+
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
92+
f.debug_struct("TableScan")
93+
.field("table_name", &self.table_name)
94+
.field("source", &"...")
95+
.field("op", &self.op)
96+
.field("input", &self.input)
97+
.field("output_schema", &self.output_schema)
98+
.finish_non_exhaustive()
99+
}
100+
}
101+
102+
impl PartialEq for DmlStatement {
103+
fn eq(&self, other: &Self) -> bool {
104+
self.table_name == other.table_name
105+
&& self.op == other.op
106+
&& self.input == other.input
107+
&& self.output_schema == other.output_schema
108+
}
109+
}
110+
111+
impl Eq for DmlStatement {}
112+
113+
impl Hash for DmlStatement {
114+
fn hash<H: Hasher>(&self, state: &mut H) {
115+
self.table_name.hash(state);
116+
self.op.hash(state);
117+
self.input.hash(state);
118+
self.output_schema.hash(state);
119+
}
120+
}
121+
90122
impl DmlStatement {
91123
/// Creates a new DML statement with the output schema set to a single `count` column.
92124
pub fn new(
93125
table_name: TableReference,
94-
table_schema: DFSchemaRef,
126+
dst: Arc<dyn TableSource>,
95127
op: WriteOp,
96128
input: Arc<LogicalPlan>,
97129
) -> Self {
98130
Self {
99131
table_name,
100-
table_schema,
132+
dst,
101133
op,
102134
input,
103135

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -810,15 +810,15 @@ impl LogicalPlan {
810810
}
811811
LogicalPlan::Dml(DmlStatement {
812812
table_name,
813-
table_schema,
813+
dst,
814814
op,
815815
..
816816
}) => {
817817
self.assert_no_expressions(expr)?;
818818
let input = self.only_input(inputs)?;
819819
Ok(LogicalPlan::Dml(DmlStatement::new(
820820
table_name.clone(),
821-
Arc::clone(table_schema),
821+
Arc::clone(dst),
822822
op.clone(),
823823
Arc::new(input),
824824
)))
@@ -1987,7 +1987,7 @@ impl LogicalPlan {
19871987
.map(|i| &input_columns[*i])
19881988
.collect::<Vec<&Column>>();
19891989
// get items from input_columns indexed by list_col_indices
1990-
write!(f, "Unnest: lists[{}] structs[{}]",
1990+
write!(f, "Unnest: lists[{}] structs[{}]",
19911991
expr_vec_fmt!(list_type_columns),
19921992
expr_vec_fmt!(struct_type_columns))
19931993
}

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,14 +248,14 @@ impl TreeNode for LogicalPlan {
248248
}),
249249
LogicalPlan::Dml(DmlStatement {
250250
table_name,
251-
table_schema,
251+
dst,
252252
op,
253253
input,
254254
output_schema,
255255
}) => rewrite_arc(input, f)?.update_data(|input| {
256256
LogicalPlan::Dml(DmlStatement {
257257
table_name,
258-
table_schema,
258+
dst,
259259
op,
260260
input,
261261
output_schema,

0 commit comments

Comments
 (0)