Skip to content

Commit

Permalink
fix: advance state after conflict check
Browse files Browse the repository at this point in the history
Signed-off-by: Ion Koutsouris <[email protected]>
  • Loading branch information
ion-elgreco committed Feb 18, 2025
1 parent da8327d commit 44c104b
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
//! └───────────────────────────────┘
//!</pre>
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;

use bytes::Bytes;
Expand Down Expand Up @@ -607,7 +608,7 @@ impl PreparedCommit<'_> {
}

impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
type Output = DeltaResult<PostCommit<'a>>;
type Output = DeltaResult<PostCommit>;
type IntoFuture = BoxFuture<'a, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
Expand All @@ -626,14 +627,14 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
create_checkpoint: false,
cleanup_expired_logs: None,
log_store: this.log_store,
table_data: this.table_data,
table_data: None,
custom_execute_handler: this.post_commit_hook_handler,
metrics: CommitMetrics { num_retries: 0 },
});
}

// unwrap() is safe here due to the above check
let read_snapshot = this.table_data.unwrap().eager_snapshot();
let mut read_snapshot = this.table_data.unwrap().eager_snapshot().clone();

let mut attempt_number = 1;
let total_retries = this.max_retries + 1;
Expand Down Expand Up @@ -664,7 +665,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
)
.await?;
let transaction_info = TransactionInfo::try_new(
read_snapshot,
&read_snapshot,
this.data.operation.read_predicate(),
&this.data.actions,
this.data.operation.read_whole_table(),
Expand All @@ -683,6 +684,10 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
}
steps -= 1;
}
// Update snapshot to latest version after succesful conflict check
read_snapshot
.update(this.log_store.clone(), Some(latest_version))
.await?;
}
let version: i64 = latest_version + 1;

Expand All @@ -704,7 +709,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
.map(|v| v.cleanup_expired_logs)
.unwrap_or_default(),
log_store: this.log_store,
table_data: this.table_data,
table_data: Some(Box::new(read_snapshot)),
custom_execute_handler: this.post_commit_hook_handler,
metrics: CommitMetrics {
num_retries: attempt_number as u64 - 1,
Expand Down Expand Up @@ -732,23 +737,23 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
}

/// Represents items for the post commit hook
pub struct PostCommit<'a> {
pub struct PostCommit {
/// The winning version number of the commit
pub version: i64,
/// The data that was committed to the log store
pub data: CommitData,
create_checkpoint: bool,
cleanup_expired_logs: Option<bool>,
log_store: LogStoreRef,
table_data: Option<&'a dyn TableReference>,
table_data: Option<Box<dyn TableReference>>,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
metrics: CommitMetrics,
}

impl PostCommit<'_> {
impl PostCommit {
/// Runs the post commit activities
async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> {
if let Some(table) = self.table_data {
if let Some(table) = &self.table_data {
let post_commit_operation_id = Uuid::new_v4();
let mut snapshot = table.eager_snapshot().clone();
if self.version - snapshot.version() > 1 {
Expand Down Expand Up @@ -886,9 +891,9 @@ impl FinalizedCommit {
}
}

impl<'a> std::future::IntoFuture for PostCommit<'a> {
impl std::future::IntoFuture for PostCommit {
type Output = DeltaResult<FinalizedCommit>;
type IntoFuture = BoxFuture<'a, Self::Output>;
type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;
Expand Down

0 comments on commit 44c104b

Please sign in to comment.