Skip to content

Commit 3ee22ce

Browse files
authored
Re-use memory via aggregate functions (#1266)
This simplifies memory reuse on subsequent collection cycles. Also small ergonomics changes: * Move `MetricsProducer` config to builders to match other config * Remove the need for separate `Delta*` and `Cumulative*` aggregate types * Return error earlier if readers are shut down * Log warning if two instruments have the same name with different casing * Log warning if view is created with empty criteria
1 parent bfb61de commit 3ee22ce

File tree

21 files changed

+1123
-1108
lines changed

21 files changed

+1123
-1108
lines changed

opentelemetry-prometheus/src/config.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use core::fmt;
22
use once_cell::sync::OnceCell;
33
use opentelemetry::metrics::{MetricsError, Result};
4-
use opentelemetry_sdk::metrics::{reader::AggregationSelector, ManualReaderBuilder};
4+
use opentelemetry_sdk::metrics::{
5+
reader::{AggregationSelector, MetricProducer},
6+
ManualReaderBuilder,
7+
};
58
use std::sync::{Arc, Mutex};
69

710
use crate::{Collector, PrometheusExporter};
@@ -14,8 +17,8 @@ pub struct ExporterBuilder {
1417
without_units: bool,
1518
without_counter_suffixes: bool,
1619
namespace: Option<String>,
17-
aggregation: Option<Box<dyn AggregationSelector>>,
1820
disable_scope_info: bool,
21+
reader: ManualReaderBuilder,
1922
}
2023

2124
impl fmt::Debug for ExporterBuilder {
@@ -26,7 +29,6 @@ impl fmt::Debug for ExporterBuilder {
2629
.field("without_units", &self.without_units)
2730
.field("without_counter_suffixes", &self.without_counter_suffixes)
2831
.field("namespace", &self.namespace)
29-
.field("aggregation", &self.aggregation.is_some())
3032
.field("disable_scope_info", &self.disable_scope_info)
3133
.finish()
3234
}
@@ -108,17 +110,22 @@ impl ExporterBuilder {
108110
///
109111
/// [DefaultAggregationSelector]: opentelemetry_sdk::metrics::reader::DefaultAggregationSelector
110112
pub fn with_aggregation_selector(mut self, agg: impl AggregationSelector + 'static) -> Self {
111-
self.aggregation = Some(Box::new(agg));
113+
self.reader = self.reader.with_aggregation_selector(agg);
114+
self
115+
}
116+
117+
/// Registers an external [MetricProducer] with this reader.
118+
///
119+
/// The producer is used as a source of aggregated metric data which is
120+
/// incorporated into metrics collected from the SDK.
121+
pub fn with_producer(mut self, producer: impl MetricProducer + 'static) -> Self {
122+
self.reader = self.reader.with_producer(producer);
112123
self
113124
}
114125

115126
/// Creates a new [PrometheusExporter] from this configuration.
116127
pub fn build(self) -> Result<PrometheusExporter> {
117-
let mut reader = ManualReaderBuilder::new();
118-
if let Some(selector) = self.aggregation {
119-
reader = reader.with_aggregation_selector(selector)
120-
}
121-
let reader = Arc::new(reader.build());
128+
let reader = Arc::new(self.reader.build());
122129

123130
let collector = Collector {
124131
reader: Arc::clone(&reader),

opentelemetry-prometheus/src/lib.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ use opentelemetry::{
102102
use opentelemetry_sdk::{
103103
metrics::{
104104
data::{self, ResourceMetrics, Temporality},
105-
reader::{AggregationSelector, MetricProducer, MetricReader, TemporalitySelector},
105+
reader::{AggregationSelector, MetricReader, TemporalitySelector},
106106
Aggregation, InstrumentKind, ManualReader, Pipeline,
107107
},
108108
Resource, Scope,
@@ -166,10 +166,6 @@ impl MetricReader for PrometheusExporter {
166166
self.reader.register_pipeline(pipeline)
167167
}
168168

169-
fn register_producer(&self, producer: Box<dyn MetricProducer>) {
170-
self.reader.register_producer(producer)
171-
}
172-
173169
fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
174170
self.reader.collect(rm)
175171
}

opentelemetry-sdk/CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
## vNext
44

5+
### Added
6+
7+
- Log warning if two instruments have the same name with different (#1266)
8+
casing
9+
- Log warning if view is created with empty criteria (#1266)
10+
511
### Changed
612

713
- Default Resource (the one used when no other Resource is explicitly provided) now includes `TelemetryResourceDetector`,
@@ -14,6 +20,8 @@
1420
- Add in memory span exporter [#1216](https://github.com/open-telemetry/opentelemetry-rust/pull/1216)
1521
- Add in memory log exporter [#1231](https://github.com/open-telemetry/opentelemetry-rust/pull/1231)
1622
- Add `Sync` bound to the `SpanExporter` and `LogExporter` traits [#1240](https://github.com/open-telemetry/opentelemetry-rust/pull/1240)
23+
- Move `MetricsProducer` config to builders to match other config (#1266)
24+
- Return error earlier if readers are shut down (#1266)
1725

1826
### Removed
1927

opentelemetry-sdk/benches/metric.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use opentelemetry_sdk::{
1010
metrics::{
1111
data::{ResourceMetrics, Temporality},
1212
new_view,
13-
reader::{AggregationSelector, MetricProducer, MetricReader, TemporalitySelector},
13+
reader::{AggregationSelector, MetricReader, TemporalitySelector},
1414
Aggregation, Instrument, InstrumentKind, ManualReader, MeterProvider, Pipeline, Stream,
1515
View,
1616
},
@@ -37,10 +37,6 @@ impl MetricReader for SharedReader {
3737
self.0.register_pipeline(pipeline)
3838
}
3939

40-
fn register_producer(&self, producer: Box<dyn MetricProducer>) {
41-
self.0.register_producer(producer)
42-
}
43-
4440
fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
4541
self.0.collect(rm)
4642
}

opentelemetry-sdk/src/metrics/aggregation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub enum Aggregation {
2929
/// An aggregation that summarizes a set of measurements as the last one made.
3030
LastValue,
3131

32-
/// An aggregation that summarizes a set of measurements as an histogram with
32+
/// An aggregation that summarizes a set of measurements as a histogram with
3333
/// explicitly defined buckets.
3434
ExplicitBucketHistogram {
3535
/// The increasing bucket boundary values.

opentelemetry-sdk/src/metrics/data/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ pub struct Metric {
5151
pub trait Aggregation: fmt::Debug + any::Any + Send + Sync {
5252
/// Support downcasting
5353
fn as_any(&self) -> &dyn any::Any;
54+
/// Support downcasting during aggregation
55+
fn as_mut(&mut self) -> &mut dyn any::Any;
5456
}
5557

5658
/// A measurement of the current value of an instrument.
@@ -64,6 +66,9 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Gauge<T> {
6466
fn as_any(&self) -> &dyn any::Any {
6567
self
6668
}
69+
fn as_mut(&mut self) -> &mut dyn any::Any {
70+
self
71+
}
6772
}
6873

6974
/// Represents the sum of all measurements of values from an instrument.
@@ -82,6 +87,9 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Sum<T> {
8287
fn as_any(&self) -> &dyn any::Any {
8388
self
8489
}
90+
fn as_mut(&mut self) -> &mut dyn any::Any {
91+
self
92+
}
8593
}
8694

8795
/// DataPoint is a single data point in a time series.
@@ -126,6 +134,9 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Histogram<T> {
126134
fn as_any(&self) -> &dyn any::Any {
127135
self
128136
}
137+
fn as_mut(&mut self) -> &mut dyn any::Any {
138+
self
139+
}
129140
}
130141

131142
/// A single histogram data point in a time series.

opentelemetry-sdk/src/metrics/instrument.rs

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,10 @@ use opentelemetry::{
1010
use crate::{
1111
attributes::AttributeSet,
1212
instrumentation::Scope,
13-
metrics::data::Temporality,
14-
metrics::{aggregation::Aggregation, internal::Aggregator},
13+
metrics::{aggregation::Aggregation, internal::Measure},
1514
};
1615

17-
pub(crate) const EMPTY_AGG_MSG: &str = "no aggregators for observable instrument";
16+
pub(crate) const EMPTY_MEASURE_MSG: &str = "no aggregators for observable instrument";
1817

1918
/// The identifier of a group of instruments that all perform the same function.
2019
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
@@ -218,52 +217,61 @@ impl Stream {
218217
}
219218
}
220219

221-
/// the identifying properties of a stream.
220+
/// The identifying properties of an instrument.
222221
#[derive(Debug, PartialEq, Eq, Hash)]
223-
pub(crate) struct StreamId {
224-
/// The human-readable identifier of the stream.
222+
pub(crate) struct InstrumentId {
223+
/// The human-readable identifier of the instrument.
225224
pub(crate) name: Cow<'static, str>,
226225
/// Describes the purpose of the data.
227226
pub(crate) description: Cow<'static, str>,
227+
/// Defines the functional group of the instrument.
228+
pub(crate) kind: InstrumentKind,
228229
/// the unit of measurement recorded.
229230
pub(crate) unit: Unit,
230-
/// The stream uses for an instrument.
231-
pub(crate) aggregation: String,
232-
/// Monotonic is the monotonicity of an instruments data type. This field is
233-
/// not used for all data types, so a zero value needs to be understood in the
234-
/// context of Aggregation.
235-
pub(crate) monotonic: bool,
236-
/// Temporality is the temporality of a stream's data type. This field is
237-
/// not used by some data types.
238-
pub(crate) temporality: Option<Temporality>,
239-
/// Number is the number type of the stream.
231+
/// Number is the underlying data type of the instrument.
240232
pub(crate) number: Cow<'static, str>,
241233
}
242234

235+
impl InstrumentId {
236+
/// Instrument names are considered case-insensitive ASCII.
237+
///
238+
/// Standardize the instrument name to always be lowercase so it can be compared
239+
/// via hash.
240+
///
241+
/// See [naming syntax] for full requirements.
242+
///
243+
/// [naming syntax]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/api.md#instrument-name-syntax
244+
pub(crate) fn normalize(&mut self) {
245+
if self.name.chars().any(|c| c.is_ascii_uppercase()) {
246+
self.name = self.name.to_ascii_lowercase().into();
247+
}
248+
}
249+
}
250+
243251
pub(crate) struct InstrumentImpl<T> {
244-
pub(crate) aggregators: Vec<Arc<dyn Aggregator<T>>>,
252+
pub(crate) measures: Vec<Arc<dyn Measure<T>>>,
245253
}
246254

247-
impl<T: Copy> SyncCounter<T> for InstrumentImpl<T> {
255+
impl<T: Copy + 'static> SyncCounter<T> for InstrumentImpl<T> {
248256
fn add(&self, val: T, attrs: &[KeyValue]) {
249-
for agg in &self.aggregators {
250-
agg.aggregate(val, AttributeSet::from(attrs))
257+
for measure in &self.measures {
258+
measure.call(val, AttributeSet::from(attrs))
251259
}
252260
}
253261
}
254262

255-
impl<T: Copy> SyncUpDownCounter<T> for InstrumentImpl<T> {
263+
impl<T: Copy + 'static> SyncUpDownCounter<T> for InstrumentImpl<T> {
256264
fn add(&self, val: T, attrs: &[KeyValue]) {
257-
for agg in &self.aggregators {
258-
agg.aggregate(val, AttributeSet::from(attrs))
265+
for measure in &self.measures {
266+
measure.call(val, AttributeSet::from(attrs))
259267
}
260268
}
261269
}
262270

263-
impl<T: Copy> SyncHistogram<T> for InstrumentImpl<T> {
271+
impl<T: Copy + 'static> SyncHistogram<T> for InstrumentImpl<T> {
264272
fn record(&self, val: T, attrs: &[KeyValue]) {
265-
for agg in &self.aggregators {
266-
agg.aggregate(val, AttributeSet::from(attrs))
273+
for measure in &self.measures {
274+
measure.call(val, AttributeSet::from(attrs))
267275
}
268276
}
269277
}
@@ -306,7 +314,7 @@ impl<T> Eq for ObservableId<T> {}
306314
#[derive(Clone)]
307315
pub(crate) struct Observable<T> {
308316
pub(crate) id: ObservableId<T>,
309-
aggregators: Vec<Arc<dyn Aggregator<T>>>,
317+
measures: Vec<Arc<dyn Measure<T>>>,
310318
}
311319

312320
impl<T> Observable<T> {
@@ -316,7 +324,7 @@ impl<T> Observable<T> {
316324
name: Cow<'static, str>,
317325
description: Cow<'static, str>,
318326
unit: Unit,
319-
aggregators: Vec<Arc<dyn Aggregator<T>>>,
327+
measures: Vec<Arc<dyn Measure<T>>>,
320328
) -> Self {
321329
Self {
322330
id: ObservableId {
@@ -329,7 +337,7 @@ impl<T> Observable<T> {
329337
},
330338
_marker: marker::PhantomData,
331339
},
332-
aggregators,
340+
measures,
333341
}
334342
}
335343

@@ -340,8 +348,8 @@ impl<T> Observable<T> {
340348
/// any aggregators. Also, an error is returned if scope defines a Meter other
341349
/// than the observable it was created by.
342350
pub(crate) fn registerable(&self, scope: &Scope) -> Result<()> {
343-
if self.aggregators.is_empty() {
344-
return Err(MetricsError::Other(EMPTY_AGG_MSG.into()));
351+
if self.measures.is_empty() {
352+
return Err(MetricsError::Other(EMPTY_MEASURE_MSG.into()));
345353
}
346354
if &self.id.inner.scope != scope {
347355
return Err(MetricsError::Other(format!(
@@ -356,8 +364,8 @@ impl<T> Observable<T> {
356364

357365
impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
358366
fn observe(&self, measurement: T, attrs: &[KeyValue]) {
359-
for agg in &self.aggregators {
360-
agg.aggregate(measurement, AttributeSet::from(attrs))
367+
for measure in &self.measures {
368+
measure.call(measurement, AttributeSet::from(attrs))
361369
}
362370
}
363371

0 commit comments

Comments
 (0)