Skip to content

Commit

Permalink
Add SyncPersistence metrics (#6232)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Apr 27, 2023
1 parent bb7601a commit 966ca0f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.StateMessageHelper;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.protocol.models.AirbyteEstimateTraceMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand Down Expand Up @@ -197,7 +199,8 @@ public void close() {
final boolean terminated = stateFlushExecutorService.awaitTermination(flushTerminationTimeoutInSeconds, TimeUnit.SECONDS);
if (!terminated) {
if (stateToFlush != null && !stateToFlush.isEmpty()) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.STATE_COMMIT_NOT_ATTEMPTED, 1);
emitFailedStateCloseMetrics();
emitFailedStatsCloseMetrics();
}

// Ongoing flush failed to terminate within the allocated time
Expand All @@ -209,7 +212,8 @@ public void close() {
}
} catch (final InterruptedException e) {
if (stateToFlush != null && !stateToFlush.isEmpty()) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.STATE_COMMIT_NOT_ATTEMPTED, 1);
emitFailedStateCloseMetrics();
emitFailedStatsCloseMetrics();
}

// The current thread is getting interrupted
Expand All @@ -235,12 +239,17 @@ public void close() {
}, "Flush States from SyncPersistenceImpl");
} catch (final Exception e) {
if (stateToFlush != null && !stateToFlush.isEmpty()) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.STATE_COMMIT_NOT_ATTEMPTED, 1);
emitFailedStateCloseMetrics();
emitFailedStatsCloseMetrics();
}
throw e;
}
}

// At this point, the final state flush is either successful or there was no state left to flush.
// From a connection point of view, it should be considered a success since no state are lost.
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.STATE_COMMIT_CLOSE_SUCCESSFUL, 1);

// On close, this check is independent of hasDataToFlush. We could be in a state where state flush
// was successful but stats flush failed, so we should check for stats to flush regardless of the
// states.
Expand All @@ -251,10 +260,22 @@ public void close() {
return null;
}, "Flush Stats from SyncPersistenceImpl");
} catch (final Exception e) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.STATS_COMMIT_NOT_ATTEMPTED, 1);
emitFailedStatsCloseMetrics();
throw e;
}
}

MetricClientFactory.getMetricClient().count(OssMetricsRegistry.STATS_COMMIT_CLOSE_SUCCESSFUL, 1);
}

private void emitFailedStateCloseMetrics() {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.STATE_COMMIT_NOT_ATTEMPTED, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()));
}

private void emitFailedStatsCloseMetrics() {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.STATS_COMMIT_NOT_ATTEMPTED, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()));
}

private boolean hasStatesToFlush() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ public enum OssMetricsRegistry implements MetricsRegistry {
"state_commit_not_attempted",
"number of attempts to commit states dropped due to an early termination",
MetricTags.GEOGRAPHY),
STATE_COMMIT_CLOSE_SUCCESSFUL(MetricEmittingApps.WORKER,
"state_commit_close_successful",
"number of final to connection exiting with the a successful final state flush",
MetricTags.GEOGRAPHY),

STATS_COMMIT_ATTEMPT(MetricEmittingApps.WORKER,
"stats_commit_attempt",
Expand All @@ -207,6 +211,10 @@ public enum OssMetricsRegistry implements MetricsRegistry {
"stats_commit_not_attempted",
"number of attempts to commit stats dropped due to an early termination",
MetricTags.GEOGRAPHY),
STATS_COMMIT_CLOSE_SUCCESSFUL(MetricEmittingApps.WORKER,
"stats_commit_close_successful",
"number of final to connection exiting with the a successful final stats flush",
MetricTags.GEOGRAPHY),
@Deprecated
// To be deleted along with PersistStateActivity
STATE_COMMIT_ATTEMPT_FROM_PERSIST_STATE(MetricEmittingApps.WORKER,
Expand Down

0 comments on commit 966ca0f

Please sign in to comment.