Skip to content

Commit 804f325

Browse files
frailltMindaugas Vinkelis
authored and
Mindaugas Vinkelis
committed
Fix issue where dupliate attributes are not always removed
1 parent 66f29b8 commit 804f325

File tree

10 files changed

+286
-252
lines changed

10 files changed

+286
-252
lines changed

opentelemetry-otlp/tests/integration_test/expected/serialized_traces.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@
112112
"value": {
113113
"intValue": "100"
114114
}
115+
},
116+
{
117+
"key": "number/int",
118+
"value": {
119+
"intValue": "100"
120+
}
115121
}
116122
],
117123
"droppedAttributesCount": 0

opentelemetry-prometheus/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ rustdoc-args = ["--cfg", "docsrs"]
2121

2222
[dependencies]
2323
once_cell = { workspace = true }
24-
opentelemetry = { version = "0.24", default-features = false, features = ["metrics"] }
25-
opentelemetry_sdk = { version = "0.24", default-features = false, features = ["metrics"] }
24+
opentelemetry = { default-features = false, features = ["metrics"], path = "../opentelemetry" }
25+
opentelemetry_sdk = { default-features = false, features = ["metrics"], path = "../opentelemetry-sdk" }
2626
prometheus = "0.13"
2727
protobuf = "2.14"
2828

2929
[dev-dependencies]
30-
opentelemetry-semantic-conventions = { version = "0.16" }
30+
opentelemetry-semantic-conventions = { path = "../opentelemetry-semantic-conventions" }
3131
http-body-util = { workspace = true }
3232
hyper = { workspace = true, features = ["full"] }
3333
hyper-util = { workspace = true, features = ["full"] }

opentelemetry-sdk/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ futures-util = { workspace = true, features = ["std", "sink", "async-await-macro
2020
once_cell = { workspace = true }
2121
percent-encoding = { version = "2.0", optional = true }
2222
rand = { workspace = true, features = ["std", "std_rng","small_rng"], optional = true }
23+
rustc-hash = "2.0"
2324
glob = { version = "0.3.1", optional =true}
2425
serde = { workspace = true, features = ["derive", "rc"], optional = true }
2526
serde_json = { workspace = true, optional = true }
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use std::hash::{Hash, Hasher};
2+
3+
use opentelemetry::{Key, KeyValue, Value};
4+
use rustc_hash::FxHasher;
5+
6+
/// A unique set of attributes that can be used as instrument identifiers.
7+
///
8+
/// This must implement [Hash], [PartialEq], and [Eq] so it may be used as
9+
/// HashMap keys and other de-duplication methods.
10+
#[derive(Clone, Default, Debug, PartialEq, Eq)]
11+
pub(crate) struct AttributeSet(Vec<KeyValue>, u64);
12+
13+
impl From<&[KeyValue]> for AttributeSet {
14+
fn from(values: &[KeyValue]) -> Self {
15+
let mut vec = Vec::from_iter(values.into_iter().cloned());
16+
vec.sort_by(|a, b| a.key.cmp(&b.key));
17+
18+
// we cannot use vec.dedup_by because it will remove last duplicate not first
19+
if vec.len() > 1 {
20+
let mut i = vec.len() - 1;
21+
while i != 0 {
22+
let is_same = unsafe { vec.get_unchecked(i - 1).key == vec.get_unchecked(i).key };
23+
if is_same {
24+
vec.remove(i - 1);
25+
}
26+
i -= 1;
27+
}
28+
}
29+
30+
let hash = calculate_hash(&vec);
31+
AttributeSet(vec, hash)
32+
}
33+
}
34+
35+
fn calculate_hash(values: &[KeyValue]) -> u64 {
36+
let mut hasher = FxHasher::default();
37+
values.iter().fold(&mut hasher, |mut hasher, item| {
38+
item.hash(&mut hasher);
39+
hasher
40+
});
41+
hasher.finish()
42+
}
43+
44+
impl AttributeSet {
45+
pub(crate) fn new_empty() -> Self {
46+
AttributeSet(vec![], 0)
47+
}
48+
49+
pub(crate) fn attribs_cloned(&self) -> Vec<KeyValue> {
50+
self.0.clone()
51+
}
52+
53+
/// Iterate over key value pairs in the set
54+
pub(crate) fn iter(&self) -> impl Iterator<Item = (&Key, &Value)> {
55+
self.0.iter().map(|kv| (&kv.key, &kv.value))
56+
}
57+
58+
/// Returns the underlying Vec of KeyValue pairs
59+
pub(crate) fn into_vec(self) -> Vec<KeyValue> {
60+
self.0
61+
}
62+
}
63+
64+
impl Hash for AttributeSet {
65+
fn hash<H: Hasher>(&self, state: &mut H) {
66+
state.write_u64(self.1)
67+
}
68+
}

opentelemetry-sdk/src/metrics/internal/histogram.rs

Lines changed: 54 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use std::collections::HashSet;
21
use std::sync::atomic::Ordering;
3-
use std::sync::Arc;
42
use std::{sync::Mutex, time::SystemTime};
53

64
use crate::metrics::data::HistogramDataPoint;
@@ -197,40 +195,36 @@ impl<T: Number<T>> Histogram<T> {
197195
}
198196
}
199197

200-
let mut trackers = match self.value_map.trackers.write() {
201-
Ok(v) => v,
202-
Err(_) => return (0, None),
198+
let Ok(mut trackers) = self.value_map.trackers.write() else {
199+
return (0, None);
203200
};
204201

205-
let mut seen = HashSet::new();
206202
for (attrs, tracker) in trackers.drain() {
207-
if seen.insert(Arc::as_ptr(&tracker)) {
208-
if let Ok(b) = tracker.buckets.lock() {
209-
h.data_points.push(HistogramDataPoint {
210-
attributes: attrs.clone(),
211-
start_time: start,
212-
time: t,
213-
count: b.count,
214-
bounds: self.bounds.clone(),
215-
bucket_counts: b.counts.clone(),
216-
sum: if self.record_sum {
217-
b.total
218-
} else {
219-
T::default()
220-
},
221-
min: if self.record_min_max {
222-
Some(b.min)
223-
} else {
224-
None
225-
},
226-
max: if self.record_min_max {
227-
Some(b.max)
228-
} else {
229-
None
230-
},
231-
exemplars: vec![],
232-
});
233-
}
203+
if let Ok(b) = tracker.buckets.lock() {
204+
h.data_points.push(HistogramDataPoint {
205+
attributes: attrs.into_vec(),
206+
start_time: start,
207+
time: t,
208+
count: b.count,
209+
bounds: self.bounds.clone(),
210+
bucket_counts: b.counts.clone(),
211+
sum: if self.record_sum {
212+
b.total
213+
} else {
214+
T::default()
215+
},
216+
min: if self.record_min_max {
217+
Some(b.min)
218+
} else {
219+
None
220+
},
221+
max: if self.record_min_max {
222+
Some(b.max)
223+
} else {
224+
None
225+
},
226+
exemplars: vec![],
227+
});
234228
}
235229
}
236230

@@ -306,44 +300,40 @@ impl<T: Number<T>> Histogram<T> {
306300
}
307301
}
308302

309-
let trackers = match self.value_map.trackers.write() {
310-
Ok(v) => v,
311-
Err(_) => return (0, None),
303+
let Ok(trackers) = self.value_map.trackers.read() else {
304+
return (0, None);
312305
};
313306

314307
// TODO: This will use an unbounded amount of memory if there
315308
// are unbounded number of attribute sets being aggregated. Attribute
316309
// sets that become "stale" need to be forgotten so this will not
317310
// overload the system.
318-
let mut seen = HashSet::new();
319311
for (attrs, tracker) in trackers.iter() {
320-
if seen.insert(Arc::as_ptr(tracker)) {
321-
if let Ok(b) = tracker.buckets.lock() {
322-
h.data_points.push(HistogramDataPoint {
323-
attributes: attrs.clone(),
324-
start_time: start,
325-
time: t,
326-
count: b.count,
327-
bounds: self.bounds.clone(),
328-
bucket_counts: b.counts.clone(),
329-
sum: if self.record_sum {
330-
b.total
331-
} else {
332-
T::default()
333-
},
334-
min: if self.record_min_max {
335-
Some(b.min)
336-
} else {
337-
None
338-
},
339-
max: if self.record_min_max {
340-
Some(b.max)
341-
} else {
342-
None
343-
},
344-
exemplars: vec![],
345-
});
346-
}
312+
if let Ok(b) = tracker.buckets.lock() {
313+
h.data_points.push(HistogramDataPoint {
314+
attributes: attrs.attribs_cloned(),
315+
start_time: start,
316+
time: t,
317+
count: b.count,
318+
bounds: self.bounds.clone(),
319+
bucket_counts: b.counts.clone(),
320+
sum: if self.record_sum {
321+
b.total
322+
} else {
323+
T::default()
324+
},
325+
min: if self.record_min_max {
326+
Some(b.min)
327+
} else {
328+
None
329+
},
330+
max: if self.record_min_max {
331+
Some(b.max)
332+
} else {
333+
None
334+
},
335+
exemplars: vec![],
336+
});
347337
}
348338
}
349339

opentelemetry-sdk/src/metrics/internal/last_value.rs

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::{
2-
collections::HashSet,
3-
sync::{atomic::Ordering, Arc, Mutex},
2+
sync::{atomic::Ordering, Mutex},
43
time::SystemTime,
54
};
65

@@ -54,23 +53,17 @@ impl<T: Number<T>> LastValue<T> {
5453
});
5554
}
5655

57-
let mut trackers = match self.value_map.trackers.write() {
58-
Ok(v) => v,
59-
_ => return,
56+
let Ok(mut trackers) = self.value_map.trackers.write() else {
57+
return;
6058
};
6159

62-
let mut seen = HashSet::new();
63-
for (attrs, tracker) in trackers.drain() {
64-
if seen.insert(Arc::as_ptr(&tracker)) {
65-
dest.push(DataPoint {
66-
attributes: attrs.clone(),
67-
start_time: Some(prev_start),
68-
time: Some(t),
69-
value: tracker.get_value(),
70-
exemplars: vec![],
71-
});
72-
}
73-
}
60+
dest.extend(trackers.drain().map(|(attrs, tracker)| DataPoint {
61+
attributes: attrs.into_vec(),
62+
start_time: Some(prev_start),
63+
time: Some(t),
64+
value: tracker.get_value(),
65+
exemplars: vec![],
66+
}));
7467

7568
// The delta collection cycle resets.
7669
if let Ok(mut start) = self.start.lock() {
@@ -106,22 +99,16 @@ impl<T: Number<T>> LastValue<T> {
10699
});
107100
}
108101

109-
let trackers = match self.value_map.trackers.write() {
110-
Ok(v) => v,
111-
_ => return,
102+
let Ok(trackers) = self.value_map.trackers.read() else {
103+
return;
112104
};
113105

114-
let mut seen = HashSet::new();
115-
for (attrs, tracker) in trackers.iter() {
116-
if seen.insert(Arc::as_ptr(tracker)) {
117-
dest.push(DataPoint {
118-
attributes: attrs.clone(),
119-
start_time: Some(prev_start),
120-
time: Some(t),
121-
value: tracker.get_value(),
122-
exemplars: vec![],
123-
});
124-
}
125-
}
106+
dest.extend(trackers.iter().map(|(attrs, tracker)| DataPoint {
107+
attributes: attrs.attribs_cloned(),
108+
start_time: Some(prev_start),
109+
time: Some(t),
110+
value: tracker.get_value(),
111+
exemplars: vec![],
112+
}));
126113
}
127114
}

0 commit comments

Comments
 (0)