Skip to content

Env over compling time config #1323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ The Opentelemetry Rust SDK comes with an error type `openetelemetry::Error`. For

For users that want to implement their own exporters. It's RECOMMENDED to wrap all errors from the exporter into a crate-level error type, and implement `ExporterError` trait.

### Priority of configurations
OpenTelemetry supports multiple ways to configure the API, SDK and other components. The priority of configurations is as follows:

- Environment variables
- Compiling time configurations provided in the source code



## Style Guide

* Run `cargo clippy --all` - this will catch common mistakes and improve
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-jaeger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Bump MSRV to 1.65 [#1318](https://github.com/open-telemetry/opentelemetry-rust/pull/1318)
- Bump MSRV to 1.64 [#1203](https://github.com/open-telemetry/opentelemetry-rust/pull/1203)
- Prioritize environment variables over compiling time variables [#1323](https://github.com/open-telemetry/opentelemetry-rust/pull/1323)

## v0.19.0

Expand Down
19 changes: 9 additions & 10 deletions opentelemetry-jaeger/src/exporter/agent.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! # UDP Jaeger Agent Client
use crate::exporter::addrs_and_family;
use crate::exporter::address_family;
use crate::exporter::runtime::JaegerTraceRuntime;
use crate::exporter::thrift::{
agent::{self, TAgentSyncClient},
jaeger,
};
use crate::exporter::transport::{TBufferChannel, TNoopChannel};
use std::fmt;
use std::net::{ToSocketAddrs, UdpSocket};
use std::net::{SocketAddr, UdpSocket};
use thrift::{
protocol::{TCompactInputProtocol, TCompactOutputProtocol},
transport::{ReadHalf, TIoChannel, WriteHalf},
Expand Down Expand Up @@ -43,20 +43,19 @@ pub(crate) struct AgentSyncClientUdp {

impl AgentSyncClientUdp {
/// Create a new UDP agent client
pub(crate) fn new<T: ToSocketAddrs>(
agent_endpoint: T,
pub(crate) fn new(
max_packet_size: usize,
auto_split: bool,
agent_address: Vec<SocketAddr>,
) -> thrift::Result<Self> {
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
let client = agent::AgentSyncClient::new(
TCompactInputProtocol::new(TNoopChannel),
TCompactOutputProtocol::new(write),
);

let (addrs, family) = addrs_and_family(&agent_endpoint)?;
let conn = UdpSocket::bind(family)?;
conn.connect(addrs.as_slice())?;
let conn = UdpSocket::bind(address_family(agent_address.as_slice()))?;
conn.connect(agent_address.as_slice())?;

Ok(AgentSyncClientUdp {
conn,
Expand Down Expand Up @@ -102,19 +101,19 @@ pub(crate) struct AgentAsyncClientUdp<R: JaegerTraceRuntime> {

impl<R: JaegerTraceRuntime> AgentAsyncClientUdp<R> {
/// Create a new UDP agent client
pub(crate) fn new<T: ToSocketAddrs>(
agent_endpoint: T,
pub(crate) fn new(
max_packet_size: usize,
runtime: R,
auto_split: bool,
agent_address: Vec<SocketAddr>,
) -> thrift::Result<Self> {
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
let client = agent::AgentSyncClient::new(
TCompactInputProtocol::new(TNoopChannel),
TCompactOutputProtocol::new(write),
);

let conn = runtime.create_socket(agent_endpoint)?;
let conn = runtime.create_socket(agent_address.as_slice())?;

Ok(AgentAsyncClientUdp {
runtime,
Expand Down
91 changes: 50 additions & 41 deletions opentelemetry-jaeger/src/exporter/config/agent.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use std::borrow::BorrowMut;
use std::net::ToSocketAddrs;
use std::sync::Arc;
use std::{env, net};

use opentelemetry::trace::TraceError;
use opentelemetry_sdk::trace::{BatchSpanProcessor, Tracer};
use opentelemetry_sdk::{
self,
trace::{BatchConfig, Config, TracerProvider},
};

use crate::exporter::agent::{AgentAsyncClientUdp, AgentSyncClientUdp};
use crate::exporter::config::{
build_config_and_process, install_tracer_provider_and_get_tracer, HasRequiredConfig,
TransformationConfig,
};
use crate::exporter::uploader::{AsyncUploader, SyncUploader, Uploader};
use crate::{Error, Exporter, JaegerTraceRuntime};
use opentelemetry::trace::TraceError;
use opentelemetry_sdk::trace::{BatchSpanProcessor, Tracer};
use opentelemetry_sdk::{
self,
trace::{BatchConfig, Config, TracerProvider},
};
use std::borrow::BorrowMut;
use std::sync::Arc;
use std::{env, net};

/// The max size of UDP packet we want to send, synced with jaeger-agent
const UDP_PACKET_MAX_LENGTH: usize = 65_000;
Expand Down Expand Up @@ -78,38 +81,23 @@ pub struct AgentPipeline {
transformation_config: TransformationConfig,
trace_config: Option<Config>,
batch_config: Option<BatchConfig>,
agent_endpoint: Result<Vec<net::SocketAddr>, crate::Error>,
agent_endpoint: Option<String>,
max_packet_size: usize,
auto_split_batch: bool,
}

impl Default for AgentPipeline {
fn default() -> Self {
let mut pipeline = AgentPipeline {
AgentPipeline {
transformation_config: Default::default(),
trace_config: Default::default(),
batch_config: Some(Default::default()),
agent_endpoint: Ok(vec![format!(
agent_endpoint: Some(format!(
"{DEFAULT_AGENT_ENDPOINT_HOST}:{DEFAULT_AGENT_ENDPOINT_PORT}"
)
.parse()
.unwrap()]),
)),
max_packet_size: UDP_PACKET_MAX_LENGTH,
auto_split_batch: false,
};

let endpoint = match (env::var(ENV_AGENT_HOST), env::var(ENV_AGENT_PORT)) {
(Ok(host), Ok(port)) => Some(format!("{}:{}", host.trim(), port.trim())),
(Ok(host), _) => Some(format!("{}:{DEFAULT_AGENT_ENDPOINT_PORT}", host.trim())),
(_, Ok(port)) => Some(format!("{DEFAULT_AGENT_ENDPOINT_HOST}:{}", port.trim())),
(_, _) => None,
};

if let Some(endpoint) = endpoint {
pipeline = pipeline.with_endpoint(endpoint);
}

pipeline
}
}

Expand Down Expand Up @@ -147,16 +135,9 @@ impl AgentPipeline {
/// Any valid socket address can be used.
///
/// Default to be `127.0.0.1:6831`.
pub fn with_endpoint<T: net::ToSocketAddrs>(self, agent_endpoint: T) -> Self {
pub fn with_endpoint<T: Into<String>>(self, agent_endpoint: T) -> Self {
AgentPipeline {
agent_endpoint: agent_endpoint
.to_socket_addrs()
.map(|addrs| addrs.collect())
.map_err(|io_err| crate::Error::ConfigError {
pipeline_name: "agent",
config_name: "endpoint",
reason: io_err.to_string(),
}),
agent_endpoint: Some(agent_endpoint.into()),
..self
}
}
Expand Down Expand Up @@ -391,10 +372,10 @@ impl AgentPipeline {
R: JaegerTraceRuntime,
{
let agent = AgentAsyncClientUdp::new(
self.agent_endpoint?.as_slice(),
self.max_packet_size,
runtime,
self.auto_split_batch,
self.resolve_endpoint()?,
)
.map_err::<Error, _>(Into::into)?;
Ok(Arc::new(AsyncUploader::Agent(
Expand All @@ -404,13 +385,38 @@ impl AgentPipeline {

fn build_sync_agent_uploader(self) -> Result<Arc<dyn Uploader>, TraceError> {
let agent = AgentSyncClientUdp::new(
self.agent_endpoint?.as_slice(),
self.max_packet_size,
self.auto_split_batch,
self.resolve_endpoint()?,
)
.map_err::<Error, _>(Into::into)?;
Ok(Arc::new(SyncUploader::Agent(std::sync::Mutex::new(agent))))
}

// resolve the agent endpoint from the environment variables or the builder
// if only one of the environment variables is set, the other one will be set to the default value
// if no environment variable is set, the builder value will be used.
fn resolve_endpoint(self) -> Result<Vec<net::SocketAddr>, TraceError> {
let endpoint_str = match (env::var(ENV_AGENT_HOST), env::var(ENV_AGENT_PORT)) {
(Ok(host), Ok(port)) => format!("{}:{}", host.trim(), port.trim()),
(Ok(host), _) => format!("{}:{DEFAULT_AGENT_ENDPOINT_PORT}", host.trim()),
(_, Ok(port)) => format!("{DEFAULT_AGENT_ENDPOINT_HOST}:{}", port.trim()),
(_, _) => self.agent_endpoint.unwrap_or(format!(
"{DEFAULT_AGENT_ENDPOINT_HOST}:{DEFAULT_AGENT_ENDPOINT_PORT}"
)),
};
endpoint_str
.to_socket_addrs()
.map(|addrs| addrs.collect())
.map_err(|io_err| {
Error::ConfigError {
pipeline_name: "agent",
config_name: "endpoint",
reason: io_err.to_string(),
}
.into()
})
}
}

#[cfg(test)]
Expand All @@ -429,9 +435,12 @@ mod tests {
("127.0.0.1:1001", true),
];
for (socket_str, is_ok) in test_cases.into_iter() {
let pipeline = AgentPipeline::default().with_endpoint(socket_str);
let resolved_endpoint = AgentPipeline::default()
.with_endpoint(socket_str)
.resolve_endpoint();
assert_eq!(
pipeline.agent_endpoint.is_ok(),
resolved_endpoint.is_ok(),
// if is_ok is true, use socket_str, otherwise use the default endpoint
is_ok,
"endpoint string {}",
socket_str
Expand Down
Loading