Skip to content

OTLP tonic metadata from env variable #1377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Add `grpcio` metrics exporter (#1202)
- Allow specifying OTLP HTTP headers from env variable (#1290)
- Support custom channels in topic exporters [#1335](https://github.com/open-telemetry/opentelemetry-rust/pull/1335)
- Allow specifying OTLP Tonic metadata from env variable (#1377)

### Changed

Expand Down
39 changes: 5 additions & 34 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use super::default_headers;
use super::{default_headers, parse_header_string};

#[cfg(feature = "metrics")]
mod metrics;
Expand Down Expand Up @@ -316,46 +316,17 @@ fn resolve_endpoint(

#[allow(clippy::mutable_key_type)] // http headers are not mutated
fn add_header_from_string(input: &str, headers: &mut HashMap<HeaderName, HeaderValue>) {
for pair in input.split_terminator(',') {
if pair.trim().is_empty() {
continue;
}
if let Some((k, v)) = pair.trim().split_once('=') {
if !k.trim().is_empty() && !v.trim().is_empty() {
if let (Ok(key), Ok(value)) = (
HeaderName::from_str(k.trim()),
HeaderValue::from_str(v.trim()),
) {
headers.insert(key, value);
}
}
for (key, value) in parse_header_string(input) {
if let (Ok(key), Ok(value)) = (HeaderName::from_str(key), HeaderValue::from_str(value)) {
headers.insert(key, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I love the cleanup. I wonder if one more little step here could eliminate the for loop by leveraging the std::iter::Extend trait that HashMap implements? Something like:

headers.extend(
    parse_header_string(input)
        .filter_map(|(k,v)| { 
           let key = HeaderName::from_str(k)?;
           let value = HeaderName::from_str(v)?;
           Some((key, value))
        )
    );

(I may have the .filter_map lambda totally wrong, but hopefully the gist comes across.

Copy link
Contributor Author

@harscoet harscoet Nov 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, I updated it

fn add_header_from_string(input: &str, headers: &mut HashMap<HeaderName, HeaderValue>) {
    headers.extend(parse_header_string(input).filter_map(|(key, value)| {
        Some((
            HeaderName::from_str(key).ok()?,
            HeaderValue::from_str(value).ok()?,
        ))
    }));
}

}
}
}

#[cfg(test)]
mod tests {
use crate::exporter::tests::run_env_test;
use crate::{OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT};
use std::sync::Mutex;

// Make sure env tests are not running concurrently
static ENV_LOCK: Mutex<()> = Mutex::new(());

fn run_env_test<T, F>(env_vars: T, f: F)
where
F: FnOnce(),
T: Into<Vec<(&'static str, &'static str)>>,
{
let _env_lock = ENV_LOCK.lock().expect("env test lock poisoned");
let env_vars = env_vars.into();
for (k, v) in env_vars.iter() {
std::env::set_var(k, v);
}
f();
for (k, _) in env_vars {
std::env::remove_var(k);
}
}

#[test]
fn test_append_signal_path_to_generic_env() {
Expand Down
59 changes: 59 additions & 0 deletions opentelemetry-otlp/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,62 @@ impl<B: HasExportConfig> WithExportConfig for B {
self
}
}

#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
fn parse_header_string(value: &str) -> impl Iterator<Item = (&str, &str)> {
value
.split_terminator(',')
.map(str::trim)
.filter_map(|pair| {
if pair.is_empty() {
None
} else {
pair.split_once('=')
.map(|(key, value)| (key.trim(), value.trim()))
.filter(|(key, value)| !key.is_empty() && !value.is_empty())
}
})
}

#[cfg(test)]
mod tests {
// Make sure env tests are not running concurrently
#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());

#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
pub(crate) fn run_env_test<T, F>(env_vars: T, f: F)
where
F: FnOnce(),
T: Into<Vec<(&'static str, &'static str)>>,
{
let _env_lock = ENV_LOCK.lock().expect("env test lock poisoned");
let env_vars = env_vars.into();
for (k, v) in env_vars.iter() {
std::env::set_var(k, v);
}
f();
for (k, _) in env_vars {
std::env::remove_var(k);
}
}

#[test]
#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
fn test_parse_header_string() {
let test_cases = vec![
// Format: (input_str, expected_headers)
("k1=v1", vec![("k1", "v1")]),
("k1=v1,k2=v2", vec![("k1", "v1"), ("k2", "v2")]),
("k1=v1=10,k2,k3", vec![("k1", "v1=10")]),
("k1=v1,,,k2,k3=10", vec![("k1", "v1"), ("k3", "10")]),
];

for (input_str, expected_headers) in test_cases {
assert_eq!(
super::parse_header_string(input_str).collect::<Vec<_>>(),
expected_headers,
)
}
}
}
108 changes: 105 additions & 3 deletions opentelemetry-otlp/src/exporter/tonic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
use std::env;
use std::fmt::{Debug, Formatter};
use std::str::FromStr;
use std::time::Duration;

use http::{HeaderMap, HeaderName, HeaderValue};
use tonic::codec::CompressionEncoding;
use tonic::metadata::{KeyAndValueRef, MetadataMap};
use tonic::service::Interceptor;
use tonic::transport::Channel;
#[cfg(feature = "tls")]
use tonic::transport::ClientTlsConfig;

use super::default_headers;
use super::{default_headers, parse_header_string};
use crate::exporter::Compression;
use crate::{
ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_TIMEOUT,
OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TIMEOUT,
};

#[cfg(feature = "logs")]
Expand Down Expand Up @@ -213,11 +215,17 @@ impl TonicExporterBuilder {
signal_endpoint_path: &str,
signal_timeout_var: &str,
signal_compression_var: &str,
signal_headers_var: &str,
) -> Result<(Channel, BoxInterceptor, Option<CompressionEncoding>), crate::Error> {
let tonic_config = self.tonic_config;
let compression = resolve_compression(&tonic_config, signal_compression_var)?;

let metadata = tonic_config.metadata.unwrap_or_default();
let headers_from_env = parse_headers_from_env(signal_headers_var);
let metadata = merge_metadata_with_headers_from_env(
tonic_config.metadata.unwrap_or_default(),
headers_from_env,
);

let add_metadata = move |mut req: tonic::Request<()>| {
for key_and_value in metadata.iter() {
match key_and_value {
Expand Down Expand Up @@ -294,6 +302,7 @@ impl TonicExporterBuilder {
"/v1/logs",
crate::logs::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
crate::logs::OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
crate::logs::OTEL_EXPORTER_OTLP_LOGS_HEADERS,
)?;

let client = TonicLogsClient::new(channel, interceptor, compression);
Expand All @@ -316,6 +325,7 @@ impl TonicExporterBuilder {
"/v1/metrics",
crate::metric::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
crate::metric::OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
crate::metric::OTEL_EXPORTER_OTLP_METRICS_HEADERS,
)?;

let client = TonicMetricsClient::new(channel, interceptor, compression);
Expand All @@ -339,6 +349,7 @@ impl TonicExporterBuilder {
"/v1/traces",
crate::span::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
crate::span::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
crate::span::OTEL_EXPORTER_OTLP_TRACES_HEADERS,
)?;

let client = TonicTracesClient::new(channel, interceptor, compression);
Expand All @@ -347,11 +358,44 @@ impl TonicExporterBuilder {
}
}

fn merge_metadata_with_headers_from_env(
metadata: MetadataMap,
headers_from_env: HeaderMap,
) -> MetadataMap {
if headers_from_env.is_empty() {
metadata
} else {
let mut existing_headers: HeaderMap = metadata.into_headers();
existing_headers.extend(headers_from_env);

MetadataMap::from_headers(existing_headers)
}
}

fn parse_headers_from_env(signal_headers_var: &str) -> HeaderMap {
env::var(signal_headers_var)
.or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
.map(|input| {
parse_header_string(&input)
.filter_map(|(key, value)| {
Some((
HeaderName::from_str(key).ok()?,
HeaderValue::from_str(value).ok()?,
))
})
.collect::<HeaderMap>()
})
.unwrap_or_default()
}

#[cfg(test)]
mod tests {
use crate::exporter::tests::run_env_test;
#[cfg(feature = "gzip-tonic")]
use crate::exporter::Compression;
use crate::TonicExporterBuilder;
use crate::{OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TRACES_HEADERS};
use http::{HeaderMap, HeaderName, HeaderValue};
use tonic::metadata::{MetadataMap, MetadataValue};

#[test]
Expand Down Expand Up @@ -393,4 +437,62 @@ mod tests {
let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
}

#[test]
fn test_parse_headers_from_env() {
run_env_test(
vec![
(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2"),
(OTEL_EXPORTER_OTLP_HEADERS, "k3=v3"),
],
|| {
assert_eq!(
super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS),
HeaderMap::from_iter([
(
HeaderName::from_static("k1"),
HeaderValue::from_static("v1")
),
(
HeaderName::from_static("k2"),
HeaderValue::from_static("v2")
),
])
);

assert_eq!(
super::parse_headers_from_env("EMPTY_ENV"),
HeaderMap::from_iter([(
HeaderName::from_static("k3"),
HeaderValue::from_static("v3")
)])
);
},
)
}

#[test]
fn test_merge_metadata_with_headers_from_env() {
run_env_test(
vec![(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2")],
|| {
let headers_from_env =
super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS);

let mut metadata = MetadataMap::new();
metadata.insert("foo", "bar".parse().unwrap());
metadata.insert("k1", "v0".parse().unwrap());

let result =
super::merge_metadata_with_headers_from_env(metadata, headers_from_env);

assert_eq!(
result.get("foo").unwrap(),
MetadataValue::from_static("bar")
);
assert_eq!(result.get("k1").unwrap(), MetadataValue::from_static("v1"));
assert_eq!(result.get("k2").unwrap(), MetadataValue::from_static("v2"));
},
);
}
}