Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update snapshot after conflict check #3205

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
//! └───────────────────────────────┘
//!</pre>
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;

use bytes::Bytes;
Expand Down Expand Up @@ -577,7 +578,7 @@ impl PreparedCommit<'_> {
}

impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
type Output = DeltaResult<PostCommit<'a>>;
type Output = DeltaResult<PostCommit>;
type IntoFuture = BoxFuture<'a, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
Expand All @@ -596,13 +597,13 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
create_checkpoint: false,
cleanup_expired_logs: None,
log_store: this.log_store,
table_data: this.table_data,
table_data: None,
custom_execute_handler: this.post_commit_hook_handler,
});
}

// unwrap() is safe here due to the above check
let read_snapshot = this.table_data.unwrap().eager_snapshot();
let mut read_snapshot = this.table_data.unwrap().eager_snapshot().clone();

let mut attempt_number = 1;
while attempt_number <= this.max_retries {
Expand All @@ -625,7 +626,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
)
.await?;
let transaction_info = TransactionInfo::try_new(
read_snapshot,
&read_snapshot,
this.data.operation.read_predicate(),
&this.data.actions,
this.data.operation.read_whole_table(),
Expand All @@ -644,6 +645,10 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
}
steps -= 1;
}
// Update snapshot to latest version after succesful conflict check
read_snapshot
.update(this.log_store.clone(), Some(latest_version))
.await?;
}
let version: i64 = latest_version + 1;

Expand All @@ -665,7 +670,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
.map(|v| v.cleanup_expired_logs)
.unwrap_or_default(),
log_store: this.log_store,
table_data: this.table_data,
table_data: Some(Box::new(read_snapshot)),
custom_execute_handler: this.post_commit_hook_handler,
});
}
Expand All @@ -690,22 +695,22 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
}

/// Represents items for the post commit hook
pub struct PostCommit<'a> {
pub struct PostCommit {
/// The winning version number of the commit
pub version: i64,
/// The data that was committed to the log store
pub data: CommitData,
create_checkpoint: bool,
cleanup_expired_logs: Option<bool>,
log_store: LogStoreRef,
table_data: Option<&'a dyn TableReference>,
table_data: Option<Box<dyn TableReference>>,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}

impl PostCommit<'_> {
impl PostCommit {
/// Runs the post commit activities
async fn run_post_commit_hook(&self) -> DeltaResult<DeltaTableState> {
if let Some(table) = self.table_data {
if let Some(table) = &self.table_data {
let post_commit_operation_id = Uuid::new_v4();
let mut snapshot = table.eager_snapshot().clone();
if self.version - snapshot.version() > 1 {
Expand Down Expand Up @@ -822,9 +827,9 @@ impl FinalizedCommit {
}
}

impl<'a> std::future::IntoFuture for PostCommit<'a> {
impl std::future::IntoFuture for PostCommit {
type Output = DeltaResult<FinalizedCommit>;
type IntoFuture = BoxFuture<'a, Self::Output>;
type IntoFuture = BoxFuture<'static, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;
Expand Down
Loading