Skip to content

Revert removal of external producers #3025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- TODO: Placeholder for Span processor related things
- *Fix* SpanProcessor::on_start is no longer called on non recording spans
- Revert removal of `MetricProducer` which allowed metrics from
external sources to be sent through OpenTelemetry.

## 0.30.0

Expand Down
38 changes: 33 additions & 5 deletions opentelemetry-sdk/src/metrics/manual_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use super::{
data::ResourceMetrics,
pipeline::Pipeline,
reader::{MetricReader, SdkProducer},
reader::{MetricProducer, MetricReader, SdkProducer},
};

/// A simple [MetricReader] that allows an application to read metrics on demand.
Expand Down Expand Up @@ -50,6 +50,7 @@
struct ManualReaderInner {
sdk_producer: Option<Weak<dyn SdkProducer>>,
is_shutdown: bool,
external_producers: Vec<Box<dyn MetricProducer>>,
}

impl ManualReader {
Expand All @@ -59,11 +60,15 @@
}

/// A [MetricReader] which is directly called to collect metrics.
pub(crate) fn new(temporality: Temporality) -> Self {
pub(crate) fn new(
temporality: Temporality,
external_producers: Vec<Box<dyn MetricProducer>>,
) -> Self {

Check warning on line 66 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L63-L66

Added lines #L63 - L66 were not covered by tests
ManualReader {
inner: Mutex::new(ManualReaderInner {
sdk_producer: None,
is_shutdown: false,
external_producers,

Check warning on line 71 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L71

Added line #L71 was not covered by tests
}),
temporality,
}
Expand All @@ -86,7 +91,7 @@
});
}

/// Gathers all metrics from the SDK, calling any
/// Gathers all metrics from the SDK and other [MetricProducer]s, calling any
/// callbacks necessary and returning the results.
///
/// Returns an error if called after shutdown.
Expand All @@ -105,7 +110,19 @@
}
};

Ok(())
let mut errs = vec![];
for producer in &inner.external_producers {
match producer.produce() {
Ok(metrics) => rm.scope_metrics.push(metrics),
Err(err) => errs.push(err),

Check warning on line 117 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L113-L117

Added lines #L113 - L117 were not covered by tests
}
}

if errs.is_empty() {
Ok(())

Check warning on line 122 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L121-L122

Added lines #L121 - L122 were not covered by tests
} else {
Err(OTelSdkError::InternalFailure(format!("{:?}", errs)))

Check warning on line 124 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L124

Added line #L124 was not covered by tests
}
}

/// ForceFlush is a no-op, it always returns nil.
Expand All @@ -123,6 +140,7 @@
// Any future call to collect will now return an error.
inner.sdk_producer = None;
inner.is_shutdown = true;
inner.external_producers = Vec::new();

Check warning on line 143 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L143

Added line #L143 was not covered by tests

Ok(())
}
Expand All @@ -136,6 +154,7 @@
#[derive(Default)]
pub struct ManualReaderBuilder {
temporality: Temporality,
producers: Vec<Box<dyn MetricProducer>>,
}

impl fmt::Debug for ManualReaderBuilder {
Expand All @@ -156,8 +175,17 @@
self
}

/// Registers a an external [MetricProducer] with this reader.
///
/// The producer is used as a source of aggregated metric data which is
/// incorporated into metrics collected from the SDK.
pub fn with_producer(mut self, producer: impl MetricProducer + 'static) -> Self {
self.producers.push(Box::new(producer));
self
}

Check warning on line 185 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L182-L185

Added lines #L182 - L185 were not covered by tests

/// Create a new [ManualReader] from this configuration.
pub fn build(self) -> ManualReader {
ManualReader::new(self.temporality)
ManualReader::new(self.temporality, self.producers)

Check warning on line 189 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L189

Added line #L189 was not covered by tests
}
}
37 changes: 33 additions & 4 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

use crate::{
error::{OTelSdkError, OTelSdkResult},
metrics::{exporter::PushMetricExporter, reader::SdkProducer},
metrics::{exporter::PushMetricExporter, reader::MetricProducer, reader::SdkProducer},
Resource,
};

Expand All @@ -30,6 +30,7 @@
pub struct PeriodicReaderBuilder<E> {
interval: Duration,
exporter: E,
producers: Vec<Box<dyn MetricProducer>>,
}

impl<E> PeriodicReaderBuilder<E>
Expand All @@ -42,7 +43,11 @@
.and_then(|v| v.parse().map(Duration::from_millis).ok())
.unwrap_or(DEFAULT_INTERVAL);

PeriodicReaderBuilder { interval, exporter }
PeriodicReaderBuilder {
interval,
exporter,
producers: Vec::new(),
}
}

/// Configures the intervening time between exports for a [PeriodicReader].
Expand All @@ -59,6 +64,15 @@
self
}

/// Registers a an external [MetricProducer] with this reader.
///
/// The producer is used as a source of aggregated metric data which is
/// incorporated into metrics collected from the SDK.
pub fn with_producer(mut self, producer: impl MetricProducer + 'static) -> Self {
self.producers.push(Box::new(producer));
self
}

Check warning on line 74 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L71-L74

Added lines #L71 - L74 were not covered by tests

/// Create a [PeriodicReader] with the given config.
pub fn build(self) -> PeriodicReader<E> {
PeriodicReader::new(self.exporter, self.interval)
Expand Down Expand Up @@ -152,6 +166,7 @@
message_sender,
producer: Mutex::new(None),
exporter: exporter_arc.clone(),
external_producers: Vec::new(),
}),
};
let cloned_reader = reader.clone();
Expand Down Expand Up @@ -351,6 +366,7 @@
exporter: Arc<E>,
message_sender: mpsc::Sender<Message>,
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
external_producers: Vec<Box<dyn MetricProducer>>,
}

impl<E: PushMetricExporter> PeriodicReaderInner<E> {
Expand All @@ -364,23 +380,36 @@
}

fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
let mut errs = vec![];
let producer = self.producer.lock().expect("lock poisoned");
if let Some(p) = producer.as_ref() {
p.upgrade()
.ok_or(OTelSdkError::AlreadyShutdown)?
.produce(rm)?;
Ok(())
} else {
otel_warn!(
name: "PeriodReader.MeterProviderNotRegistered",
message = "PeriodicReader is not registered with MeterProvider. Metrics will not be collected. \
This occurs when a periodic reader is created but not associated with a MeterProvider \
by calling `.with_reader(reader)` on MeterProviderBuilder."
);
Err(OTelSdkError::InternalFailure(
errs.push(OTelSdkError::InternalFailure(
"MeterProvider is not registered".into(),
))
}

for producer in &self.external_producers {
match producer.produce() {
Ok(metrics) => rm.scope_metrics.push(metrics),
Err(err) => errs.push(err),

Check warning on line 404 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L402-L404

Added lines #L402 - L404 were not covered by tests
}
}

if errs.is_empty() {
Ok(())
} else {
Err(OTelSdkError::InternalFailure(format!("{:?}", errs)))
}
}

fn collect_and_export(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
Expand Down
15 changes: 13 additions & 2 deletions opentelemetry-sdk/src/metrics/reader.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
//! Interfaces for reading and producing metrics
use crate::error::OTelSdkResult;
use crate::error::{OTelSdkError, OTelSdkResult};
use std::time::Duration;
use std::{fmt, sync::Weak};

use super::{data::ResourceMetrics, instrument::InstrumentKind, pipeline::Pipeline, Temporality};
use super::{
data::{ResourceMetrics, ScopeMetrics},
instrument::InstrumentKind,
pipeline::Pipeline,
Temporality,
};

/// The interface used between the SDK and an exporter.
///
Expand Down Expand Up @@ -65,3 +70,9 @@ pub(crate) trait SdkProducer: fmt::Debug + Send + Sync {
/// Returns aggregated metrics from a single collection.
fn produce(&self, rm: &mut ResourceMetrics) -> OTelSdkResult;
}

/// Produces metrics for a [MetricReader] from an external source.
pub trait MetricProducer: fmt::Debug + Send + Sync {
/// Returns aggregated metrics from an external source.
fn produce(&self) -> Result<ScopeMetrics, OTelSdkError>;
}
Loading