Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use datadog_trace_utils::{config_utils::read_cloud_env, trace_utils::Environment

use dogstatsd::{
aggregator::Aggregator as MetricsAggregator,
api_key::ApiKeyFactory,
constants::CONTEXTS,
datadog::{MetricsIntakeUrlPrefix, RetryStrategy, Site},
dogstatsd::{DogStatsD, DogStatsDConfig},
Expand Down Expand Up @@ -204,7 +203,7 @@ async fn start_dogstatsd(
Some(dd_api_key) => {
#[allow(clippy::expect_used)]
let metrics_flusher = Flusher::new(FlusherConfig {
api_key_factory: Arc::new(ApiKeyFactory::new(&dd_api_key)),
api_key: dd_api_key,
aggregator: Arc::clone(&metrics_aggr),
metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(
Some(Site::new(dd_site).expect("Failed to parse site")),
Expand Down
71 changes: 0 additions & 71 deletions crates/dogstatsd/src/api_key.rs

This file was deleted.

49 changes: 12 additions & 37 deletions crates/dogstatsd/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

use crate::aggregator::Aggregator;
use crate::api_key::ApiKeyFactory;
use crate::datadog::{DdApi, MetricsIntakeUrlPrefix, RetryStrategy};
use reqwest::{Response, StatusCode};
use std::sync::{Arc, Mutex};
Expand All @@ -11,18 +10,12 @@ use tracing::{debug, error};

#[derive(Clone)]
pub struct Flusher {
// Allow accepting a future so the API key resolution is deferred until the flush happens
api_key_factory: Arc<ApiKeyFactory>,
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
https_proxy: Option<String>,
timeout: Duration,
retry_strategy: RetryStrategy,
dd_api: DdApi,
aggregator: Arc<Mutex<Aggregator>>,
dd_api: Option<DdApi>,
}

pub struct FlusherConfig {
pub api_key_factory: Arc<ApiKeyFactory>,
pub api_key: String,
pub aggregator: Arc<Mutex<Aggregator>>,
pub metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
pub https_proxy: Option<String>,
Expand All @@ -33,35 +26,19 @@ pub struct FlusherConfig {
#[allow(clippy::await_holding_lock)]
impl Flusher {
pub fn new(config: FlusherConfig) -> Self {
let dd_api = DdApi::new(
config.api_key,
config.metrics_intake_url_prefix,
config.https_proxy,
config.timeout,
config.retry_strategy,
);
Flusher {
api_key_factory: Arc::clone(&config.api_key_factory),
metrics_intake_url_prefix: config.metrics_intake_url_prefix,
https_proxy: config.https_proxy,
timeout: config.timeout,
retry_strategy: config.retry_strategy,
dd_api,
aggregator: config.aggregator,
dd_api: None,
}
}

async fn get_dd_api(&mut self) -> &DdApi {
if self.dd_api.is_none() {
let api_key = self.api_key_factory.get_api_key().await;
self.dd_api = Some(DdApi::new(
api_key.to_string(),
self.metrics_intake_url_prefix.clone(),
self.https_proxy.clone(),
self.timeout,
self.retry_strategy.clone(),
));
}

#[allow(clippy::expect_used)]
self.dd_api
.as_ref()
.expect("dd_api should have been initialized")
}

/// Flush metrics from the aggregator
pub async fn flush(
&mut self,
Expand Down Expand Up @@ -98,9 +75,7 @@ impl Flusher {
let series_copy = series.clone();
let distributions_copy = distributions.clone();

let dd_api = self.get_dd_api().await;

let dd_api_clone = dd_api.clone();
let dd_api_clone = self.dd_api.clone();
let series_handle = tokio::spawn(async move {
let mut failed = Vec::new();
let mut had_shipping_error = false;
Expand All @@ -118,7 +93,7 @@ impl Flusher {
(failed, had_shipping_error)
});

let dd_api_clone = dd_api.clone();
let dd_api_clone = self.dd_api.clone();
let distributions_handle = tokio::spawn(async move {
let mut failed = Vec::new();
let mut had_shipping_error = false;
Expand Down
1 change: 0 additions & 1 deletion crates/dogstatsd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#![cfg_attr(not(test), deny(clippy::unimplemented))]

pub mod aggregator;
pub mod api_key;
pub mod constants;
pub mod datadog;
pub mod dogstatsd;
Expand Down
5 changes: 1 addition & 4 deletions crates/dogstatsd/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use dogstatsd::metric::SortedTags;
use dogstatsd::{
aggregator::Aggregator as MetricsAggregator,
api_key::ApiKeyFactory,
constants::CONTEXTS,
datadog::{DdDdUrl, MetricsIntakeUrlPrefix, MetricsIntakeUrlPrefixOverride},
dogstatsd::{DogStatsD, DogStatsDConfig},
Expand Down Expand Up @@ -40,10 +39,8 @@ async fn dogstatsd_server_ships_series() {

let _ = start_dogstatsd(&metrics_aggr).await;

let api_key_factory = ApiKeyFactory::new("mock-api-key");

let mut metrics_flusher = Flusher::new(FlusherConfig {
api_key_factory: Arc::new(api_key_factory),
api_key: "mock-api-key".to_string(),
aggregator: Arc::clone(&metrics_aggr),
metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(
None,
Expand Down
Loading