Skip to content

Commit

Permalink
feat: schema evolution nodes
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Jan 21, 2025
1 parent e4a2b9b commit dc0c7be
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 172 deletions.
295 changes: 125 additions & 170 deletions crates/core/src/operations/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
//! ````
pub mod configs;
pub mod execution;
pub mod schema_evolution;
pub(crate) mod execution;
pub(crate) mod schema_evolution;

use arrow_schema::Schema;
pub use configs::WriterStatsConfig;
use std::collections::HashMap;
use std::str::FromStr;
Expand All @@ -44,7 +45,7 @@ use datafusion_expr::{col, when, Expr, ExprSchemable, LogicalPlan};
use execution::{prepare_predicate_actions, write_execution_plan_with_predicate};
use futures::future::BoxFuture;
use parquet::file::properties::WriterProperties;
use schema_evolution::try_cast_batch;
use schema_evolution::try_cast_schema;
use serde::{Deserialize, Serialize};
use tracing::log::*;

Expand Down Expand Up @@ -139,8 +140,6 @@ pub struct WriteBuilder {
target_file_size: Option<usize>,
/// Number of records to be written in single batch to underlying writer
write_batch_size: Option<usize>,
/// RecordBatches to be written into the table
batches: Option<Vec<RecordBatch>>,
/// whether to overwrite the schema or to merge it. None means to fail on schmema drift
schema_mode: Option<SchemaMode>,
/// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false)
Expand Down Expand Up @@ -195,7 +194,6 @@ impl WriteBuilder {
predicate: None,
target_file_size: None,
write_batch_size: None,
batches: None,
safe_cast: false,
schema_mode: None,
writer_properties: None,
Expand Down Expand Up @@ -247,12 +245,6 @@ impl WriteBuilder {
self
}

/// Execution plan that produces the data to be written to the delta table
pub fn with_input_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
self.batches = Some(batches.into_iter().collect());
self
}

/// Specify the target file size for data files written to the delta table.
pub fn with_target_file_size(mut self, target_file_size: usize) -> Self {
self.target_file_size = Some(target_file_size);
Expand Down Expand Up @@ -322,28 +314,11 @@ impl WriteBuilder {
));
}

let batches: &Vec<RecordBatch> = match &self.batches {
Some(batches) => {
if batches.is_empty() {
error!("The WriteBuilder was an empty set of batches!");
return Err(WriteError::MissingData.into());
}
batches
}
None => {
if self.input.is_none() {
error!("The WriteBuilder must have an input plan _or_ batches!");
return Err(WriteError::MissingData.into());
}
// provide an empty array in the case that an input plan exists
&vec![]
}
};

let schema: StructType = match &self.input {
Some(plan) => (plan.schema().as_arrow()).try_into()?,
None => (batches[0].schema()).try_into()?,
};
let input = self
.input
.clone()
.ok_or::<DeltaTableError>(WriteError::MissingData.into())?;
let schema: StructType = input.schema().as_arrow().try_into()?;

match &self.snapshot {
Some(snapshot) => {
Expand Down Expand Up @@ -447,148 +422,128 @@ impl std::future::IntoFuture for WriteBuilder {
.map(|v| v.schema().get_generated_columns().unwrap_or_default())
.unwrap_or_default();
let mut schema_drift = false;
let mut df = if let Some(plan) = this.input {
if this.schema_mode == Some(SchemaMode::Merge) {
return Err(DeltaTableError::Generic(
"Schema merge not supported yet for Datafusion".to_string(),
));

let mut input_df = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone());
let mut schema: Arc<Schema> = (*input_df.schema().as_arrow()).into();

// Schema merging code should be aware of columns that can be generated during write
// so they might be empty in the batch, but the will exist in the input_schema()
// in this case we have to insert the generated column and it's type in the schema of the batch
let mut new_schema = None;
if let Some(snapshot) = &this.snapshot {
let table_schema = snapshot.input_schema()?;

// Merge schema's initial round when there are generated columns expressions
// This is to have the batch schema be the same as the input schema without adding new fields
// from the incoming batch
if !generated_col_expressions.is_empty() {
schema = merge_arrow_schema(table_schema.clone(), schema, true)?;
}
Ok(DataFrame::new(state.clone(), plan.as_ref().clone()))
} else if let Some(batches) = this.batches {
if batches.is_empty() {
Err(WriteError::MissingData)
} else {
let mut schema = batches[0].schema();

// Schema merging code should be aware of columns that can be generated during write
// so they might be empty in the batch, but the will exist in the input_schema()
// in this case we have to insert the generated column and it's type in the schema of the batch
let mut new_schema = None;
if let Some(snapshot) = &this.snapshot {
let table_schema = snapshot.input_schema()?;

// Merge schema's initial round when there are generated columns expressions
// This is to have the batch schema be the same as the input schema without adding new fields
// from the incoming batch
if !generated_col_expressions.is_empty() {
schema = merge_arrow_schema(table_schema.clone(), schema, true)?;
}

if let Err(schema_err) =
try_cast_batch(schema.fields(), table_schema.fields())
{
schema_drift = true;
if this.mode == SaveMode::Overwrite
&& this.schema_mode == Some(SchemaMode::Overwrite)
{
if generated_col_expressions.is_empty() {
new_schema = None // we overwrite anyway, so no need to cast
} else {
new_schema = Some(schema.clone()) // we need to cast the batch to include the generated col as empty null
}
} else if this.schema_mode == Some(SchemaMode::Merge) {
new_schema = Some(merge_arrow_schema(
table_schema.clone(),
schema.clone(),
schema_drift,
)?);
} else {
return Err(schema_err.into());
}
} else if this.mode == SaveMode::Overwrite
&& this.schema_mode == Some(SchemaMode::Overwrite)
{
if generated_col_expressions.is_empty() {
new_schema = None // we overwrite anyway, so no need to cast
} else {
new_schema = Some(schema.clone()) // we need to cast the batch to include the generated col as empty null
}
if let Err(schema_err) = try_cast_schema(schema.fields(), table_schema.fields()) {
schema_drift = true;
if this.mode == SaveMode::Overwrite
&& this.schema_mode == Some(SchemaMode::Overwrite)
{
if generated_col_expressions.is_empty() {
new_schema = None // we overwrite anyway, so no need to cast
} else {
// Schema needs to be merged so that utf8/binary/list types are preserved from the batch side if both table
// and batch contains such type. Other types are preserved from the table side.
// At this stage it will never introduce more fields since try_cast_batch passed correctly.
new_schema = Some(merge_arrow_schema(
table_schema.clone(),
schema.clone(),
schema_drift,
)?);
new_schema = Some(schema.clone()) // we need to cast the batch to include the generated col as empty null
}
} else if this.schema_mode == Some(SchemaMode::Merge) {
new_schema = Some(merge_arrow_schema(
table_schema.clone(),
schema.clone(),
schema_drift,
)?);
} else {
return Err(schema_err.into());
}
let data = if !partition_columns.is_empty() {
// TODO partitioning should probably happen in its own plan ...
let mut partitions: HashMap<String, Vec<RecordBatch>> = HashMap::new();
let mut num_partitions = 0;
let mut num_added_rows = 0;
for batch in batches {
let real_batch = match new_schema.clone() {
Some(new_schema) => cast_record_batch(
&batch,
new_schema,
this.safe_cast,
schema_drift || !generated_col_expressions.is_empty(), // Schema drifted so we have to add the missing columns/structfields or missing generated cols..
)?,
None => batch,
};

let divided = divide_by_partition_values(
new_schema.clone().unwrap_or(schema.clone()),
partition_columns.clone(),
&real_batch,
)?;
num_partitions += divided.len();
for part in divided {
num_added_rows += part.record_batch.num_rows();
let key = part.partition_values.hive_partition_path();
match partitions.get_mut(&key) {
Some(part_batches) => {
part_batches.push(part.record_batch);
}
None => {
partitions.insert(key, vec![part.record_batch]);
}
}
}
}
metrics.num_partitions = num_partitions;
metrics.num_added_rows = num_added_rows;
partitions.into_values().collect::<Vec<_>>()
} else if this.mode == SaveMode::Overwrite
&& this.schema_mode == Some(SchemaMode::Overwrite)
{
if generated_col_expressions.is_empty() {
new_schema = None // we overwrite anyway, so no need to cast
} else {
match new_schema {
Some(ref new_schema) => {
let mut new_batches = vec![];
let mut num_added_rows = 0;
for batch in batches {
new_batches.push(cast_record_batch(
&batch,
new_schema.clone(),
this.safe_cast,
schema_drift || !generated_col_expressions.is_empty(), // Schema drifted so we have to add the missing columns/structfields or missing generated cols.
)?);
num_added_rows += batch.num_rows();
}
metrics.num_added_rows = num_added_rows;
vec![new_batches]
}
None => {
metrics.num_added_rows = batches.iter().map(|b| b.num_rows()).sum();
vec![batches]
}
}
};

let ctx = SessionContext::new();
let table_provider: Arc<dyn TableProvider> = Arc::new(
MemTable::try_new(new_schema.unwrap_or(schema).clone(), data).unwrap(),
);
let df = ctx.read_table(table_provider).unwrap();

Ok(df)
new_schema = Some(schema.clone()) // we need to cast the batch to include the generated col as empty null
}
} else {
// Schema needs to be merged so that utf8/binary/list types are preserved from the batch side if both table
// and batch contains such type. Other types are preserved from the table side.
// At this stage it will never introduce more fields since try_cast_batch passed correctly.
new_schema = Some(merge_arrow_schema(
table_schema.clone(),
schema.clone(),
schema_drift,
)?);
}
} else {
Err(WriteError::MissingData)
}?;
}

let schema = Arc::new(df.schema().as_arrow().clone());
// CONVERT THIS TO LOGICAL PLAN STEP
// let data = if !partition_columns.is_empty() {
// // TODO partitioning should probably happen in its own plan ...
// let mut partitions: HashMap<String, Vec<RecordBatch>> = HashMap::new();
// let mut num_partitions = 0;
// let mut num_added_rows = 0;
// for batch in batches {
// let real_batch = match new_schema.clone() {
// Some(new_schema) => cast_record_batch(
// &batch,
// new_schema,
// this.safe_cast,
// schema_drift || !generated_col_expressions.is_empty(), // Schema drifted so we have to add the missing columns/structfields or missing generated cols..
// )?,
// None => batch,
// };

// let divided = divide_by_partition_values(
// new_schema.clone().unwrap_or(schema.clone()),
// partition_columns.clone(),
// &real_batch,
// )?;
// num_partitions += divided.len();
// for part in divided {
// num_added_rows += part.record_batch.num_rows();
// let key = part.partition_values.hive_partition_path();
// match partitions.get_mut(&key) {
// Some(part_batches) => {
// part_batches.push(part.record_batch);
// }
// None => {
// partitions.insert(key, vec![part.record_batch]);
// }
// }
// }
// }
// metrics.num_partitions = num_partitions;
// metrics.num_added_rows = num_added_rows;
// partitions.into_values().collect::<Vec<_>>()
// } else {
// match new_schema {
// Some(ref new_schema) => {
// let mut new_batches = vec![];
// let mut num_added_rows = 0;
// for batch in batches {
// new_batches.push(cast_record_batch(
// &batch,
// new_schema.clone(),
// this.safe_cast,
// schema_drift || !generated_col_expressions.is_empty(), // Schema drifted so we have to add the missing columns/structfields or missing generated cols.
// )?);
// num_added_rows += batch.num_rows();
// }
// metrics.num_added_rows = num_added_rows;
// vec![new_batches]
// }
// None => {
// metrics.num_added_rows = batches.iter().map(|b| b.num_rows()).sum();
// vec![batches]
// }
// }
// };


let schema = Arc::new(input_df.schema().as_arrow().clone());
if this.schema_mode == Some(SchemaMode::Merge) && schema_drift {
if let Some(snapshot) = &this.snapshot {
let schema_struct: StructType = schema.clone().try_into()?;
Expand Down Expand Up @@ -691,7 +646,7 @@ impl std::future::IntoFuture for WriteBuilder {
predicate.clone(),
this.snapshot.as_ref(),
state.clone(),
df.clone().create_physical_plan().await?,
input_df.clone().create_physical_plan().await?,
partition_columns.clone(),
this.log_store.object_store(Some(operation_id)).clone(),
target_file_size,
Expand Down Expand Up @@ -745,7 +700,7 @@ impl std::future::IntoFuture for WriteBuilder {
this.writer_properties,
deletion_timestamp,
writer_stats_config,
df,
input_df,
operation_id,
)
.await?;
Expand Down
Loading

0 comments on commit dc0c7be

Please sign in to comment.