Skip to content

Commit cf2aeca

Browse files
authored
feat(iceberg): support remove expired files (#52)
* feat(iceberg): support remove expired files * fix(iceberg): add ut and fix rewrite action * typo * feat(iceberg): support concurrent delete files * fix compile * fix(iceberg): support limit delete concurrency
1 parent be339a0 commit cf2aeca

File tree

5 files changed

+473
-12
lines changed

5 files changed

+473
-12
lines changed

crates/iceberg/src/transaction/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod remove_snapshots;
2222
mod rewrite_files;
2323
mod snapshot;
2424
mod sort_order;
25+
mod utils;
2526

2627
use std::cmp::Ordering;
2728
use std::collections::HashMap;

crates/iceberg/src/transaction/remove_snapshots.rs

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,27 @@
1818
//! Transaction action for removing snapshot.
1919
2020
use std::collections::{HashMap, HashSet};
21+
use std::ops::Deref;
2122

2223
use itertools::Itertools;
2324

25+
use super::utils::ReachableFileCleanupStrategy;
2426
use crate::error::Result;
27+
use crate::io::FileIO;
2528
use crate::spec::{
26-
SnapshotReference, SnapshotRetention, MAIN_BRANCH, MAX_REF_AGE_MS, MAX_REF_AGE_MS_DEFAULT,
27-
MAX_SNAPSHOT_AGE_MS, MAX_SNAPSHOT_AGE_MS_DEFAULT, MIN_SNAPSHOTS_TO_KEEP,
28-
MIN_SNAPSHOTS_TO_KEEP_DEFAULT,
29+
SnapshotReference, SnapshotRetention, TableMetadataRef, MAIN_BRANCH, MAX_REF_AGE_MS,
30+
MAX_REF_AGE_MS_DEFAULT, MAX_SNAPSHOT_AGE_MS, MAX_SNAPSHOT_AGE_MS_DEFAULT,
31+
MIN_SNAPSHOTS_TO_KEEP, MIN_SNAPSHOTS_TO_KEEP_DEFAULT,
2932
};
33+
use crate::table::Table;
3034
use crate::transaction::Transaction;
3135
use crate::utils::ancestors_of;
32-
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
36+
use crate::{Catalog, Error, ErrorKind, TableRequirement, TableUpdate};
3337

3438
/// RemoveSnapshotAction is a transaction action for removing snapshot.
3539
pub struct RemoveSnapshotAction<'a> {
3640
tx: Transaction<'a>,
37-
clear_expire_files: bool,
41+
clear_expired_files: bool,
3842
ids_to_remove: HashSet<i64>,
3943
default_expired_older_than: i64,
4044
default_min_num_snapshots: i32,
@@ -69,7 +73,7 @@ impl<'a> RemoveSnapshotAction<'a> {
6973

7074
Self {
7175
tx,
72-
clear_expire_files: false,
76+
clear_expired_files: false,
7377
ids_to_remove: HashSet::new(),
7478
default_expired_older_than: now - default_max_snapshot_age_ms,
7579
default_min_num_snapshots,
@@ -80,8 +84,8 @@ impl<'a> RemoveSnapshotAction<'a> {
8084
}
8185

8286
/// Finished building the action and apply it to the transaction.
83-
pub fn clear_expire_files(mut self, clear_expire_files: bool) -> Self {
84-
self.clear_expire_files = clear_expire_files;
87+
pub fn clear_expired_files(mut self, clear_expired_files: bool) -> Self {
88+
self.clear_expired_files = clear_expired_files;
8589
self
8690
}
8791

@@ -110,9 +114,12 @@ impl<'a> RemoveSnapshotAction<'a> {
110114
}
111115

112116
/// Finished building the action and apply it to the transaction.
113-
pub async fn apply(mut self) -> Result<Transaction<'a>> {
117+
pub async fn apply(mut self) -> Result<RemoveSnapshotApplyResult<'a>> {
114118
if self.tx.current_table.metadata().refs.is_empty() {
115-
return Ok(self.tx);
119+
return Ok(RemoveSnapshotApplyResult {
120+
tx: self.tx,
121+
clear_expired_files: self.clear_expired_files,
122+
});
116123
}
117124

118125
let table_meta = self.tx.current_table.metadata().clone();
@@ -255,7 +262,10 @@ impl<'a> RemoveSnapshotAction<'a> {
255262
},
256263
])?;
257264

258-
Ok(self.tx)
265+
Ok(RemoveSnapshotApplyResult {
266+
tx: self.tx,
267+
clear_expired_files: self.clear_expired_files,
268+
})
259269
}
260270

261271
fn compute_retained_refs(
@@ -403,6 +413,47 @@ impl<'a> RemoveSnapshotAction<'a> {
403413
}
404414
}
405415

416+
pub struct RemoveSnapshotApplyResult<'a> {
417+
tx: Transaction<'a>,
418+
clear_expired_files: bool,
419+
}
420+
421+
impl RemoveSnapshotApplyResult<'_> {
422+
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
423+
let after_expiration = self.tx.current_table.metadata_ref();
424+
let before_expiration = self.tx.base_table.metadata_ref();
425+
let file_io = self.tx.current_table.file_io().clone();
426+
427+
let table = self.tx.commit(catalog).await?;
428+
429+
if self.clear_expired_files {
430+
Self::clean_expired_files(file_io, &before_expiration, &after_expiration).await?;
431+
}
432+
433+
Ok(table)
434+
}
435+
436+
async fn clean_expired_files(
437+
file_io: FileIO,
438+
before_expiration: &TableMetadataRef,
439+
after_expiration: &TableMetadataRef,
440+
) -> Result<()> {
441+
let file_cleanup_strategy = ReachableFileCleanupStrategy::new(file_io);
442+
443+
file_cleanup_strategy
444+
.clean_files(before_expiration, after_expiration)
445+
.await
446+
}
447+
}
448+
449+
impl<'a> Deref for RemoveSnapshotApplyResult<'a> {
450+
type Target = Transaction<'a>;
451+
452+
fn deref(&self) -> &Self::Target {
453+
&self.tx
454+
}
455+
}
456+
406457
#[cfg(test)]
407458
mod tests {
408459
use std::fs::File;

crates/iceberg/src/transaction/rewrite_files.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,13 +273,16 @@ impl SnapshotProduceOperation for RewriteFilesOperation {
273273
})
274274
.collect();
275275

276-
if found_deleted_files.is_empty() {
276+
if found_deleted_files.is_empty()
277+
&& (manifest_file.has_added_files() || manifest_file.has_existing_files())
278+
{
277279
existing_files.push(manifest_file.clone());
278280
} else {
279281
// Rewrite the manifest file without the deleted data files
280282
if manifest
281283
.entries()
282284
.iter()
285+
.filter(|entry| entry.status() != ManifestStatus::Deleted)
283286
.any(|entry| !found_deleted_files.contains(entry.data_file().file_path()))
284287
{
285288
let mut manifest_writer = snapshot_produce.new_manifest_writer(
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashSet;
19+
use std::sync::Arc;
20+
21+
use futures::stream::{self, StreamExt};
22+
use futures::TryStreamExt;
23+
24+
use crate::error::Result;
25+
use crate::io::FileIO;
26+
use crate::spec::{ManifestFile, Snapshot, TableMetadataRef};
27+
28+
const DEFAULT_DELETE_CONCURRENCY_LIMIT: usize = 10;
29+
30+
pub struct ReachableFileCleanupStrategy {
31+
file_io: FileIO,
32+
}
33+
34+
impl ReachableFileCleanupStrategy {
35+
pub fn new(file_io: FileIO) -> Self {
36+
Self { file_io }
37+
}
38+
}
39+
40+
impl ReachableFileCleanupStrategy {
41+
pub async fn clean_files(
42+
&self,
43+
before_expiration: &TableMetadataRef,
44+
after_expiration: &TableMetadataRef,
45+
) -> Result<()> {
46+
let mut manifest_lists_to_delete: HashSet<&str> = HashSet::default();
47+
let mut expired_snapshots = Vec::default();
48+
for snapshot in before_expiration.snapshots() {
49+
if after_expiration
50+
.snapshot_by_id(snapshot.snapshot_id())
51+
.is_none()
52+
{
53+
expired_snapshots.push(snapshot);
54+
manifest_lists_to_delete.insert(snapshot.manifest_list());
55+
}
56+
}
57+
58+
let deletion_candidates = {
59+
let mut deletion_candidates = HashSet::default();
60+
// This part can also be parallelized if `load_manifest_list` is a bottleneck
61+
// and if the underlying FileIO supports concurrent reads efficiently.
62+
for snapshot in expired_snapshots {
63+
let manifest_list = snapshot
64+
.load_manifest_list(&self.file_io, before_expiration)
65+
.await?;
66+
67+
for manifest_file in manifest_list.entries() {
68+
deletion_candidates.insert(manifest_file.clone());
69+
}
70+
}
71+
deletion_candidates
72+
};
73+
74+
if !deletion_candidates.is_empty() {
75+
let (manifests_to_delete, referenced_manifests) = self
76+
.prune_referenced_manifests(
77+
after_expiration.snapshots(),
78+
after_expiration,
79+
deletion_candidates,
80+
)
81+
.await?;
82+
83+
if !manifests_to_delete.is_empty() {
84+
let files_to_delete = self
85+
.find_files_to_delete(&manifests_to_delete, &referenced_manifests)
86+
.await?;
87+
88+
stream::iter(files_to_delete)
89+
.map(|file_path| self.file_io.delete(file_path))
90+
.buffer_unordered(DEFAULT_DELETE_CONCURRENCY_LIMIT)
91+
.try_collect::<Vec<_>>()
92+
.await?;
93+
94+
stream::iter(manifests_to_delete)
95+
.map(|manifest_file| self.file_io.delete(manifest_file.manifest_path))
96+
.buffer_unordered(DEFAULT_DELETE_CONCURRENCY_LIMIT)
97+
.try_collect::<Vec<_>>()
98+
.await?;
99+
}
100+
}
101+
102+
stream::iter(manifest_lists_to_delete)
103+
.map(|path| self.file_io.delete(path))
104+
.buffer_unordered(DEFAULT_DELETE_CONCURRENCY_LIMIT)
105+
.try_collect::<Vec<_>>()
106+
.await?;
107+
108+
Ok(())
109+
}
110+
111+
async fn prune_referenced_manifests(
112+
&self,
113+
snapshots: impl Iterator<Item = &Arc<Snapshot>>,
114+
table_meta_data_ref: &TableMetadataRef,
115+
mut deletion_candidates: HashSet<ManifestFile>,
116+
) -> Result<(HashSet<ManifestFile>, HashSet<ManifestFile>)> {
117+
let mut referenced_manifests = HashSet::default();
118+
for snapshot in snapshots {
119+
let manifest_list = snapshot
120+
.load_manifest_list(&self.file_io, table_meta_data_ref)
121+
.await?;
122+
123+
for manifest_file in manifest_list.entries() {
124+
deletion_candidates.remove(manifest_file);
125+
referenced_manifests.insert(manifest_file.clone());
126+
127+
if deletion_candidates.is_empty() {
128+
break;
129+
}
130+
}
131+
}
132+
133+
Ok((deletion_candidates, referenced_manifests))
134+
}
135+
136+
async fn find_files_to_delete(
137+
&self,
138+
manifest_files: &HashSet<ManifestFile>,
139+
referenced_manifests: &HashSet<ManifestFile>,
140+
) -> Result<HashSet<String>> {
141+
let mut files_to_delete = HashSet::default();
142+
for manifest_file in manifest_files {
143+
let m = manifest_file.load_manifest(&self.file_io).await.unwrap();
144+
for entry in m.entries() {
145+
files_to_delete.insert(entry.data_file().file_path().to_owned());
146+
}
147+
}
148+
149+
if files_to_delete.is_empty() {
150+
return Ok(files_to_delete);
151+
}
152+
153+
for manifest_file in referenced_manifests {
154+
let m = manifest_file.load_manifest(&self.file_io).await.unwrap();
155+
for entry in m.entries() {
156+
files_to_delete.remove(entry.data_file().file_path());
157+
}
158+
}
159+
160+
Ok(files_to_delete)
161+
}
162+
}

0 commit comments

Comments
 (0)