Skip to content

Commit 3c1a4dc

Browse files
committed
re-write ancestors_of() to return Iterable[Snapshot]
1 parent f58e751 commit 3c1a4dc

File tree

3 files changed

+26
-24
lines changed

3 files changed

+26
-24
lines changed

pyiceberg/table/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@
112112
SnapshotLogEntry,
113113
SnapshotSummaryCollector,
114114
Summary,
115+
ancestors_of,
115116
update_snapshot_summaries,
116117
)
117118
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -1292,7 +1293,16 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
12921293

12931294
def latest_snapshot_before_timestamp(self, timestamp_ms: int) -> Optional[Snapshot]:
12941295
"""Get the snapshot right before the given timestamp, or None if there is no matching snapshot."""
1295-
return self.metadata.latest_snapshot_before_timestamp(timestamp_ms)
1296+
result, prev_timestamp = None, 0
1297+
if self.metadata.current_snapshot_id is not None:
1298+
for snapshot in self.current_ancestors():
1299+
if snapshot and prev_timestamp < snapshot.timestamp_ms < timestamp_ms:
1300+
result, prev_timestamp = snapshot, snapshot.timestamp_ms
1301+
return result
1302+
1303+
def current_ancestors(self) -> List[Optional[Snapshot]]:
1304+
"""Get a list of ancestors of and including the current snapshot."""
1305+
return list(ancestors_of(self.current_snapshot(), self.metadata)) # type: ignore
12961306

12971307
def history(self) -> List[SnapshotLogEntry]:
12981308
"""Get the snapshot history of this table."""

pyiceberg/table/metadata.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -230,28 +230,6 @@ def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
230230
"""Get the snapshot by snapshot_id."""
231231
return next((snapshot for snapshot in self.snapshots if snapshot.snapshot_id == snapshot_id), None)
232232

233-
def latest_snapshot_before_timestamp(self, timestamp_ms: int) -> Optional[Snapshot]:
234-
"""Get the snapshot right before the given timestamp."""
235-
result, prev_timestamp = None, 0
236-
if self.current_snapshot_id is not None:
237-
for snapshot_id, snapshot_timestamp in self.ancestors_of(self.current_snapshot_id):
238-
snapshot = self.snapshot_by_id(snapshot_id)
239-
if prev_timestamp < snapshot_timestamp < timestamp_ms:
240-
result, prev_timestamp = snapshot, snapshot_timestamp
241-
return result
242-
243-
def ancestors_of(self, snapshot_id: int) -> List[tuple[int, int]]:
244-
"""Get the snapshot_id of the ancestors of the given snapshot."""
245-
current_id: Optional[int] = snapshot_id
246-
result = []
247-
while current_id is not None:
248-
snapshot = self.snapshot_by_id(current_id)
249-
if not snapshot:
250-
break
251-
result.append((current_id, snapshot.timestamp_ms))
252-
current_id = snapshot.parent_snapshot_id
253-
return result
254-
255233
def schema_by_id(self, schema_id: int) -> Optional[Schema]:
256234
"""Get the schema by schema_id."""
257235
return next((schema for schema in self.schemas if schema.schema_id == schema_id), None)

pyiceberg/table/snapshots.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,22 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
from __future__ import annotations
18+
1719
import time
1820
from collections import defaultdict
1921
from enum import Enum
20-
from typing import Any, DefaultDict, Dict, List, Mapping, Optional
22+
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List, Mapping, Optional
2123

2224
from pydantic import Field, PrivateAttr, model_serializer
2325

2426
from pyiceberg.io import FileIO
2527
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list
2628
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
2729
from pyiceberg.schema import Schema
30+
31+
if TYPE_CHECKING:
32+
from pyiceberg.table.metadata import TableMetadata
2833
from pyiceberg.typedef import IcebergBaseModel
2934

3035
ADDED_DATA_FILES = 'added-data-files'
@@ -412,3 +417,12 @@ def _update_totals(total_property: str, added_property: str, removed_property: s
412417
def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None:
413418
if num > 0:
414419
properties[property_name] = str(num)
420+
421+
422+
def ancestors_of(current_snapshot: Snapshot, table_metadata: TableMetadata) -> Iterable[Snapshot]:
423+
"""Get the ancestors of and including the given snapshot."""
424+
if current_snapshot:
425+
yield current_snapshot
426+
if current_snapshot.parent_snapshot_id:
427+
if parent := table_metadata.snapshot_by_id(current_snapshot.parent_snapshot_id):
428+
yield from ancestors_of(parent, table_metadata)

0 commit comments

Comments
 (0)