-
Notifications
You must be signed in to change notification settings - Fork 323
refactor: consolidate snapshot expiration into MaintenanceTable #2143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
0a94d96
5f0b62b
f995daa
65365e1
e28815f
4628ede
e80c41c
cb9f0c9
ebcff2b
97399bf
95e5af2
5ab5890
5acd690
d30a08c
e62ab58
1af3258
352b48f
382e0ea
549c183
386cb15
12729fa
ce3515c
28fce4b
2c3153e
27c3ece
fe73a34
8dfa038
42e55c9
e1627c4
0e6d45c
311c442
fba592d
b837f86
4605a04
536528e
6036e12
9dc9c82
635a1d9
73658e0
a9a01ee
8c906d2
0e72ccc
cfb4061
9371bca
881fab9
acb70da
54c1f7f
4c6f86c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,8 @@ | |
from __future__ import annotations | ||
|
||
from datetime import datetime, timezone | ||
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple | ||
from functools import reduce | ||
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union | ||
|
||
from pyiceberg.conversions import from_bytes | ||
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary | ||
|
@@ -650,14 +651,11 @@ def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[S | |
|
||
snapshot = self._get_snapshot(snapshot_id) | ||
io = self.tbl.io | ||
files_table: list[pa.Table] = [] | ||
for manifest_list in snapshot.manifests(io): | ||
files_table.append(self._get_files_from_manifest(manifest_list, data_file_filter)) | ||
|
||
executor = ExecutorFactory.get_or_create() | ||
results = list( | ||
executor.map( | ||
lambda manifest_list: self._get_files_from_manifest(manifest_list, data_file_filter), snapshot.manifests(io) | ||
) | ||
) | ||
return pa.concat_tables(results) | ||
return pa.concat_tables(files_table) | ||
|
||
def files(self, snapshot_id: Optional[int] = None) -> "pa.Table": | ||
return self._files(snapshot_id) | ||
|
@@ -668,10 +666,20 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": | |
def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": | ||
return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) | ||
|
||
def all_manifests(self) -> "pa.Table": | ||
def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] = None) -> "pa.Table": | ||
import pyarrow as pa | ||
|
||
snapshots = self.tbl.snapshots() | ||
# coerce into snapshot objects if users passes in snapshot ids | ||
if snapshots is not None: | ||
if isinstance(snapshots[0], int): | ||
snapshots = [ | ||
snapshot | ||
for snapshot_id in snapshots | ||
if (snapshot := self.tbl.metadata.snapshot_by_id(snapshot_id)) is not None | ||
] | ||
else: | ||
snapshots = self.tbl.snapshots() | ||
|
||
Comment on lines
+669
to
+682
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I might have written this and it got cherry picked in but I think its simpler to only allow There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tl;dr I originally forked your branch so I could stack my PR on your “Delete Orphans” PR. As July began, my schedule looked pretty rough, so I converted my draft into a PR against the main pyiceberg branch—since I wasn’t sure how much time I’d have later in the month—but I forgot to rebase. As a result, I inadvertently removed the code for deleting orphans while keeping your MaintenanceTable implementation in a more… manual way 😟, so there may still be some remnants. They say the best form of flattery is imitation 😉. |
||
if not snapshots: | ||
return pa.Table.from_pylist([], schema=self._get_all_manifests_schema()) | ||
|
||
|
@@ -681,6 +689,32 @@ def all_manifests(self) -> "pa.Table": | |
) | ||
return pa.concat_tables(manifests_by_snapshots) | ||
|
||
def _all_known_files(self) -> dict[str, set[str]]: | ||
"""Get all the known files in the table. | ||
|
||
Returns: | ||
dict of {file_type: set of file paths} for each file type. | ||
""" | ||
snapshots = self.tbl.snapshots() | ||
|
||
_all_known_files = {} | ||
_all_known_files["manifests"] = set(self.all_manifests(snapshots)["path"].to_pylist()) | ||
_all_known_files["manifest_lists"] = {snapshot.manifest_list for snapshot in snapshots} | ||
_all_known_files["statistics"] = {statistic.statistics_path for statistic in self.tbl.metadata.statistics} | ||
|
||
metadata_files = {entry.metadata_file for entry in self.tbl.metadata.metadata_log} | ||
metadata_files.add(self.tbl.metadata_location) # Include current metadata file | ||
_all_known_files["metadata"] = metadata_files | ||
|
||
executor = ExecutorFactory.get_or_create() | ||
snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] | ||
files_by_snapshots: Iterator[Set[str]] = executor.map( | ||
lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids | ||
) | ||
_all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set()) | ||
|
||
return _all_known_files | ||
|
||
def _all_files(self, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": | ||
import pyarrow as pa | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is de-parallelized? Is that intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, it was not :) Good catch!