@@ -11,6 +11,11 @@ mod env;
11
11
pub ( crate ) mod transport;
12
12
mod uploader;
13
13
14
+ // Linting isn't detecting that it's used seems like linting bug.
15
+ #[ allow( unused_imports) ]
16
+ #[ cfg( feature = "surf_collector_client" ) ]
17
+ use std:: convert:: TryFrom ;
18
+
14
19
use self :: runtime:: JaegerTraceRuntime ;
15
20
use self :: thrift:: jaeger;
16
21
use agent:: AgentAsyncClientUdp ;
@@ -110,6 +115,10 @@ impl trace::SpanExporter for Exporter {
110
115
#[ derive( Debug ) ]
111
116
pub struct PipelineBuilder {
112
117
agent_endpoint : Vec < net:: SocketAddr > ,
118
+ // There are many variations in which it's read unclear which is causing not to be.
119
+ #[ allow( dead_code) ]
120
+ #[ cfg( feature = "collector_client" ) ]
121
+ collector_timeout : Duration ,
113
122
#[ cfg( any( feature = "collector_client" , feature = "wasm_collector_client" ) ) ]
114
123
collector_endpoint : Option < Result < http:: Uri , http:: uri:: InvalidUri > > ,
115
124
#[ cfg( any( feature = "collector_client" , feature = "wasm_collector_client" ) ) ]
@@ -131,6 +140,8 @@ impl Default for PipelineBuilder {
131
140
fn default ( ) -> Self {
132
141
let builder_defaults = PipelineBuilder {
133
142
agent_endpoint : vec ! [ DEFAULT_AGENT_ENDPOINT . parse( ) . unwrap( ) ] ,
143
+ #[ cfg( feature = "collector_client" ) ]
144
+ collector_timeout : env:: DEFAULT_COLLECTOR_TIMEOUT ,
134
145
#[ cfg( any( feature = "collector_client" , feature = "wasm_collector_client" ) ) ]
135
146
collector_endpoint : None ,
136
147
#[ cfg( any( feature = "collector_client" , feature = "wasm_collector_client" ) ) ]
@@ -173,6 +184,18 @@ impl PipelineBuilder {
173
184
}
174
185
}
175
186
187
+ /// Assign the collector timeout
188
+ ///
189
+ /// E.g. "10s"
190
+ #[ cfg( feature = "collector_client" ) ]
191
+ #[ cfg_attr( docsrs, doc( cfg( feature = "collector_client" ) ) ) ]
192
+ pub fn with_collector_timeout ( self , collector_timeout : Duration ) -> Self {
193
+ PipelineBuilder {
194
+ collector_timeout,
195
+ ..self
196
+ }
197
+ }
198
+
176
199
/// Assign the collector endpoint.
177
200
///
178
201
/// E.g. "http://localhost:14268/api/traces"
@@ -519,7 +542,8 @@ impl PipelineBuilder {
519
542
520
543
#[ cfg( feature = "isahc_collector_client" ) ]
521
544
let client = self . client . unwrap_or ( {
522
- let mut builder = isahc:: HttpClient :: builder ( ) ;
545
+ let mut builder = isahc:: HttpClient :: builder ( ) . timeout ( self . collector_timeout ) ;
546
+
523
547
if let ( Some ( username) , Some ( password) ) =
524
548
( self . collector_username , self . collector_password )
525
549
{
@@ -546,12 +570,13 @@ impl PipelineBuilder {
546
570
) ) ]
547
571
let client = self . client . unwrap_or ( {
548
572
#[ cfg( feature = "reqwest_collector_client" ) ]
549
- let mut builder = reqwest:: ClientBuilder :: new ( ) ;
573
+ let mut builder = reqwest:: ClientBuilder :: new ( ) . timeout ( self . collector_timeout ) ;
550
574
#[ cfg( all(
551
575
not( feature = "reqwest_collector_client" ) ,
552
576
feature = "reqwest_blocking_collector_client"
553
577
) ) ]
554
- let mut builder = reqwest:: blocking:: ClientBuilder :: new ( ) ;
578
+ let mut builder =
579
+ reqwest:: blocking:: ClientBuilder :: new ( ) . timeout ( self . collector_timeout ) ;
555
580
if let ( Some ( username) , Some ( password) ) =
556
581
( self . collector_username , self . collector_password )
557
582
{
@@ -573,13 +598,18 @@ impl PipelineBuilder {
573
598
not( feature = "reqwest_blocking_collector_client" )
574
599
) ) ]
575
600
let client = self . client . unwrap_or ( {
601
+ let client = surf:: Client :: try_from (
602
+ surf:: Config :: new ( ) . set_timeout ( Some ( self . collector_timeout ) ) ,
603
+ )
604
+ . unwrap_or_else ( |_| surf:: Client :: new ( ) ) ;
605
+
576
606
let client = if let ( Some ( username) , Some ( password) ) =
577
607
( self . collector_username , self . collector_password )
578
608
{
579
609
let auth = surf:: http:: auth:: BasicAuth :: new ( username, password) ;
580
- surf :: Client :: new ( ) . with ( BasicAuthMiddleware ( auth) )
610
+ client . with ( BasicAuthMiddleware ( auth) )
581
611
} else {
582
- surf :: Client :: new ( )
612
+ client
583
613
} ;
584
614
585
615
Box :: new ( client)
@@ -830,6 +860,32 @@ impl ExportError for Error {
830
860
}
831
861
}
832
862
863
+ #[ cfg( test) ]
864
+ #[ cfg( all( feature = "collector_client" ) ) ]
865
+ mod timeout_env_tests {
866
+ use crate :: exporter:: env;
867
+ use crate :: exporter:: PipelineBuilder ;
868
+ use std:: time:: Duration ;
869
+
870
+ #[ test]
871
+ fn test_collector_defaults ( ) {
872
+ // No Env Variable
873
+ std:: env:: remove_var ( env:: ENV_TIMEOUT ) ;
874
+ let builder = PipelineBuilder :: default ( ) ;
875
+ assert_eq ! ( env:: DEFAULT_COLLECTOR_TIMEOUT , builder. collector_timeout) ;
876
+
877
+ // Bad Env Variable
878
+ std:: env:: set_var ( env:: ENV_TIMEOUT , "a" ) ;
879
+ let builder = PipelineBuilder :: default ( ) ;
880
+ assert_eq ! ( env:: DEFAULT_COLLECTOR_TIMEOUT , builder. collector_timeout) ;
881
+
882
+ // Good Env Variable
883
+ std:: env:: set_var ( env:: ENV_TIMEOUT , "777" ) ;
884
+ let builder = PipelineBuilder :: default ( ) ;
885
+ assert_eq ! ( Duration :: from_millis( 777 ) , builder. collector_timeout) ;
886
+ }
887
+ }
888
+
833
889
#[ cfg( test) ]
834
890
#[ cfg( all( feature = "collector_client" , feature = "rt-tokio" ) ) ]
835
891
mod collector_client_tests {
0 commit comments