Skip to content

Commit 34db3aa

Browse files
committed
Add InsertExec, port in memory insert to use DataSink
1 parent d0aadd6 commit 34db3aa

File tree

8 files changed

+281
-209
lines changed

8 files changed

+281
-209
lines changed

datafusion/core/src/datasource/datasource.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ use crate::execution::context::SessionState;
3131
use crate::logical_expr::Expr;
3232
use crate::physical_plan::ExecutionPlan;
3333

34+
use super::sink::DataSink;
35+
3436
/// Source table
3537
#[async_trait]
3638
pub trait TableProvider: Sync + Send {
@@ -98,12 +100,11 @@ pub trait TableProvider: Sync + Send {
98100
None
99101
}
100102

101-
/// Insert into this table
102-
async fn insert_into(
103-
&self,
104-
_state: &SessionState,
105-
_input: Arc<dyn ExecutionPlan>,
106-
) -> Result<Arc<dyn ExecutionPlan>> {
103+
/// Return a [`DataSink`] suitable for writing to this table
104+
///
105+
/// Each insert or other DML plan will call this function. Each
106+
/// returned value can be unique.
107+
async fn write_to(&self) -> Result<Arc<dyn DataSink>> {
107108
let msg = "Insertion not implemented for this table".to_owned();
108109
Err(DataFusionError::NotImplemented(msg))
109110
}

datafusion/core/src/datasource/memory.rs

Lines changed: 50 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
2020
use futures::StreamExt;
2121
use std::any::Any;
22+
use std::fmt::Debug;
2223
use std::sync::Arc;
2324

2425
use arrow::datatypes::SchemaRef;
@@ -30,13 +31,14 @@ use crate::datasource::{TableProvider, TableType};
3031
use crate::error::{DataFusionError, Result};
3132
use crate::execution::context::SessionState;
3233
use crate::logical_expr::Expr;
33-
use crate::physical_plan::common;
3434
use crate::physical_plan::common::AbortOnDropSingle;
3535
use crate::physical_plan::memory::MemoryExec;
36-
use crate::physical_plan::memory::MemoryWriteExec;
3736
use crate::physical_plan::ExecutionPlan;
37+
use crate::physical_plan::{common, SendableRecordBatchStream};
3838
use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
3939

40+
use super::sink::DataSink;
41+
4042
/// Type alias for partition data
4143
pub type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
4244

@@ -164,50 +166,58 @@ impl TableProvider for MemTable {
164166
)?))
165167
}
166168

167-
/// Inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`].
168-
/// The [`ExecutionPlan`] must have the same schema as this [`MemTable`].
169-
///
170-
/// # Arguments
171-
///
172-
/// * `state` - The [`SessionState`] containing the context for executing the plan.
173-
/// * `input` - The [`ExecutionPlan`] to execute and insert.
174-
///
175-
/// # Returns
176-
///
177-
/// * A `Result` indicating success or failure.
178-
async fn insert_into(
179-
&self,
180-
_state: &SessionState,
181-
input: Arc<dyn ExecutionPlan>,
182-
) -> Result<Arc<dyn ExecutionPlan>> {
183-
// Create a physical plan from the logical plan.
184-
// Check that the schema of the plan matches the schema of this table.
185-
if !input.schema().eq(&self.schema) {
186-
return Err(DataFusionError::Plan(
187-
"Inserting query must have the same schema with the table.".to_string(),
169+
async fn write_to(&self) -> Result<Arc<dyn DataSink>> {
170+
if self.batches.is_empty() {
171+
return Err(DataFusionError::Internal(
172+
"Can not insert into table without partitions.".to_string(),
188173
));
189174
}
175+
Ok(Arc::new(MemSink::new(self.batches.clone())))
176+
}
177+
}
190178

191-
if self.batches.is_empty() {
192-
return Err(DataFusionError::Plan(
193-
"The table must have partitions.".to_string(),
194-
));
179+
/// Implements for writing to a [`MemTable`]
180+
struct MemSink {
181+
/// Target locations for writing data
182+
batches: Vec<PartitionData>,
183+
}
184+
185+
impl Debug for MemSink {
186+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187+
f.debug_struct("MemSink")
188+
.field("num_partitions", &self.batches.len())
189+
.finish()
190+
}
191+
}
192+
193+
impl MemSink {
194+
fn new(batches: Vec<PartitionData>) -> Self {
195+
Self { batches }
196+
}
197+
}
198+
199+
#[async_trait]
200+
impl DataSink for MemSink {
201+
async fn write_all(&self, mut data: SendableRecordBatchStream) -> Result<u64> {
202+
let num_partitions = self.batches.len();
203+
204+
// buffer up the data round robin stle into num_partitions new buffers
205+
let mut new_batches = vec![vec![]; num_partitions];
206+
let mut i = 0;
207+
let mut row_count = 0;
208+
while let Some(batch) = data.next().await.transpose()? {
209+
row_count += batch.num_rows();
210+
new_batches[i].push(batch);
211+
i = (i + 1) % num_partitions;
195212
}
196213

197-
let input = if self.batches.len() > 1 {
198-
Arc::new(RepartitionExec::try_new(
199-
input,
200-
Partitioning::RoundRobinBatch(self.batches.len()),
201-
)?)
202-
} else {
203-
input
204-
};
214+
// write the outputs into
215+
for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
216+
// Append all the new batches in one go to minimize locking overhead
217+
target.write().await.append(&mut batches);
218+
}
205219

206-
Ok(Arc::new(MemoryWriteExec::try_new(
207-
input,
208-
self.batches.clone(),
209-
self.schema.clone(),
210-
)?))
220+
Ok(row_count as u64)
211221
}
212222
}
213223

datafusion/core/src/datasource/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub mod file_format;
2525
pub mod listing;
2626
pub mod listing_table_factory;
2727
pub mod memory;
28+
pub mod sink;
2829
pub mod streaming;
2930
pub mod view;
3031

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Data sink traits
19+
use async_trait::async_trait;
20+
use datafusion_common::Result;
21+
22+
use crate::physical_plan::SendableRecordBatchStream;
23+
24+
/// `DataSink` implements writing streams of [`RecordBatch`]es to
25+
/// destinations.
26+
#[async_trait]
27+
pub trait DataSink: std::fmt::Debug + Send + Sync {
28+
// TODO add desired input ordering
29+
// How does this sink want its input ordered?
30+
31+
/// Writes the data to the sink, returns the number of rows written
32+
///
33+
/// This method will be called exactly once during each DML
34+
/// statement. Thus prior to return, the sink should do any commit
35+
/// or rollback required.
36+
async fn write_all(&self, data: SendableRecordBatchStream) -> Result<u64>;
37+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Execution plan for writing data to [`DataSink`]s
19+
20+
use super::expressions::PhysicalSortExpr;
21+
use super::{
22+
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
23+
};
24+
use crate::datasource::sink::DataSink;
25+
use crate::error::Result;
26+
use arrow::datatypes::SchemaRef;
27+
use arrow::record_batch::RecordBatch;
28+
use arrow_array::{ArrayRef, UInt64Array};
29+
use arrow_schema::{DataType, Field, Schema};
30+
use core::fmt;
31+
use futures::StreamExt;
32+
use std::any::Any;
33+
use std::sync::Arc;
34+
35+
use crate::execution::context::TaskContext;
36+
use crate::physical_plan::stream::RecordBatchStreamAdapter;
37+
use crate::physical_plan::Distribution;
38+
use datafusion_common::DataFusionError;
39+
40+
/// Execution plan for writing record batches to a [`DataSink`]
41+
///
42+
/// Returns a single row with the number of vaules written
43+
pub struct InsertExec {
44+
/// Input plan that produces the record batches to be written.
45+
input: Arc<dyn ExecutionPlan>,
46+
/// Sink to whic to write
47+
sink: Arc<dyn DataSink>,
48+
/// Schema describing the structure of the data.
49+
schema: SchemaRef,
50+
}
51+
52+
impl fmt::Debug for InsertExec {
53+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
54+
write!(f, "InsertExec schema: {:?}", self.schema)
55+
}
56+
}
57+
58+
impl InsertExec {
59+
/// Create a new execution plan to write to `sink`
60+
pub fn new(input: Arc<dyn ExecutionPlan>, sink: Arc<dyn DataSink>) -> Self {
61+
Self {
62+
input,
63+
sink,
64+
schema: make_count_schema(),
65+
}
66+
}
67+
}
68+
69+
impl ExecutionPlan for InsertExec {
70+
/// Return a reference to Any that can be used for downcasting
71+
fn as_any(&self) -> &dyn Any {
72+
self
73+
}
74+
75+
/// Get the schema for this execution plan
76+
fn schema(&self) -> SchemaRef {
77+
self.schema.clone()
78+
}
79+
80+
fn output_partitioning(&self) -> Partitioning {
81+
Partitioning::UnknownPartitioning(1)
82+
}
83+
84+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
85+
None
86+
}
87+
88+
fn required_input_distribution(&self) -> Vec<Distribution> {
89+
vec![Distribution::SinglePartition]
90+
}
91+
92+
fn maintains_input_order(&self) -> Vec<bool> {
93+
vec![false]
94+
}
95+
96+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
97+
vec![self.input.clone()]
98+
}
99+
100+
fn with_new_children(
101+
self: Arc<Self>,
102+
children: Vec<Arc<dyn ExecutionPlan>>,
103+
) -> Result<Arc<dyn ExecutionPlan>> {
104+
Ok(Arc::new(Self {
105+
input: children[0].clone(),
106+
sink: self.sink.clone(),
107+
schema: self.schema.clone(),
108+
}))
109+
}
110+
111+
/// Execute the plan and return a stream of record batches for the specified partition.
112+
/// Depending on the number of input partitions and MemTable partitions, it will choose
113+
/// either a less lock acquiring or a locked implementation.
114+
fn execute(
115+
&self,
116+
partition: usize,
117+
context: Arc<TaskContext>,
118+
) -> Result<SendableRecordBatchStream> {
119+
if partition != 0 {
120+
return Err(DataFusionError::Internal(
121+
format!("Invalid requested partition {partition}. InsertExec has only a single partition."
122+
)));
123+
}
124+
125+
// Execute each of our own inputs and pass them to the sink
126+
let input_partition_count = self.input.output_partitioning().partition_count();
127+
if input_partition_count != 1 {
128+
return Err(DataFusionError::Internal(format!(
129+
"Invalid input partition count {input_partition_count}. \
130+
InsertExec needs only a single partition."
131+
)));
132+
}
133+
134+
let data = self.input.execute(0, context.clone())?;
135+
let schema = self.schema.clone();
136+
let sink = self.sink.clone();
137+
138+
let stream = futures::stream::once(async move {
139+
sink.write_all(data).await.map(make_count_batch)
140+
})
141+
.boxed();
142+
143+
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
144+
}
145+
146+
fn fmt_as(
147+
&self,
148+
t: DisplayFormatType,
149+
f: &mut std::fmt::Formatter,
150+
) -> std::fmt::Result {
151+
match t {
152+
DisplayFormatType::Default => {
153+
write!(
154+
f,
155+
"InsertExec: input_partition_count={}",
156+
self.input.output_partitioning().partition_count()
157+
)
158+
}
159+
}
160+
}
161+
162+
fn statistics(&self) -> Statistics {
163+
Statistics::default()
164+
}
165+
}
166+
167+
fn make_count_batch(count: u64) -> RecordBatch {
168+
let array = Arc::new(UInt64Array::from_iter_values(vec![count])) as ArrayRef;
169+
170+
RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap()
171+
}
172+
173+
fn make_count_schema() -> SchemaRef {
174+
// define a schema.
175+
Arc::new(Schema::new(vec![Field::new(
176+
"count",
177+
DataType::UInt64,
178+
false,
179+
)]))
180+
}

0 commit comments

Comments
 (0)