From 44c104b0bf8555bb81d384f5d92d0e7f42f3ac77 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Tue, 18 Feb 2025 16:27:08 +0100 Subject: [PATCH] fix: advance state after conflict check Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> --- crates/core/src/operations/transaction/mod.rs | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 3d461540a7..5afe41be81 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -74,6 +74,7 @@ //! └───────────────────────────────┘ //! use std::collections::HashMap; +use std::future::Future; use std::sync::Arc; use bytes::Bytes; @@ -607,7 +608,7 @@ impl PreparedCommit<'_> { } impl<'a> std::future::IntoFuture for PreparedCommit<'a> { - type Output = DeltaResult>; + type Output = DeltaResult; type IntoFuture = BoxFuture<'a, Self::Output>; fn into_future(self) -> Self::IntoFuture { @@ -626,14 +627,14 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { create_checkpoint: false, cleanup_expired_logs: None, log_store: this.log_store, - table_data: this.table_data, + table_data: None, custom_execute_handler: this.post_commit_hook_handler, metrics: CommitMetrics { num_retries: 0 }, }); } // unwrap() is safe here due to the above check - let read_snapshot = this.table_data.unwrap().eager_snapshot(); + let mut read_snapshot = this.table_data.unwrap().eager_snapshot().clone(); let mut attempt_number = 1; let total_retries = this.max_retries + 1; @@ -664,7 +665,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { ) .await?; let transaction_info = TransactionInfo::try_new( - read_snapshot, + &read_snapshot, this.data.operation.read_predicate(), &this.data.actions, this.data.operation.read_whole_table(), @@ -683,6 +684,10 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { } steps -= 1; } + // Update snapshot to latest version after succesful conflict check + read_snapshot + .update(this.log_store.clone(), Some(latest_version)) + .await?; } let version: i64 = latest_version + 1; @@ -704,7 +709,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { .map(|v| v.cleanup_expired_logs) .unwrap_or_default(), log_store: this.log_store, - table_data: this.table_data, + table_data: Some(Box::new(read_snapshot)), custom_execute_handler: this.post_commit_hook_handler, metrics: CommitMetrics { num_retries: attempt_number as u64 - 1, @@ -732,7 +737,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { } /// Represents items for the post commit hook -pub struct PostCommit<'a> { +pub struct PostCommit { /// The winning version number of the commit pub version: i64, /// The data that was committed to the log store @@ -740,15 +745,15 @@ pub struct PostCommit<'a> { create_checkpoint: bool, cleanup_expired_logs: Option, log_store: LogStoreRef, - table_data: Option<&'a dyn TableReference>, + table_data: Option>, custom_execute_handler: Option>, metrics: CommitMetrics, } -impl PostCommit<'_> { +impl PostCommit { /// Runs the post commit activities async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> { - if let Some(table) = self.table_data { + if let Some(table) = &self.table_data { let post_commit_operation_id = Uuid::new_v4(); let mut snapshot = table.eager_snapshot().clone(); if self.version - snapshot.version() > 1 { @@ -886,9 +891,9 @@ impl FinalizedCommit { } } -impl<'a> std::future::IntoFuture for PostCommit<'a> { +impl std::future::IntoFuture for PostCommit { type Output = DeltaResult; - type IntoFuture = BoxFuture<'a, Self::Output>; + type IntoFuture = BoxFuture<'static, Self::Output>; fn into_future(self) -> Self::IntoFuture { let this = self;