Skip to content

Commit 398f6c0

Browse files
committed
Fixed bugs in delete and overwrite
1 parent 917b044 commit 398f6c0

File tree

2 files changed

+12
-6
lines changed

2 files changed

+12
-6
lines changed

pyiceberg/table/__init__.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ def overwrite(
501501
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
502502
)
503503

504-
self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)
504+
self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties,branch=branch)
505505

506506
with self.update_snapshot(branch=branch, snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
507507
# skip writing data files if the dataframe is empty
@@ -554,7 +554,7 @@ def delete(
554554
bound_delete_filter = bind(self.table_metadata.schema(), delete_filter, case_sensitive=True)
555555
preserve_row_filter = _expression_to_complementary_pyarrow(bound_delete_filter)
556556

557-
files = self._scan(row_filter=delete_filter).plan_files()
557+
files = self._scan(row_filter=delete_filter).use_ref(branch).plan_files()
558558

559559
commit_uuid = uuid.uuid4()
560560
counter = itertools.count(0)
@@ -1019,6 +1019,7 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT,
10191019
Args:
10201020
df: The Arrow dataframe that will be appended to overwrite the table
10211021
snapshot_properties: Custom properties to be added to the snapshot summary
1022+
branch: Branch Reference to run the delete operation
10221023
"""
10231024
with self.transaction() as tx:
10241025
tx.append(df=df, snapshot_properties=snapshot_properties, branch=branch)
@@ -1044,22 +1045,27 @@ def overwrite(
10441045
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
10451046
or a boolean expression in case of a partial overwrite
10461047
snapshot_properties: Custom properties to be added to the snapshot summary
1048+
branch: Branch Reference to run the delete operation
10471049
"""
10481050
with self.transaction() as tx:
10491051
tx.overwrite(df=df, overwrite_filter=overwrite_filter, snapshot_properties=snapshot_properties, branch=branch)
10501052

10511053
def delete(
1052-
self, delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
1054+
self,
1055+
delete_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
1056+
snapshot_properties: Dict[str, str] = EMPTY_DICT,
1057+
branch: str = MAIN_BRANCH,
10531058
) -> None:
10541059
"""
10551060
Shorthand for deleting rows from the table.
10561061
10571062
Args:
10581063
delete_filter: The predicate that used to remove rows
10591064
snapshot_properties: Custom properties to be added to the snapshot summary
1065+
branch: Branch Reference to run the delete operation
10601066
"""
10611067
with self.transaction() as tx:
1062-
tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties)
1068+
tx.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch)
10631069

10641070
def add_files(
10651071
self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True

pyiceberg/table/update/snapshot.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def __init__(
122122
self._snapshot_id = self._transaction.table_metadata.new_snapshot_id()
123123
self._branch = branch
124124
self._parent_snapshot_id = (
125-
snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None
125+
snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.snapshot_by_name(self._branch)) else None
126126
)
127127
self._added_data_files = []
128128
self._deleted_data_files = set()
@@ -548,7 +548,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
548548
"""Determine if there are any existing manifest files."""
549549
existing_files = []
550550

551-
if snapshot := self._transaction.table_metadata.current_snapshot():
551+
if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._branch):
552552
for manifest_file in snapshot.manifests(io=self._io):
553553
entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True)
554554
found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files]

0 commit comments

Comments
 (0)