1
+ import os
2
+ from contextlib import contextmanager
3
+ from typing import Dict , Generator , Optional
4
+
1
5
from _pytest .pytester import Pytester
2
6
from opentelemetry import trace
3
7
9
13
from . import SpanRecorder
10
14
11
15
16
+ @contextmanager
17
+ def environment (** overrides : Optional [str ]) -> Generator [None , None , None ]:
18
+ original : Dict [str , Optional [str ]] = {}
19
+ for key , value in overrides .items ():
20
+ original [key ] = os .environ .pop (key , None )
21
+ if value is not None :
22
+ os .environ [key ] = value
23
+
24
+ try :
25
+ yield
26
+ finally :
27
+ for key , value in original .items ():
28
+ if value is None :
29
+ os .environ .pop (key , None )
30
+ else :
31
+ os .environ [key ] = value
32
+
33
+
34
+ def test_environment_manipulation ():
35
+ assert 'VALUE' not in os .environ
36
+ with environment (VALUE = 'outer' ):
37
+ assert os .environ ['VALUE' ] == 'outer'
38
+ with environment (VALUE = 'inner' ):
39
+ assert os .environ ['VALUE' ] == 'inner'
40
+ with environment (VALUE = None ):
41
+ assert 'VALUE' not in os .environ
42
+ with environment (VALUE = 'once more' ):
43
+ assert os .environ ['VALUE' ] == 'once more'
44
+ assert 'VALUE' not in os .environ
45
+ assert os .environ ['VALUE' ] == 'inner'
46
+ assert os .environ ['VALUE' ] == 'outer'
47
+ assert 'VALUE' not in os .environ
48
+
49
+
12
50
def test_getting_no_trace_id (pytester : Pytester ) -> None :
13
51
config = pytester .parseconfig ()
14
52
context = OpenTelemetryPlugin .get_trace_parent (config )
@@ -31,6 +69,24 @@ def test_getting_trace_id_from_command_line(pytester: Pytester) -> None:
31
69
assert parent .span_id == 0xFEDCBA0987654321
32
70
33
71
72
+ def test_getting_trace_id_from_environment_variable (pytester : Pytester ) -> None :
73
+ config = pytester .parseconfig ()
74
+
75
+ with environment (
76
+ TRACEPARENT = '00-1234567890abcdef1234567890abcdef-fedcba0987654321-01'
77
+ ):
78
+ context = OpenTelemetryPlugin .get_trace_parent (config )
79
+
80
+ assert context
81
+
82
+ parent_span = next (iter (context .values ()))
83
+ assert isinstance (parent_span , trace .Span )
84
+
85
+ parent = parent_span .get_span_context ()
86
+ assert parent .trace_id == 0x1234567890ABCDEF1234567890ABCDEF
87
+ assert parent .span_id == 0xFEDCBA0987654321
88
+
89
+
34
90
def test_getting_trace_id_from_worker_input (pytester : Pytester ) -> None :
35
91
config = pytester .parseconfig ()
36
92
setattr (
0 commit comments