Skip to content

Commit

Permalink
fix: add dropped metrics for transformer for mvtx
Browse files Browse the repository at this point in the history
Signed-off-by: adarsh0728 <[email protected]>
  • Loading branch information
adarsh0728 committed Feb 10, 2025
1 parent 12c2720 commit 77c3981
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
15 changes: 15 additions & 0 deletions rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const READ_BYTES_TOTAL: &str = "read_bytes";
const ACK_TOTAL: &str = "ack";
const SINK_WRITE_TOTAL: &str = "write";
const DROPPED_TOTAL: &str = "dropped";
const TRANSFORMER_DROPPED_TOTAL: &str = "dropped";
const FALLBACK_SINK_WRITE_TOTAL: &str = "write";

// pending as gauge for mvtx (these metric names are hardcoded in the auto-scaler)
Expand Down Expand Up @@ -225,6 +226,7 @@ pub(crate) struct FallbackSinkMetrics {
pub(crate) struct TransformerMetrics {
/// Transformer latency
pub(crate) time: Family<Vec<(String, String)>, Histogram>,
pub(crate) dropped_total: Family<Vec<(String, String)>, Counter>,
}

pub(crate) struct PipelineForwarderMetrics {
Expand Down Expand Up @@ -290,6 +292,7 @@ impl MonoVtxMetrics {
time: Family::<Vec<(String, String)>, Histogram>::new_with_constructor(|| {
Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10))
}),
dropped_total: Family::<Vec<(String, String)>, Counter>::default(),
},

sink: SinkMetrics {
Expand Down Expand Up @@ -359,6 +362,11 @@ impl MonoVtxMetrics {
"A Histogram to keep track of the total time taken to Transform, in microseconds",
metrics.transformer.time.clone(),
);
transformer_registry.register(
TRANSFORMER_DROPPED_TOTAL,
"A Counter to keep track of the total number of messages dropped by the transformer",
metrics.transformer.dropped_total.clone(),
);

// Sink metrics
let sink_registry = registry.sub_registry_with_prefix(SINK_REGISTRY_PREFIX);
Expand Down Expand Up @@ -1418,6 +1426,12 @@ mod tests {
.get_or_create(&common_labels)
.observe(5.0);

metrics
.transformer
.dropped_total
.get_or_create(&common_labels)
.inc();

metrics.sink.write_total.get_or_create(&common_labels).inc();
metrics.sink.time.get_or_create(&common_labels).observe(4.0);

Expand Down Expand Up @@ -1451,6 +1465,7 @@ mod tests {
r#"monovtx_transformer_time_sum{mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 5.0"#,
r#"monovtx_transformer_time_count{mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 1"#,
r#"monovtx_transformer_time_bucket{le="100.0",mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 1"#,
r#"monovtx_transformer_dropped_total{mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 1"#,
r#"monovtx_sink_write_total{mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 1"#,
r#"monovtx_sink_time_sum{mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 4.0"#,
r#"monovtx_sink_time_count{mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 1"#,
Expand Down
10 changes: 9 additions & 1 deletion rust/numaflow-core/src/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ impl Transformer {
.await
.map_err(|e| Error::Transformer(format!("failed to receive message: {}", e)))??;

// Check for dropped messages in the response
let dropped_messages_count = response.iter().filter(|msg| msg.dropped()).count();
if dropped_messages_count > 0 {
monovertex_metrics()
.transformer
.dropped_total
.get_or_create(mvtx_forward_metric_labels())
.inc_by(dropped_messages_count as u64);
}
monovertex_metrics()
.transformer
.time
Expand Down Expand Up @@ -167,7 +176,6 @@ impl Transformer {
Err(e) => return Err(Error::Transformer(format!("task join failed: {}", e))),
}
}

Ok(transformed_messages)
}
}
Expand Down

0 comments on commit 77c3981

Please sign in to comment.