Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions crates/executor/src/datafusion/logical_plan/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,31 @@ impl MergeIntoCOWSink {
pub fn new(
input: Arc<LogicalPlan>,
target: DataFusionTable,
has_insert: bool,
has_update: bool,
has_delete: bool,
) -> datafusion_common::Result<Self> {
let field = Field::new("number of rows updated", DataType::Int64, false);
let schema = DFSchema::new_with_metadata(
vec![(None, Arc::new(field))],
std::collections::HashMap::new(),
)?;
let inserted = Arc::new(Field::new(
"number of rows inserted",
DataType::Int64,
false,
));
let updated = Arc::new(Field::new("number of rows updated", DataType::Int64, false));
let deleted = Arc::new(Field::new("number of rows deleted", DataType::Int64, false));
let mut fields: Vec<(Option<datafusion_common::TableReference>, Arc<Field>)> = Vec::new();
if has_insert {
fields.push((None, inserted));
}
if has_update {
fields.push((None, updated.clone()));
}
if has_delete {
fields.push((None, deleted));
}
if fields.is_empty() {
fields.push((None, updated));
}
let schema = DFSchema::new_with_metadata(fields, std::collections::HashMap::new())?;

Ok(Self {
input,
Expand Down
233 changes: 189 additions & 44 deletions crates/executor/src/datafusion/physical_plan/merge.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use datafusion::{
arrow::{
array::{Array, BooleanArray, RecordBatch, StringArray, downcast_array},
array::{Array, ArrayRef, BooleanArray, RecordBatch, StringArray, downcast_array},
compute::{
filter, filter_record_batch,
kernels::cmp::{distinct, eq},
Expand All @@ -15,12 +15,10 @@ use datafusion_iceberg::{
DataFusionTable, error::Error as DataFusionIcebergError, table::write_parquet_data_files,
};
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, PlanProperties,
RecordBatchStream, SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream,
SendableRecordBatchStream,
coalesce_partitions::CoalescePartitionsExec,
execution_plan::{Boundedness, EmissionType},
expressions::Column,
projection::ProjectionExec,
stream::RecordBatchStreamAdapter,
};
use futures::{Stream, StreamExt};
Expand All @@ -31,6 +29,8 @@ use snafu::ResultExt;
use std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
ops::BitAnd,
sync::atomic::{AtomicI64, Ordering},
sync::{Arc, Mutex},
task::Poll,
thread::available_parallelism,
Expand All @@ -42,6 +42,8 @@ pub(crate) static TARGET_EXISTS_COLUMN: &str = "__target_exists";
pub(crate) static SOURCE_EXISTS_COLUMN: &str = "__source_exists";
pub(crate) static DATA_FILE_PATH_COLUMN: &str = "__data_file_path";
pub(crate) static MANIFEST_FILE_PATH_COLUMN: &str = "__manifest_file_path";
pub(crate) static MERGE_UPDATED_COLUMN: &str = "__merge_row_updated";
pub(crate) static MERGE_INSERTED_COLUMN: &str = "__merge_row_inserted";
static THREAD_FILE_RATIO: usize = 4;

#[derive(Debug)]
Expand Down Expand Up @@ -137,6 +139,8 @@ impl ExecutionPlan for MergeIntoCOWSinkExec {
let schema = Arc::new(self.schema.as_arrow().clone());

let matching_files: Arc<Mutex<Option<ManifestAndDataFiles>>> = Arc::default();
let updated_rows: Arc<AtomicI64> = Arc::new(AtomicI64::new(0));
let inserted_rows: Arc<AtomicI64> = Arc::new(AtomicI64::new(0));

let coalesce = CoalescePartitionsExec::new(self.input.clone());

Expand All @@ -146,16 +150,24 @@ impl ExecutionPlan for MergeIntoCOWSinkExec {
matching_files.clone(),
));

// Remove auxiliary columns
let projection =
ProjectionExec::try_new(schema_projection(&self.input.schema()), filtered)?;

let batches = projection.execute(partition, context.clone())?;
let input_batches = filtered.execute(partition, context.clone())?;
let count_and_project_stream = MergeCOWCountAndProjectStream::new(
input_batches,
updated_rows.clone(),
inserted_rows.clone(),
);

let stream = futures::stream::once({
let tabular = self.target.tabular.clone();
let branch = self.target.branch.clone();
let schema = schema.clone();
let updated_rows = Arc::clone(&updated_rows);
let inserted_rows = Arc::clone(&inserted_rows);
let projected_schema = count_and_project_stream.projected_schema();
let batches: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
projected_schema,
count_and_project_stream,
));
async move {
#[allow(clippy::unwrap_used)]
let value = tabular.read().unwrap().clone();
Expand Down Expand Up @@ -204,7 +216,37 @@ impl ExecutionPlan for MergeIntoCOWSinkExec {
#[allow(clippy::unwrap_used)]
let mut lock = tabular.write().unwrap();
*lock = Tabular::Table(table);
Ok(RecordBatch::new_empty(schema))
// Return a one-row result for DML, so clients don't render "No data result" on success.
let updated = updated_rows.load(Ordering::Relaxed);
let inserted = inserted_rows.load(Ordering::Relaxed);
// MERGE DELETE is not supported yet
let deleted = 0i64;

let arrays = schema
.fields()
.iter()
.map(|f| {
let v = match f.name().as_str() {
"number of rows inserted" => inserted,
"number of rows updated" => updated,
"number of rows deleted" => deleted,
other => {
return Err(DataFusionError::Internal(format!(
"Unexpected MERGE result column: {other}"
)));
}
};
let a: ArrayRef =
Arc::new(datafusion::arrow::array::Int64Array::from(vec![v]));
Ok(a)
})
.collect::<Result<Vec<_>, DataFusionError>>()?;

RecordBatch::try_new(schema.clone(), arrays).map_err(|e| {
DataFusionError::Internal(format!(
"Failed to build MERGE result record batch: {e}"
))
})
}
})
.boxed();
Expand All @@ -213,6 +255,142 @@ impl ExecutionPlan for MergeIntoCOWSinkExec {
}
}

pin_project! {
/// Stream wrapper that counts per-action MERGE rows (insert/update markers) and projects away
/// auxiliary merge columns before writing to data files.
pub struct MergeCOWCountAndProjectStream {
projection_indices: Vec<usize>,
projected_schema: Arc<Schema>,
updated_idx: Option<usize>,
inserted_idx: Option<usize>,
updated_rows: Arc<AtomicI64>,
inserted_rows: Arc<AtomicI64>,

#[pin]
input: SendableRecordBatchStream,
}
}

impl MergeCOWCountAndProjectStream {
fn new(
input: SendableRecordBatchStream,
updated_rows: Arc<AtomicI64>,
inserted_rows: Arc<AtomicI64>,
) -> Self {
let input_schema = input.schema();

let updated_idx = input_schema.index_of(MERGE_UPDATED_COLUMN).ok();
let inserted_idx = input_schema.index_of(MERGE_INSERTED_COLUMN).ok();

// Drop auxiliary columns so we only write table columns to parquet
let projection_indices: Vec<usize> = input_schema
.fields()
.iter()
.enumerate()
.filter_map(|(i, f)| {
let name = f.name();
if name != SOURCE_EXISTS_COLUMN
&& name != DATA_FILE_PATH_COLUMN
&& name != MANIFEST_FILE_PATH_COLUMN
&& name != MERGE_UPDATED_COLUMN
&& name != MERGE_INSERTED_COLUMN
{
Some(i)
} else {
None
}
})
.collect();

let projected_fields = projection_indices
.iter()
.map(|i| input_schema.field(*i).clone())
.collect::<Vec<_>>();

let projected_schema = Arc::new(Schema::new(projected_fields));

Self {
projection_indices,
projected_schema,
updated_idx,
inserted_idx,
updated_rows,
inserted_rows,
input,
}
}

fn projected_schema(&self) -> Arc<Schema> {
self.projected_schema.clone()
}
}

impl Stream for MergeCOWCountAndProjectStream {
type Item = Result<RecordBatch, DataFusionError>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut project = self.project();
match project.input.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(batch))) => {
if let Some(updated_idx) = *project.updated_idx
&& let Some(col) = batch.columns().get(updated_idx)
{
let updated = downcast_array::<BooleanArray>(col.as_ref());
let n = usize_to_i64_saturating(count_true_and_valid(&updated));
project.updated_rows.fetch_add(n, Ordering::Relaxed);
}
if let Some(inserted_idx) = *project.inserted_idx
&& let Some(col) = batch.columns().get(inserted_idx)
{
let inserted = downcast_array::<BooleanArray>(col.as_ref());
let n = usize_to_i64_saturating(count_true_and_valid(&inserted));
project.inserted_rows.fetch_add(n, Ordering::Relaxed);
}

let cols = project
.projection_indices
.iter()
.map(|i| batch.column(*i).clone())
.collect::<Vec<_>>();

let projected = RecordBatch::try_new(project.projected_schema.clone(), cols)
.map_err(|e| {
DataFusionError::Internal(format!(
"Failed to project MERGE record batch: {e}"
))
})?;
Poll::Ready(Some(Ok(projected)))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

/// Fast count of `true` values, treating NULL as false, using Arrow bitmaps.
#[inline]
fn count_true_and_valid(arr: &BooleanArray) -> usize {
if arr.null_count() == 0 {
return arr.values().count_set_bits();
}

if let Some(nulls) = arr.logical_nulls() {
let valid = nulls.inner();
return arr.values().bitand(valid).count_set_bits();
}

arr.values().count_set_bits()
}

#[inline]
fn usize_to_i64_saturating(v: usize) -> i64 {
i64::try_from(v).unwrap_or(i64::MAX)
}

#[derive(Debug)]
struct MergeCOWFilterExec {
input: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -672,39 +850,6 @@ fn unique_files_and_manifests(
Ok(result)
}

/// Creates a projection expression list from a schema by filtering out auxiliary columns.
///
/// This function builds a vector of physical expressions and column names from the given schema,
/// excluding internal auxiliary columns used for merge operations. The auxiliary columns that
/// are filtered out are:
/// - `__source_exists`: Indicates if the source record exists
/// - `__data_file_path`: Path to the data file
/// - `__manifest_file_path`: Path to the manifest file
///
/// # Arguments
/// * `schema` - The schema to create projections from
///
/// # Returns
/// * `Vec<(Arc<dyn PhysicalExpr>, String)>` - Vector of tuples containing physical expressions and column names
fn schema_projection(schema: &Schema) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
schema
.fields()
.iter()
.enumerate()
.filter_map(|(i, field)| -> Option<(Arc<dyn PhysicalExpr>, String)> {
let name = field.name();
if name != SOURCE_EXISTS_COLUMN
&& name != DATA_FILE_PATH_COLUMN
&& name != MANIFEST_FILE_PATH_COLUMN
{
Some((Arc::new(Column::new(name, i)), name.to_owned()))
} else {
None
}
})
.collect()
}

#[cfg(test)]
mod tests {
#![allow(clippy::unwrap_used)]
Expand Down
Loading