4
4
import os
5
5
import threading
6
6
import time
7
- import six
8
- import unittest
7
+ from typing import Generator
9
8
10
- from google .cloud .pubsub_v1 import PublisherClient , SubscriberClient
9
+ import pytest
10
+ import six
11
11
from google .api_core .exceptions import AlreadyExists
12
+ from google .cloud .pubsub_v1 import PublisherClient , SubscriberClient
12
13
from google .cloud .pubsub_v1 .publisher import exceptions
14
+ from opentelemetry .trace import SpanKind
15
+
13
16
from instana .singletons import agent , tracer
17
+ from instana .span .span import get_current_span
14
18
from tests .test_utils import _TraceContextMixin
15
19
16
20
# Use PubSub Emulator exposed at :8085
17
21
os .environ ["PUBSUB_EMULATOR_HOST" ] = "localhost:8085"
18
22
19
23
20
- class TestPubSubPublish (unittest .TestCase , _TraceContextMixin ):
21
- @classmethod
22
- def setUpClass (cls ):
23
- cls .publisher = PublisherClient ()
24
+ class TestPubSubPublish (_TraceContextMixin ):
25
+ publisher = PublisherClient ()
24
26
25
- def setUp (self ):
26
- self .recorder = tracer .recorder
27
+ @pytest .fixture (autouse = True )
28
+ def _resource (self ) -> Generator [None , None , None ]:
29
+ self .recorder = tracer .span_processor
27
30
self .recorder .clear_spans ()
28
31
29
- self .project_id = ' test-project'
30
- self .topic_name = ' test-topic'
32
+ self .project_id = " test-project"
33
+ self .topic_name = " test-topic"
31
34
32
35
# setup topic_path & topic
33
36
self .topic_path = self .publisher .topic_path (self .project_id , self .topic_name )
@@ -36,89 +39,91 @@ def setUp(self):
36
39
except AlreadyExists :
37
40
self .publisher .delete_topic (request = {"topic" : self .topic_path })
38
41
self .publisher .create_topic (request = {"name" : self .topic_path })
39
-
40
- def tearDown (self ):
42
+ yield
41
43
self .publisher .delete_topic (request = {"topic" : self .topic_path })
42
44
agent .options .allow_exit_as_root = False
43
45
44
- def test_publish (self ):
46
+ def test_publish (self ) -> None :
45
47
# publish a single message
46
- with tracer .start_active_span ( ' test' ):
47
- future = self .publisher .publish (self . topic_path ,
48
- b' Test Message' ,
49
- origin = "instana" )
48
+ with tracer .start_as_current_span ( " test" ):
49
+ future = self .publisher .publish (
50
+ self . topic_path , b" Test Message" , origin = "instana"
51
+ )
50
52
time .sleep (2.0 ) # for sanity
51
53
result = future .result ()
52
- self . assertIsInstance (result , six .string_types )
54
+ assert isinstance (result , six .string_types )
53
55
54
56
spans = self .recorder .queued_spans ()
55
57
gcps_span , test_span = spans [0 ], spans [1 ]
56
58
57
- self .assertEqual (2 , len (spans ))
58
- self .assertIsNone (tracer .active_span )
59
- self .assertEqual ('gcps' , gcps_span .n )
60
- self .assertEqual (2 , gcps_span .k ) # EXIT
59
+ assert len (spans ) == 2
60
+
61
+ current_span = get_current_span ()
62
+ assert not current_span .is_recording ()
63
+ assert gcps_span .n == "gcps"
64
+ assert gcps_span .k is SpanKind .CLIENT
61
65
62
- self . assertEqual ( 'publish' , gcps_span .data [' gcps' ][ 'op' ])
63
- self .assertEqual ( self . topic_name , gcps_span .data [' gcps' ][ ' top' ])
66
+ assert gcps_span .data [" gcps" ][ "op" ] == "publish"
67
+ assert self .topic_name == gcps_span .data [" gcps" ][ " top" ]
64
68
65
69
# Trace Context Propagation
66
70
self .assertTraceContextPropagated (test_span , gcps_span )
67
71
68
72
# Error logging
69
73
self .assertErrorLogging (spans )
70
74
71
- def test_publish_as_root_exit_span (self ):
75
+ def test_publish_as_root_exit_span (self ) -> None :
72
76
agent .options .allow_exit_as_root = True
73
77
# publish a single message
74
- future = self .publisher .publish (self . topic_path ,
75
- b' Test Message' ,
76
- origin = "instana" )
78
+ future = self .publisher .publish (
79
+ self . topic_path , b" Test Message" , origin = "instana"
80
+ )
77
81
time .sleep (2.0 ) # for sanity
78
82
result = future .result ()
79
- self . assertIsInstance (result , six .string_types )
83
+ assert isinstance (result , six .string_types )
80
84
81
85
spans = self .recorder .queued_spans ()
82
- self . assertEqual ( 1 , len (spans ))
86
+ assert len (spans ) == 1
83
87
gcps_span = spans [0 ]
84
88
85
- self .assertIsNone (tracer .active_span )
86
- self .assertEqual ('gcps' , gcps_span .n )
87
- self .assertEqual (2 , gcps_span .k ) # EXIT
89
+ current_span = get_current_span ()
90
+ assert not current_span .is_recording ()
91
+ assert gcps_span .n == "gcps"
92
+ assert gcps_span .k is SpanKind .CLIENT
88
93
89
- self . assertEqual ( 'publish' , gcps_span .data [' gcps' ][ 'op' ])
90
- self .assertEqual ( self . topic_name , gcps_span .data [' gcps' ][ ' top' ])
94
+ assert gcps_span .data [" gcps" ][ "op" ] == "publish"
95
+ assert self .topic_name == gcps_span .data [" gcps" ][ " top" ]
91
96
92
97
# Error logging
93
98
self .assertErrorLogging (spans )
94
99
95
100
96
101
class AckCallback (object ):
97
- def __init__ (self ):
102
+ def __init__ (self ) -> None :
98
103
self .calls = 0
99
104
self .lock = threading .Lock ()
100
105
101
- def __call__ (self , message ):
106
+ def __call__ (self , message ) -> None :
102
107
message .ack ()
103
108
# Only increment the number of calls **after** finishing.
104
109
with self .lock :
105
110
self .calls += 1
106
111
107
112
108
- class TestPubSubSubscribe (unittest . TestCase , _TraceContextMixin ):
113
+ class TestPubSubSubscribe (_TraceContextMixin ):
109
114
@classmethod
110
- def setUpClass (cls ):
115
+ def setup_class (cls ) -> None :
111
116
cls .publisher = PublisherClient ()
112
117
cls .subscriber = SubscriberClient ()
113
118
114
- def setUp ( self ):
115
-
116
- self .recorder = tracer .recorder
119
+ @ pytest . fixture ( autouse = True )
120
+ def _resource ( self ) -> Generator [ None , None , None ]:
121
+ self .recorder = tracer .span_processor
117
122
self .recorder .clear_spans ()
118
123
119
- self .project_id = ' test-project'
120
- self .topic_name = ' test-topic'
121
- self .subscription_name = ' test-subscription'
124
+ self .project_id = " test-project"
125
+ self .topic_name = " test-topic"
126
+ self .subscription_name = " test-subscription"
122
127
123
128
# setup topic_path & topic
124
129
self .topic_path = self .publisher .topic_path (self .project_id , self .topic_name )
@@ -130,29 +135,33 @@ def setUp(self):
130
135
131
136
# setup subscription path & attach subscription
132
137
self .subscription_path = self .subscriber .subscription_path (
133
- self .project_id , self .subscription_name )
138
+ self .project_id ,
139
+ self .subscription_name ,
140
+ )
134
141
try :
135
142
self .subscriber .create_subscription (
136
143
request = {"name" : self .subscription_path , "topic" : self .topic_path }
137
144
)
138
145
except AlreadyExists :
139
- self .subscriber .delete_subscription (request = {"subscription" : self .subscription_path })
146
+ self .subscriber .delete_subscription (
147
+ request = {"subscription" : self .subscription_path }
148
+ )
140
149
self .subscriber .create_subscription (
141
150
request = {"name" : self .subscription_path , "topic" : self .topic_path }
142
151
)
143
-
144
- def tearDown (self ):
152
+ yield
145
153
self .publisher .delete_topic (request = {"topic" : self .topic_path })
146
- self .subscriber .delete_subscription (request = { "subscription" : self . subscription_path })
147
-
148
- def test_subscribe ( self ):
154
+ self .subscriber .delete_subscription (
155
+ request = { "subscription" : self . subscription_path }
156
+ )
149
157
150
- with tracer .start_active_span ('test' ):
158
+ def test_subscribe (self ) -> None :
159
+ with tracer .start_as_current_span ("test" ):
151
160
# Publish a message
152
- future = self .publisher .publish (self . topic_path ,
153
- b"Test Message to PubSub" ,
154
- origin = "instana" )
155
- self . assertIsInstance (future .result (), six .string_types )
161
+ future = self .publisher .publish (
162
+ self . topic_path , b"Test Message to PubSub" , origin = "instana"
163
+ )
164
+ assert isinstance (future .result (), six .string_types )
156
165
157
166
time .sleep (2.0 ) # for sanity
158
167
@@ -171,15 +180,16 @@ def test_subscribe(self):
171
180
consumer_span = spans [1 ]
172
181
test_span = spans [2 ]
173
182
174
- self .assertEqual (3 , len (spans ))
175
- self .assertIsNone (tracer .active_span )
176
- self .assertEqual ('publish' , producer_span .data ['gcps' ]['op' ])
177
- self .assertEqual ('consume' , consumer_span .data ['gcps' ]['op' ])
178
- self .assertEqual (self .topic_name , producer_span .data ['gcps' ]['top' ])
179
- self .assertEqual (self .subscription_name , consumer_span .data ['gcps' ]['sub' ])
183
+ assert len (spans ) == 3
184
+ current_span = get_current_span ()
185
+ assert not current_span .is_recording ()
186
+ assert producer_span .data ["gcps" ]["op" ] == "publish"
187
+ assert consumer_span .data ["gcps" ]["op" ] == "consume"
188
+ assert self .topic_name == producer_span .data ["gcps" ]["top" ]
189
+ assert self .subscription_name == consumer_span .data ["gcps" ]["sub" ]
180
190
181
- self . assertEqual ( 2 , producer_span .k ) # EXIT
182
- self . assertEqual ( 1 , consumer_span .k ) # ENTRY
191
+ assert producer_span .k is SpanKind . CLIENT
192
+ assert consumer_span .k is SpanKind . SERVER
183
193
184
194
# Trace Context Propagation
185
195
self .assertTraceContextPropagated (producer_span , consumer_span )
0 commit comments