Skip to content

Commit 4f38d4a

Browse files
authored
refactor(snapshot): unify ancestors (#49)
1 parent 73715f0 commit 4f38d4a

File tree

4 files changed

+68
-89
lines changed

4 files changed

+68
-89
lines changed

crates/iceberg/src/scan/context.rs

Lines changed: 1 addition & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::spec::{
3333
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList,
3434
ManifestStatus, Operation, SchemaRef, SnapshotRef, TableMetadataRef,
3535
};
36+
use crate::utils::ancestors_between;
3637
use crate::{Error, ErrorKind, Result};
3738

3839
type ManifestEntryFilterFn = dyn Fn(&ManifestEntryRef) -> bool + Send + Sync;
@@ -350,57 +351,3 @@ impl PlanContext {
350351
}
351352
}
352353
}
353-
354-
struct Ancestors {
355-
next: Option<SnapshotRef>,
356-
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
357-
}
358-
359-
impl Iterator for Ancestors {
360-
type Item = SnapshotRef;
361-
362-
fn next(&mut self) -> Option<Self::Item> {
363-
let snapshot = self.next.take()?;
364-
let result = snapshot.clone();
365-
self.next = snapshot
366-
.parent_snapshot_id()
367-
.and_then(|id| (self.get_snapshot)(id));
368-
Some(result)
369-
}
370-
}
371-
372-
/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
373-
fn ancestors_of(
374-
table_metadata: &TableMetadataRef,
375-
snapshot: i64,
376-
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
377-
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
378-
let table_metadata = table_metadata.clone();
379-
Box::new(Ancestors {
380-
next: Some(snapshot.clone()),
381-
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
382-
})
383-
} else {
384-
Box::new(std::iter::empty())
385-
}
386-
}
387-
388-
/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
389-
fn ancestors_between(
390-
table_metadata: &TableMetadataRef,
391-
latest_snapshot_id: i64,
392-
oldest_snapshot_id: Option<i64>,
393-
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
394-
let Some(oldest_snapshot_id) = oldest_snapshot_id else {
395-
return Box::new(ancestors_of(table_metadata, latest_snapshot_id));
396-
};
397-
398-
if latest_snapshot_id == oldest_snapshot_id {
399-
return Box::new(std::iter::empty());
400-
}
401-
402-
Box::new(
403-
ancestors_of(table_metadata, latest_snapshot_id)
404-
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
405-
)
406-
}

crates/iceberg/src/spec/snapshot.rs

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ impl Snapshot {
175175
}
176176

177177
/// Get parent snapshot.
178-
pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
178+
pub fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
179179
match self.parent_snapshot_id {
180180
Some(id) => table_metadata.snapshot_by_id(id).cloned(),
181181
None => None,
@@ -400,33 +400,6 @@ impl SnapshotRetention {
400400
}
401401
}
402402

403-
/// An iterator over the ancestors of a snapshot.
404-
pub struct AncestorIterator<'a> {
405-
current: Option<SnapshotRef>,
406-
table_metadata: &'a TableMetadata,
407-
}
408-
409-
impl<'a> Iterator for AncestorIterator<'a> {
410-
type Item = SnapshotRef;
411-
412-
fn next(&mut self) -> Option<Self::Item> {
413-
let current = self.current.take()?;
414-
415-
let next = current.parent_snapshot(self.table_metadata);
416-
self.current = next;
417-
418-
Some(current)
419-
}
420-
}
421-
422-
/// Returns an iterator over the ancestors of a snapshot.
423-
pub fn ancestors_of(snapshot: SnapshotRef, table_metadata: &TableMetadata) -> AncestorIterator<'_> {
424-
AncestorIterator {
425-
current: Some(snapshot),
426-
table_metadata,
427-
}
428-
}
429-
430403
#[cfg(test)]
431404
mod tests {
432405
use std::collections::HashMap;

crates/iceberg/src/transaction/remove_snapshots.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ use itertools::Itertools;
2323

2424
use crate::error::Result;
2525
use crate::spec::{
26-
ancestors_of, SnapshotReference, SnapshotRetention, MAIN_BRANCH, MAX_REF_AGE_MS,
27-
MAX_REF_AGE_MS_DEFAULT, MAX_SNAPSHOT_AGE_MS, MAX_SNAPSHOT_AGE_MS_DEFAULT,
28-
MIN_SNAPSHOTS_TO_KEEP, MIN_SNAPSHOTS_TO_KEEP_DEFAULT,
26+
SnapshotReference, SnapshotRetention, MAIN_BRANCH, MAX_REF_AGE_MS, MAX_REF_AGE_MS_DEFAULT,
27+
MAX_SNAPSHOT_AGE_MS, MAX_SNAPSHOT_AGE_MS_DEFAULT, MIN_SNAPSHOTS_TO_KEEP,
28+
MIN_SNAPSHOTS_TO_KEEP_DEFAULT,
2929
};
3030
use crate::transaction::Transaction;
31+
use crate::utils::ancestors_of;
3132
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
3233

3334
/// RemoveSnapshotAction is a transaction action for removing snapshot.
@@ -345,9 +346,9 @@ impl<'a> RemoveSnapshotAction<'a> {
345346
min_snapshots_to_keep: usize,
346347
) -> HashSet<i64> {
347348
let mut ids_to_retain = HashSet::new();
348-
let table_meta = self.tx.current_table.metadata();
349+
let table_meta = self.tx.current_table.metadata_ref();
349350
if let Some(snapshot) = table_meta.snapshot_by_id(snapshot_id) {
350-
let ancestors = ancestors_of(snapshot.clone(), table_meta);
351+
let ancestors = ancestors_of(&table_meta, snapshot.snapshot_id());
351352
for ancestor in ancestors {
352353
if ids_to_retain.len() < min_snapshots_to_keep
353354
|| ancestor.timestamp_ms() >= expire_snapshots_older_than
@@ -377,8 +378,10 @@ impl<'a> RemoveSnapshotAction<'a> {
377378
.metadata()
378379
.snapshot_by_id(snapshot_ref.snapshot_id)
379380
{
380-
let ancestors =
381-
ancestors_of(snapshot.clone(), self.tx.current_table.metadata());
381+
let ancestors = ancestors_of(
382+
&self.tx.current_table.metadata_ref(),
383+
snapshot.snapshot_id(),
384+
);
382385
for ancestor in ancestors {
383386
referenced_snapshots.insert(ancestor.snapshot_id());
384387
}

crates/iceberg/src/utils.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
use std::num::NonZeroUsize;
1919

20+
use crate::spec::{SnapshotRef, TableMetadataRef};
21+
2022
// Use a default value of 1 as the safest option.
2123
// See https://doc.rust-lang.org/std/thread/fn.available_parallelism.html#limitations
2224
// for more details.
@@ -185,3 +187,57 @@ pub mod bin {
185187
}
186188
}
187189
}
190+
191+
pub struct Ancestors {
192+
next: Option<SnapshotRef>,
193+
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
194+
}
195+
196+
impl Iterator for Ancestors {
197+
type Item = SnapshotRef;
198+
199+
fn next(&mut self) -> Option<Self::Item> {
200+
let snapshot = self.next.take()?;
201+
let result = snapshot.clone();
202+
self.next = snapshot
203+
.parent_snapshot_id()
204+
.and_then(|id| (self.get_snapshot)(id));
205+
Some(result)
206+
}
207+
}
208+
209+
/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
210+
pub fn ancestors_of(
211+
table_metadata: &TableMetadataRef,
212+
snapshot: i64,
213+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
214+
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
215+
let table_metadata = table_metadata.clone();
216+
Box::new(Ancestors {
217+
next: Some(snapshot.clone()),
218+
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
219+
})
220+
} else {
221+
Box::new(std::iter::empty())
222+
}
223+
}
224+
225+
/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
226+
pub fn ancestors_between(
227+
table_metadata: &TableMetadataRef,
228+
latest_snapshot_id: i64,
229+
oldest_snapshot_id: Option<i64>,
230+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
231+
let Some(oldest_snapshot_id) = oldest_snapshot_id else {
232+
return Box::new(ancestors_of(table_metadata, latest_snapshot_id));
233+
};
234+
235+
if latest_snapshot_id == oldest_snapshot_id {
236+
return Box::new(std::iter::empty());
237+
}
238+
239+
Box::new(
240+
ancestors_of(table_metadata, latest_snapshot_id)
241+
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
242+
)
243+
}

0 commit comments

Comments
 (0)