-
Notifications
You must be signed in to change notification settings - Fork 330
Create rollback and set snapshot APIs #758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Create rollback and set snapshot APIs #758
Conversation
c60938f
to
1af604a
Compare
@@ -1956,6 +1957,10 @@ def _commit(self) -> UpdatesAndRequirements: | |||
"""Apply the pending changes and commit.""" | |||
return self._updates, self._requirements | |||
|
|||
def _commit_if_ref_updates_exist(self) -> None: | |||
self.commit() | |||
self._updates, self._requirements = (), () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to Java implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only issue here is that self.commit
will commit the transaction if the ManageSnapshot
object comes from
iceberg-python/pyiceberg/table/__init__.py
Lines 1508 to 1521 in 2252e71
def manage_snapshots(self) -> ManageSnapshots: | |
""" | |
Shorthand to run snapshot management operations like create branch, create tag, etc. | |
Use table.manage_snapshots().<operation>().commit() to run a specific operation. | |
Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations. | |
Pending changes are applied on commit. | |
We can also use context managers to make more changes. For example, | |
with table.manage_snapshots() as ms: | |
ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B") | |
""" | |
return ManageSnapshots(transaction=Transaction(self, autocommit=True)) |
where
autocommit
is set to true.
One possible way to fix this is that we can add additional parameters in transaction._apply
to override the autocommit behavior and call that directly here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated! Now there's an extra parameter commit_transaction_now
that defaults to True, and we override it to False when staged refs need to be applied without commiting the transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I'm re-opening this resolved conversation, since I don't think adding the additional parameter is enough.
Say, in the future, we have more APIs like:
branch_name, min_snapshots_to_keep = "test_branch_min_snapshots_to_keep", 2
with tbl.manage_snapshots() as ms:
ms.create_branch(branch_name=branch_name, snapshot_id=snapshot_id)
ms.set_min_snapshots_to_keep(branch_name=branch_name, min_snapshots_to_keep=min_snapshots_to_keep)
The updates and requirements would be :
(SetSnapshotRefUpdate(action='set-snapshot-ref', ref_name='test_branch_min_snapshots_to_keep', type='branch', snapshot_id=71191752302974125, max_ref_age_ms=None, max_snapshot_age_ms=None, min_snapshots_to_keep=None), SetSnapshotRefUpdate(action='set-snapshot-ref', ref_name='test_branch_min_snapshots_to_keep', type='branch', snapshot_id=71191752302974125, max_ref_age_ms=None, max_snapshot_age_ms=None, min_snapshots_to_keep=2))
(AssertRefSnapshotId(type='assert-ref-snapshot-id', ref='test_branch_min_snapshots_to_keep', snapshot_id=None), AssertRefSnapshotId(type='assert-ref-snapshot-id', ref='test_branch_min_snapshots_to_keep', snapshot_id=71191752302974125))
The 2nd requirement will fail with a CommitFailedException
as the branch would be missing.
With _commit_if_ref_updates_exist()
, the transaction.table_metadata
would get updated, but when the transaction exits, it will try to commit_transaction()
which runs _do_commit()
which runs _commit_table()
.
In _commit_table()
, for non-REST catalogs, we _update_and_stage_table()
where we check the requirements with current table metadata, here the 2nd requirement fails.
To fix this, we might consider one of the following solutions:
- in
transaction._apply
identify the differences between current table metadata and staged metadata, and only pass those differences inself._updates
, while not sending the ref updates requirements (since we've already validated them once intransaction._apply
) OR - improve
_update_and_stage_table()
to iteratively apply the update with corresponding requirement and always check the requirements with updated_metadata. This is easier than (1), but only serves non-REST catalogs. OR - continue the original implementation, i.e. for every
commit_if_ref_exists()
, the Transaction commits to the table. This would be expensive IMO, but the result would remain atomic and correct, with minimal changes in the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chinmay-bhat Thank you so much for digging into this issue! I think you've made a great point. I am thinking of a similar solution like your first point: to derive a list of requirements when we commit the transaction: https://github.com/apache/iceberg/blob/d69ba0568a2e07dfb5af233350ad5668d9aef134/core/src/main/java/org/apache/iceberg/UpdateRequirements.java#L50-L58
This will save us from manually specifying requirements for every UpdateTableMetadata
definition and also prevent the problems described above.
Let me research more on this and get back to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @HonahX, should I make a new issue for this? Since changing how we specify requirements is not strictly in the scope of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @chinmay-bhat. Sorry for the long wait🙏. I was distracted by other stuff and some blocking issues for 0.7.0 release. Yes, please feel free to create an issue to further discuss it. I can reply to that when I get something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @chinmay-bhat. Sorry for the long wait...(again, my bad).. Thanks for the patience and the great work! I left some comments but I think we are close.
@@ -1956,6 +1957,10 @@ def _commit(self) -> UpdatesAndRequirements: | |||
"""Apply the pending changes and commit.""" | |||
return self._updates, self._requirements | |||
|
|||
def _commit_if_ref_updates_exist(self) -> None: | |||
self.commit() | |||
self._updates, self._requirements = (), () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only issue here is that self.commit
will commit the transaction if the ManageSnapshot
object comes from
iceberg-python/pyiceberg/table/__init__.py
Lines 1508 to 1521 in 2252e71
def manage_snapshots(self) -> ManageSnapshots: | |
""" | |
Shorthand to run snapshot management operations like create branch, create tag, etc. | |
Use table.manage_snapshots().<operation>().commit() to run a specific operation. | |
Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations. | |
Pending changes are applied on commit. | |
We can also use context managers to make more changes. For example, | |
with table.manage_snapshots() as ms: | |
ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B") | |
""" | |
return ManageSnapshots(transaction=Transaction(self, autocommit=True)) |
where
autocommit
is set to true.
One possible way to fix this is that we can add additional parameters in transaction._apply
to override the autocommit behavior and call that directly here.
We don't need to find all the ancestors, we only need to validate that the snapshot is an ancestor, i.e if it was ever current.
This reverts commit f5d489c.
we cannot use snapshot_as_of_timestamp() as it finds previously current snapshots but not necessarily an ancestor. An example is here: https://iceberg.apache.org/docs/nightly/spark-queries/?h=ancestor#history
d7cee84
to
59f1626
Compare
Great contribution @chinmay-bhat what would it take to get this out of the door? Any help that you need? We would love to be able to use this feature instead of relying on spark for this at the moment |
Creates ManageSnapshots() rollback and set snapshot APIs.
Relevant issue - #737