Skip to content

[test] Add DatadogTagsProcessor to observability pipelines spec #802

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .generated-info
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"spec_repo_commit": "0f24b8e",
"generated": "2025-07-18 16:58:06.791"
"spec_repo_commit": "5b64e98",
"generated": "2025-07-18 21:28:10.698"
}
75 changes: 75 additions & 0 deletions .generator/schemas/v2/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25557,6 +25557,7 @@ components:
- $ref: '#/components/schemas/ObservabilityPipelineEnrichmentTableProcessor'
- $ref: '#/components/schemas/ObservabilityPipelineReduceProcessor'
- $ref: '#/components/schemas/ObservabilityPipelineThrottleProcessor'
- $ref: '#/components/schemas/ObservabilityPipelineDatadogTagsProcessor'
ObservabilityPipelineConfigSourceItem:
description: A data source for the pipeline.
oneOf:
Expand Down Expand Up @@ -25666,6 +25667,80 @@ components:
type: string
x-enum-varnames:
- DATADOG_LOGS
ObservabilityPipelineDatadogTagsProcessor:
description: The `datadog_tags` processor includes or excludes specific Datadog
tags in your logs.
properties:
action:
$ref: '#/components/schemas/ObservabilityPipelineDatadogTagsProcessorAction'
id:
description: The unique identifier for this component. Used to reference
this component in other parts of the pipeline (for example, as the `input`
to downstream components).
example: datadog-tags-processor
type: string
include:
description: A Datadog search query used to determine which logs this processor
targets.
example: service:my-service
type: string
inputs:
description: A list of component IDs whose output is used as the `input`
for this component.
example:
- datadog-agent-source
items:
type: string
type: array
keys:
description: A list of tag keys.
example:
- env
- service
- version
items:
type: string
type: array
mode:
$ref: '#/components/schemas/ObservabilityPipelineDatadogTagsProcessorMode'
type:
$ref: '#/components/schemas/ObservabilityPipelineDatadogTagsProcessorType'
required:
- id
- type
- include
- mode
- action
- keys
- inputs
type: object
ObservabilityPipelineDatadogTagsProcessorAction:
description: The action to take on tags with matching keys.
enum:
- include
- exclude
example: include
type: string
x-enum-varnames:
- INCLUDE
- EXCLUDE
ObservabilityPipelineDatadogTagsProcessorMode:
description: The processing mode.
enum:
- filter
example: filter
type: string
x-enum-varnames:
- FILTER
ObservabilityPipelineDatadogTagsProcessorType:
default: datadog_tags
description: The processor type. The value should always be `datadog_tags`.
enum:
- datadog_tags
example: datadog_tags
type: string
x-enum-varnames:
- DATADOG_TAGS
ObservabilityPipelineDecoding:
description: The decoding format used to interpret incoming logs.
enum:
Expand Down
8 changes: 8 additions & 0 deletions src/datadogV2/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3712,6 +3712,14 @@ pub mod model_observability_pipeline_throttle_processor;
pub use self::model_observability_pipeline_throttle_processor::ObservabilityPipelineThrottleProcessor;
pub mod model_observability_pipeline_throttle_processor_type;
pub use self::model_observability_pipeline_throttle_processor_type::ObservabilityPipelineThrottleProcessorType;
pub mod model_observability_pipeline_datadog_tags_processor;
pub use self::model_observability_pipeline_datadog_tags_processor::ObservabilityPipelineDatadogTagsProcessor;
pub mod model_observability_pipeline_datadog_tags_processor_action;
pub use self::model_observability_pipeline_datadog_tags_processor_action::ObservabilityPipelineDatadogTagsProcessorAction;
pub mod model_observability_pipeline_datadog_tags_processor_mode;
pub use self::model_observability_pipeline_datadog_tags_processor_mode::ObservabilityPipelineDatadogTagsProcessorMode;
pub mod model_observability_pipeline_datadog_tags_processor_type;
pub use self::model_observability_pipeline_datadog_tags_processor_type::ObservabilityPipelineDatadogTagsProcessorType;
pub mod model_observability_pipeline_config_processor_item;
pub use self::model_observability_pipeline_config_processor_item::ObservabilityPipelineConfigProcessorItem;
pub mod model_observability_pipeline_kafka_source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub enum ObservabilityPipelineConfigProcessorItem {
ObservabilityPipelineThrottleProcessor(
Box<crate::datadogV2::model::ObservabilityPipelineThrottleProcessor>,
),
ObservabilityPipelineDatadogTagsProcessor(
Box<crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessor>,
),
UnparsedObject(crate::datadog::UnparsedObject),
}

Expand Down Expand Up @@ -213,6 +216,14 @@ impl<'de> Deserialize<'de> for ObservabilityPipelineConfigProcessorItem {
return Ok(ObservabilityPipelineConfigProcessorItem::ObservabilityPipelineThrottleProcessor(_v));
}
}
if let Ok(_v) = serde_json::from_value::<
Box<crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessor>,
>(value.clone())
{
if !_v._unparsed {
return Ok(ObservabilityPipelineConfigProcessorItem::ObservabilityPipelineDatadogTagsProcessor(_v));
}
}

return Ok(ObservabilityPipelineConfigProcessorItem::UnparsedObject(
crate::datadog::UnparsedObject { value },
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2.0 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2019-Present Datadog, Inc.
use serde::de::{Error, MapAccess, Visitor};
use serde::{Deserialize, Deserializer, Serialize};
use serde_with::skip_serializing_none;
use std::fmt::{self, Formatter};

/// The `datadog_tags` processor includes or excludes specific Datadog tags in your logs.
#[non_exhaustive]
#[skip_serializing_none]
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct ObservabilityPipelineDatadogTagsProcessor {
/// The action to take on tags with matching keys.
#[serde(rename = "action")]
pub action: crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessorAction,
/// The unique identifier for this component. Used to reference this component in other parts of the pipeline (for example, as the `input` to downstream components).
#[serde(rename = "id")]
pub id: String,
/// A Datadog search query used to determine which logs this processor targets.
#[serde(rename = "include")]
pub include: String,
/// A list of component IDs whose output is used as the `input` for this component.
#[serde(rename = "inputs")]
pub inputs: Vec<String>,
/// A list of tag keys.
#[serde(rename = "keys")]
pub keys: Vec<String>,
/// The processing mode.
#[serde(rename = "mode")]
pub mode: crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessorMode,
/// The processor type. The value should always be `datadog_tags`.
#[serde(rename = "type")]
pub type_: crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessorType,
#[serde(flatten)]
pub additional_properties: std::collections::BTreeMap<String, serde_json::Value>,
#[serde(skip)]
#[serde(default)]
pub(crate) _unparsed: bool,
}

impl ObservabilityPipelineDatadogTagsProcessor {
pub fn new(
action: crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessorAction,
id: String,
include: String,
inputs: Vec<String>,
keys: Vec<String>,
mode: crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessorMode,
type_: crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessorType,
) -> ObservabilityPipelineDatadogTagsProcessor {
ObservabilityPipelineDatadogTagsProcessor {
action,
id,
include,
inputs,
keys,
mode,
type_,
additional_properties: std::collections::BTreeMap::new(),
_unparsed: false,
}
}

pub fn additional_properties(
mut self,
value: std::collections::BTreeMap<String, serde_json::Value>,
) -> Self {
self.additional_properties = value;
self
}
}

impl<'de> Deserialize<'de> for ObservabilityPipelineDatadogTagsProcessor {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct ObservabilityPipelineDatadogTagsProcessorVisitor;
impl<'a> Visitor<'a> for ObservabilityPipelineDatadogTagsProcessorVisitor {
type Value = ObservabilityPipelineDatadogTagsProcessor;

fn expecting(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str("a mapping")
}

fn visit_map<M>(self, mut map: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'a>,
{
let mut action: Option<
crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessorAction,
> = None;
let mut id: Option<String> = None;
let mut include: Option<String> = None;
let mut inputs: Option<Vec<String>> = None;
let mut keys: Option<Vec<String>> = None;
let mut mode: Option<
crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessorMode,
> = None;
let mut type_: Option<
crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessorType,
> = None;
let mut additional_properties: std::collections::BTreeMap<
String,
serde_json::Value,
> = std::collections::BTreeMap::new();
let mut _unparsed = false;

while let Some((k, v)) = map.next_entry::<String, serde_json::Value>()? {
match k.as_str() {
"action" => {
action = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
if let Some(ref _action) = action {
match _action {
crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessorAction::UnparsedObject(_action) => {
_unparsed = true;
},
_ => {}
}
}
}
"id" => {
id = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
}
"include" => {
include = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
}
"inputs" => {
inputs = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
}
"keys" => {
keys = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
}
"mode" => {
mode = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
if let Some(ref _mode) = mode {
match _mode {
crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessorMode::UnparsedObject(_mode) => {
_unparsed = true;
},
_ => {}
}
}
}
"type" => {
type_ = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
if let Some(ref _type_) = type_ {
match _type_ {
crate::datadogV2::model::ObservabilityPipelineDatadogTagsProcessorType::UnparsedObject(_type_) => {
_unparsed = true;
},
_ => {}
}
}
}
&_ => {
if let Ok(value) = serde_json::from_value(v.clone()) {
additional_properties.insert(k, value);
}
}
}
}
let action = action.ok_or_else(|| M::Error::missing_field("action"))?;
let id = id.ok_or_else(|| M::Error::missing_field("id"))?;
let include = include.ok_or_else(|| M::Error::missing_field("include"))?;
let inputs = inputs.ok_or_else(|| M::Error::missing_field("inputs"))?;
let keys = keys.ok_or_else(|| M::Error::missing_field("keys"))?;
let mode = mode.ok_or_else(|| M::Error::missing_field("mode"))?;
let type_ = type_.ok_or_else(|| M::Error::missing_field("type_"))?;

let content = ObservabilityPipelineDatadogTagsProcessor {
action,
id,
include,
inputs,
keys,
mode,
type_,
additional_properties,
_unparsed,
};

Ok(content)
}
}

deserializer.deserialize_any(ObservabilityPipelineDatadogTagsProcessorVisitor)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2.0 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2019-Present Datadog, Inc.

use serde::{Deserialize, Deserializer, Serialize, Serializer};

#[non_exhaustive]
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ObservabilityPipelineDatadogTagsProcessorAction {
INCLUDE,
EXCLUDE,
UnparsedObject(crate::datadog::UnparsedObject),
}

impl ToString for ObservabilityPipelineDatadogTagsProcessorAction {
fn to_string(&self) -> String {
match self {
Self::INCLUDE => String::from("include"),
Self::EXCLUDE => String::from("exclude"),
Self::UnparsedObject(v) => v.value.to_string(),
}
}
}

impl Serialize for ObservabilityPipelineDatadogTagsProcessorAction {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
Self::UnparsedObject(v) => v.serialize(serializer),
_ => serializer.serialize_str(self.to_string().as_str()),
}
}
}

impl<'de> Deserialize<'de> for ObservabilityPipelineDatadogTagsProcessorAction {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s: String = String::deserialize(deserializer)?;
Ok(match s.as_str() {
"include" => Self::INCLUDE,
"exclude" => Self::EXCLUDE,
_ => Self::UnparsedObject(crate::datadog::UnparsedObject {
value: serde_json::Value::String(s.into()),
}),
})
}
}
Loading
Loading