Skip to content

Commit a8bb9f4

Browse files
authored
feat: LogSchema metadata key refacoring (#18099)
* feat: LogSchema metadata key refacoring * fix failing test
1 parent f015b29 commit a8bb9f4

File tree

6 files changed

+40
-37
lines changed

6 files changed

+40
-37
lines changed

lib/codecs/src/decoding/format/syslog.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -483,9 +483,8 @@ fn insert_fields_from_syslog(
483483

484484
#[cfg(test)]
485485
mod tests {
486-
use vector_core::config::{init_log_schema, log_schema, LogSchema};
487-
488486
use super::*;
487+
use vector_core::config::{init_log_schema, log_schema, LogSchema};
489488

490489
#[test]
491490
fn deserialize_syslog_legacy_namespace() {
@@ -522,8 +521,12 @@ mod tests {
522521

523522
fn init() {
524523
let mut schema = LogSchema::default();
525-
schema.set_message_key(Some(owned_value_path!("legacy_message")));
526-
schema.set_message_key(Some(owned_value_path!("legacy_timestamp")));
524+
schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!(
525+
"legacy_message"
526+
))));
527+
schema.set_message_key(Some(OwnedTargetPath::event(owned_value_path!(
528+
"legacy_timestamp"
529+
))));
527530
init_log_schema(schema, false);
528531
}
529532
}

lib/vector-core/src/config/log_schema.rs

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
use lookup::lookup_v2::{OptionalTargetPath, OptionalValuePath};
1+
use lookup::lookup_v2::OptionalTargetPath;
22
use lookup::{OwnedTargetPath, OwnedValuePath};
33
use once_cell::sync::{Lazy, OnceCell};
44
use vector_config::configurable_component;
5-
use vrl::path::PathPrefix;
65

76
static LOG_SCHEMA: OnceCell<LogSchema> = OnceCell::new();
87
static LOG_SCHEMA_DEFAULT: Lazy<LogSchema> = Lazy::new(LogSchema::default);
@@ -74,7 +73,7 @@ pub struct LogSchema {
7473
/// Generally, this field will be set by Vector to hold event-specific metadata, such as
7574
/// annotations by the `remap` transform when an error or abort is encountered.
7675
#[serde(default = "LogSchema::default_metadata_key")]
77-
metadata_key: OptionalValuePath,
76+
metadata_key: OptionalTargetPath,
7877
}
7978

8079
impl Default for LogSchema {
@@ -106,8 +105,8 @@ impl LogSchema {
106105
OptionalTargetPath::event(SOURCE_TYPE)
107106
}
108107

109-
fn default_metadata_key() -> OptionalValuePath {
110-
OptionalValuePath::new(METADATA)
108+
fn default_metadata_key() -> OptionalTargetPath {
109+
OptionalTargetPath::event(METADATA)
111110
}
112111

113112
pub fn message_key(&self) -> Option<&OwnedValuePath> {
@@ -140,7 +139,7 @@ impl LogSchema {
140139
}
141140

142141
pub fn metadata_key(&self) -> Option<&OwnedValuePath> {
143-
self.metadata_key.path.as_ref()
142+
self.metadata_key.as_ref().map(|key| &key.path)
144143
}
145144

146145
pub fn message_key_target_path(&self) -> Option<&OwnedTargetPath> {
@@ -159,24 +158,28 @@ impl LogSchema {
159158
self.source_type_key.as_ref()
160159
}
161160

162-
pub fn set_message_key(&mut self, path: Option<OwnedValuePath>) {
163-
self.message_key = OptionalTargetPath::from(PathPrefix::Event, path);
161+
pub fn metadata_key_target_path(&self) -> Option<&OwnedTargetPath> {
162+
self.metadata_key.as_ref()
164163
}
165164

166-
pub fn set_timestamp_key(&mut self, path: Option<OwnedValuePath>) {
167-
self.timestamp_key = OptionalTargetPath::from(PathPrefix::Event, path);
165+
pub fn set_message_key(&mut self, path: Option<OwnedTargetPath>) {
166+
self.message_key = OptionalTargetPath { path };
168167
}
169168

170-
pub fn set_host_key(&mut self, path: Option<OwnedValuePath>) {
171-
self.host_key = OptionalTargetPath::from(PathPrefix::Event, path);
169+
pub fn set_timestamp_key(&mut self, path: Option<OwnedTargetPath>) {
170+
self.timestamp_key = OptionalTargetPath { path };
172171
}
173172

174-
pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
175-
self.source_type_key = OptionalTargetPath::from(PathPrefix::Event, path);
173+
pub fn set_host_key(&mut self, path: Option<OwnedTargetPath>) {
174+
self.host_key = OptionalTargetPath { path };
176175
}
177176

178-
pub fn set_metadata_key(&mut self, path: Option<OwnedValuePath>) {
179-
self.metadata_key = OptionalValuePath { path };
177+
pub fn set_source_type_key(&mut self, path: Option<OwnedTargetPath>) {
178+
self.source_type_key = OptionalTargetPath { path };
179+
}
180+
181+
pub fn set_metadata_key(&mut self, path: Option<OwnedTargetPath>) {
182+
self.metadata_key = OptionalTargetPath { path };
180183
}
181184

182185
/// Merge two `LogSchema` instances together.
@@ -195,35 +198,35 @@ impl LogSchema {
195198
{
196199
errors.push("conflicting values for 'log_schema.host_key' found".to_owned());
197200
} else {
198-
self.set_host_key(other.host_key().cloned());
201+
self.set_host_key(other.host_key_target_path().cloned());
199202
}
200203
if self.message_key() != LOG_SCHEMA_DEFAULT.message_key()
201204
&& self.message_key() != other.message_key()
202205
{
203206
errors.push("conflicting values for 'log_schema.message_key' found".to_owned());
204207
} else {
205-
self.set_message_key(other.message_key().cloned());
208+
self.set_message_key(other.message_key_target_path().cloned());
206209
}
207210
if self.timestamp_key() != LOG_SCHEMA_DEFAULT.timestamp_key()
208211
&& self.timestamp_key() != other.timestamp_key()
209212
{
210213
errors.push("conflicting values for 'log_schema.timestamp_key' found".to_owned());
211214
} else {
212-
self.set_timestamp_key(other.timestamp_key().cloned());
215+
self.set_timestamp_key(other.timestamp_key_target_path().cloned());
213216
}
214217
if self.source_type_key() != LOG_SCHEMA_DEFAULT.source_type_key()
215218
&& self.source_type_key() != other.source_type_key()
216219
{
217220
errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned());
218221
} else {
219-
self.set_source_type_key(other.source_type_key().cloned());
222+
self.set_source_type_key(other.source_type_key_target_path().cloned());
220223
}
221224
if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key()
222225
&& self.metadata_key() != other.metadata_key()
223226
{
224227
errors.push("conflicting values for 'log_schema.metadata_key' found".to_owned());
225228
} else {
226-
self.set_metadata_key(other.metadata_key().cloned());
229+
self.set_metadata_key(other.metadata_key_target_path().cloned());
227230
}
228231
}
229232

lib/vector-core/src/config/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -551,15 +551,14 @@ mod test {
551551
use chrono::Utc;
552552
use lookup::{event_path, owned_value_path, OwnedTargetPath};
553553
use vector_common::btreemap;
554-
use vrl::path::OwnedValuePath;
555554
use vrl::value::Kind;
556555

557556
#[test]
558557
fn test_insert_standard_vector_source_metadata() {
559-
let nested_path = "a.b.c.d".to_string();
560-
561558
let mut schema = LogSchema::default();
562-
schema.set_source_type_key(Some(OwnedValuePath::try_from(nested_path).unwrap()));
559+
schema.set_source_type_key(Some(OwnedTargetPath::event(owned_value_path!(
560+
"a", "b", "c", "d"
561+
))));
563562
init_log_schema(schema, false);
564563

565564
let namespace = LogNamespace::Legacy;

lib/vector-core/src/event/trace.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use vector_common::{
77
internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags,
88
EventDataEq,
99
};
10-
use vrl::path::{PathPrefix, ValuePath};
1110

1211
use super::{
1312
BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata,
@@ -95,12 +94,11 @@ impl TraceEvent {
9594

9695
pub fn maybe_insert<'a, F: FnOnce() -> Value>(
9796
&mut self,
98-
prefix: PathPrefix,
99-
path: Option<impl ValuePath<'a>>,
97+
path: Option<impl TargetPath<'a>>,
10098
value_callback: F,
10199
) -> Option<Value> {
102100
if let Some(path) = path {
103-
return self.0.insert((prefix, path), value_callback());
101+
return self.0.insert(path, value_callback());
104102
}
105103
None
106104
}

src/sources/http_client/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,9 @@ impl http_client::HttpClientContext for HttpClientContext {
350350
}
351351
}
352352
Event::Trace(ref mut trace) => {
353-
if let Some(source_type_key) = log_schema().source_type_key_target_path() {
354-
trace.insert(source_type_key, Bytes::from(HttpClientConfig::NAME));
355-
}
353+
trace.maybe_insert(log_schema().source_type_key_target_path(), || {
354+
Bytes::from(HttpClientConfig::NAME).into()
355+
});
356356
}
357357
}
358358
}

src/transforms/remap.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ where
487487
}
488488
}
489489
Event::Trace(ref mut trace) => {
490-
trace.maybe_insert(PathPrefix::Event, log_schema().metadata_key(), || {
490+
trace.maybe_insert(log_schema().metadata_key_target_path(), || {
491491
self.dropped_data(reason, error).into()
492492
});
493493
}

0 commit comments

Comments
 (0)