From 4d2897ea836cb088631cad61a871353b05c16b83 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 23 Jan 2025 08:57:28 +0530 Subject: [PATCH] refactor: ingestion data flow (#1100) * refactor: kinesis message construction may panic * refactor: replace `BTreeMap` with `serde_json::Map` * refactor: get rid of clone * refactor: use `Value` for JSON data * refactor: `HeaderMap::get` and `let Some else` * refacror: ingest utils don't need http context anymore * refactor: more descriptive error variants * refactor: PUT stream header extraction * refactor: use Path and Json extractor * don't extract where not required * refactor: serde `date_list` * refactor: serde `DefaultPrivilege` * refactor: serde `Dashboard` * refactor: serde `Filter` * refactor: move up `p_timestamp` addition to recordbatch * refactor: refer over clone * fix: don't hog write privileges * refactor: DRY stream writer creation * refactor: serde `StreamType` --- src/catalog/mod.rs | 5 +- src/event/format/mod.rs | 29 ++- src/event/mod.rs | 40 ++--- src/event/writer/mem_writer.rs | 6 +- src/event/writer/mod.rs | 138 ++++----------- src/handlers/http/cluster/mod.rs | 22 +-- src/handlers/http/ingest.rs | 166 +++++++++--------- src/handlers/http/kinesis.rs | 17 +- src/handlers/http/logstream.rs | 135 +++++++------- .../http/modal/ingest/ingestor_logstream.rs | 27 +-- .../http/modal/ingest/ingestor_role.rs | 14 +- .../http/modal/query/querier_ingest.rs | 10 +- .../http/modal/query/querier_logstream.rs | 28 +-- src/handlers/http/modal/query/querier_role.rs | 12 +- src/handlers/http/modal/utils/ingest_utils.rs | 75 ++++---- .../http/modal/utils/logstream_utils.rs | 111 ++++++------ src/handlers/http/role.rs | 13 +- src/handlers/http/users/dashboards.rs | 52 +++--- src/handlers/http/users/filters.rs | 54 +++--- src/kafka.rs | 7 +- src/metadata.rs | 2 +- src/otel/logs.rs | 26 +-- src/otel/metrics.rs | 82 +++++---- src/otel/otel_utils.rs | 33 ++-- src/otel/traces.rs | 47 +++-- src/storage/mod.rs | 12 +- src/storage/object_storage.rs | 6 +- src/utils/arrow/mod.rs | 39 +++- src/validator.rs | 7 +- 29 files changed, 597 insertions(+), 618 deletions(-) diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 0a07855f3..aa46afb4f 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -33,7 +33,6 @@ use crate::{ query::PartialTimeFilter, storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError}, }; -use bytes::Bytes; use chrono::{DateTime, Local, NaiveTime, Utc}; use relative_path::RelativePathBuf; use std::io::Error as IOError; @@ -412,13 +411,11 @@ pub async fn get_first_event( base_path_without_preceding_slash(), stream_name ); - // Convert dates vector to Bytes object - let dates_bytes = Bytes::from(serde_json::to_vec(&dates).unwrap()); let ingestor_first_event_at = handlers::http::cluster::send_retention_cleanup_request( &url, ingestor.clone(), - dates_bytes, + &dates, ) .await?; if !ingestor_first_event_at.is_empty() { diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 4032c92fa..9d83d2d48 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -19,6 +19,7 @@ use std::{ collections::{HashMap, HashSet}, + fmt::Display, sync::Arc, }; @@ -29,7 +30,10 @@ use chrono::DateTime; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::{metadata::SchemaVersion, utils::arrow::get_field}; +use crate::{ + metadata::SchemaVersion, + utils::arrow::{get_field, get_timestamp_array, replace_columns}, +}; use super::DEFAULT_TIMESTAMP_KEY; @@ -73,6 +77,20 @@ impl From<&str> for LogSource { } } +impl Display for LogSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + LogSource::Kinesis => "kinesis", + LogSource::OtelLogs => "otel-logs", + LogSource::OtelMetrics => "otel-metrics", + LogSource::OtelTraces => "otel-traces", + LogSource::Json => "json", + LogSource::Pmeta => "pmeta", + LogSource::Custom(custom) => custom, + }) + } +} + // Global Trait for event format // This trait is implemented by all the event formats pub trait EventFormat: Sized { @@ -126,7 +144,14 @@ pub trait EventFormat: Sized { } new_schema = update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); - let rb = Self::decode(data, new_schema.clone())?; + + let mut rb = Self::decode(data, new_schema.clone())?; + rb = replace_columns( + rb.schema(), + &rb, + &[0], + &[Arc::new(get_timestamp_array(rb.num_rows()))], + ); Ok((rb, is_first)) } diff --git a/src/event/mod.rs b/src/event/mod.rs index 2e9bc7359..0c2c1f6b9 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -28,7 +28,7 @@ use tracing::error; use self::error::EventError; pub use self::writer::STREAM_WRITERS; -use crate::{handlers::http::ingest::PostError, metadata, storage::StreamType}; +use crate::{metadata, storage::StreamType}; use chrono::NaiveDateTime; use std::collections::HashMap; @@ -49,7 +49,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub async fn process(&self) -> Result<(), EventError> { + pub async fn process(self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); @@ -69,10 +69,10 @@ impl Event { commit_schema(&self.stream_name, self.rb.schema())?; } - Self::process_event( + STREAM_WRITERS.append_to_local( &self.stream_name, &key, - self.rb.clone(), + &self.rb, self.parsed_timestamp, &self.custom_partition_values, &self.stream_type, @@ -98,44 +98,24 @@ impl Event { Ok(()) } - pub fn process_unchecked(&self) -> Result<(), PostError> { + pub fn process_unchecked(&self) -> Result<(), EventError> { let key = get_schema_key(&self.rb.schema().fields); - Self::process_event( + STREAM_WRITERS.append_to_local( &self.stream_name, &key, - self.rb.clone(), + &self.rb, self.parsed_timestamp, &self.custom_partition_values, &self.stream_type, - ) - .map_err(PostError::Event) + )?; + + Ok(()) } pub fn clear(&self, stream_name: &str) { STREAM_WRITERS.clear(stream_name); } - - // event process all events after the 1st event. Concatenates record batches - // and puts them in memory store for each event. - fn process_event( - stream_name: &str, - schema_key: &str, - rb: RecordBatch, - parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, - stream_type: &StreamType, - ) -> Result<(), EventError> { - STREAM_WRITERS.append_to_local( - stream_name, - schema_key, - rb, - parsed_timestamp, - custom_partition_values.clone(), - stream_type, - )?; - Ok(()) - } } pub fn get_schema_key(fields: &[Arc]) -> String { diff --git a/src/event/writer/mem_writer.rs b/src/event/writer/mem_writer.rs index 561f2c4e5..d24077333 100644 --- a/src/event/writer/mem_writer.rs +++ b/src/event/writer/mem_writer.rs @@ -50,7 +50,7 @@ impl Default for MemWriter { } impl MemWriter { - pub fn push(&mut self, schema_key: &str, rb: RecordBatch) { + pub fn push(&mut self, schema_key: &str, rb: &RecordBatch) { if !self.schema_map.contains(schema_key) { self.schema_map.insert(schema_key.to_owned()); self.schema = Schema::try_merge([self.schema.clone(), (*rb.schema()).clone()]).unwrap(); @@ -97,7 +97,7 @@ pub struct MutableBuffer { } impl MutableBuffer { - fn push(&mut self, rb: RecordBatch) -> Option> { + fn push(&mut self, rb: &RecordBatch) -> Option> { if self.rows + rb.num_rows() >= N { let left = N - self.rows; let right = rb.num_rows() - left; @@ -121,7 +121,7 @@ impl MutableBuffer { Some(inner) } else { self.rows += rb.num_rows(); - self.inner.push(rb); + self.inner.push(rb.clone()); None } } diff --git a/src/event/writer/mod.rs b/src/event/writer/mod.rs index 9efbc3fcc..895cd59ed 100644 --- a/src/event/writer/mod.rs +++ b/src/event/writer/mod.rs @@ -22,20 +22,18 @@ mod mem_writer; use std::{ collections::HashMap, - sync::{Arc, Mutex, RwLock, RwLockWriteGuard}, + sync::{Arc, Mutex, RwLock}, }; use crate::{ option::{Mode, CONFIG}, storage::StreamType, - utils, }; use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter}; -use arrow_array::{RecordBatch, TimestampMillisecondArray}; +use arrow_array::RecordBatch; use arrow_schema::Schema; use chrono::NaiveDateTime; -use chrono::Utc; use derive_more::{Deref, DerefMut}; use once_cell::sync::Lazy; @@ -52,21 +50,14 @@ impl Writer { &mut self, stream_name: &str, schema_key: &str, - rb: RecordBatch, + rb: &RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, ) -> Result<(), StreamWriterError> { - let rb = utils::arrow::replace_columns( - rb.schema(), - &rb, - &[0], - &[Arc::new(get_timestamp_array(rb.num_rows()))], - ); - self.disk.push( stream_name, schema_key, - &rb, + rb, parsed_timestamp, custom_partition_values, )?; @@ -74,7 +65,7 @@ impl Writer { Ok(()) } - fn push_mem(&mut self, schema_key: &str, rb: RecordBatch) -> Result<(), StreamWriterError> { + fn push_mem(&mut self, schema_key: &str, rb: &RecordBatch) -> Result<(), StreamWriterError> { self.mem.push(schema_key, rb); Ok(()) } @@ -84,62 +75,54 @@ impl Writer { pub struct WriterTable(RwLock>>); impl WriterTable { - // append to a existing stream + // Concatenates record batches and puts them in memory store for each event. pub fn append_to_local( &self, stream_name: &str, schema_key: &str, - record: RecordBatch, + record: &RecordBatch, parsed_timestamp: NaiveDateTime, - custom_partition_values: HashMap, + custom_partition_values: &HashMap, stream_type: &StreamType, ) -> Result<(), StreamWriterError> { - let hashmap_guard = self.read().unwrap(); + if !self.read().unwrap().contains_key(stream_name) { + // Gets write privileges only for inserting a writer + self.write() + .unwrap() + .insert(stream_name.to_owned(), Mutex::new(Writer::default())); + } + + // Updates the writer with only read privileges + self.handle_existing_writer( + stream_name, + schema_key, + record, + parsed_timestamp, + custom_partition_values, + stream_type, + )?; - match hashmap_guard.get(stream_name) { - Some(stream_writer) => { - self.handle_existing_writer( - stream_writer, - stream_name, - schema_key, - record, - parsed_timestamp, - &custom_partition_values, - stream_type, - )?; - } - None => { - drop(hashmap_guard); - let map = self.write().unwrap(); - // check for race condition - // if map contains entry then just - self.handle_missing_writer( - map, - stream_name, - schema_key, - record, - parsed_timestamp, - &custom_partition_values, - stream_type, - )?; - } - }; Ok(()) } - #[allow(clippy::too_many_arguments)] + /// Update writer for stream when it already exists fn handle_existing_writer( &self, - stream_writer: &Mutex, stream_name: &str, schema_key: &str, - record: RecordBatch, + record: &RecordBatch, parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, stream_type: &StreamType, ) -> Result<(), StreamWriterError> { + let hashmap_guard = self.read().unwrap(); + let mut writer = hashmap_guard + .get(stream_name) + .expect("Stream exists") + .lock() + .unwrap(); if CONFIG.options.mode != Mode::Query || *stream_type == StreamType::Internal { - stream_writer.lock().unwrap().push( + writer.push( stream_name, schema_key, record, @@ -147,61 +130,12 @@ impl WriterTable { custom_partition_values, )?; } else { - stream_writer - .lock() - .unwrap() - .push_mem(stream_name, record)?; + writer.push_mem(stream_name, record)?; } Ok(()) } - #[allow(clippy::too_many_arguments)] - fn handle_missing_writer( - &self, - mut map: RwLockWriteGuard>>, - stream_name: &str, - schema_key: &str, - record: RecordBatch, - parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, - stream_type: &StreamType, - ) -> Result<(), StreamWriterError> { - match map.get(stream_name) { - Some(writer) => { - if CONFIG.options.mode != Mode::Query || *stream_type == StreamType::Internal { - writer.lock().unwrap().push( - stream_name, - schema_key, - record, - parsed_timestamp, - custom_partition_values, - )?; - } else { - writer.lock().unwrap().push_mem(stream_name, record)?; - } - } - None => { - if CONFIG.options.mode != Mode::Query || *stream_type == StreamType::Internal { - let mut writer = Writer::default(); - writer.push( - stream_name, - schema_key, - record, - parsed_timestamp, - custom_partition_values, - )?; - map.insert(stream_name.to_owned(), Mutex::new(writer)); - } else { - let mut writer = Writer::default(); - writer.push_mem(schema_key, record)?; - map.insert(stream_name.to_owned(), Mutex::new(writer)); - } - } - } - Ok(()) - } - pub fn clear(&self, stream_name: &str) { let map = self.write().unwrap(); if let Some(writer) = map.get(stream_name) { @@ -243,10 +177,6 @@ impl WriterTable { } } -fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { - TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) -} - pub mod errors { #[derive(Debug, thiserror::Error)] diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 4e936c79d..91bdf288c 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -35,7 +35,8 @@ use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY}; use crate::HTTP_CLIENT; use actix_web::http::header::{self, HeaderMap}; -use actix_web::{HttpRequest, Responder}; +use actix_web::web::Path; +use actix_web::Responder; use bytes::Bytes; use chrono::Utc; use http::{header as http_header, StatusCode}; @@ -321,19 +322,13 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), // forward the put role request to all ingestors to keep them in sync pub async fn sync_role_update_with_ingestors( name: String, - body: Vec, + privileges: Vec, ) -> Result<(), RoleError> { let ingestor_infos = get_ingestor_info().await.map_err(|err| { error!("Fatal: failed to get ingestor info: {:?}", err); RoleError::Anyhow(err) })?; - let roles = to_vec(&body).map_err(|err| { - error!("Fatal: failed to serialize roles: {:?}", err); - RoleError::SerdeError(err) - })?; - let roles = Bytes::from(roles); - for ingestor in ingestor_infos.iter() { if !utils::check_liveness(&ingestor.domain_name).await { warn!("Ingestor {} is not live", ingestor.domain_name); @@ -350,7 +345,7 @@ pub async fn sync_role_update_with_ingestors( .put(url) .header(header::AUTHORIZATION, &ingestor.token) .header(header::CONTENT_TYPE, "application/json") - .body(roles.clone()) + .json(&privileges) .send() .await .map_err(|err| { @@ -538,7 +533,7 @@ pub async fn send_stream_delete_request( pub async fn send_retention_cleanup_request( url: &str, ingestor: IngestorMetadata, - body: Bytes, + dates: &Vec, ) -> Result { let mut first_event_at: String = String::default(); if !utils::check_liveness(&ingestor.domain_name).await { @@ -548,7 +543,7 @@ pub async fn send_retention_cleanup_request( .post(url) .header(header::CONTENT_TYPE, "application/json") .header(header::AUTHORIZATION, ingestor.token) - .body(body) + .json(dates) .send() .await .map_err(|err| { @@ -676,9 +671,8 @@ pub async fn get_ingestor_info() -> anyhow::Result { Ok(arr) } -pub async fn remove_ingestor(req: HttpRequest) -> Result { - let domain_name: String = req.match_info().get("ingestor").unwrap().parse().unwrap(); - let domain_name = to_url_string(domain_name); +pub async fn remove_ingestor(ingestor: Path) -> Result { + let domain_name = to_url_string(ingestor.into_inner()); if check_liveness(&domain_name).await { return Err(PostError::Invalid(anyhow::anyhow!( diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 8b437f862..b3da07761 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -37,13 +37,13 @@ use crate::otel::traces::flatten_otel_traces; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::flatten::JsonFlattenError; +use actix_web::web::{Json, Path}; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_array::RecordBatch; use arrow_schema::Schema; use bytes::Bytes; use chrono::Utc; use http::StatusCode; -use nom::AsBytes; use opentelemetry_proto::tonic::logs::v1::LogsData; use opentelemetry_proto::tonic::metrics::v1::MetricsData; use opentelemetry_proto::tonic::trace::v1::TracesData; @@ -54,32 +54,27 @@ use std::sync::Arc; // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist -pub async fn ingest(req: HttpRequest, body: Bytes) -> Result { - if let Some((_, stream_name)) = req - .headers() - .iter() - .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) - { - let stream_name = stream_name.to_str().unwrap().to_owned(); - let internal_stream_names = STREAM_INFO.list_internal_streams(); - if internal_stream_names.contains(&stream_name) { - return Err(PostError::Invalid(anyhow::anyhow!( - "The stream {} is reserved for internal use and cannot be ingested into", - stream_name - ))); - } - create_stream_if_not_exists( - &stream_name, - &StreamType::UserDefined.to_string(), - LogSource::default(), - ) - .await?; +pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result { + let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { + return Err(PostError::Header(ParseHeaderError::MissingStreamName)); + }; - flatten_and_push_logs(req, body, &stream_name).await?; - Ok(HttpResponse::Ok().finish()) - } else { - Err(PostError::Header(ParseHeaderError::MissingStreamName)) + let stream_name = stream_name.to_str().unwrap().to_owned(); + let internal_stream_names = STREAM_INFO.list_internal_streams(); + if internal_stream_names.contains(&stream_name) { + return Err(PostError::InternalStream(stream_name)); } + create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::default()) + .await?; + + let log_source = req + .headers() + .get(LOG_SOURCE_KEY) + .and_then(|h| h.to_str().ok()) + .map_or(LogSource::default(), LogSource::from); + flatten_and_push_logs(json, &stream_name, &log_source).await?; + + Ok(HttpResponse::Ok().finish()) } pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> { @@ -118,7 +113,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< // creates if stream does not exist pub async fn handle_otel_logs_ingestion( req: HttpRequest, - body: Bytes, + Json(json): Json, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -129,25 +124,16 @@ pub async fn handle_otel_logs_ingestion( }; let log_source = LogSource::from(log_source.to_str().unwrap()); if log_source != LogSource::OtelLogs { - return Err(PostError::Invalid(anyhow::anyhow!( - "Please use x-p-log-source: otel-logs for ingesting otel logs" - ))); + return Err(PostError::IncorrectLogSource(LogSource::OtelLogs)); } let stream_name = stream_name.to_str().unwrap().to_owned(); - create_stream_if_not_exists( - &stream_name, - &StreamType::UserDefined.to_string(), - LogSource::OtelLogs, - ) - .await?; + create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs).await?; //custom flattening required for otel logs - let logs: LogsData = serde_json::from_slice(body.as_bytes())?; - let mut json = flatten_otel_logs(&logs); - for record in json.iter_mut() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(&stream_name, &body, &log_source).await?; + let logs: LogsData = serde_json::from_value(json)?; + for record in flatten_otel_logs(&logs) { + push_logs(&stream_name, record, &log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -158,7 +144,7 @@ pub async fn handle_otel_logs_ingestion( // creates if stream does not exist pub async fn handle_otel_metrics_ingestion( req: HttpRequest, - body: Bytes, + Json(json): Json, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -168,24 +154,20 @@ pub async fn handle_otel_metrics_ingestion( }; let log_source = LogSource::from(log_source.to_str().unwrap()); if log_source != LogSource::OtelMetrics { - return Err(PostError::Invalid(anyhow::anyhow!( - "Please use x-p-log-source: otel-metrics for ingesting otel metrics" - ))); + return Err(PostError::IncorrectLogSource(LogSource::OtelMetrics)); } let stream_name = stream_name.to_str().unwrap().to_owned(); create_stream_if_not_exists( &stream_name, - &StreamType::UserDefined.to_string(), + StreamType::UserDefined, LogSource::OtelMetrics, ) .await?; //custom flattening required for otel metrics - let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?; - let mut json = flatten_otel_metrics(metrics); - for record in json.iter_mut() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(&stream_name, &body, &log_source).await?; + let metrics: MetricsData = serde_json::from_value(json)?; + for record in flatten_otel_metrics(metrics) { + push_logs(&stream_name, record, &log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -196,7 +178,7 @@ pub async fn handle_otel_metrics_ingestion( // creates if stream does not exist pub async fn handle_otel_traces_ingestion( req: HttpRequest, - body: Bytes, + Json(json): Json, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -207,24 +189,16 @@ pub async fn handle_otel_traces_ingestion( }; let log_source = LogSource::from(log_source.to_str().unwrap()); if log_source != LogSource::OtelTraces { - return Err(PostError::Invalid(anyhow::anyhow!( - "Please use x-p-log-source: otel-traces for ingesting otel traces" - ))); + return Err(PostError::IncorrectLogSource(LogSource::OtelTraces)); } let stream_name = stream_name.to_str().unwrap().to_owned(); - create_stream_if_not_exists( - &stream_name, - &StreamType::UserDefined.to_string(), - LogSource::OtelTraces, - ) - .await?; + create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces) + .await?; //custom flattening required for otel traces - let traces: TracesData = serde_json::from_slice(body.as_bytes())?; - let mut json = flatten_otel_traces(&traces); - for record in json.iter_mut() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(&stream_name, &body, &log_source).await?; + let traces: TracesData = serde_json::from_value(json)?; + for record in flatten_otel_traces(&traces) { + push_logs(&stream_name, record, &log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -233,14 +207,15 @@ pub async fn handle_otel_traces_ingestion( // Handler for POST /api/v1/logstream/{logstream} // only ingests events into the specified logstream // fails if the logstream does not exist -pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn post_event( + req: HttpRequest, + stream_name: Path, + Json(json): Json, +) -> Result { + let stream_name = stream_name.into_inner(); let internal_stream_names = STREAM_INFO.list_internal_streams(); if internal_stream_names.contains(&stream_name) { - return Err(PostError::Invalid(anyhow::anyhow!( - "Stream {} is an internal stream and cannot be ingested into", - stream_name - ))); + return Err(PostError::InternalStream(stream_name)); } if !STREAM_INFO.stream_exists(&stream_name) { // For distributed deployments, if the stream not found in memory map, @@ -256,7 +231,13 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result { let mut stream_exists = false; @@ -347,6 +328,18 @@ pub enum PostError { StreamError(#[from] StreamError), #[error("Error: {0}")] JsonFlattenError(#[from] JsonFlattenError), + #[error( + "Use the endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces" + )] + OtelNotSupported, + #[error("The stream {0} is reserved for internal use and cannot be ingested into")] + InternalStream(String), + #[error(r#"Please use "x-p-log-source: {0}" for ingesting otel logs"#)] + IncorrectLogSource(LogSource), + #[error("Ingestion is not allowed in Query mode")] + IngestionNotAllowed, + #[error("Missing field for time partition in json: {0}")] + MissingTimePartition(String), } impl actix_web::ResponseError for PostError { @@ -369,6 +362,11 @@ impl actix_web::ResponseError for PostError { PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::JsonFlattenError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::OtelNotSupported => StatusCode::BAD_REQUEST, + PostError::InternalStream(_) => StatusCode::BAD_REQUEST, + PostError::IncorrectLogSource(_) => StatusCode::BAD_REQUEST, + PostError::IngestionNotAllowed => StatusCode::BAD_REQUEST, + PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST, } } @@ -427,7 +425,7 @@ mod tests { }); let (rb, _) = - into_event_batch(&json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 4); @@ -454,7 +452,7 @@ mod tests { }); let (rb, _) = - into_event_batch(&json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -484,7 +482,7 @@ mod tests { .into_iter(), ); - let (rb, _) = into_event_batch(&json, schema, false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -514,7 +512,7 @@ mod tests { .into_iter(), ); - assert!(into_event_batch(&json, schema, false, None, SchemaVersion::V0,).is_err()); + assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); } #[test] @@ -530,7 +528,7 @@ mod tests { .into_iter(), ); - let (rb, _) = into_event_batch(&json, schema, false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 1); @@ -570,7 +568,7 @@ mod tests { ]); let (rb, _) = - into_event_batch(&json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -617,7 +615,7 @@ mod tests { ]); let (rb, _) = - into_event_batch(&json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -664,7 +662,7 @@ mod tests { .into_iter(), ); - let (rb, _) = into_event_batch(&json, schema, false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -711,7 +709,7 @@ mod tests { .into_iter(), ); - assert!(into_event_batch(&json, schema, false, None, SchemaVersion::V0,).is_err()); + assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); } #[test] @@ -750,7 +748,7 @@ mod tests { .unwrap(); let (rb, _) = into_event_batch( - &flattened_json, + flattened_json, HashMap::default(), false, None, @@ -838,7 +836,7 @@ mod tests { .unwrap(); let (rb, _) = into_event_batch( - &flattened_json, + flattened_json, HashMap::default(), false, None, diff --git a/src/handlers/http/kinesis.rs b/src/handlers/http/kinesis.rs index 084f686cd..e2f245f73 100644 --- a/src/handlers/http/kinesis.rs +++ b/src/handlers/http/kinesis.rs @@ -17,15 +17,13 @@ */ use base64::{engine::general_purpose::STANDARD, Engine as _}; -use bytes::Bytes; use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::collections::BTreeMap; +use serde_json::{Map, Value}; use std::str; #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] -struct Message { +pub struct Message { records: Vec, request_id: String, timestamp: u64, @@ -59,16 +57,14 @@ struct Data { // "requestId": "b858288a-f5d8-4181-a746-3f3dd716be8a", // "timestamp": "1704964113659" // } -pub fn flatten_kinesis_logs(body: &Bytes) -> Vec> { - let body_str = std::str::from_utf8(body).unwrap(); - let message: Message = serde_json::from_str(body_str).unwrap(); - let mut vec_kinesis_json: Vec> = Vec::new(); +pub fn flatten_kinesis_logs(message: Message) -> Vec { + let mut vec_kinesis_json = Vec::new(); for record in message.records.iter() { let bytes = STANDARD.decode(record.data.clone()).unwrap(); let json_string: String = String::from_utf8(bytes).unwrap(); let json: serde_json::Value = serde_json::from_str(&json_string).unwrap(); - let mut kinesis_json: BTreeMap = match serde_json::from_value(json) { + let mut kinesis_json: Map = match serde_json::from_value(json) { Ok(value) => value, Err(error) => panic!("Failed to deserialize JSON: {}", error), }; @@ -82,7 +78,8 @@ pub fn flatten_kinesis_logs(body: &Bytes) -> Vec> { Value::String(message.timestamp.to_string()), ); - vec_kinesis_json.push(kinesis_json); + vec_kinesis_json.push(Value::Object(kinesis_json)); } + vec_kinesis_json } diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 7eac4e822..e106a2de9 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -43,6 +43,7 @@ use crate::{event, stats}; use crate::{metadata, validator}; use actix_web::http::header::{self, HeaderMap}; use actix_web::http::StatusCode; +use actix_web::web::{Json, Path}; use actix_web::{web, HttpRequest, Responder}; use arrow_json::reader::infer_json_schema_from_iterator; use arrow_schema::{Field, Schema}; @@ -58,8 +59,8 @@ use std::str::FromStr; use std::sync::Arc; use tracing::{error, warn}; -pub async fn delete(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn delete(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { return Err(StreamError::StreamNotFound(stream_name)); } @@ -113,9 +114,8 @@ pub async fn list(req: HttpRequest) -> Result { Ok(web::Json(res)) } -pub async fn detect_schema(body: Bytes) -> Result { - let body_val: Value = serde_json::from_slice(&body)?; - let log_records: Vec = match body_val { +pub async fn detect_schema(Json(json): Json) -> Result { + let log_records: Vec = match json { Value::Array(arr) => arr, value @ Value::Object(_) => vec![value], _ => { @@ -133,8 +133,8 @@ pub async fn detect_schema(body: Bytes) -> Result { Ok((web::Json(schema), StatusCode::OK)) } -pub async fn schema(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn schema(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); match STREAM_INFO.schema(&stream_name) { Ok(_) => {} @@ -157,8 +157,8 @@ pub async fn schema(req: HttpRequest) -> Result { } } -pub async fn get_alert(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn get_alert(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); let alerts = metadata::STREAM_INFO .read() @@ -190,24 +190,26 @@ pub async fn get_alert(req: HttpRequest) -> Result Ok((web::Json(alerts), StatusCode::OK)) } -pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn put_stream( + req: HttpRequest, + stream_name: Path, + body: Bytes, +) -> Result { + let stream_name = stream_name.into_inner(); - create_update_stream(&req, &body, &stream_name).await?; + create_update_stream(req.headers(), &body, &stream_name).await?; Ok(("Log stream created", StatusCode::OK)) } pub async fn put_alert( - req: HttpRequest, - body: web::Json, + stream_name: Path, + Json(mut json): Json, ) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let stream_name = stream_name.into_inner(); - let mut body = body.into_inner(); - remove_id_from_alerts(&mut body); - - let alerts: Alerts = match serde_json::from_value(body) { + remove_id_from_alerts(&mut json); + let alerts: Alerts = match serde_json::from_value(json) { Ok(alerts) => alerts, Err(err) => { return Err(StreamError::BadAlertJson { @@ -265,8 +267,8 @@ pub async fn put_alert( )) } -pub async fn get_retention(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn get_retention(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { // For query mode, if the stream not found in memory map, //check if it exists in the storage @@ -295,10 +297,10 @@ pub async fn get_retention(req: HttpRequest) -> Result, + stream_name: Path, + Json(json): Json, ) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { // For query mode, if the stream not found in memory map, @@ -314,9 +316,7 @@ pub async fn put_retention( } } - let body = body.into_inner(); - - let retention: Retention = match serde_json::from_value(body) { + let retention: Retention = match serde_json::from_value(json) { Ok(retention) => retention, Err(err) => return Err(StreamError::InvalidRetentionConfig(err)), }; @@ -361,8 +361,11 @@ pub async fn get_stats_date(stream_name: &str, date: &str) -> Result Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn get_stats( + req: HttpRequest, + stream_name: Path, +) -> Result { + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { // For query mode, if the stream not found in memory map, @@ -380,8 +383,9 @@ pub async fn get_stats(req: HttpRequest) -> Result let query_string = req.query_string(); if !query_string.is_empty() { - let date_key = query_string.split('=').collect::>()[0]; - let date_value = query_string.split('=').collect::>()[1]; + let tokens = query_string.split('=').collect::>(); + let date_key = tokens[0]; + let date_value = tokens[1]; if date_key != "date" { return Err(StreamError::Custom { msg: "Invalid query parameter".to_string(), @@ -492,11 +496,11 @@ pub async fn create_stream( custom_partition: &str, static_schema_flag: bool, schema: Arc, - stream_type: &str, + stream_type: StreamType, log_source: LogSource, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name - if stream_type != StreamType::Internal.to_string() { + if stream_type != StreamType::Internal { validator::stream_name(&stream_name, stream_type)?; } // Proceed to create log stream if it doesn't exist @@ -546,8 +550,8 @@ pub async fn create_stream( Ok(()) } -pub async fn get_stream_info(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn get_stream_info(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { if CONFIG.options.mode == Mode::Query { match create_stream_and_schema_from_storage(&stream_name).await { @@ -596,10 +600,10 @@ pub async fn get_stream_info(req: HttpRequest) -> Result, + stream_name: Path, + Json(json): Json, ) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { // For query mode, if the stream not found in memory map, //check if it exists in the storage @@ -624,8 +628,7 @@ pub async fn put_stream_hot_tier( return Err(StreamError::HotTierNotEnabled(stream_name)); } - let body = body.into_inner(); - let mut hottier: StreamHotTier = match serde_json::from_value(body) { + let mut hottier: StreamHotTier = match serde_json::from_value(json) { Ok(hottier) => hottier, Err(err) => return Err(StreamError::InvalidHotTierConfig(err)), }; @@ -657,8 +660,8 @@ pub async fn put_stream_hot_tier( )) } -pub async fn get_stream_hot_tier(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn get_stream_hot_tier(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { // For query mode, if the stream not found in memory map, @@ -692,8 +695,10 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn delete_stream_hot_tier( + stream_name: Path, +) -> Result { + let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { // For query mode, if the stream not found in memory map, @@ -730,12 +735,9 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result Result<(), StreamError> { - if let Ok(stream_exists) = create_stream_if_not_exists( - INTERNAL_STREAM_NAME, - &StreamType::Internal.to_string(), - LogSource::Pmeta, - ) - .await + if let Ok(stream_exists) = + create_stream_if_not_exists(INTERNAL_STREAM_NAME, StreamType::Internal, LogSource::Pmeta) + .await { if stream_exists { return Ok(()); @@ -903,23 +905,24 @@ pub mod error { mod tests { use crate::handlers::http::logstream::error::StreamError; use crate::handlers::http::logstream::get_stats; - use crate::handlers::http::modal::utils::logstream_utils::fetch_headers_from_put_stream_request; + use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders; use actix_web::test::TestRequest; + use actix_web::web; use anyhow::bail; - #[actix_web::test] - #[should_panic] - async fn get_stats_panics_without_logstream() { - let req = TestRequest::default().to_http_request(); - let _ = get_stats(req).await; - } + + // TODO: Fix this test with routes + // #[actix_web::test] + // #[should_panic] + // async fn get_stats_panics_without_logstream() { + // let req = TestRequest::default().to_http_request(); + // let _ = get_stats(req).await; + // } #[actix_web::test] async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> { - let req = TestRequest::default() - .param("logstream", "test") - .to_http_request(); + let req = TestRequest::default().to_http_request(); - match get_stats(req).await { + match get_stats(req, web::Path::from("test".to_string())).await { Err(StreamError::StreamNotFound(_)) => Ok(()), _ => bail!("expected StreamNotFound error"), } @@ -928,7 +931,7 @@ mod tests { #[actix_web::test] async fn header_without_log_source() { let req = TestRequest::default().to_http_request(); - let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + let PutStreamHeaders { log_source, .. } = req.headers().into(); assert_eq!(log_source, crate::event::format::LogSource::Json); } @@ -937,19 +940,19 @@ mod tests { let mut req = TestRequest::default() .insert_header(("X-P-Log-Source", "pmeta")) .to_http_request(); - let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + let PutStreamHeaders { log_source, .. } = req.headers().into(); assert_eq!(log_source, crate::event::format::LogSource::Pmeta); req = TestRequest::default() .insert_header(("X-P-Log-Source", "otel-logs")) .to_http_request(); - let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + let PutStreamHeaders { log_source, .. } = req.headers().into(); assert_eq!(log_source, crate::event::format::LogSource::OtelLogs); req = TestRequest::default() .insert_header(("X-P-Log-Source", "kinesis")) .to_http_request(); - let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + let PutStreamHeaders { log_source, .. } = req.headers().into(); assert_eq!(log_source, crate::event::format::LogSource::Kinesis); } @@ -958,7 +961,7 @@ mod tests { let req = TestRequest::default() .insert_header(("X-P-Log-Source", "teststream")) .to_http_request(); - let (_, _, _, _, _, _, log_source) = fetch_headers_from_put_stream_request(&req); + let PutStreamHeaders { log_source, .. } = req.headers().into(); assert_eq!(log_source, crate::event::format::LogSource::Json); } } diff --git a/src/handlers/http/modal/ingest/ingestor_logstream.rs b/src/handlers/http/modal/ingest/ingestor_logstream.rs index 3f0e5292d..40efce1a4 100644 --- a/src/handlers/http/modal/ingest/ingestor_logstream.rs +++ b/src/handlers/http/modal/ingest/ingestor_logstream.rs @@ -16,7 +16,10 @@ * */ -use actix_web::{HttpRequest, Responder}; +use actix_web::{ + web::{Json, Path}, + HttpRequest, Responder, +}; use bytes::Bytes; use http::StatusCode; use tracing::warn; @@ -36,10 +39,10 @@ use crate::{ }; pub async fn retention_cleanup( - req: HttpRequest, - body: Bytes, + stream_name: Path, + Json(date_list): Json>, ) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let stream_name = stream_name.into_inner(); let storage = CONFIG.storage().get_object_store(); // if the stream not found in memory map, //check if it exists in the storage @@ -52,15 +55,14 @@ pub async fn retention_cleanup( return Err(StreamError::StreamNotFound(stream_name.clone())); } - let date_list: Vec = serde_json::from_slice(&body).unwrap(); let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await; let first_event_at: Option = res.unwrap_or_default(); Ok((first_event_at, StatusCode::OK)) } -pub async fn delete(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn delete(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); // if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage @@ -80,10 +82,13 @@ pub async fn delete(req: HttpRequest) -> Result { Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } -pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - - create_update_stream(&req, &body, &stream_name).await?; +pub async fn put_stream( + req: HttpRequest, + stream_name: Path, + body: Bytes, +) -> Result { + let stream_name = stream_name.into_inner(); + create_update_stream(req.headers(), &body, &stream_name).await?; Ok(("Log stream created", StatusCode::OK)) } diff --git a/src/handlers/http/modal/ingest/ingestor_role.rs b/src/handlers/http/modal/ingest/ingestor_role.rs index 499157136..d48b9efdf 100644 --- a/src/handlers/http/modal/ingest/ingestor_role.rs +++ b/src/handlers/http/modal/ingest/ingestor_role.rs @@ -16,8 +16,10 @@ * */ -use actix_web::{web, HttpResponse, Responder}; -use bytes::Bytes; +use actix_web::{ + web::{self, Json}, + HttpResponse, Responder, +}; use crate::{ handlers::http::{modal::utils::rbac_utils::get_metadata, role::RoleError}, @@ -27,14 +29,16 @@ use crate::{ // Handler for PUT /api/v1/role/{name} // Creates a new role or update existing one -pub async fn put(name: web::Path, body: Bytes) -> Result { +pub async fn put( + name: web::Path, + Json(privileges): Json>, +) -> Result { let name = name.into_inner(); - let privileges = serde_json::from_slice::>(&body)?; let mut metadata = get_metadata().await?; metadata.roles.insert(name.clone(), privileges.clone()); let _ = storage::put_staging_metadata(&metadata); - mut_roles().insert(name.clone(), privileges.clone()); + mut_roles().insert(name.clone(), privileges); Ok(HttpResponse::Ok().finish()) } diff --git a/src/handlers/http/modal/query/querier_ingest.rs b/src/handlers/http/modal/query/querier_ingest.rs index 1eff3999a..6b74edafb 100644 --- a/src/handlers/http/modal/query/querier_ingest.rs +++ b/src/handlers/http/modal/query/querier_ingest.rs @@ -16,16 +16,14 @@ * */ +use actix_web::HttpResponse; + use crate::handlers::http::ingest::PostError; -use actix_web::{HttpRequest, HttpResponse}; -use bytes::Bytes; // Handler for POST /api/v1/logstream/{logstream} // only ingests events into the specified logstream // fails if the logstream does not exist #[allow(unused)] -pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { - Err(PostError::Invalid(anyhow::anyhow!( - "Ingestion is not allowed in Query mode" - ))) +pub async fn post_event() -> Result { + Err(PostError::IngestionNotAllowed) } diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 58277f7b8..622383964 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -19,7 +19,10 @@ use core::str; use std::fs; -use actix_web::{web, HttpRequest, Responder}; +use actix_web::{ + web::{self, Path}, + HttpRequest, Responder, +}; use bytes::Bytes; use chrono::Utc; use http::StatusCode; @@ -49,8 +52,8 @@ use crate::{ storage::{StorageDir, StreamType}, }; -pub async fn delete(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); +pub async fn delete(stream_name: Path) -> Result { + let stream_name = stream_name.into_inner(); // if the stream not found in memory map, //check if it exists in the storage @@ -106,20 +109,25 @@ pub async fn delete(req: HttpRequest) -> Result { Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } -pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - +pub async fn put_stream( + req: HttpRequest, + stream_name: Path, + body: Bytes, +) -> Result { + let stream_name = stream_name.into_inner(); let _ = CREATE_STREAM_LOCK.lock().await; - let headers = create_update_stream(&req, &body, &stream_name).await?; + let headers = create_update_stream(req.headers(), &body, &stream_name).await?; sync_streams_with_ingestors(headers, body, &stream_name).await?; Ok(("Log stream created", StatusCode::OK)) } -pub async fn get_stats(req: HttpRequest) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - +pub async fn get_stats( + req: HttpRequest, + stream_name: Path, +) -> Result { + let stream_name = stream_name.into_inner(); // if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage diff --git a/src/handlers/http/modal/query/querier_role.rs b/src/handlers/http/modal/query/querier_role.rs index b9930579c..b8b6f4639 100644 --- a/src/handlers/http/modal/query/querier_role.rs +++ b/src/handlers/http/modal/query/querier_role.rs @@ -16,8 +16,10 @@ * */ -use actix_web::{web, HttpResponse, Responder}; -use bytes::Bytes; +use actix_web::{ + web::{self, Json}, + HttpResponse, Responder, +}; use crate::{ handlers::http::{ @@ -30,9 +32,11 @@ use crate::{ // Handler for PUT /api/v1/role/{name} // Creates a new role or update existing one -pub async fn put(name: web::Path, body: Bytes) -> Result { +pub async fn put( + name: web::Path, + Json(privileges): Json>, +) -> Result { let name = name.into_inner(); - let privileges = serde_json::from_slice::>(&body)?; let mut metadata = get_metadata().await?; metadata.roles.insert(name.clone(), privileges.clone()); diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 8a13bcefd..3a2b9c797 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,10 +16,7 @@ * */ -use actix_web::HttpRequest; -use anyhow::anyhow; use arrow_schema::Field; -use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, Utc}; use itertools::Itertools; use serde_json::Value; @@ -27,12 +24,12 @@ use std::{collections::HashMap, sync::Arc}; use crate::{ event::{ - format::{self, EventFormat, LogSource}, + format::{json, EventFormat, LogSource}, Event, }, - handlers::{ - http::{ingest::PostError, kinesis}, - LOG_SOURCE_KEY, + handlers::http::{ + ingest::PostError, + kinesis::{flatten_kinesis_logs, Message}, }, metadata::{SchemaVersion, STREAM_INFO}, storage::StreamType, @@ -40,32 +37,23 @@ use crate::{ }; pub async fn flatten_and_push_logs( - req: HttpRequest, - body: Bytes, + json: Value, stream_name: &str, + log_source: &LogSource, ) -> Result<(), PostError> { - let log_source = req - .headers() - .get(LOG_SOURCE_KEY) - .map(|h| h.to_str().unwrap_or("")) - .map(LogSource::from) - .unwrap_or_default(); - match log_source { LogSource::Kinesis => { - let json = kinesis::flatten_kinesis_logs(&body); - for record in json.iter() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(stream_name, &body, &LogSource::default()).await?; + let message: Message = serde_json::from_value(json)?; + let json = flatten_kinesis_logs(message); + for record in json { + push_logs(stream_name, record, &LogSource::default()).await?; } } LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { - return Err(PostError::Invalid(anyhow!( - "Please use endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces" - ))); + return Err(PostError::OtelNotSupported); } _ => { - push_logs(stream_name, &body, &log_source).await?; + push_logs(stream_name, json, log_source).await?; } } Ok(()) @@ -73,7 +61,7 @@ pub async fn flatten_and_push_logs( pub async fn push_logs( stream_name: &str, - body: &Bytes, + json: Value, log_source: &LogSource, ) -> Result<(), PostError> { let time_partition = STREAM_INFO.get_time_partition(stream_name)?; @@ -81,11 +69,10 @@ pub async fn push_logs( let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?; let custom_partition = STREAM_INFO.get_custom_partition(stream_name)?; let schema_version = STREAM_INFO.get_schema_version(stream_name)?; - let body_val: Value = serde_json::from_slice(body)?; let data = if time_partition.is_some() || custom_partition.is_some() { convert_array_to_object( - body_val, + json, time_partition.as_ref(), time_partition_limit, custom_partition.as_ref(), @@ -94,7 +81,7 @@ pub async fn push_logs( )? } else { vec![convert_to_array(convert_array_to_object( - body_val, + json, None, None, None, @@ -124,7 +111,7 @@ pub async fn push_logs( .schema .clone(); let (rb, is_first_event) = into_event_batch( - &value, + value, schema, static_schema_flag, time_partition.as_ref(), @@ -149,27 +136,28 @@ pub async fn push_logs( } pub fn into_event_batch( - body: &Value, + data: Value, schema: HashMap>, static_schema_flag: bool, time_partition: Option<&String>, schema_version: SchemaVersion, ) -> Result<(arrow_array::RecordBatch, bool), PostError> { - let event = format::json::Event { - data: body.to_owned(), - }; - let (rb, is_first) = - event.into_recordbatch(&schema, static_schema_flag, time_partition, schema_version)?; + let (rb, is_first) = json::Event { data }.into_recordbatch( + &schema, + static_schema_flag, + time_partition, + schema_version, + )?; Ok((rb, is_first)) } pub fn get_custom_partition_values( - body: &Value, + json: &Value, custom_partition_list: &[&str], ) -> HashMap { let mut custom_partition_values: HashMap = HashMap::new(); for custom_partition_field in custom_partition_list { - let custom_partition_value = body.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned(); let custom_partition_value = match custom_partition_value { e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), Value::String(s) => s, @@ -183,13 +171,10 @@ pub fn get_custom_partition_values( custom_partition_values } -fn get_parsed_timestamp(body: &Value, time_partition: &str) -> Result { - let current_time = body.get(time_partition).ok_or_else(|| { - anyhow!( - "Missing field for time partition from json: {:?}", - time_partition - ) - })?; +fn get_parsed_timestamp(json: &Value, time_partition: &str) -> Result { + let current_time = json + .get(time_partition) + .ok_or_else(|| PostError::MissingTimePartition(time_partition.to_string()))?; let parsed_time: DateTime = serde_json::from_value(current_time.clone())?; Ok(parsed_time.naive_utc()) @@ -217,7 +202,7 @@ mod tests { let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); let parsed = get_parsed_timestamp(&json, "timestamp"); - matches!(parsed, Err(PostError::Invalid(_))); + matches!(parsed, Err(PostError::MissingTimePartition(_))); } #[test] diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 011ad5a4e..cdc338ad8 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -18,7 +18,7 @@ use std::{collections::HashMap, num::NonZeroU32, sync::Arc}; -use actix_web::{http::header::HeaderMap, HttpRequest}; +use actix_web::http::header::HeaderMap; use arrow_schema::{Field, Schema}; use bytes::Bytes; use http::StatusCode; @@ -38,11 +38,11 @@ use crate::{ }; pub async fn create_update_stream( - req: &HttpRequest, + headers: &HeaderMap, body: &Bytes, stream_name: &str, ) -> Result { - let ( + let PutStreamHeaders { time_partition, time_partition_limit, custom_partition, @@ -50,7 +50,7 @@ pub async fn create_update_stream( update_stream_flag, stream_type, log_source, - ) = fetch_headers_from_put_stream_request(req); + } = headers.into(); if metadata::STREAM_INFO.stream_exists(stream_name) && !update_stream_flag { return Err(StreamError::Custom { @@ -75,7 +75,7 @@ pub async fn create_update_stream( if update_stream_flag { return update_stream( - req, + headers, stream_name, &time_partition, static_schema_flag, @@ -114,16 +114,16 @@ pub async fn create_update_stream( &custom_partition, static_schema_flag, schema, - &stream_type, + stream_type, log_source, ) .await?; - Ok(req.headers().clone()) + Ok(headers.clone()) } async fn update_stream( - req: &HttpRequest, + headers: &HeaderMap, stream_name: &str, time_partition: &str, static_schema_flag: bool, @@ -148,11 +148,11 @@ async fn update_stream( if !time_partition_limit.is_empty() { let time_partition_days = validate_time_partition_limit(time_partition_limit)?; update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days).await?; - return Ok(req.headers().clone()); + return Ok(headers.clone()); } validate_and_update_custom_partition(stream_name, custom_partition).await?; - Ok(req.headers().clone()) + Ok(headers.clone()) } async fn validate_and_update_custom_partition( @@ -168,49 +168,47 @@ async fn validate_and_update_custom_partition( Ok(()) } -pub fn fetch_headers_from_put_stream_request( - req: &HttpRequest, -) -> (String, String, String, bool, bool, String, LogSource) { - let mut time_partition = String::default(); - let mut time_partition_limit = String::default(); - let mut custom_partition = String::default(); - let mut static_schema_flag = false; - let mut update_stream_flag = false; - let mut stream_type = StreamType::UserDefined.to_string(); - let mut log_source = LogSource::default(); - req.headers().iter().for_each(|(key, value)| { - if key == TIME_PARTITION_KEY { - time_partition = value.to_str().unwrap().to_string(); - } - if key == TIME_PARTITION_LIMIT_KEY { - time_partition_limit = value.to_str().unwrap().to_string(); - } - if key == CUSTOM_PARTITION_KEY { - custom_partition = value.to_str().unwrap().to_string(); - } - if key == STATIC_SCHEMA_FLAG && value.to_str().unwrap() == "true" { - static_schema_flag = true; - } - if key == UPDATE_STREAM_KEY && value.to_str().unwrap() == "true" { - update_stream_flag = true; - } - if key == STREAM_TYPE_KEY { - stream_type = value.to_str().unwrap().to_string(); - } - if key == LOG_SOURCE_KEY { - log_source = LogSource::from(value.to_str().unwrap()); - } - }); +#[derive(Debug, Default)] +pub struct PutStreamHeaders { + pub time_partition: String, + pub time_partition_limit: String, + pub custom_partition: String, + pub static_schema_flag: bool, + pub update_stream_flag: bool, + pub stream_type: StreamType, + pub log_source: LogSource, +} - ( - time_partition, - time_partition_limit, - custom_partition, - static_schema_flag, - update_stream_flag, - stream_type, - log_source, - ) +impl From<&HeaderMap> for PutStreamHeaders { + fn from(headers: &HeaderMap) -> Self { + PutStreamHeaders { + time_partition: headers + .get(TIME_PARTITION_KEY) + .map_or("", |v| v.to_str().unwrap()) + .to_string(), + time_partition_limit: headers + .get(TIME_PARTITION_LIMIT_KEY) + .map_or("", |v| v.to_str().unwrap()) + .to_string(), + custom_partition: headers + .get(CUSTOM_PARTITION_KEY) + .map_or("", |v| v.to_str().unwrap()) + .to_string(), + static_schema_flag: headers + .get(STATIC_SCHEMA_FLAG) + .is_some_and(|v| v.to_str().unwrap() == "true"), + update_stream_flag: headers + .get(UPDATE_STREAM_KEY) + .is_some_and(|v| v.to_str().unwrap() == "true"), + stream_type: headers + .get(STREAM_TYPE_KEY) + .map(|v| StreamType::from(v.to_str().unwrap())) + .unwrap_or_default(), + log_source: headers + .get(LOG_SOURCE_KEY) + .map_or(LogSource::default(), |v| v.to_str().unwrap().into()), + } + } } pub fn validate_time_partition_limit( @@ -394,11 +392,11 @@ pub async fn create_stream( custom_partition: &str, static_schema_flag: bool, schema: Arc, - stream_type: &str, + stream_type: StreamType, log_source: LogSource, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name - if stream_type != StreamType::Internal.to_string() { + if stream_type != StreamType::Internal { validator::stream_name(&stream_name, stream_type)?; } // Proceed to create log stream if it doesn't exist @@ -486,7 +484,10 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< .and_then(|limit| limit.parse().ok()); let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or(""); let static_schema_flag = stream_metadata.static_schema_flag; - let stream_type = stream_metadata.stream_type.as_deref().unwrap_or(""); + let stream_type = stream_metadata + .stream_type + .map(|s| StreamType::from(s.as_str())) + .unwrap_or_default(); let schema_version = stream_metadata.schema_version; let log_source = stream_metadata.log_source; metadata::STREAM_INFO.add_stream( diff --git a/src/handlers/http/role.rs b/src/handlers/http/role.rs index 757f4f170..711eade9b 100644 --- a/src/handlers/http/role.rs +++ b/src/handlers/http/role.rs @@ -16,8 +16,11 @@ * */ -use actix_web::{http::header::ContentType, web, HttpResponse, Responder}; -use bytes::Bytes; +use actix_web::{ + http::header::ContentType, + web::{self, Json}, + HttpResponse, Responder, +}; use http::StatusCode; use crate::{ @@ -31,9 +34,11 @@ use crate::{ // Handler for PUT /api/v1/role/{name} // Creates a new role or update existing one -pub async fn put(name: web::Path, body: Bytes) -> Result { +pub async fn put( + name: web::Path, + Json(privileges): Json>, +) -> Result { let name = name.into_inner(); - let privileges = serde_json::from_slice::>(&body)?; let mut metadata = get_metadata().await?; metadata.roles.insert(name.clone(), privileges.clone()); diff --git a/src/handlers/http/users/dashboards.rs b/src/handlers/http/users/dashboards.rs index 354689834..f95de4559 100644 --- a/src/handlers/http/users/dashboards.rs +++ b/src/handlers/http/users/dashboards.rs @@ -23,7 +23,11 @@ use crate::{ users::dashboards::{Dashboard, CURRENT_DASHBOARD_VERSION, DASHBOARDS}, utils::{get_hash, get_user_from_request}, }; -use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder}; +use actix_web::{ + http::header::ContentType, + web::{self, Json, Path}, + HttpRequest, HttpResponse, Responder, +}; use bytes::Bytes; use rand::distributions::DistString; @@ -38,24 +42,26 @@ pub async fn list(req: HttpRequest) -> Result { Ok((web::Json(dashboards), StatusCode::OK)) } -pub async fn get(req: HttpRequest) -> Result { +pub async fn get( + req: HttpRequest, + dashboard_id: Path, +) -> Result { let user_id = get_user_from_request(&req)?; - let dashboard_id = req - .match_info() - .get("dashboard_id") - .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + let dashboard_id = dashboard_id.into_inner(); - if let Some(dashboard) = DASHBOARDS.get_dashboard(dashboard_id, &get_hash(&user_id)) { + if let Some(dashboard) = DASHBOARDS.get_dashboard(&dashboard_id, &get_hash(&user_id)) { return Ok((web::Json(dashboard), StatusCode::OK)); } Err(DashboardError::Metadata("Dashboard does not exist")) } -pub async fn post(req: HttpRequest, body: Bytes) -> Result { +pub async fn post( + req: HttpRequest, + Json(mut dashboard): Json, +) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); - let mut dashboard: Dashboard = serde_json::from_slice(&body)?; let dashboard_id = get_hash(Utc::now().timestamp_micros().to_string().as_str()); dashboard.dashboard_id = Some(dashboard_id.clone()); dashboard.version = Some(CURRENT_DASHBOARD_VERSION.to_string()); @@ -84,18 +90,18 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result Result { +pub async fn update( + req: HttpRequest, + dashboard_id: Path, + Json(mut dashboard): Json, +) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); - let dashboard_id = req - .match_info() - .get("dashboard_id") - .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + let dashboard_id = dashboard_id.into_inner(); - if DASHBOARDS.get_dashboard(dashboard_id, &user_id).is_none() { + if DASHBOARDS.get_dashboard(&dashboard_id, &user_id).is_none() { return Err(DashboardError::Metadata("Dashboard does not exist")); } - let mut dashboard: Dashboard = serde_json::from_slice(&body)?; dashboard.dashboard_id = Some(dashboard_id.to_string()); dashboard.user_id = Some(user_id.clone()); dashboard.version = Some(CURRENT_DASHBOARD_VERSION.to_string()); @@ -117,21 +123,21 @@ pub async fn update(req: HttpRequest, body: Bytes) -> Result Result { +pub async fn delete( + req: HttpRequest, + dashboard_id: Path, +) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); - let dashboard_id = req - .match_info() - .get("dashboard_id") - .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; - if DASHBOARDS.get_dashboard(dashboard_id, &user_id).is_none() { + let dashboard_id = dashboard_id.into_inner(); + if DASHBOARDS.get_dashboard(&dashboard_id, &user_id).is_none() { return Err(DashboardError::Metadata("Dashboard does not exist")); } let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id)); let store = CONFIG.storage().get_object_store(); store.delete_object(&path).await?; - DASHBOARDS.delete_dashboard(dashboard_id); + DASHBOARDS.delete_dashboard(&dashboard_id); Ok(HttpResponse::Ok().finish()) } diff --git a/src/handlers/http/users/filters.rs b/src/handlers/http/users/filters.rs index e8f00c901..c3e2ed905 100644 --- a/src/handlers/http/users/filters.rs +++ b/src/handlers/http/users/filters.rs @@ -23,7 +23,11 @@ use crate::{ users::filters::{Filter, CURRENT_FILTER_VERSION, FILTERS}, utils::{get_hash, get_user_from_request}, }; -use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder}; +use actix_web::{ + http::header::ContentType, + web::{self, Json, Path}, + HttpRequest, HttpResponse, Responder, +}; use bytes::Bytes; use chrono::Utc; use http::StatusCode; @@ -35,24 +39,26 @@ pub async fn list(req: HttpRequest) -> Result { Ok((web::Json(filters), StatusCode::OK)) } -pub async fn get(req: HttpRequest) -> Result { +pub async fn get( + req: HttpRequest, + filter_id: Path, +) -> Result { let user_id = get_user_from_request(&req)?; - let filter_id = req - .match_info() - .get("filter_id") - .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + let filter_id = filter_id.into_inner(); - if let Some(filter) = FILTERS.get_filter(filter_id, &get_hash(&user_id)) { + if let Some(filter) = FILTERS.get_filter(&filter_id, &get_hash(&user_id)) { return Ok((web::Json(filter), StatusCode::OK)); } Err(FiltersError::Metadata("Filter does not exist")) } -pub async fn post(req: HttpRequest, body: Bytes) -> Result { +pub async fn post( + req: HttpRequest, + Json(mut filter): Json, +) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); - let mut filter: Filter = serde_json::from_slice(&body)?; let filter_id = get_hash(Utc::now().timestamp_micros().to_string().as_str()); filter.filter_id = Some(filter_id.clone()); filter.user_id = Some(user_id.clone()); @@ -72,18 +78,18 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result Result { +pub async fn update( + req: HttpRequest, + filter_id: Path, + Json(mut filter): Json, +) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); - let filter_id = req - .match_info() - .get("filter_id") - .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; - if FILTERS.get_filter(filter_id, &user_id).is_none() { + let filter_id = filter_id.into_inner(); + if FILTERS.get_filter(&filter_id, &user_id).is_none() { return Err(FiltersError::Metadata("Filter does not exist")); } - let mut filter: Filter = serde_json::from_slice(&body)?; - filter.filter_id = Some(filter_id.to_string()); + filter.filter_id = Some(filter_id.clone()); filter.user_id = Some(user_id.clone()); filter.version = Some(CURRENT_FILTER_VERSION.to_string()); FILTERS.update(&filter); @@ -101,15 +107,15 @@ pub async fn update(req: HttpRequest, body: Bytes) -> Result Result { +pub async fn delete( + req: HttpRequest, + filter_id: Path, +) -> Result { let mut user_id = get_user_from_request(&req)?; user_id = get_hash(&user_id); - let filter_id = req - .match_info() - .get("filter_id") - .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + let filter_id = filter_id.into_inner(); let filter = FILTERS - .get_filter(filter_id, &user_id) + .get_filter(&filter_id, &user_id) .ok_or(FiltersError::Metadata("Filter does not exist"))?; let path = filter_path( @@ -120,7 +126,7 @@ pub async fn delete(req: HttpRequest) -> Result { let store = CONFIG.storage().get_object_store(); store.delete_object(&path).await?; - FILTERS.delete_filter(filter_id); + FILTERS.delete_filter(&filter_id); Ok(HttpResponse::Ok().finish()) } diff --git a/src/kafka.rs b/src/kafka.rs index 8293b4e1f..9ba697a97 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -181,12 +181,7 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> { let stream_name = msg.topic(); // stream should get created only if there is an incoming event, not before that - create_stream_if_not_exists( - stream_name, - &StreamType::UserDefined.to_string(), - LogSource::default(), - ) - .await?; + create_stream_if_not_exists(stream_name, StreamType::UserDefined, LogSource::default()).await?; let schema = resolve_schema(stream_name)?; let event = format::json::Event { diff --git a/src/metadata.rs b/src/metadata.rs index c3ff0fce7..5c18aa329 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -271,7 +271,7 @@ impl StreamInfo { custom_partition: String, static_schema_flag: bool, static_schema: HashMap>, - stream_type: &str, + stream_type: StreamType, schema_version: SchemaVersion, log_source: LogSource, ) { diff --git a/src/otel/logs.rs b/src/otel/logs.rs index fcdffe1af..969758d5a 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -20,8 +20,8 @@ use opentelemetry_proto::tonic::logs::v1::LogRecord; use opentelemetry_proto::tonic::logs::v1::LogsData; use opentelemetry_proto::tonic::logs::v1::ScopeLogs; use opentelemetry_proto::tonic::logs::v1::SeverityNumber; +use serde_json::Map; use serde_json::Value; -use std::collections::BTreeMap; use super::otel_utils::collect_json_from_values; use super::otel_utils::convert_epoch_nano_to_timestamp; @@ -31,8 +31,8 @@ use super::otel_utils::insert_attributes; /// there is a mapping of severity number to severity text provided in proto /// this function fetches the severity text from the severity number /// and adds it to the flattened json -fn flatten_severity(severity_number: i32) -> BTreeMap { - let mut severity_json: BTreeMap = BTreeMap::new(); +fn flatten_severity(severity_number: i32) -> Map { + let mut severity_json: Map = Map::new(); severity_json.insert( "severity_number".to_string(), Value::Number(severity_number.into()), @@ -46,10 +46,10 @@ fn flatten_severity(severity_number: i32) -> BTreeMap { } /// this function flattens the `LogRecord` object -/// and returns a `BTreeMap` of the flattened json +/// and returns a `Map` of the flattened json /// this function is called recursively for each log record object in the otel logs -pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap { - let mut log_record_json: BTreeMap = BTreeMap::new(); +pub fn flatten_log_record(log_record: &LogRecord) -> Map { + let mut log_record_json: Map = Map::new(); log_record_json.insert( "time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -95,10 +95,10 @@ pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap { } /// this function flattens the `ScopeLogs` object -/// and returns a `Vec` of `BTreeMap` of the flattened json -fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { +/// and returns a `Vec` of `Map` of the flattened json +fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { let mut vec_scope_log_json = Vec::new(); - let mut scope_log_json = BTreeMap::new(); + let mut scope_log_json = Map::new(); if let Some(scope) = &scope_log.scope { scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); @@ -128,11 +128,11 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { } /// this function performs the custom flattening of the otel logs -/// and returns a `Vec` of `BTreeMap` of the flattened json -pub fn flatten_otel_logs(message: &LogsData) -> Vec> { +/// and returns a `Vec` of `Value::Object` of the flattened json +pub fn flatten_otel_logs(message: &LogsData) -> Vec { let mut vec_otel_json = Vec::new(); for record in &message.resource_logs { - let mut resource_log_json = BTreeMap::new(); + let mut resource_log_json = Map::new(); if let Some(resource) = &record.resource { insert_attributes(&mut resource_log_json, &resource.attributes); @@ -158,5 +158,5 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec> { vec_otel_json.extend(vec_resource_logs_json); } - vec_otel_json + vec_otel_json.into_iter().map(Value::Object).collect() } diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index f5aa1c072..aa621b03e 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -16,14 +16,12 @@ * */ -use std::collections::BTreeMap; - use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue; use opentelemetry_proto::tonic::metrics::v1::{ exemplar::Value as ExemplarValue, exponential_histogram_data_point::Buckets, metric, Exemplar, ExponentialHistogram, Gauge, Histogram, Metric, MetricsData, NumberDataPoint, Sum, Summary, }; -use serde_json::Value; +use serde_json::{Map, Value}; use super::otel_utils::{ convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some, @@ -31,10 +29,10 @@ use super::otel_utils::{ /// otel metrics event has json array for exemplar /// this function flatten the exemplar json array -/// and returns a `BTreeMap` of the exemplar json +/// and returns a `Map` of the exemplar json /// this function is reused in all json objects that have exemplar -fn flatten_exemplar(exemplars: &[Exemplar]) -> BTreeMap { - let mut exemplar_json = BTreeMap::new(); +fn flatten_exemplar(exemplars: &[Exemplar]) -> Map { + let mut exemplar_json = Map::new(); for exemplar in exemplars { insert_attributes(&mut exemplar_json, &exemplar.filtered_attributes); exemplar_json.insert( @@ -73,13 +71,13 @@ fn flatten_exemplar(exemplars: &[Exemplar]) -> BTreeMap { /// otel metrics event has json array for number data points /// this function flatten the number data points json array -/// and returns a `Vec` of `BTreeMap` of the flattened json +/// and returns a `Vec` of `Map` of the flattened json /// this function is reused in all json objects that have number data points -fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec> { +fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec> { data_points .iter() .map(|data_point| { - let mut data_point_json = BTreeMap::new(); + let mut data_point_json = Map::new(); insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -122,12 +120,12 @@ fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec Vec> { +/// and returns a `Vec` of `Map` for each data point +fn flatten_gauge(gauge: &Gauge) -> Vec> { let mut vec_gauge_json = Vec::new(); let data_points_json = flatten_number_data_points(&gauge.data_points); for data_point_json in data_points_json { - let mut gauge_json = BTreeMap::new(); + let mut gauge_json = Map::new(); for (key, value) in &data_point_json { gauge_json.insert(key.clone(), value.clone()); } @@ -139,18 +137,18 @@ fn flatten_gauge(gauge: &Gauge) -> Vec> { /// otel metrics event has json object for sum /// each sum object has json array for data points /// this function flatten the sum json object -/// and returns a `Vec` of `BTreeMap` for each data point -fn flatten_sum(sum: &Sum) -> Vec> { +/// and returns a `Vec` of `Map` for each data point +fn flatten_sum(sum: &Sum) -> Vec> { let mut vec_sum_json = Vec::new(); let data_points_json = flatten_number_data_points(&sum.data_points); for data_point_json in data_points_json { - let mut sum_json = BTreeMap::new(); + let mut sum_json = Map::new(); for (key, value) in &data_point_json { sum_json.insert(key.clone(), value.clone()); } vec_sum_json.push(sum_json); } - let mut sum_json = BTreeMap::new(); + let mut sum_json = Map::new(); sum_json.extend(flatten_aggregation_temporality(sum.aggregation_temporality)); sum_json.insert("is_monotonic".to_string(), Value::Bool(sum.is_monotonic)); for data_point_json in &mut vec_sum_json { @@ -164,11 +162,11 @@ fn flatten_sum(sum: &Sum) -> Vec> { /// otel metrics event has json object for histogram /// each histogram object has json array for data points /// this function flatten the histogram json object -/// and returns a `Vec` of `BTreeMap` for each data point -fn flatten_histogram(histogram: &Histogram) -> Vec> { +/// and returns a `Vec` of `Map` for each data point +fn flatten_histogram(histogram: &Histogram) -> Vec> { let mut data_points_json = Vec::new(); for data_point in &histogram.data_points { - let mut data_point_json = BTreeMap::new(); + let mut data_point_json = Map::new(); insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -216,7 +214,7 @@ fn flatten_histogram(histogram: &Histogram) -> Vec> { insert_number_if_some(&mut data_point_json, "max", &data_point.max); data_points_json.push(data_point_json); } - let mut histogram_json = BTreeMap::new(); + let mut histogram_json = Map::new(); histogram_json.extend(flatten_aggregation_temporality( histogram.aggregation_temporality, )); @@ -230,9 +228,9 @@ fn flatten_histogram(histogram: &Histogram) -> Vec> { /// otel metrics event has json object for buckets /// this function flatten the buckets json object -/// and returns a `BTreeMap` of the flattened json -fn flatten_buckets(bucket: &Buckets) -> BTreeMap { - let mut bucket_json = BTreeMap::new(); +/// and returns a `Map` of the flattened json +fn flatten_buckets(bucket: &Buckets) -> Map { + let mut bucket_json = Map::new(); bucket_json.insert("offset".to_string(), Value::Number(bucket.offset.into())); bucket_json.insert( "bucket_count".to_string(), @@ -250,11 +248,11 @@ fn flatten_buckets(bucket: &Buckets) -> BTreeMap { /// otel metrics event has json object for exponential histogram /// each exponential histogram object has json array for data points /// this function flatten the exponential histogram json object -/// and returns a `Vec` of `BTreeMap` for each data point -fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec> { +/// and returns a `Vec` of `Map` for each data point +fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec> { let mut data_points_json = Vec::new(); for data_point in &exp_histogram.data_points { - let mut data_point_json = BTreeMap::new(); + let mut data_point_json = Map::new(); insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -299,7 +297,7 @@ fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec Vec Vec> { +/// and returns a `Vec` of `Map` for each data point +fn flatten_summary(summary: &Summary) -> Vec> { let mut data_points_json = Vec::new(); for data_point in &summary.data_points { - let mut data_point_json = BTreeMap::new(); + let mut data_point_json = Map::new(); insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -379,11 +377,11 @@ fn flatten_summary(summary: &Summary) -> Vec> { /// this function flattens the `Metric` object /// each metric object has json object for gauge, sum, histogram, exponential histogram, summary /// this function flatten the metric json object -/// and returns a `Vec` of `BTreeMap` of the flattened json +/// and returns a `Vec` of `Map` of the flattened json /// this function is called recursively for each metric record object in the otel metrics event -pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec> { +pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec> { let mut data_points_json = Vec::new(); - let mut metric_json = BTreeMap::new(); + let mut metric_json = Map::new(); match &metrics_record.data { Some(metric::Data::Gauge(gauge)) => { @@ -428,11 +426,11 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec Vec> { +/// and returns a `Vec` of `Value::Object` of the flattened json +pub fn flatten_otel_metrics(message: MetricsData) -> Vec { let mut vec_otel_json = Vec::new(); for record in &message.resource_metrics { - let mut resource_metrics_json = BTreeMap::new(); + let mut resource_metrics_json = Map::new(); if let Some(resource) = &record.resource { insert_attributes(&mut resource_metrics_json, &resource.attributes); resource_metrics_json.insert( @@ -442,7 +440,7 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec } let mut vec_scope_metrics_json = Vec::new(); for scope_metric in &record.scope_metrics { - let mut scope_metrics_json = BTreeMap::new(); + let mut scope_metrics_json = Map::new(); for metrics_record in &scope_metric.metrics { vec_scope_metrics_json.extend(flatten_metrics_record(metrics_record)); } @@ -481,15 +479,15 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec } vec_otel_json.extend(vec_scope_metrics_json); } - vec_otel_json + vec_otel_json.into_iter().map(Value::Object).collect() } /// otel metrics event has json object for aggregation temporality /// there is a mapping of aggregation temporality to its description provided in proto /// this function fetches the description from the aggregation temporality /// and adds it to the flattened json -fn flatten_aggregation_temporality(aggregation_temporality: i32) -> BTreeMap { - let mut aggregation_temporality_json = BTreeMap::new(); +fn flatten_aggregation_temporality(aggregation_temporality: i32) -> Map { + let mut aggregation_temporality_json = Map::new(); aggregation_temporality_json.insert( "aggregation_temporality".to_string(), Value::Number(aggregation_temporality.into()), @@ -508,8 +506,8 @@ fn flatten_aggregation_temporality(aggregation_temporality: i32) -> BTreeMap BTreeMap { - let mut data_point_flags_json = BTreeMap::new(); +fn flatten_data_point_flags(flags: u32) -> Map { + let mut data_point_flags_json = Map::new(); data_point_flags_json.insert("data_point_flags".to_string(), Value::Number(flags.into())); let description = match flags { 0 => "DATA_POINT_FLAGS_DO_NOT_USE", diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index fabed184f..4eb1fa2a6 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -18,11 +18,11 @@ use chrono::DateTime; use opentelemetry_proto::tonic::common::v1::{any_value::Value as OtelValue, AnyValue, KeyValue}; -use serde_json::Value; -use std::collections::BTreeMap; +use serde_json::{Map, Value}; + // Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte -pub fn collect_json_from_value(key: &String, value: OtelValue) -> BTreeMap { - let mut value_json: BTreeMap = BTreeMap::new(); +pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map { + let mut value_json: Map = Map::new(); match value { OtelValue::StringValue(str_val) => { value_json.insert(key.to_string(), Value::String(str_val)); @@ -86,16 +86,13 @@ pub fn collect_json_from_value(key: &String, value: OtelValue) -> BTreeMap BTreeMap { +pub fn collect_json_from_anyvalue(key: &String, value: AnyValue) -> Map { collect_json_from_value(key, value.value.unwrap()) } //traverse through Value by calling function ollect_json_from_any_value -pub fn collect_json_from_values( - values: &Option, - key: &String, -) -> BTreeMap { - let mut value_json: BTreeMap = BTreeMap::new(); +pub fn collect_json_from_values(values: &Option, key: &String) -> Map { + let mut value_json: Map = Map::new(); for value in values.iter() { value_json = collect_json_from_anyvalue(key, value.clone()); @@ -112,8 +109,8 @@ pub fn value_to_string(value: serde_json::Value) -> String { } } -pub fn flatten_attributes(attributes: &Vec) -> BTreeMap { - let mut attributes_json: BTreeMap = BTreeMap::new(); +pub fn flatten_attributes(attributes: &Vec) -> Map { + let mut attributes_json: Map = Map::new(); for attribute in attributes { let key = &attribute.key; let value = &attribute.value; @@ -125,17 +122,13 @@ pub fn flatten_attributes(attributes: &Vec) -> BTreeMap attributes_json } -pub fn insert_if_some( - map: &mut BTreeMap, - key: &str, - option: &Option, -) { +pub fn insert_if_some(map: &mut Map, key: &str, option: &Option) { if let Some(value) = option { map.insert(key.to_string(), Value::String(value.to_string())); } } -pub fn insert_number_if_some(map: &mut BTreeMap, key: &str, option: &Option) { +pub fn insert_number_if_some(map: &mut Map, key: &str, option: &Option) { if let Some(value) = option { if let Some(number) = serde_json::Number::from_f64(*value) { map.insert(key.to_string(), Value::Number(number)); @@ -143,13 +136,13 @@ pub fn insert_number_if_some(map: &mut BTreeMap, key: &str, optio } } -pub fn insert_bool_if_some(map: &mut BTreeMap, key: &str, option: &Option) { +pub fn insert_bool_if_some(map: &mut Map, key: &str, option: &Option) { if let Some(value) = option { map.insert(key.to_string(), Value::Bool(*value)); } } -pub fn insert_attributes(map: &mut BTreeMap, attributes: &Vec) { +pub fn insert_attributes(map: &mut Map, attributes: &Vec) { let attributes_json = flatten_attributes(attributes); for (key, value) in attributes_json { map.insert(key, value); diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 8ba137b33..e1ce3406a 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -22,17 +22,16 @@ use opentelemetry_proto::tonic::trace::v1::ScopeSpans; use opentelemetry_proto::tonic::trace::v1::Span; use opentelemetry_proto::tonic::trace::v1::Status; use opentelemetry_proto::tonic::trace::v1::TracesData; -use serde_json::Value; -use std::collections::BTreeMap; +use serde_json::{Map, Value}; use super::otel_utils::convert_epoch_nano_to_timestamp; use super::otel_utils::insert_attributes; /// this function flattens the `ScopeSpans` object -/// and returns a `Vec` of `BTreeMap` of the flattened json -fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { +/// and returns a `Vec` of `Map` of the flattened json +fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { let mut vec_scope_span_json = Vec::new(); - let mut scope_span_json = BTreeMap::new(); + let mut scope_span_json = Map::new(); for span in &scope_span.spans { let span_record_json = flatten_span_record(span); @@ -69,12 +68,12 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { } /// this function performs the custom flattening of the otel traces event -/// and returns a `Vec` of `BTreeMap` of the flattened json -pub fn flatten_otel_traces(message: &TracesData) -> Vec> { +/// and returns a `Vec` of `Value::Object` of the flattened json +pub fn flatten_otel_traces(message: &TracesData) -> Vec { let mut vec_otel_json = Vec::new(); for record in &message.resource_spans { - let mut resource_span_json = BTreeMap::new(); + let mut resource_span_json = Map::new(); if let Some(resource) = &record.resource { insert_attributes(&mut resource_span_json, &resource.attributes); @@ -104,17 +103,17 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec> vec_otel_json.extend(vec_resource_spans_json); } - vec_otel_json + vec_otel_json.into_iter().map(Value::Object).collect() } /// otel traces has json array of events /// this function flattens the `Event` object -/// and returns a `Vec` of `BTreeMap` of the flattened json -fn flatten_events(events: &[Event]) -> Vec> { +/// and returns a `Vec` of `Map` of the flattened json +fn flatten_events(events: &[Event]) -> Vec> { events .iter() .map(|event| { - let mut event_json = BTreeMap::new(); + let mut event_json = Map::new(); event_json.insert( "event_time_unix_nano".to_string(), Value::String( @@ -134,12 +133,12 @@ fn flatten_events(events: &[Event]) -> Vec> { /// otel traces has json array of links /// this function flattens the `Link` object -/// and returns a `Vec` of `BTreeMap` of the flattened json -fn flatten_links(links: &[Link]) -> Vec> { +/// and returns a `Vec` of `Map` of the flattened json +fn flatten_links(links: &[Link]) -> Vec> { links .iter() .map(|link| { - let mut link_json = BTreeMap::new(); + let mut link_json = Map::new(); link_json.insert( "link_span_id".to_string(), Value::String(hex::encode(&link.span_id)), @@ -163,8 +162,8 @@ fn flatten_links(links: &[Link]) -> Vec> { /// there is a mapping of status code to status description provided in proto /// this function fetches the status description from the status code /// and adds it to the flattened json -fn flatten_status(status: &Status) -> BTreeMap { - let mut status_json = BTreeMap::new(); +fn flatten_status(status: &Status) -> Map { + let mut status_json = Map::new(); status_json.insert( "span_status_message".to_string(), Value::String(status.message.clone()), @@ -191,8 +190,8 @@ fn flatten_status(status: &Status) -> BTreeMap { /// there is a mapping of flags to flags description provided in proto /// this function fetches the flags description from the flags /// and adds it to the flattened json -fn flatten_flags(flags: u32) -> BTreeMap { - let mut flags_json = BTreeMap::new(); +fn flatten_flags(flags: u32) -> Map { + let mut flags_json = Map::new(); flags_json.insert("span_flags".to_string(), Value::Number(flags.into())); let description = match flags { 0 => "SPAN_FLAGS_DO_NOT_USE", @@ -213,8 +212,8 @@ fn flatten_flags(flags: u32) -> BTreeMap { /// there is a mapping of kind to kind description provided in proto /// this function fetches the kind description from the kind /// and adds it to the flattened json -fn flatten_kind(kind: i32) -> BTreeMap { - let mut kind_json = BTreeMap::new(); +fn flatten_kind(kind: i32) -> Map { + let mut kind_json = Map::new(); kind_json.insert("span_kind".to_string(), Value::Number(kind.into())); let description = match kind { 0 => "SPAN_KIND_UNSPECIFIED", @@ -234,12 +233,12 @@ fn flatten_kind(kind: i32) -> BTreeMap { } /// this function flattens the `Span` object -/// and returns a `Vec` of `BTreeMap` of the flattened json +/// and returns a `Vec` of `Map` of the flattened json /// this function is called recursively for each span record object in the otel traces event -fn flatten_span_record(span_record: &Span) -> Vec> { +fn flatten_span_record(span_record: &Span) -> Vec> { let mut span_records_json = Vec::new(); - let mut span_record_json = BTreeMap::new(); + let mut span_record_json = Map::new(); span_record_json.insert( "span_trace_id".to_string(), Value::String(hex::encode(&span_record.trace_id)), diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 454a17ebe..85c46dade 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -144,13 +144,23 @@ pub struct StreamInfo { pub log_source: LogSource, } -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, Default)] pub enum StreamType { #[default] UserDefined, Internal, } +impl From<&str> for StreamType { + fn from(stream_type: &str) -> Self { + match stream_type { + "UserDefined" => Self::UserDefined, + "Internal" => Self::Internal, + t => panic!("Unexpected stream type: {t}"), + } + } +} + impl std::fmt::Display for StreamType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 05c046179..16d526c8a 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -21,8 +21,8 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - Owner, ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + Owner, StreamType, ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, + PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; use crate::event::format::LogSource; @@ -156,7 +156,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { custom_partition: &str, static_schema_flag: bool, schema: Arc, - stream_type: &str, + stream_type: StreamType, log_source: LogSource, ) -> Result { let format = ObjectStoreFormat { diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index b3105eeee..2cbdbf0a5 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -42,8 +42,9 @@ use std::sync::Arc; -use arrow_array::{Array, RecordBatch}; +use arrow_array::{Array, RecordBatch, TimestampMillisecondArray}; use arrow_schema::Schema; +use chrono::Utc; use itertools::Itertools; pub mod batch_adapter; @@ -125,6 +126,19 @@ pub fn get_field<'a>( .find(|field| field.name() == name) } +/// Constructs an array of the current timestamp. +/// +/// # Arguments +/// +/// * `size` - The number of rows for which timestamp values are to be added. +/// +/// # Returns +/// +/// A column in arrow, containing the current timestamp in millis. +pub fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { + TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -132,7 +146,7 @@ mod tests { use arrow_array::{Array, Int32Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; - use super::{record_batches_to_json, replace_columns}; + use super::*; #[test] fn check_replace() { @@ -170,4 +184,25 @@ mod tests { let batches = record_batches_to_json(&rb).unwrap(); assert_eq!(batches, vec![]); } + + #[test] + fn test_timestamp_array_has_correct_size_and_value() { + let size = 5; + let now = Utc::now().timestamp_millis(); + + let array = get_timestamp_array(size); + + assert_eq!(array.len(), size); + for i in 0..size { + assert!(array.value(i) >= now); + } + } + + #[test] + fn test_timestamp_array_with_zero_size() { + let array = get_timestamp_array(0); + + assert_eq!(array.len(), 0); + assert!(array.is_empty()); + } } diff --git a/src/validator.rs b/src/validator.rs index bfa1dae02..bcbaefea6 100644 --- a/src/validator.rs +++ b/src/validator.rs @@ -77,7 +77,10 @@ pub fn alert(alerts: &Alerts) -> Result<(), AlertValidationError> { Ok(()) } -pub fn stream_name(stream_name: &str, stream_type: &str) -> Result<(), StreamNameValidationError> { +pub fn stream_name( + stream_name: &str, + stream_type: StreamType, +) -> Result<(), StreamNameValidationError> { if stream_name.is_empty() { return Err(StreamNameValidationError::EmptyName); } @@ -102,7 +105,7 @@ pub fn stream_name(stream_name: &str, stream_type: &str) -> Result<(), StreamNam )); } - if stream_type == StreamType::Internal.to_string() { + if stream_type == StreamType::Internal { return Err(StreamNameValidationError::InternalStream( stream_name.to_owned(), ));