Skip to content

Commit b6a6f6d

Browse files
refine: refine the interface of SnapshotProducer (#1490)
## Which issue does this PR close? ## What changes are included in this PR? This PR refine the interface of SnapshotProducer: 1. include the table in SnapshotProducer rather than pass using function param 2. add SnapshotProducer as param for ManifestProcess, SnapshotProduceOperation I find that it would convenient when working on merge append: ManifestProcess and SnapshotProduceOperation can be consider as custom extension for SnapshotProducer and they would reuse SnapshotProducer as for common usage. So we should include it in their function param. At the same time, include Table in SnapshotProducer make thing easier they can use SnapshotProducer diretcly. ## Are these changes tested? --------- Co-authored-by: ZENOTME <[email protected]> Co-authored-by: Renjie Liu <[email protected]>
1 parent 8dd4a22 commit b6a6f6d

File tree

2 files changed

+84
-69
lines changed

2 files changed

+84
-69
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,6 @@ impl FastAppendAction {
8484
#[async_trait]
8585
impl TransactionAction for FastAppendAction {
8686
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
87-
// validate added files
88-
SnapshotProducer::validate_added_data_files(table, &self.added_data_files)?;
89-
90-
// Checks duplicate files
91-
if self.check_duplicate {
92-
SnapshotProducer::validate_duplicate_files(table, &self.added_data_files).await?;
93-
}
94-
9587
let snapshot_producer = SnapshotProducer::new(
9688
table,
9789
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
@@ -100,8 +92,18 @@ impl TransactionAction for FastAppendAction {
10092
self.added_data_files.clone(),
10193
);
10294

95+
// validate added files
96+
snapshot_producer.validate_added_data_files(&self.added_data_files)?;
97+
98+
// Checks duplicate files
99+
if self.check_duplicate {
100+
snapshot_producer
101+
.validate_duplicate_files(&self.added_data_files)
102+
.await?;
103+
}
104+
103105
snapshot_producer
104-
.commit(table, FastAppendOperation, DefaultManifestProcess)
106+
.commit(FastAppendOperation, DefaultManifestProcess)
105107
.await
106108
}
107109
}
@@ -115,18 +117,24 @@ impl SnapshotProduceOperation for FastAppendOperation {
115117

116118
async fn delete_entries(
117119
&self,
118-
_snapshot_produce: &SnapshotProducer,
120+
_snapshot_produce: &SnapshotProducer<'_>,
119121
) -> Result<Vec<ManifestEntry>> {
120122
Ok(vec![])
121123
}
122124

123-
async fn existing_manifest(&self, table: &Table) -> Result<Vec<ManifestFile>> {
124-
let Some(snapshot) = table.metadata().current_snapshot() else {
125+
async fn existing_manifest(
126+
&self,
127+
snapshot_produce: &SnapshotProducer<'_>,
128+
) -> Result<Vec<ManifestFile>> {
129+
let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else {
125130
return Ok(vec![]);
126131
};
127132

128133
let manifest_list = snapshot
129-
.load_manifest_list(table.file_io(), &table.metadata_ref())
134+
.load_manifest_list(
135+
snapshot_produce.table.file_io(),
136+
&snapshot_produce.table.metadata_ref(),
137+
)
130138
.await?;
131139

132140
Ok(manifest_list

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 63 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -44,23 +44,32 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync {
4444
) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
4545
fn existing_manifest(
4646
&self,
47-
table: &Table,
47+
snapshot_produce: &SnapshotProducer<'_>,
4848
) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
4949
}
5050

5151
pub(crate) struct DefaultManifestProcess;
5252

5353
impl ManifestProcess for DefaultManifestProcess {
54-
fn process_manifests(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> {
54+
fn process_manifests(
55+
&self,
56+
_snapshot_produce: &SnapshotProducer<'_>,
57+
manifests: Vec<ManifestFile>,
58+
) -> Vec<ManifestFile> {
5559
manifests
5660
}
5761
}
5862

5963
pub(crate) trait ManifestProcess: Send + Sync {
60-
fn process_manifests(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile>;
64+
fn process_manifests(
65+
&self,
66+
snapshot_produce: &SnapshotProducer<'_>,
67+
manifests: Vec<ManifestFile>,
68+
) -> Vec<ManifestFile>;
6169
}
6270

63-
pub(crate) struct SnapshotProducer {
71+
pub(crate) struct SnapshotProducer<'a> {
72+
pub(crate) table: &'a Table,
6473
snapshot_id: i64,
6574
commit_uuid: Uuid,
6675
key_metadata: Option<Vec<u8>>,
@@ -72,15 +81,16 @@ pub(crate) struct SnapshotProducer {
7281
manifest_counter: RangeFrom<u64>,
7382
}
7483

75-
impl SnapshotProducer {
84+
impl<'a> SnapshotProducer<'a> {
7685
pub(crate) fn new(
77-
table: &Table,
86+
table: &'a Table,
7887
commit_uuid: Uuid,
7988
key_metadata: Option<Vec<u8>>,
8089
snapshot_properties: HashMap<String, String>,
8190
added_data_files: Vec<DataFile>,
8291
) -> Self {
8392
Self {
93+
table,
8494
snapshot_id: Self::generate_unique_snapshot_id(table),
8595
commit_uuid,
8696
key_metadata,
@@ -90,10 +100,7 @@ impl SnapshotProducer {
90100
}
91101
}
92102

93-
pub(crate) fn validate_added_data_files(
94-
table: &Table,
95-
added_data_files: &[DataFile],
96-
) -> Result<()> {
103+
pub(crate) fn validate_added_data_files(&self, added_data_files: &[DataFile]) -> Result<()> {
97104
for data_file in added_data_files {
98105
if data_file.content_type() != crate::spec::DataContentType::Data {
99106
return Err(Error::new(
@@ -102,23 +109,23 @@ impl SnapshotProducer {
102109
));
103110
}
104111
// Check if the data file partition spec id matches the table default partition spec id.
105-
if table.metadata().default_partition_spec_id() != data_file.partition_spec_id {
112+
if self.table.metadata().default_partition_spec_id() != data_file.partition_spec_id {
106113
return Err(Error::new(
107114
ErrorKind::DataInvalid,
108115
"Data file partition spec id does not match table default partition spec id",
109116
));
110117
}
111118
Self::validate_partition_value(
112119
data_file.partition(),
113-
table.metadata().default_partition_type(),
120+
self.table.metadata().default_partition_type(),
114121
)?;
115122
}
116123

117124
Ok(())
118125
}
119126

120127
pub(crate) async fn validate_duplicate_files(
121-
table: &Table,
128+
&self,
122129
added_data_files: &[DataFile],
123130
) -> Result<()> {
124131
let new_files: HashSet<&str> = added_data_files
@@ -127,12 +134,14 @@ impl SnapshotProducer {
127134
.collect();
128135

129136
let mut referenced_files = Vec::new();
130-
if let Some(current_snapshot) = table.metadata().current_snapshot() {
137+
if let Some(current_snapshot) = self.table.metadata().current_snapshot() {
131138
let manifest_list = current_snapshot
132-
.load_manifest_list(table.file_io(), &table.metadata_ref())
139+
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
133140
.await?;
134141
for manifest_list_entry in manifest_list.entries() {
135-
let manifest = manifest_list_entry.load_manifest(table.file_io()).await?;
142+
let manifest = manifest_list_entry
143+
.load_manifest(self.table.file_io())
144+
.await?;
136145
for entry in manifest.entries() {
137146
let file_path = entry.file_path();
138147
if new_files.contains(file_path) && entry.is_alive() {
@@ -177,28 +186,28 @@ impl SnapshotProducer {
177186
snapshot_id
178187
}
179188

180-
fn new_manifest_writer(
181-
&mut self,
182-
content: ManifestContentType,
183-
table: &Table,
184-
) -> Result<ManifestWriter> {
189+
fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result<ManifestWriter> {
185190
let new_manifest_path = format!(
186191
"{}/{}/{}-m{}.{}",
187-
table.metadata().location(),
192+
self.table.metadata().location(),
188193
META_ROOT_PATH,
189194
self.commit_uuid,
190195
self.manifest_counter.next().unwrap(),
191196
DataFileFormat::Avro
192197
);
193-
let output_file = table.file_io().new_output(new_manifest_path)?;
198+
let output_file = self.table.file_io().new_output(new_manifest_path)?;
194199
let builder = ManifestWriterBuilder::new(
195200
output_file,
196201
Some(self.snapshot_id),
197202
self.key_metadata.clone(),
198-
table.metadata().current_schema().clone(),
199-
table.metadata().default_partition_spec().as_ref().clone(),
203+
self.table.metadata().current_schema().clone(),
204+
self.table
205+
.metadata()
206+
.default_partition_spec()
207+
.as_ref()
208+
.clone(),
200209
);
201-
if table.metadata().format_version() == FormatVersion::V1 {
210+
if self.table.metadata().format_version() == FormatVersion::V1 {
202211
Ok(builder.build_v1())
203212
} else {
204213
match content {
@@ -240,7 +249,7 @@ impl SnapshotProducer {
240249
}
241250

242251
// Write manifest file for added data files and return the ManifestFile for ManifestList.
243-
async fn write_added_manifest(&mut self, table: &Table) -> Result<ManifestFile> {
252+
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
244253
let added_data_files = std::mem::take(&mut self.added_data_files);
245254
if added_data_files.is_empty() {
246255
return Err(Error::new(
@@ -250,7 +259,7 @@ impl SnapshotProducer {
250259
}
251260

252261
let snapshot_id = self.snapshot_id;
253-
let format_version = table.metadata().format_version();
262+
let format_version = self.table.metadata().format_version();
254263
let manifest_entries = added_data_files.into_iter().map(|data_file| {
255264
let builder = ManifestEntry::builder()
256265
.status(crate::spec::ManifestStatus::Added)
@@ -263,7 +272,7 @@ impl SnapshotProducer {
263272
builder.build()
264273
}
265274
});
266-
let mut writer = self.new_manifest_writer(ManifestContentType::Data, table)?;
275+
let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
267276
for entry in manifest_entries {
268277
writer.add_entry(entry)?;
269278
}
@@ -272,29 +281,27 @@ impl SnapshotProducer {
272281

273282
async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
274283
&mut self,
275-
table: &Table,
276284
snapshot_produce_operation: &OP,
277285
manifest_process: &MP,
278286
) -> Result<Vec<ManifestFile>> {
279-
let added_manifest = self.write_added_manifest(table).await?;
280-
let existing_manifests = snapshot_produce_operation.existing_manifest(table).await?;
287+
let added_manifest = self.write_added_manifest().await?;
288+
let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
281289
// # TODO
282290
// Support process delete entries.
283291

284292
let mut manifest_files = vec![added_manifest];
285293
manifest_files.extend(existing_manifests);
286-
let manifest_files = manifest_process.process_manifests(manifest_files);
294+
let manifest_files = manifest_process.process_manifests(self, manifest_files);
287295
Ok(manifest_files)
288296
}
289297

290298
// Returns a `Summary` of the current snapshot
291299
fn summary<OP: SnapshotProduceOperation>(
292300
&self,
293-
table: &Table,
294301
snapshot_produce_operation: &OP,
295302
) -> Result<Summary> {
296303
let mut summary_collector = SnapshotSummaryCollector::default();
297-
let table_metadata = table.metadata_ref();
304+
let table_metadata = self.table.metadata_ref();
298305

299306
let partition_summary_limit = if let Some(limit) = table_metadata
300307
.properties()
@@ -339,10 +346,10 @@ impl SnapshotProducer {
339346
)
340347
}
341348

342-
fn generate_manifest_list_file_path(&self, table: &Table, attempt: i64) -> String {
349+
fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
343350
format!(
344351
"{}/{}/snap-{}-{}-{}.{}",
345-
table.metadata().location(),
352+
self.table.metadata().location(),
346353
META_ROOT_PATH,
347354
self.snapshot_id,
348355
attempt,
@@ -354,34 +361,34 @@ impl SnapshotProducer {
354361
/// Finished building the action and return the [`ActionCommit`] to the transaction.
355362
pub(crate) async fn commit<OP: SnapshotProduceOperation, MP: ManifestProcess>(
356363
mut self,
357-
table: &Table,
358364
snapshot_produce_operation: OP,
359365
process: MP,
360366
) -> Result<ActionCommit> {
361367
let new_manifests = self
362-
.manifest_file(table, &snapshot_produce_operation, &process)
368+
.manifest_file(&snapshot_produce_operation, &process)
363369
.await?;
364-
let next_seq_num = table.metadata().next_sequence_number();
370+
let next_seq_num = self.table.metadata().next_sequence_number();
365371

366-
let summary = self
367-
.summary(table, &snapshot_produce_operation)
368-
.map_err(|err| {
369-
Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.")
370-
.with_source(err)
371-
})?;
372+
let summary = self.summary(&snapshot_produce_operation).map_err(|err| {
373+
Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.").with_source(err)
374+
})?;
372375

373-
let manifest_list_path = self.generate_manifest_list_file_path(table, 0);
376+
let manifest_list_path = self.generate_manifest_list_file_path(0);
374377

375-
let mut manifest_list_writer = match table.metadata().format_version() {
378+
let mut manifest_list_writer = match self.table.metadata().format_version() {
376379
FormatVersion::V1 => ManifestListWriter::v1(
377-
table.file_io().new_output(manifest_list_path.clone())?,
380+
self.table
381+
.file_io()
382+
.new_output(manifest_list_path.clone())?,
378383
self.snapshot_id,
379-
table.metadata().current_snapshot_id(),
384+
self.table.metadata().current_snapshot_id(),
380385
),
381386
FormatVersion::V2 => ManifestListWriter::v2(
382-
table.file_io().new_output(manifest_list_path.clone())?,
387+
self.table
388+
.file_io()
389+
.new_output(manifest_list_path.clone())?,
383390
self.snapshot_id,
384-
table.metadata().current_snapshot_id(),
391+
self.table.metadata().current_snapshot_id(),
385392
next_seq_num,
386393
),
387394
};
@@ -392,10 +399,10 @@ impl SnapshotProducer {
392399
let new_snapshot = Snapshot::builder()
393400
.with_manifest_list(manifest_list_path)
394401
.with_snapshot_id(self.snapshot_id)
395-
.with_parent_snapshot_id(table.metadata().current_snapshot_id())
402+
.with_parent_snapshot_id(self.table.metadata().current_snapshot_id())
396403
.with_sequence_number(next_seq_num)
397404
.with_summary(summary)
398-
.with_schema_id(table.metadata().current_schema_id())
405+
.with_schema_id(self.table.metadata().current_schema_id())
399406
.with_timestamp_ms(commit_ts)
400407
.build();
401408

@@ -414,11 +421,11 @@ impl SnapshotProducer {
414421

415422
let requirements = vec![
416423
TableRequirement::UuidMatch {
417-
uuid: table.metadata().uuid(),
424+
uuid: self.table.metadata().uuid(),
418425
},
419426
TableRequirement::RefSnapshotIdMatch {
420427
r#ref: MAIN_BRANCH.to_string(),
421-
snapshot_id: table.metadata().current_snapshot_id(),
428+
snapshot_id: self.table.metadata().current_snapshot_id(),
422429
},
423430
];
424431

0 commit comments

Comments
 (0)