@@ -2,8 +2,8 @@ use crate::config;
2
2
use crate :: http:: get_client;
3
3
use crate :: logs:: aggregator:: Aggregator ;
4
4
use crate :: FLUSH_RETRY_COUNT ;
5
- use futures:: future:: join_all;
6
5
use dogstatsd:: api_key:: ApiKeyFactory ;
6
+ use futures:: future:: join_all;
7
7
use reqwest:: header:: HeaderMap ;
8
8
use std:: error:: Error ;
9
9
use std:: time:: Instant ;
@@ -95,7 +95,6 @@ impl Flusher {
95
95
96
96
async fn create_request ( & self , data : Vec < u8 > ) -> reqwest:: RequestBuilder {
97
97
let url = format ! ( "{}/api/v2/logs" , self . endpoint) ;
98
- let body = self . compress ( data) ;
99
98
let headers = self . get_headers ( ) . await ;
100
99
self . client
101
100
. post ( & url)
@@ -146,6 +145,35 @@ impl Flusher {
146
145
}
147
146
}
148
147
}
148
+
149
+ async fn get_headers ( & self ) -> & HeaderMap {
150
+ self . headers
151
+ . get_or_init ( move || async move {
152
+ let api_key = self . api_key_factory . get_api_key ( ) . await ;
153
+ let mut headers = HeaderMap :: new ( ) ;
154
+ headers. insert (
155
+ "DD-API-KEY" ,
156
+ api_key. parse ( ) . expect ( "failed to parse header" ) ,
157
+ ) ;
158
+ headers. insert (
159
+ "DD-PROTOCOL" ,
160
+ "agent-json" . parse ( ) . expect ( "failed to parse header" ) ,
161
+ ) ;
162
+ headers. insert (
163
+ "Content-Type" ,
164
+ "application/json" . parse ( ) . expect ( "failed to parse header" ) ,
165
+ ) ;
166
+
167
+ if self . config . logs_config_use_compression {
168
+ headers. insert (
169
+ "Content-Encoding" ,
170
+ "zstd" . parse ( ) . expect ( "failed to parse header" ) ,
171
+ ) ;
172
+ }
173
+ headers
174
+ } )
175
+ . await
176
+ }
149
177
}
150
178
151
179
#[ allow( clippy:: module_name_repetitions) ]
@@ -157,15 +185,15 @@ pub struct LogsFlusher {
157
185
158
186
impl LogsFlusher {
159
187
pub fn new (
160
- api_key : String ,
188
+ api_key_factory : Arc < ApiKeyFactory > ,
161
189
aggregator : Arc < Mutex < Aggregator > > ,
162
190
config : Arc < config:: Config > ,
163
191
) -> Self {
164
192
let mut flushers = Vec :: new ( ) ;
165
193
166
194
// Create primary flusher
167
195
flushers. push ( Flusher :: new (
168
- api_key . clone ( ) ,
196
+ Arc :: clone ( & api_key_factory ) ,
169
197
config. logs_config_logs_dd_url . clone ( ) ,
170
198
aggregator. clone ( ) ,
171
199
config. clone ( ) ,
@@ -175,7 +203,7 @@ impl LogsFlusher {
175
203
for endpoint in & config. logs_config_additional_endpoints {
176
204
let endpoint_url = format ! ( "https://{}:{}" , endpoint. host, endpoint. port) ;
177
205
flushers. push ( Flusher :: new (
178
- endpoint . api_key . clone ( ) ,
206
+ Arc :: clone ( & api_key_factory ) ,
179
207
endpoint_url,
180
208
aggregator. clone ( ) ,
181
209
config. clone ( ) ,
@@ -252,33 +280,4 @@ impl LogsFlusher {
252
280
encoder. write_all ( data) ?;
253
281
encoder. finish ( ) . map_err ( |e| Box :: new ( e) as Box < dyn Error > )
254
282
}
255
-
256
- async fn get_headers ( & self ) -> & HeaderMap {
257
- self . headers
258
- . get_or_init ( move || async move {
259
- let api_key = self . api_key_factory . get_api_key ( ) . await ;
260
- let mut headers = HeaderMap :: new ( ) ;
261
- headers. insert (
262
- "DD-API-KEY" ,
263
- api_key. parse ( ) . expect ( "failed to parse header" ) ,
264
- ) ;
265
- headers. insert (
266
- "DD-PROTOCOL" ,
267
- "agent-json" . parse ( ) . expect ( "failed to parse header" ) ,
268
- ) ;
269
- headers. insert (
270
- "Content-Type" ,
271
- "application/json" . parse ( ) . expect ( "failed to parse header" ) ,
272
- ) ;
273
-
274
- if self . config . logs_config_use_compression {
275
- headers. insert (
276
- "Content-Encoding" ,
277
- "zstd" . parse ( ) . expect ( "failed to parse header" ) ,
278
- ) ;
279
- }
280
- headers
281
- } )
282
- . await
283
- }
284
283
}
0 commit comments