Skip to content

Commit

Permalink
feat: enable file merging by last modification time using preserve-in…
Browse files Browse the repository at this point in the history
…sertion-order

This change leverages the previously unused `preserve-insertion-order`
configuration to enable merging files sorted by their last modification
time during compaction. This is particularly beneficial for append-only
workloads, improving data locality after optimize runs by merging files
that were created around similar times.

Signed-off-by: esarili <[email protected]>
  • Loading branch information
esarili committed Jan 31, 2025
1 parent f67e828 commit 7333a36
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 3 deletions.
16 changes: 13 additions & 3 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.filters,
this.target_size.to_owned(),
writer_properties,
this.preserve_insertion_order,
)?;
let metrics = plan
.execute(
Expand Down Expand Up @@ -877,12 +878,15 @@ pub fn create_merge_plan(
filters: &[PartitionFilter],
target_size: Option<i64>,
writer_properties: WriterProperties,
preserve_insertion_order: bool,
) -> Result<MergePlan, DeltaTableError> {
let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size());
let partitions_keys = &snapshot.metadata().partition_columns;

let (operations, metrics) = match optimize_type {
OptimizeType::Compact => build_compaction_plan(snapshot, filters, target_size)?,
OptimizeType::Compact => {
build_compaction_plan(snapshot, filters, target_size, preserve_insertion_order)?
}
OptimizeType::ZOrder(zorder_columns) => {
build_zorder_plan(zorder_columns, snapshot, partitions_keys, filters)?
}
Expand Down Expand Up @@ -958,6 +962,7 @@ fn build_compaction_plan(
snapshot: &DeltaTableState,
filters: &[PartitionFilter],
target_size: i64,
perserve_insertion_order: bool,
) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
let mut metrics = Metrics::default();

Expand Down Expand Up @@ -985,8 +990,13 @@ fn build_compaction_plan(
}

for (_, file) in partition_files.values_mut() {
// Sort files by size: largest to smallest
file.sort_by(|a, b| b.size.cmp(&a.size));
if perserve_insertion_order {
// sort files by modification date
file.sort_by(|a, b| b.last_modified.cmp(&a.last_modified));
} else {
// Sort files by size: largest to smallest
file.sort_by(|a, b| b.size.cmp(&a.size));
}
}

let mut operations: HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)> = HashMap::new();
Expand Down
58 changes: 58 additions & 0 deletions crates/core/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box<dyn Error>> {
&filter,
None,
WriterProperties::builder().build(),
false,
)?;

let uri = context.tmp_dir.path().to_str().to_owned().unwrap();
Expand Down Expand Up @@ -351,6 +352,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box<dyn Error>> {
&filter,
None,
WriterProperties::builder().build(),
false,
)?;

let uri = context.tmp_dir.path().to_str().to_owned().unwrap();
Expand Down Expand Up @@ -410,6 +412,7 @@ async fn test_commit_interval() -> Result<(), Box<dyn Error>> {
&[],
None,
WriterProperties::builder().build(),
false,
)?;

let metrics = plan
Expand Down Expand Up @@ -867,6 +870,61 @@ async fn test_zorder_respects_target_size() -> Result<(), Box<dyn Error>> {
Ok(())
}

#[tokio::test]
async fn test_preserve_insertion_order() -> Result<(), Box<dyn Error>> {
let context = setup_test(true).await?;
let mut dt = context.table;
let mut writer = RecordBatchWriter::for_table(&dt)?;

// first file
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(1, 1), (1, 2), (1, 3), (1, 4)], "2022-05-22")?,
)
.await?;

// later file
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(2, 5), (2, 6), (2, 7), (2, 8)], "2022-05-22")?,
)
.await?;

let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?];

let optimize = DeltaOps(dt)
.optimize()
.with_target_size(2_000_000)
.with_filters(&filter)
.with_preserve_insertion_order(true);
let (dt, metrics) = optimize.await?;

assert_eq!(metrics.num_files_added, 1);
assert_eq!(metrics.num_files_removed, 2);
assert_eq!(metrics.total_files_skipped, 0);
assert_eq!(metrics.total_considered_files, 2);

// Check data
let files = dt.get_files_iter()?.collect::<Vec<_>>();
assert_eq!(files.len(), 1);

let actual = read_parquet_file(&files[0], dt.object_store()).await?;
let expected = RecordBatch::try_new(
actual.schema(),
// file created later is merged first
vec![
Arc::new(Int32Array::from(vec![2, 2, 2, 2, 1, 1, 1, 1])),
Arc::new(Int32Array::from(vec![5, 6, 7, 8, 1, 2, 3, 4])),
],
)?;

assert_eq!(actual, expected);

Ok(())
}

async fn read_parquet_file(
path: &Path,
object_store: ObjectStoreRef,
Expand Down

0 comments on commit 7333a36

Please sign in to comment.