-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Simplify Write API: Add InsertExec
, port in memory insert to use DataSink
#6347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
_state: &SessionState, | ||
_input: Arc<dyn ExecutionPlan>, | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
/// Return a [`DataSink`] suitable for writing to this table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the new interface -- I think this probably should also get some sort of "options" parameter too so that we can provide a way to pass in settings like row_group_size
from the various inserts).
I prefer to add that parameter at some future point when I have an actual usecase with COPY
rather than guessing exactly what is needed.
let num_partitions = self.batches.len(); | ||
|
||
// buffer up the data round robin stle into num_partitions new buffers | ||
let mut new_batches = vec![vec![]; num_partitions]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am quite pleased that by following @tustvold 's guidance and pushing the partitioning choice into the DataSink
implementations, that the logic becomes quite a bit simpler and flexible with seemingly no performance penalty
use crate::physical_plan::Distribution; | ||
use datafusion_common::DataFusionError; | ||
|
||
/// Execution plan for writing record batches to a [`DataSink`] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically a generic version of MemoryWriteExec
that calls a dyn DataSink
to do the actual writing
|
||
// Test the less-lock mode by inserting a large number of batches into a table. | ||
#[tokio::test] | ||
async fn test_one_to_one_mode() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not quite sure how to port these tests yet -- I am thinking about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I studied these tests carefully. I believe these cases with different partitioning strategies are already covered by https://github.com/apache/arrow-datafusion/blob/eb918ab217213d5e07e71e53c118a8409d2f71a0/datafusion/core/src/datasource/memory.rs#L455-L476
f70741b
to
925c01b
Compare
Given the API change proposal does not have consensus I am closing this PR for now until we get one. I think I can get most of the benefit I was looking for from #6354 |
Which issue does this PR close?
Closes #6339
Rationale for this change
I want to make it easier for DataFusion to be extended with new write sources. I want to use this both for
COPY ... TO ...
statements as well as @JanKaul is looking into using it for Delta.rs: #6339 (comment)What changes are included in this PR?
MemTable
write to be in terms of DataSink (it was really helpful to have a specific implemetation to try this with -- thanks @metesynnada )MemoryWriteExec
, theMemTable
specific execution planAre these changes tested?
Yes
Are there any user-facing changes?
The API for insert changes.