Skip to content

Commit 976e3fd

Browse files
Merge pull request #3 from de-sh/otel-metrics-traces-flattening
refactor: `LogSource` enum
2 parents 4692bb9 + cca61b7 commit 976e3fd

File tree

7 files changed

+142
-72
lines changed

7 files changed

+142
-72
lines changed

src/event/format/json.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use serde_json::Value;
2929
use std::{collections::HashMap, sync::Arc};
3030
use tracing::error;
3131

32-
use super::{EventFormat, Metadata, Tags};
32+
use super::{EventFormat, LogSource, Metadata, Tags};
3333
use crate::{
3434
metadata::SchemaVersion,
3535
utils::{arrow::get_field, json::flatten_json_body},
@@ -52,7 +52,7 @@ impl EventFormat for Event {
5252
static_schema_flag: Option<&String>,
5353
time_partition: Option<&String>,
5454
schema_version: SchemaVersion,
55-
log_source: &str,
55+
log_source: &LogSource,
5656
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
5757
let data = flatten_json_body(
5858
self.data,

src/event/format/mod.rs

+34-2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,38 @@ type Tags = String;
4343
type Metadata = String;
4444
type EventSchema = Vec<Arc<Field>>;
4545

46+
/// Source of the logs, used to perform special processing for certain sources
47+
#[derive(Default, Debug, Clone, PartialEq, Eq)]
48+
pub enum LogSource {
49+
// AWS Kinesis sends logs in the format of a json array
50+
Kinesis,
51+
// OpenTelemetry sends logs according to the specification as explained here
52+
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/logs/v1
53+
OtelLogs,
54+
// OpenTelemetry sends traces according to the specification as explained here
55+
// https://github.com/open-telemetry/opentelemetry-proto/blob/v1.0.0/opentelemetry/proto/trace/v1/trace.proto
56+
OtelMetrics,
57+
// OpenTelemetry sends traces according to the specification as explained here
58+
// https://github.com/open-telemetry/opentelemetry-proto/tree/v1.0.0/opentelemetry/proto/metrics/v1
59+
OtelTraces,
60+
#[default]
61+
// Json object or array
62+
Json,
63+
Custom(String),
64+
}
65+
66+
impl From<&str> for LogSource {
67+
fn from(s: &str) -> Self {
68+
match s {
69+
"kinesis" => LogSource::Kinesis,
70+
"otel-logs" => LogSource::OtelLogs,
71+
"otel-metrics" => LogSource::OtelMetrics,
72+
"otel-traces" => LogSource::OtelTraces,
73+
custom => LogSource::Custom(custom.to_owned()),
74+
}
75+
}
76+
}
77+
4678
// Global Trait for event format
4779
// This trait is implemented by all the event formats
4880
pub trait EventFormat: Sized {
@@ -54,7 +86,7 @@ pub trait EventFormat: Sized {
5486
static_schema_flag: Option<&String>,
5587
time_partition: Option<&String>,
5688
schema_version: SchemaVersion,
57-
log_source: &str,
89+
log_source: &LogSource,
5890
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;
5991

6092
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
@@ -65,7 +97,7 @@ pub trait EventFormat: Sized {
6597
static_schema_flag: Option<&String>,
6698
time_partition: Option<&String>,
6799
schema_version: SchemaVersion,
68-
log_source: &str,
100+
log_source: &LogSource,
69101
) -> Result<(RecordBatch, bool), AnyError> {
70102
let (data, mut schema, is_first, tags, metadata) = self.to_data(
71103
storage_schema,

src/handlers/http/ingest.rs

+70-30
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,14 @@ use super::logstream::error::{CreateStreamError, StreamError};
2020
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
2121
use super::users::dashboards::DashboardError;
2222
use super::users::filters::FiltersError;
23+
use crate::event::format::LogSource;
2324
use crate::event::{
2425
self,
2526
error::EventError,
2627
format::{self, EventFormat},
2728
};
2829
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
29-
use crate::handlers::{
30-
LOG_SOURCE_KEY, LOG_SOURCE_OTEL_LOGS, LOG_SOURCE_OTEL_METRICS, LOG_SOURCE_OTEL_TRACES,
31-
STREAM_NAME_HEADER_KEY,
32-
};
30+
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3331
use crate::metadata::error::stream_info::MetadataError;
3432
use crate::metadata::{SchemaVersion, STREAM_INFO};
3533
use crate::option::{Mode, CONFIG};
@@ -95,7 +93,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
9593
metadata: String::default(),
9694
};
9795
// For internal streams, use old schema
98-
event.into_recordbatch(&schema, None, None, SchemaVersion::V0, "")?
96+
event.into_recordbatch(&schema, None, None, SchemaVersion::V0, &LogSource::default())?
9997
};
10098
event::Event {
10199
rb,
@@ -127,8 +125,8 @@ pub async fn handle_otel_logs_ingestion(
127125
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
128126
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
129127
};
130-
let log_source = log_source.to_str().unwrap();
131-
if log_source != LOG_SOURCE_OTEL_LOGS {
128+
let log_source = LogSource::from(log_source.to_str().unwrap());
129+
if log_source != LogSource::OtelLogs {
132130
return Err(PostError::Invalid(anyhow::anyhow!(
133131
"Please use x-p-log-source: otel-logs for ingesting otel logs"
134132
)));
@@ -142,7 +140,7 @@ pub async fn handle_otel_logs_ingestion(
142140
let mut json = flatten_otel_logs(&logs);
143141
for record in json.iter_mut() {
144142
let body: Bytes = serde_json::to_vec(record).unwrap().into();
145-
push_logs(&stream_name, &req, &body, log_source).await?;
143+
push_logs(&stream_name, &req, &body, &log_source).await?;
146144
}
147145

148146
Ok(HttpResponse::Ok().finish())
@@ -161,8 +159,8 @@ pub async fn handle_otel_metrics_ingestion(
161159
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
162160
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
163161
};
164-
let log_source = log_source.to_str().unwrap();
165-
if log_source != LOG_SOURCE_OTEL_METRICS {
162+
let log_source = LogSource::from(log_source.to_str().unwrap());
163+
if log_source != LogSource::OtelMetrics {
166164
return Err(PostError::Invalid(anyhow::anyhow!(
167165
"Please use x-p-log-source: otel-metrics for ingesting otel metrics"
168166
)));
@@ -175,7 +173,7 @@ pub async fn handle_otel_metrics_ingestion(
175173
let mut json = flatten_otel_metrics(metrics);
176174
for record in json.iter_mut() {
177175
let body: Bytes = serde_json::to_vec(record).unwrap().into();
178-
push_logs(&stream_name, &req, &body, log_source).await?;
176+
push_logs(&stream_name, &req, &body, &log_source).await?;
179177
}
180178

181179
Ok(HttpResponse::Ok().finish())
@@ -195,8 +193,8 @@ pub async fn handle_otel_traces_ingestion(
195193
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
196194
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
197195
};
198-
let log_source = log_source.to_str().unwrap();
199-
if log_source != LOG_SOURCE_OTEL_TRACES {
196+
let log_source = LogSource::from(log_source.to_str().unwrap());
197+
if log_source != LogSource::OtelTraces {
200198
return Err(PostError::Invalid(anyhow::anyhow!(
201199
"Please use x-p-log-source: otel-traces for ingesting otel traces"
202200
)));
@@ -209,7 +207,7 @@ pub async fn handle_otel_traces_ingestion(
209207
let mut json = flatten_otel_traces(&traces);
210208
for record in json.iter_mut() {
211209
let body: Bytes = serde_json::to_vec(record).unwrap().into();
212-
push_logs(&stream_name, &req, &body, log_source).await?;
210+
push_logs(&stream_name, &req, &body, &log_source).await?;
213211
}
214212

215213
Ok(HttpResponse::Ok().finish())
@@ -371,7 +369,7 @@ mod tests {
371369
use serde_json::json;
372370

373371
use crate::{
374-
event,
372+
event::{self, format::LogSource},
375373
handlers::{http::modal::utils::ingest_utils::into_event_batch, PREFIX_META, PREFIX_TAGS},
376374
metadata::SchemaVersion,
377375
};
@@ -420,7 +418,7 @@ mod tests {
420418
None,
421419
None,
422420
SchemaVersion::V0,
423-
"",
421+
&LogSource::default(),
424422
)
425423
.unwrap();
426424

@@ -471,7 +469,7 @@ mod tests {
471469
None,
472470
None,
473471
SchemaVersion::V0,
474-
"",
472+
&LogSource::default(),
475473
)
476474
.unwrap();
477475

@@ -505,8 +503,16 @@ mod tests {
505503

506504
let req = TestRequest::default().to_http_request();
507505

508-
let (rb, _) =
509-
into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap();
506+
let (rb, _) = into_event_batch(
507+
&req,
508+
&json,
509+
schema,
510+
None,
511+
None,
512+
SchemaVersion::V0,
513+
&LogSource::default(),
514+
)
515+
.unwrap();
510516

511517
assert_eq!(rb.num_rows(), 1);
512518
assert_eq!(rb.num_columns(), 5);
@@ -538,7 +544,16 @@ mod tests {
538544

539545
let req = TestRequest::default().to_http_request();
540546

541-
assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").is_err());
547+
assert!(into_event_batch(
548+
&req,
549+
&json,
550+
schema,
551+
None,
552+
None,
553+
SchemaVersion::V0,
554+
&LogSource::default()
555+
)
556+
.is_err());
542557
}
543558

544559
#[test]
@@ -556,8 +571,16 @@ mod tests {
556571

557572
let req = TestRequest::default().to_http_request();
558573

559-
let (rb, _) =
560-
into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap();
574+
let (rb, _) = into_event_batch(
575+
&req,
576+
&json,
577+
schema,
578+
None,
579+
None,
580+
SchemaVersion::V0,
581+
&LogSource::default(),
582+
)
583+
.unwrap();
561584

562585
assert_eq!(rb.num_rows(), 1);
563586
assert_eq!(rb.num_columns(), 3);
@@ -576,7 +599,7 @@ mod tests {
576599
None,
577600
None,
578601
SchemaVersion::V0,
579-
""
602+
&LogSource::default()
580603
)
581604
.is_err())
582605
}
@@ -608,7 +631,7 @@ mod tests {
608631
None,
609632
None,
610633
SchemaVersion::V0,
611-
"",
634+
&LogSource::default(),
612635
)
613636
.unwrap();
614637

@@ -665,7 +688,7 @@ mod tests {
665688
None,
666689
None,
667690
SchemaVersion::V0,
668-
"",
691+
&LogSource::default(),
669692
)
670693
.unwrap();
671694

@@ -715,8 +738,16 @@ mod tests {
715738
);
716739
let req = TestRequest::default().to_http_request();
717740

718-
let (rb, _) =
719-
into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap();
741+
let (rb, _) = into_event_batch(
742+
&req,
743+
&json,
744+
schema,
745+
None,
746+
None,
747+
SchemaVersion::V0,
748+
&LogSource::default(),
749+
)
750+
.unwrap();
720751

721752
assert_eq!(rb.num_rows(), 3);
722753
assert_eq!(rb.num_columns(), 6);
@@ -765,7 +796,16 @@ mod tests {
765796
.into_iter(),
766797
);
767798

768-
assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").is_err());
799+
assert!(into_event_batch(
800+
&req,
801+
&json,
802+
schema,
803+
None,
804+
None,
805+
SchemaVersion::V0,
806+
&LogSource::default()
807+
)
808+
.is_err());
769809
}
770810

771811
#[test]
@@ -800,7 +840,7 @@ mod tests {
800840
None,
801841
None,
802842
SchemaVersion::V0,
803-
"",
843+
&LogSource::default(),
804844
)
805845
.unwrap();
806846

@@ -881,7 +921,7 @@ mod tests {
881921
None,
882922
None,
883923
SchemaVersion::V1,
884-
"",
924+
&LogSource::default(),
885925
)
886926
.unwrap();
887927

0 commit comments

Comments
 (0)