diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index 46bf45fd68..c2105c3b10 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -72,6 +72,7 @@ const READ_BYTES_TOTAL: &str = "read_bytes"; const ACK_TOTAL: &str = "ack"; const SINK_WRITE_TOTAL: &str = "write"; const DROPPED_TOTAL: &str = "dropped"; +const TRANSFORMER_DROPPED_TOTAL: &str = "dropped"; const FALLBACK_SINK_WRITE_TOTAL: &str = "write"; // pending as gauge for mvtx (these metric names are hardcoded in the auto-scaler) @@ -225,6 +226,7 @@ pub(crate) struct FallbackSinkMetrics { pub(crate) struct TransformerMetrics { /// Transformer latency pub(crate) time: Family, Histogram>, + pub(crate) dropped_total: Family, Counter>, } pub(crate) struct PipelineForwarderMetrics { @@ -290,6 +292,7 @@ impl MonoVtxMetrics { time: Family::, Histogram>::new_with_constructor(|| { Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10)) }), + dropped_total: Family::, Counter>::default(), }, sink: SinkMetrics { @@ -359,6 +362,11 @@ impl MonoVtxMetrics { "A Histogram to keep track of the total time taken to Transform, in microseconds", metrics.transformer.time.clone(), ); + transformer_registry.register( + TRANSFORMER_DROPPED_TOTAL, + "A Counter to keep track of the total number of messages dropped by the transformer", + metrics.transformer.dropped_total.clone(), + ); // Sink metrics let sink_registry = registry.sub_registry_with_prefix(SINK_REGISTRY_PREFIX); @@ -1418,6 +1426,12 @@ mod tests { .get_or_create(&common_labels) .observe(5.0); + metrics + .transformer + .dropped_total + .get_or_create(&common_labels) + .inc(); + metrics.sink.write_total.get_or_create(&common_labels).inc(); metrics.sink.time.get_or_create(&common_labels).observe(4.0); @@ -1451,6 +1465,7 @@ mod tests { r#"monovtx_transformer_time_sum{mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 5.0"#, r#"monovtx_transformer_time_count{mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 1"#, r#"monovtx_transformer_time_bucket{le="100.0",mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 1"#, + r#"monovtx_transformer_dropped_total{mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 1"#, r#"monovtx_sink_write_total{mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 1"#, r#"monovtx_sink_time_sum{mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 4.0"#, r#"monovtx_sink_time_count{mvtx_name="test-monovertex-metric-names",mvtx_replica="3"} 1"#, diff --git a/rust/numaflow-core/src/transformer.rs b/rust/numaflow-core/src/transformer.rs index f49a473819..0b3dc97148 100644 --- a/rust/numaflow-core/src/transformer.rs +++ b/rust/numaflow-core/src/transformer.rs @@ -115,6 +115,15 @@ impl Transformer { .await .map_err(|e| Error::Transformer(format!("failed to receive message: {}", e)))??; + // Check for dropped messages in the response + let dropped_messages_count = response.iter().filter(|msg| msg.dropped()).count(); + if dropped_messages_count > 0 { + monovertex_metrics() + .transformer + .dropped_total + .get_or_create(mvtx_forward_metric_labels()) + .inc_by(dropped_messages_count as u64); + } monovertex_metrics() .transformer .time @@ -167,7 +176,6 @@ impl Transformer { Err(e) => return Err(Error::Transformer(format!("task join failed: {}", e))), } } - Ok(transformed_messages) } }