Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: return metrics about the commit #3202

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 66 additions & 12 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ use crate::table::config::TableConfig;
use crate::table::state::DeltaTableState;
use crate::{crate_version, DeltaResult};
use delta_kernel::table_features::{ReaderFeatures, WriterFeatures};

use serde::{Deserialize, Serialize};
pub use self::conflict_checker::CommitConflictError;
pub use self::protocol::INSTANCE as PROTOCOL;

Expand All @@ -113,6 +113,36 @@ mod state;
const DELTA_LOG_FOLDER: &str = "_delta_log";
pub(crate) const DEFAULT_RETRIES: usize = 15;

#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CommitMetrics {
/// Number of retries before a successful commit
pub num_retries: u64,
}

#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PostCommitMetrics {
/// Whether a new checkpoint was created as part of this commit
pub new_checkpoint_created: bool,

/// Number of log files cleaned up
pub num_log_files_cleaned_up: u64,
}

#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Metrics {
/// Number of retries before a successful commit
pub num_retries: u64,

/// Whether a new checkpoint was created as part of this commit
pub new_checkpoint_created: bool,

/// Number of log files cleaned up
pub num_log_files_cleaned_up: u64,
}

/// Error raised while commititng transaction
#[derive(thiserror::Error, Debug)]
pub enum TransactionError {
Expand Down Expand Up @@ -598,6 +628,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
log_store: this.log_store,
table_data: this.table_data,
custom_execute_handler: this.post_commit_hook_handler,
metrics: CommitMetrics { num_retries: 0 },
});
}

Expand Down Expand Up @@ -667,6 +698,9 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
log_store: this.log_store,
table_data: this.table_data,
custom_execute_handler: this.post_commit_hook_handler,
metrics: CommitMetrics {
num_retries: attempt_number as u64 - 1,
},
});
}
Err(TransactionError::VersionAlreadyExists(version)) => {
Expand Down Expand Up @@ -700,11 +734,12 @@ pub struct PostCommit<'a> {
log_store: LogStoreRef,
table_data: Option<&'a dyn TableReference>,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
metrics: CommitMetrics,
}

impl PostCommit<'_> {
/// Runs the post commit activities
async fn run_post_commit_hook(&self) -> DeltaResult<DeltaTableState> {
async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> {
if let Some(table) = self.table_data {
let post_commit_operation_id = Uuid::new_v4();
let mut snapshot = table.eager_snapshot().clone();
Expand Down Expand Up @@ -737,26 +772,29 @@ impl PostCommit<'_> {
.await?
}

let mut new_checkpoint_created = false;
if self.create_checkpoint {
// Execute create checkpoint hook
self.create_checkpoint(
new_checkpoint_created = self.create_checkpoint(
&state,
&self.log_store,
self.version,
post_commit_operation_id,
)
.await?;
}

let mut num_log_files_cleaned_up : u64 = 0;
if cleanup_logs {
// Execute clean up logs hook
cleanup_expired_logs_for(
num_log_files_cleaned_up = cleanup_expired_logs_for(
self.version,
self.log_store.as_ref(),
Utc::now().timestamp_millis()
- state.table_config().log_retention_duration().as_millis() as i64,
Some(post_commit_operation_id),
)
.await?;
.await? as u64;
}

// Run arbitrary after_post_commit_hook code
Expand All @@ -769,7 +807,10 @@ impl PostCommit<'_> {
)
.await?
}
Ok(state)
Ok((state, PostCommitMetrics {
new_checkpoint_created,
num_log_files_cleaned_up,
}))
} else {
let state = DeltaTableState::try_new(
&Path::default(),
Expand All @@ -778,7 +819,10 @@ impl PostCommit<'_> {
Some(self.version),
)
.await?;
Ok(state)
Ok((state, PostCommitMetrics {
new_checkpoint_created: false,
num_log_files_cleaned_up: 0,
}))
}
}
async fn create_checkpoint(
Expand All @@ -787,18 +831,20 @@ impl PostCommit<'_> {
log_store: &LogStoreRef,
version: i64,
operation_id: Uuid,
) -> DeltaResult<()> {
) -> DeltaResult<bool> {
if !table_state.load_config().require_files {
warn!("Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files.");
return Ok(());
return Ok(false);
}

let checkpoint_interval = table_state.config().checkpoint_interval() as i64;
if ((version + 1) % checkpoint_interval) == 0 {
create_checkpoint_for(version, table_state, log_store.as_ref(), Some(operation_id))
.await?
.await?;
Ok(true)
} else {
Ok(false)
}
Ok(())
}
}

Expand All @@ -809,6 +855,9 @@ pub struct FinalizedCommit {

/// Version of the finalized commit
pub version: i64,

/// Metrics associated with the commit operation
pub metrics: Metrics,
}

impl FinalizedCommit {
Expand All @@ -831,9 +880,14 @@ impl<'a> std::future::IntoFuture for PostCommit<'a> {

Box::pin(async move {
match this.run_post_commit_hook().await {
Ok(snapshot) => Ok(FinalizedCommit {
Ok((snapshot, post_commit_metrics)) => Ok(FinalizedCommit {
snapshot,
version: this.version,
metrics: Metrics {
num_retries: this.metrics.num_retries,
new_checkpoint_created: post_commit_metrics.new_checkpoint_created,
num_log_files_cleaned_up: post_commit_metrics.num_log_files_cleaned_up,
},
}),
Err(err) => Err(err),
}
Expand Down
Loading