Skip to content

Support getting snapshot at or right before the given timestamp #748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,18 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
return self.snapshot_by_id(ref.snapshot_id)
return None

def snapshot_as_of_timestamp(self, timestamp_ms: int, inclusive: bool = True) -> Optional[Snapshot]:
"""Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot.

Args:
timestamp_ms: Find snapshot that was current at/before this timestamp
inclusive: Includes timestamp_ms in search when True. Excludes timestamp_ms when False
"""
for log_entry in reversed(self.history()):
if (inclusive and log_entry.timestamp_ms <= timestamp_ms) or log_entry.timestamp_ms < timestamp_ms:
return self.snapshot_by_id(log_entry.snapshot_id)
return None

def history(self) -> List[SnapshotLogEntry]:
"""Get the snapshot history of this table."""
return self.metadata.snapshot_log
Expand Down
16 changes: 15 additions & 1 deletion pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import time
from collections import defaultdict
from enum import Enum
from typing import Any, DefaultDict, Dict, List, Mapping, Optional
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List, Mapping, Optional

from pydantic import Field, PrivateAttr, model_serializer

from pyiceberg.io import FileIO
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema

if TYPE_CHECKING:
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.typedef import IcebergBaseModel

ADDED_DATA_FILES = "added-data-files"
Expand Down Expand Up @@ -412,3 +417,12 @@ def _update_totals(total_property: str, added_property: str, removed_property: s
def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None:
if num > 0:
properties[property_name] = str(num)


def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMetadata) -> Iterable[Snapshot]:
"""Get the ancestors of and including the given snapshot."""
if current_snapshot:
yield current_snapshot
if current_snapshot.parent_snapshot_id is not None:
if parent := table_metadata.snapshot_by_id(current_snapshot.parent_snapshot_id):
yield from ancestors_of(parent, table_metadata)
37 changes: 37 additions & 0 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
Snapshot,
SnapshotLogEntry,
Summary,
ancestors_of,
)
from pyiceberg.table.sorting import (
NullOrder,
Expand Down Expand Up @@ -204,6 +205,42 @@ def test_snapshot_by_id(table_v2: Table) -> None:
)


def test_snapshot_by_timestamp(table_v2: Table) -> None:
assert table_v2.snapshot_as_of_timestamp(1515100955770) == Snapshot(
snapshot_id=3051729675574597004,
parent_snapshot_id=None,
sequence_number=0,
timestamp_ms=1515100955770,
manifest_list="s3://a/b/1.avro",
summary=Summary(Operation.APPEND),
schema_id=None,
)
assert table_v2.snapshot_as_of_timestamp(1515100955770, inclusive=False) is None


def test_ancestors_of(table_v2: Table) -> None:
assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) == [
Snapshot(
snapshot_id=3055729675574597004,
parent_snapshot_id=3051729675574597004,
sequence_number=1,
timestamp_ms=1555100955770,
manifest_list="s3://a/b/2.avro",
summary=Summary(Operation.APPEND),
schema_id=1,
),
Snapshot(
snapshot_id=3051729675574597004,
parent_snapshot_id=None,
sequence_number=0,
timestamp_ms=1515100955770,
manifest_list="s3://a/b/1.avro",
summary=Summary(Operation.APPEND),
schema_id=None,
),
]


def test_snapshot_by_id_does_not_exist(table_v2: Table) -> None:
assert table_v2.snapshot_by_id(-1) is None

Expand Down