Skip to content

Add Retry to OTLP Exporter #2727

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl OtlpHttpClient {
logs: LogBatch<'_>,
) -> opentelemetry_sdk::logs::LogResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
let resource_logs = group_logs_by_resource_and_scope(&logs, &self.resource);
let req = ExportLogsServiceRequest { resource_logs };

match self.protocol {
Expand Down
55 changes: 47 additions & 8 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop

use super::BoxInterceptor;

use super::retry::{retry_with_exponential_backoff, RetryPolicy};

pub(crate) struct TonicLogsClient {
inner: Mutex<Option<ClientInner>>,
#[allow(dead_code)]
Expand Down Expand Up @@ -69,19 +71,56 @@ impl LogExporter for TonicLogsClient {
None => return Err(OTelSdkError::AlreadyShutdown),
};

let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
let resource_logs = group_logs_by_resource_and_scope(&batch, &self.resource);

otel_debug!(name: "TonicsLogsClient.CallingExport");

client
// First attempt without retry
let result = client
.export(Request::from_parts(
metadata,
extensions,
ExportLogsServiceRequest { resource_logs },
metadata.clone(),
extensions.clone(),
ExportLogsServiceRequest {
resource_logs: resource_logs.clone()
},
))
.await
.map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?;
Ok(())
.await;

// If the first attempt succeeds, return success
if result.is_ok() {
return Ok(());
}

// If the first attempt fails, try with retry
otel_debug!(name: "TonicsLogsClient.FirstAttemptFailed.Retrying");

let policy = RetryPolicy {
max_retries: 10,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};

// Now use retry_with_exponential_backoff for subsequent attempts
retry_with_exponential_backoff(
policy,
"TonicsLogsClient.export",
|| async {
client
.clone()
.export(Request::from_parts(
metadata.clone(),
extensions.clone(),
ExportLogsServiceRequest {
resource_logs: resource_logs.clone()
},
))
.await
.map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))
}
)
.await
.map(|_| ()) // Convert successful response to () as required by OTelSdkResult
}

fn shutdown(&self) -> OTelSdkResult {
Expand Down
8 changes: 5 additions & 3 deletions opentelemetry-otlp/src/exporter/tonic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ pub(crate) mod metrics;
#[cfg(feature = "trace")]
pub(crate) mod trace;

// For now, we are not exposing the retry policy. Only work with grpc-tonic since retry takes a hard dependency on tokio
// while we sort out an abstraction for the async runtime which can be used by all exporters.
#[cfg(feature = "grpc-tonic")]
mod retry;

/// Configuration for [tonic]
///
/// [tonic]: https://github.com/hyperium/tonic
Expand Down Expand Up @@ -498,9 +503,6 @@ mod tests {
#[test]
#[cfg(feature = "gzip-tonic")]
fn test_with_gzip_compression() {
// metadata should merge with the current one with priority instead of just replacing it
let mut metadata = MetadataMap::new();
metadata.insert("foo", "bar".parse().unwrap());
let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
}
Expand Down
229 changes: 229 additions & 0 deletions opentelemetry-otlp/src/exporter/tonic/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
//! This module provides functionality for retrying operations with exponential backoff and jitter.
//!
//! The `RetryPolicy` struct defines the configuration for the retry behavior, including the maximum
//! number of retries, initial delay, maximum delay, and jitter.
//!
//! The `Sleep` trait abstracts the sleep functionality, allowing different implementations for
//! various async runtimes such as Tokio and async-std, as well as a synchronous implementation.
//!
//! The `retry_with_exponential_backoff` function retries the given operation according to the
//! specified retry policy, using exponential backoff and jitter to determine the delay between
//! retries. The function logs errors and retries the operation until it succeeds or the maximum
//! number of retries is reached.

use std::future::Future;
use std::time::{Duration, SystemTime};
use opentelemetry::otel_warn;

/// Configuration for retry policy.
#[derive(Debug)]
pub(super) struct RetryPolicy {
/// Maximum number of retry attempts.
pub max_retries: usize,
/// Initial delay in milliseconds before the first retry.
pub initial_delay_ms: u64,
/// Maximum delay in milliseconds between retries.
pub max_delay_ms: u64,
/// Maximum jitter in milliseconds to add to the delay.
pub jitter_ms: u64,
}

// Generates a random jitter value up to max_jitter
fn generate_jitter(max_jitter: u64) -> u64 {
let now = SystemTime::now();
let nanos = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().subsec_nanos();
nanos as u64 % (max_jitter + 1)
}

// /// Trait to abstract the sleep functionality.
// pub trait Sleep {
// /// The future returned by the sleep function.
// type SleepFuture: Future<Output = ()>;

// /// Sleeps for the specified duration.
// fn sleep(duration: Duration) -> Self::SleepFuture;
// }

// /// Implementation of the Sleep trait for tokio::time::Sleep
// #[cfg(feature = "rt-tokio")]
// impl Sleep for tokio::time::Sleep {
// type SleepFuture = tokio::time::Sleep;

// fn sleep(duration: Duration) -> Self::SleepFuture {
// }
// }

// #[cfg(feature = "rt-async-std")]
// /// There is no direct equivalent to `tokio::time::Sleep` in `async-std`.
// /// Instead, we create a new struct `AsyncStdSleep` and implement the `Sleep`
// /// trait for it, boxing the future returned by `async_std::task::sleep` to fit
// /// the trait's associated type requirements.
// #[derive(Debug)]
// pub struct AsyncStdSleep;

// /// Implementation of the Sleep trait for async-std
// #[cfg(feature = "rt-async-std")]
// impl Sleep for AsyncStdSleep {
// type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

// fn sleep(duration: Duration) -> Self::SleepFuture {
// Box::pin(async_std::task::sleep(duration))
// }
// }

// /// Implement the Sleep trait for synchronous sleep
// #[derive(Debug)]
// pub struct StdSleep;

// impl Sleep for StdSleep {
// type SleepFuture = std::future::Ready<()>;

// fn sleep(duration: Duration) -> Self::SleepFuture {
// std::thread::sleep(duration);
// std::future::ready(())
// }
// }

/// Retries the given operation with exponential backoff and jitter.
///
/// # Arguments
///
/// * `policy` - The retry policy configuration.
/// * `operation_name` - The name of the operation being retried.
/// * `operation` - The operation to be retried.
///
/// # Returns
///
/// A `Result` containing the operation's result or an error if the maximum retries are reached.
pub(super) async fn retry_with_exponential_backoff<F, Fut, T, E>(
policy: RetryPolicy,
operation_name: &str,
mut operation: F,
) -> Result<T, E>
where
F: FnMut() -> Fut,
E: std::fmt::Debug,
Fut: Future<Output = Result<T, E>>,
{
let mut attempt = 0;
let mut delay = policy.initial_delay_ms;

loop {
match operation().await {
Ok(result) => return Ok(result), // Return the result if the operation succeeds
Err(err) if attempt < policy.max_retries => {
attempt += 1;
// Log the error and retry after a delay with jitter
otel_warn!(name: "OtlpRetry", message = format!("Retrying operation {:?} due to error: {:?}", operation_name, err));
let jitter = generate_jitter(policy.jitter_ms);
let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms);

// Retry currently only supports tokio::time::sleep (for use with gRPC/tonic). This
// should be replaced with a more generic sleep function that works with async-std
// and a synchronous runtime in the future.
tokio::time::sleep(Duration::from_millis(delay_with_jitter)).await;

delay = std::cmp::min(delay * 2, policy.max_delay_ms); // Exponential backoff
}
Err(err) => return Err(err), // Return the error if max retries are reached
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use tokio::time::timeout;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

// Test to ensure generate_jitter returns a value within the expected range
#[tokio::test]
async fn test_generate_jitter() {
let max_jitter = 100;
let jitter = generate_jitter(max_jitter);
assert!(jitter <= max_jitter);
}

// Test to ensure retry_with_exponential_backoff succeeds on the first attempt
#[tokio::test]
async fn test_retry_with_exponential_backoff_success() {
let policy = RetryPolicy {
max_retries: 3,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};

let result = retry_with_exponential_backoff(policy, "test_operation", || {
Box::pin(async { Ok::<_, ()>("success") })
}).await;

assert_eq!(result, Ok("success"));
}

// Test to ensure retry_with_exponential_backoff retries the operation and eventually succeeds
#[tokio::test]
async fn test_retry_with_exponential_backoff_retries() {
let policy = RetryPolicy {
max_retries: 3,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};

let attempts = AtomicUsize::new(0);

let result = retry_with_exponential_backoff(policy, "test_operation", || {
let attempt = attempts.fetch_add(1, Ordering::SeqCst);
Box::pin(async move {
if attempt < 2 {
Err::<&str, &str>("error") // Fail the first two attempts
} else {
Ok::<&str, &str>("success") // Succeed on the third attempt
}
})
}).await;

assert_eq!(result, Ok("success"));
assert_eq!(attempts.load(Ordering::SeqCst), 3); // Ensure there were 3 attempts
}

// Test to ensure retry_with_exponential_backoff fails after max retries
#[tokio::test]
async fn test_retry_with_exponential_backoff_failure() {
let policy = RetryPolicy {
max_retries: 3,
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};

let attempts = AtomicUsize::new(0);

let result = retry_with_exponential_backoff(policy, "test_operation", || {
attempts.fetch_add(1, Ordering::SeqCst);
Box::pin(async { Err::<(), _>("error") }) // Always fail
}).await;

assert_eq!(result, Err("error"));
assert_eq!(attempts.load(Ordering::SeqCst), 4); // Ensure there were 4 attempts (initial + 3 retries)
}

// Test to ensure retry_with_exponential_backoff respects the timeout
#[tokio::test]
async fn test_retry_with_exponential_backoff_timeout() {
let policy = RetryPolicy {
max_retries: 12, // Increase the number of retries
initial_delay_ms: 100,
max_delay_ms: 1600,
jitter_ms: 100,
};

let result = timeout(Duration::from_secs(1), retry_with_exponential_backoff(policy, "test_operation", || {
Box::pin(async { Err::<(), _>("error") }) // Always fail
})).await;

assert!(result.is_err()); // Ensure the operation times out
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::Result;
use opentelemetry_proto::tonic::{
common::v1::KeyValue,
logs::v1::{LogRecord, LogsData, ResourceLogs},
};
use opentelemetry_proto::tonic::logs::v1::{LogRecord, LogsData, ResourceLogs};
use std::fs::File;

// Given two ResourceLogs, assert that they are equal except for the timestamps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//! Only a single test suite can run at once, as each container has statically mapped ports, but
//! this works nicely with the way cargo executes the suite.
//!
//! To skip integration tests with cargo, you can run `cargo test --mod`, which will run unit tests
//! To skip integration tests with cargo, you can run `cargo test --lib`, which will run unit tests
//! only.
//!
#![cfg(unix)]
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ pub mod tonic {
}
}

pub fn group_logs_by_resource_and_scope(
logs: LogBatch<'_>,
pub fn group_logs_by_resource_and_scope<'a>(
logs: &'a LogBatch<'a>,
resource: &ResourceAttributesWithSchema,
) -> Vec<ResourceLogs> {
// Group logs by target or instrumentation name
Expand Down Expand Up @@ -273,7 +273,7 @@ mod tests {
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema

let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
Expand All @@ -293,7 +293,7 @@ mod tests {
let log_batch = LogBatch::new(&logs);
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource);
crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
Expand Down