Skip to content

Commit 9e73113

Browse files
Merge pull request #2 from de-sh/otel-metrics-traces-flattening
refactor: suggestions for code readability
2 parents 95a915f + f8045bb commit 9e73113

File tree

6 files changed

+77
-89
lines changed

6 files changed

+77
-89
lines changed

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ humantime-serde = "1.1"
6464
itertools = "0.13.0"
6565
num_cpus = "1.15"
6666
once_cell = "1.17.1"
67+
opentelemetry-proto = "0.27.0"
6768
prometheus = { version = "0.13", features = ["process"] }
6869
rand = "0.8.5"
69-
rdkafka = {version = "0.36.2", default-features = false, features = ["tokio"]}
70+
rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }
7071
regex = "1.7.3"
7172
relative-path = { version = "1.7", features = ["serde"] }
7273
reqwest = { version = "0.11.27", default-features = false, features = [
@@ -106,7 +107,6 @@ prost = "0.13.3"
106107
prometheus-parse = "0.2.5"
107108
sha2 = "0.10.8"
108109
tracing = "0.1.41"
109-
opentelemetry-proto = "0.27.0"
110110

111111
[build-dependencies]
112112
cargo_toml = "0.20.1"

src/handlers/http/ingest.rs

+40-45
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ use arrow_schema::Schema;
4141
use bytes::Bytes;
4242
use chrono::Utc;
4343
use http::StatusCode;
44+
use nom::AsBytes;
45+
use opentelemetry_proto::tonic::logs::v1::LogsData;
46+
use opentelemetry_proto::tonic::metrics::v1::MetricsData;
47+
use opentelemetry_proto::tonic::trace::v1::TracesData;
4448
use serde_json::Value;
4549
use std::collections::HashMap;
4650
use std::sync::Arc;
@@ -112,23 +116,20 @@ pub async fn handle_otel_logs_ingestion(
112116
req: HttpRequest,
113117
body: Bytes,
114118
) -> Result<HttpResponse, PostError> {
115-
if let Some((_, stream_name)) = req
116-
.headers()
117-
.iter()
118-
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
119-
{
120-
let stream_name = stream_name.to_str().unwrap().to_owned();
121-
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
122-
123-
//custom flattening required for otel logs
124-
let mut json = flatten_otel_logs(&body);
125-
for record in json.iter_mut() {
126-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
127-
push_logs(&stream_name, &req, &body).await?;
128-
}
129-
} else {
119+
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
130120
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
121+
};
122+
let stream_name = stream_name.to_str().unwrap().to_owned();
123+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
124+
125+
//custom flattening required for otel logs
126+
let logs: LogsData = serde_json::from_slice(body.as_bytes())?;
127+
let mut json = flatten_otel_logs(&logs);
128+
for record in json.iter_mut() {
129+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
130+
push_logs(&stream_name, &req, &body).await?;
131131
}
132+
132133
Ok(HttpResponse::Ok().finish())
133134
}
134135

@@ -139,23 +140,20 @@ pub async fn handle_otel_metrics_ingestion(
139140
req: HttpRequest,
140141
body: Bytes,
141142
) -> Result<HttpResponse, PostError> {
142-
if let Some((_, stream_name)) = req
143-
.headers()
144-
.iter()
145-
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
146-
{
147-
let stream_name = stream_name.to_str().unwrap().to_owned();
148-
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
149-
150-
//custom flattening required for otel metrics
151-
let mut json = flatten_otel_metrics(&body);
152-
for record in json.iter_mut() {
153-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
154-
push_logs(&stream_name, &req, &body).await?;
155-
}
156-
} else {
143+
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
157144
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
145+
};
146+
let stream_name = stream_name.to_str().unwrap().to_owned();
147+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
148+
149+
//custom flattening required for otel metrics
150+
let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?;
151+
let mut json = flatten_otel_metrics(metrics);
152+
for record in json.iter_mut() {
153+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
154+
push_logs(&stream_name, &req, &body).await?;
158155
}
156+
159157
Ok(HttpResponse::Ok().finish())
160158
}
161159

@@ -166,23 +164,20 @@ pub async fn handle_otel_traces_ingestion(
166164
req: HttpRequest,
167165
body: Bytes,
168166
) -> Result<HttpResponse, PostError> {
169-
if let Some((_, stream_name)) = req
170-
.headers()
171-
.iter()
172-
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
173-
{
174-
let stream_name = stream_name.to_str().unwrap().to_owned();
175-
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
176-
177-
//custom flattening required for otel traces
178-
let mut json = flatten_otel_traces(&body);
179-
for record in json.iter_mut() {
180-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
181-
push_logs(&stream_name, &req, &body).await?;
182-
}
183-
} else {
167+
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
184168
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
169+
};
170+
let stream_name = stream_name.to_str().unwrap().to_owned();
171+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
172+
173+
//custom flattening required for otel traces
174+
let traces: TracesData = serde_json::from_slice(body.as_bytes())?;
175+
let mut json = flatten_otel_traces(&traces);
176+
for record in json.iter_mut() {
177+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
178+
push_logs(&stream_name, &req, &body).await?;
185179
}
180+
186181
Ok(HttpResponse::Ok().finish())
187182
}
188183

src/handlers/http/modal/utils/ingest_utils.rs

+32-28
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ use actix_web::HttpRequest;
2020
use arrow_schema::Field;
2121
use bytes::Bytes;
2222
use chrono::{DateTime, NaiveDateTime, Utc};
23+
use nom::AsBytes;
24+
use opentelemetry_proto::tonic::{
25+
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
26+
};
2327
use serde_json::Value;
2428
use std::{
2529
collections::{BTreeMap, HashMap},
@@ -47,39 +51,39 @@ pub async fn flatten_and_push_logs(
4751
body: Bytes,
4852
stream_name: &str,
4953
) -> Result<(), PostError> {
50-
//flatten logs
51-
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
52-
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
53-
let log_source: String = log_source.to_str().unwrap().to_owned();
54-
match log_source.as_str() {
55-
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
56-
57-
//custom flattening required for otel logs
58-
LOG_SOURCE_OTEL_LOGS => {
59-
json = flatten_otel_logs(&body);
60-
}
61-
62-
//custom flattening required for otel metrics
63-
LOG_SOURCE_OTEL_METRICS => {
64-
json = flatten_otel_metrics(&body);
65-
}
66-
67-
//custom flattening required for otel traces
68-
LOG_SOURCE_OTEL_TRACES => {
69-
json = flatten_otel_traces(&body);
70-
}
71-
_ => {
72-
tracing::warn!("Unknown log source: {}", log_source);
73-
push_logs(stream_name, &req, &body).await?;
74-
}
54+
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
55+
push_logs(stream_name, &req, &body).await?;
56+
return Ok(());
57+
};
58+
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
59+
match log_source.to_str().unwrap() {
60+
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
61+
//custom flattening required for otel logs
62+
LOG_SOURCE_OTEL_LOGS => {
63+
let logs: LogsData = serde_json::from_slice(body.as_bytes())?;
64+
json = flatten_otel_logs(&logs);
65+
}
66+
//custom flattening required for otel metrics
67+
LOG_SOURCE_OTEL_METRICS => {
68+
let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?;
69+
json = flatten_otel_metrics(metrics);
70+
}
71+
//custom flattening required for otel traces
72+
LOG_SOURCE_OTEL_TRACES => {
73+
let traces: TracesData = serde_json::from_slice(body.as_bytes())?;
74+
json = flatten_otel_traces(&traces);
7575
}
76-
for record in json.iter_mut() {
77-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
76+
log_source => {
77+
tracing::warn!("Unknown log source: {}", log_source);
7878
push_logs(stream_name, &req, &body).await?;
7979
}
80-
} else {
80+
}
81+
82+
for record in json.iter_mut() {
83+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
8184
push_logs(stream_name, &req, &body).await?;
8285
}
86+
8387
Ok(())
8488
}
8589

src/otel/logs.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*
1717
*/
1818

19-
use bytes::Bytes;
2019
use opentelemetry_proto::tonic::logs::v1::LogRecord;
2120
use opentelemetry_proto::tonic::logs::v1::LogsData;
2221
use opentelemetry_proto::tonic::logs::v1::ScopeLogs;
@@ -125,11 +124,8 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<BTreeMap<String, Value>> {
125124

126125
/// this function performs the custom flattening of the otel logs
127126
/// and returns a `Vec` of `BTreeMap` of the flattened json
128-
pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
129-
let body_str = std::str::from_utf8(body).unwrap();
130-
let message: LogsData = serde_json::from_str(body_str).unwrap();
127+
pub fn flatten_otel_logs(message: &LogsData) -> Vec<BTreeMap<String, Value>> {
131128
let mut vec_otel_json = Vec::new();
132-
133129
for record in &message.resource_logs {
134130
let mut resource_log_json = BTreeMap::new();
135131

src/otel/metrics.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
use std::collections::BTreeMap;
2020

21-
use bytes::Bytes;
2221
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue;
2322
use opentelemetry_proto::tonic::metrics::v1::{
2423
exemplar::Value as ExemplarValue, exponential_histogram_data_point::Buckets, metric, Exemplar,
@@ -386,9 +385,7 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec<BTreeMap<String, V
386385

387386
/// this function performs the custom flattening of the otel metrics
388387
/// and returns a `Vec` of `BTreeMap` of the flattened json
389-
pub fn flatten_otel_metrics(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
390-
let body_str = std::str::from_utf8(body).unwrap();
391-
let message: MetricsData = serde_json::from_str(body_str).unwrap();
388+
pub fn flatten_otel_metrics(message: MetricsData) -> Vec<BTreeMap<String, Value>> {
392389
let mut vec_otel_json = Vec::new();
393390
for record in &message.resource_metrics {
394391
let mut resource_metrics_json = BTreeMap::new();

src/otel/traces.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*
1717
*/
1818

19-
use bytes::Bytes;
20-
2119
use opentelemetry_proto::tonic::trace::v1::span::Event;
2220
use opentelemetry_proto::tonic::trace::v1::span::Link;
2321
use opentelemetry_proto::tonic::trace::v1::ScopeSpans;
@@ -71,9 +69,7 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec<BTreeMap<String, Value>> {
7169

7270
/// this function performs the custom flattening of the otel traces event
7371
/// and returns a `Vec` of `BTreeMap` of the flattened json
74-
pub fn flatten_otel_traces(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
75-
let body_str = std::str::from_utf8(body).unwrap();
76-
let message: TracesData = serde_json::from_str(body_str).unwrap();
72+
pub fn flatten_otel_traces(message: &TracesData) -> Vec<BTreeMap<String, Value>> {
7773
let mut vec_otel_json = Vec::new();
7874

7975
for record in &message.resource_spans {

0 commit comments

Comments
 (0)