Skip to content

Commit 73715f0

Browse files
authored
feat(iceberg): rewrite files action 0325 (#47)
* feat(iceberg): introduce rewrite files action * fix(iceberg): add test * fix test
1 parent 9c210b9 commit 73715f0

File tree

7 files changed

+722
-19
lines changed

7 files changed

+722
-19
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ use arrow_array::StringArray;
2121
use futures::TryStreamExt;
2222
use uuid::Uuid;
2323

24+
use super::{
25+
MANIFEST_MERGE_ENABLED, MANIFEST_MERGE_ENABLED_DEFAULT, MANIFEST_MIN_MERGE_COUNT,
26+
MANIFEST_MIN_MERGE_COUNT_DEFAULT, MANIFEST_TARGET_SIZE_BYTES,
27+
MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
28+
};
2429
use crate::error::Result;
2530
use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
2631
use crate::transaction::snapshot::{
@@ -30,16 +35,6 @@ use crate::transaction::Transaction;
3035
use crate::writer::file_writer::ParquetWriter;
3136
use crate::{Error, ErrorKind};
3237

33-
/// Target size of manifest file when merging manifests.
34-
pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes";
35-
const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB
36-
/// Minimum number of manifests to merge.
37-
pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge";
38-
const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100;
39-
/// Whether allow to merge manifests.
40-
pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled";
41-
const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false;
42-
4338
/// FastAppendAction is a transaction action for fast append data files to the table.
4439
pub struct FastAppendAction<'a> {
4540
snapshot_produce_action: SnapshotProduceAction<'a>,
@@ -187,7 +182,7 @@ impl SnapshotProduceOperation for FastAppendOperation {
187182

188183
async fn existing_manifest(
189184
&self,
190-
snapshot_produce: &SnapshotProduceAction<'_>,
185+
snapshot_produce: &mut SnapshotProduceAction<'_>,
191186
) -> Result<Vec<ManifestFile>> {
192187
let Some(snapshot) = snapshot_produce
193188
.tx

crates/iceberg/src/transaction/mod.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
2020
mod append;
2121
mod remove_snapshots;
22+
mod rewrite_files;
2223
mod snapshot;
2324
mod sort_order;
2425

@@ -27,8 +28,8 @@ use std::collections::HashMap;
2728
use std::mem::discriminant;
2829
use std::sync::Arc;
2930

30-
pub use append::{MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, MANIFEST_TARGET_SIZE_BYTES};
3131
use remove_snapshots::RemoveSnapshotAction;
32+
use rewrite_files::RewriteFilesAction;
3233
use uuid::Uuid;
3334

3435
use crate::error::Result;
@@ -39,6 +40,19 @@ use crate::transaction::sort_order::ReplaceSortOrderAction;
3940
use crate::TableUpdate::UpgradeFormatVersion;
4041
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
4142

43+
/// Target size of manifest file when merging manifests.
44+
pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes";
45+
/// This is the default value for `MANIFEST_TARGET_SIZE_BYTES`.
46+
pub const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB
47+
/// Minimum number of manifests to merge.
48+
pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge";
49+
/// This is the default value for `MANIFEST_MIN_MERGE_COUNT`.
50+
pub const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100;
51+
/// Whether allow to merge manifests.
52+
pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled";
53+
/// This is the default value for `MANIFEST_MERGE_ENABLED`.
54+
pub const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false;
55+
4256
/// Table transaction.
4357
pub struct Transaction<'a> {
4458
base_table: &'a Table,
@@ -215,6 +229,22 @@ impl<'a> Transaction<'a> {
215229
RemoveSnapshotAction::new(self)
216230
}
217231

232+
/// Creates rewrite files action.
233+
pub fn rewrite_files(
234+
self,
235+
commit_uuid: Option<Uuid>,
236+
key_metadata: Vec<u8>,
237+
) -> Result<RewriteFilesAction<'a>> {
238+
let snapshot_id = self.generate_unique_snapshot_id();
239+
RewriteFilesAction::new(
240+
self,
241+
snapshot_id,
242+
commit_uuid.unwrap_or_else(Uuid::now_v7),
243+
key_metadata,
244+
HashMap::new(),
245+
)
246+
}
247+
218248
/// Remove properties in table.
219249
pub fn remove_properties(mut self, keys: Vec<String>) -> Result<Self> {
220250
self.apply(
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::{HashMap, HashSet};
19+
use std::sync::Arc;
20+
21+
use uuid::Uuid;
22+
23+
use super::snapshot::{
24+
DefaultManifestProcess, MergeManifestProcess, SnapshotProduceAction, SnapshotProduceOperation,
25+
};
26+
use super::{
27+
Transaction, MANIFEST_MERGE_ENABLED, MANIFEST_MERGE_ENABLED_DEFAULT, MANIFEST_MIN_MERGE_COUNT,
28+
MANIFEST_MIN_MERGE_COUNT_DEFAULT, MANIFEST_TARGET_SIZE_BYTES,
29+
MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
30+
};
31+
use crate::error::Result;
32+
use crate::spec::{
33+
DataContentType, DataFile, ManifestContentType, ManifestEntry, ManifestFile, ManifestStatus,
34+
Operation,
35+
};
36+
37+
/// Transaction action for rewriting files.
38+
pub struct RewriteFilesAction<'a> {
39+
snapshot_produce_action: SnapshotProduceAction<'a>,
40+
target_size_bytes: u32,
41+
min_count_to_merge: u32,
42+
merge_enabled: bool,
43+
}
44+
45+
struct RewriteFilesOperation;
46+
47+
impl<'a> RewriteFilesAction<'a> {
48+
pub fn new(
49+
tx: Transaction<'a>,
50+
snapshot_id: i64,
51+
commit_uuid: Uuid,
52+
key_metadata: Vec<u8>,
53+
snapshot_properties: HashMap<String, String>,
54+
) -> Result<Self> {
55+
let target_size_bytes: u32 = tx
56+
.current_table
57+
.metadata()
58+
.properties()
59+
.get(MANIFEST_TARGET_SIZE_BYTES)
60+
.and_then(|s| s.parse().ok())
61+
.unwrap_or(MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
62+
let min_count_to_merge: u32 = tx
63+
.current_table
64+
.metadata()
65+
.properties()
66+
.get(MANIFEST_MIN_MERGE_COUNT)
67+
.and_then(|s| s.parse().ok())
68+
.unwrap_or(MANIFEST_MIN_MERGE_COUNT_DEFAULT);
69+
let merge_enabled = tx
70+
.current_table
71+
.metadata()
72+
.properties()
73+
.get(MANIFEST_MERGE_ENABLED)
74+
.and_then(|s| s.parse().ok())
75+
.unwrap_or(MANIFEST_MERGE_ENABLED_DEFAULT);
76+
77+
Ok(Self {
78+
snapshot_produce_action: SnapshotProduceAction::new(
79+
tx,
80+
snapshot_id,
81+
key_metadata,
82+
commit_uuid,
83+
snapshot_properties,
84+
)
85+
.unwrap(),
86+
target_size_bytes,
87+
min_count_to_merge,
88+
merge_enabled,
89+
})
90+
}
91+
92+
/// Add data files to the snapshot.
93+
94+
pub fn add_data_files(
95+
&mut self,
96+
data_files: impl IntoIterator<Item = DataFile>,
97+
) -> Result<&mut Self> {
98+
self.snapshot_produce_action.add_data_files(data_files)?;
99+
Ok(self)
100+
}
101+
102+
/// Add remove files to the snapshot.
103+
pub fn delete_files(
104+
&mut self,
105+
remove_data_files: impl IntoIterator<Item = DataFile>,
106+
) -> Result<&mut Self> {
107+
self.snapshot_produce_action
108+
.delete_files(remove_data_files)?;
109+
Ok(self)
110+
}
111+
112+
/// Finished building the action and apply it to the transaction.
113+
pub async fn apply(self) -> Result<Transaction<'a>> {
114+
if self.merge_enabled {
115+
let process =
116+
MergeManifestProcess::new(self.target_size_bytes, self.min_count_to_merge);
117+
self.snapshot_produce_action
118+
.apply(RewriteFilesOperation, process)
119+
.await
120+
} else {
121+
self.snapshot_produce_action
122+
.apply(RewriteFilesOperation, DefaultManifestProcess)
123+
.await
124+
}
125+
}
126+
}
127+
128+
impl SnapshotProduceOperation for RewriteFilesOperation {
129+
fn operation(&self) -> Operation {
130+
Operation::Replace
131+
}
132+
133+
async fn delete_entries(
134+
&self,
135+
snapshot_produce: &SnapshotProduceAction<'_>,
136+
) -> Result<Vec<ManifestEntry>> {
137+
// generate delete manifest entries from removed files
138+
let snapshot = snapshot_produce
139+
.tx
140+
.current_table
141+
.metadata()
142+
.current_snapshot();
143+
144+
if let Some(snapshot) = snapshot {
145+
let gen_manifest_entry = |old_entry: &Arc<ManifestEntry>| {
146+
let builder = ManifestEntry::builder()
147+
.status(ManifestStatus::Deleted)
148+
.snapshot_id(old_entry.snapshot_id().unwrap())
149+
.sequence_number(old_entry.sequence_number().unwrap())
150+
.file_sequence_number(old_entry.file_sequence_number().unwrap())
151+
.data_file(old_entry.data_file().clone());
152+
153+
builder.build()
154+
};
155+
156+
let manifest_list = snapshot
157+
.load_manifest_list(
158+
snapshot_produce.tx.current_table.file_io(),
159+
snapshot_produce.tx.current_table.metadata(),
160+
)
161+
.await?;
162+
163+
let mut deleted_entries = Vec::new();
164+
165+
for manifest_file in manifest_list.entries() {
166+
let manifest = manifest_file
167+
.load_manifest(snapshot_produce.tx.current_table.file_io())
168+
.await?;
169+
170+
for entry in manifest.entries() {
171+
if entry.content_type() == DataContentType::Data
172+
&& snapshot_produce
173+
.removed_data_file_paths
174+
.contains(entry.data_file().file_path())
175+
{
176+
deleted_entries.push(gen_manifest_entry(entry));
177+
}
178+
179+
if entry.content_type() == DataContentType::PositionDeletes
180+
|| entry.content_type() == DataContentType::EqualityDeletes
181+
&& snapshot_produce
182+
.removed_delete_file_paths
183+
.contains(entry.data_file().file_path())
184+
{
185+
deleted_entries.push(gen_manifest_entry(entry));
186+
}
187+
}
188+
}
189+
190+
Ok(deleted_entries)
191+
} else {
192+
Ok(vec![])
193+
}
194+
}
195+
196+
async fn existing_manifest(
197+
&self,
198+
snapshot_produce: &mut SnapshotProduceAction<'_>,
199+
) -> Result<Vec<ManifestFile>> {
200+
let Some(snapshot) = snapshot_produce
201+
.tx
202+
.current_table
203+
.metadata()
204+
.current_snapshot()
205+
else {
206+
return Ok(vec![]);
207+
};
208+
209+
let manifest_list = snapshot
210+
.load_manifest_list(
211+
snapshot_produce.tx.current_table.file_io(),
212+
snapshot_produce.tx.current_table.metadata(),
213+
)
214+
.await?;
215+
216+
let mut existing_files = Vec::new();
217+
218+
for manifest_file in manifest_list.entries() {
219+
let manifest = manifest_file
220+
.load_manifest(snapshot_produce.tx.current_table.file_io())
221+
.await?;
222+
223+
let found_deleted_files: HashSet<_> = manifest
224+
.entries()
225+
.iter()
226+
.filter_map(|entry| {
227+
if snapshot_produce
228+
.removed_data_file_paths
229+
.contains(entry.data_file().file_path())
230+
|| snapshot_produce
231+
.removed_delete_file_paths
232+
.contains(entry.data_file().file_path())
233+
{
234+
Some(entry.data_file().file_path().to_string())
235+
} else {
236+
None
237+
}
238+
})
239+
.collect();
240+
241+
if found_deleted_files.is_empty() {
242+
existing_files.push(manifest_file.clone());
243+
} else {
244+
// Rewrite the manifest file without the deleted data files
245+
if manifest
246+
.entries()
247+
.iter()
248+
.any(|entry| !found_deleted_files.contains(entry.data_file().file_path()))
249+
{
250+
let mut manifest_writer = snapshot_produce.new_manifest_writer(
251+
&ManifestContentType::Data,
252+
snapshot_produce
253+
.tx
254+
.current_table
255+
.metadata()
256+
.default_partition_spec_id(),
257+
)?;
258+
259+
for entry in manifest.entries() {
260+
if !found_deleted_files.contains(entry.data_file().file_path()) {
261+
manifest_writer.add_entry((**entry).clone())?;
262+
}
263+
}
264+
265+
existing_files.push(manifest_writer.write_manifest_file().await?);
266+
}
267+
}
268+
}
269+
270+
Ok(existing_files)
271+
}
272+
}

0 commit comments

Comments
 (0)