Skip to content

Commit 0e628ff

Browse files
authored
Asynqp consume support (#114)
* Use a callback generator to wrap consumers * Update tests to follow changes * Cleanup
1 parent 2af2325 commit 0e628ff

File tree

2 files changed

+35
-46
lines changed

2 files changed

+35
-46
lines changed

instana/instrumentation/asynqp.py

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
from __future__ import absolute_import
22

3-
import sys
4-
53
import opentracing
64
import wrapt
75

@@ -65,32 +63,36 @@ def get_with_instana(wrapped, instance, argv, kwargs):
6563

6664
return msg
6765

68-
@wrapt.patch_function_wrapper('asynqp.queue','Consumers.deliver')
69-
def deliver_with_instana(wrapped, instance, argv, kwargs):
70-
71-
ctx = None
72-
msg = argv[1]
73-
if msg.headers is not None:
74-
ctx = async_tracer.extract(opentracing.Format.HTTP_HEADERS, dict(msg.headers))
75-
76-
with async_tracer.start_active_span("rabbitmq", child_of=ctx) as scope:
77-
host, port = argv[1].sender.protocol.transport._sock.getsockname()
78-
79-
try:
80-
scope.span.set_tag("exchange", msg.exchange_name)
81-
scope.span.set_tag("sort", "consume")
82-
scope.span.set_tag("address", host + ":" + str(port) )
83-
scope.span.set_tag("key", msg.routing_key)
84-
85-
rv = wrapped(*argv, **kwargs)
86-
except Exception as e:
87-
scope.span.log_kv({'message': e})
88-
scope.span.set_tag("error", True)
89-
ec = scope.span.tags.get('ec', 0)
90-
scope.span.set_tag("ec", ec+1)
91-
raise
92-
else:
93-
return rv
66+
@wrapt.patch_function_wrapper('asynqp.queue','Queue.consume')
67+
def consume_with_instana(wrapped, instance, argv, kwargs):
68+
def callback_generator(original_callback):
69+
def callback_with_instana(*argv, **kwargs):
70+
ctx = None
71+
msg = argv[0]
72+
if msg.headers is not None:
73+
ctx = async_tracer.extract(opentracing.Format.HTTP_HEADERS, dict(msg.headers))
74+
75+
with async_tracer.start_active_span("rabbitmq", child_of=ctx) as scope:
76+
host, port = msg.sender.protocol.transport._sock.getsockname()
77+
78+
try:
79+
scope.span.set_tag("exchange", msg.exchange_name)
80+
scope.span.set_tag("sort", "consume")
81+
scope.span.set_tag("address", host + ":" + str(port) )
82+
scope.span.set_tag("key", msg.routing_key)
83+
84+
original_callback(*argv, **kwargs)
85+
except Exception as e:
86+
scope.span.log_kv({'message': e})
87+
scope.span.set_tag("error", True)
88+
ec = scope.span.tags.get('ec', 0)
89+
scope.span.set_tag("ec", ec+1)
90+
raise
91+
return callback_with_instana
92+
93+
cb = argv[0]
94+
argv = (callback_generator(cb),)
95+
return wrapped(*argv, **kwargs)
9496

9597
logger.debug("Instrumenting asynqp")
9698
except ImportError:

tests/test_asynqp.py

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -218,27 +218,24 @@ def test():
218218
self.loop.run_until_complete(test())
219219

220220
spans = self.recorder.queued_spans()
221-
self.assertEqual(5, len(spans))
221+
self.assertEqual(4, len(spans))
222222

223223
publish1_span = spans[0]
224-
consume1_span = spans[1]
225-
publish2_span = spans[2]
226-
consume2_span = spans[3]
227-
test_span = spans[4]
224+
publish2_span = spans[1]
225+
consume1_span = spans[2]
226+
test_span = spans[3]
228227

229228
self.assertIsNone(async_tracer.active_span)
230229

231230
# Same traceId
232231
self.assertEqual(test_span.t, publish1_span.t)
233232
self.assertEqual(test_span.t, publish2_span.t)
234233
self.assertEqual(test_span.t, consume1_span.t)
235-
self.assertEqual(test_span.t, consume2_span.t)
236234

237235
# Parent relationships
238236
self.assertEqual(publish1_span.p, test_span.s)
239-
self.assertEqual(publish2_span.p, test_span.s)
240237
self.assertEqual(consume1_span.p, publish1_span.s)
241-
self.assertEqual(consume2_span.p, publish2_span.s)
238+
self.assertEqual(publish2_span.p, consume1_span.s)
242239

243240
# publish
244241
self.assertEqual('test.exchange', publish1_span.data.rabbitmq.exchange)
@@ -266,21 +263,11 @@ def test():
266263
self.assertTrue(type(consume1_span.stack) is list)
267264
self.assertGreater(len(consume1_span.stack), 0)
268265

269-
self.assertEqual('test.exchange', consume2_span.data.rabbitmq.exchange)
270-
self.assertEqual('consume', consume2_span.data.rabbitmq.sort)
271-
self.assertIsNotNone(consume2_span.data.rabbitmq.address)
272-
self.assertEqual('another.key', consume2_span.data.rabbitmq.key)
273-
self.assertIsNotNone(consume2_span.stack)
274-
self.assertTrue(type(consume2_span.stack) is list)
275-
self.assertGreater(len(consume2_span.stack), 0)
276-
277266
# Error logging
278267
self.assertFalse(test_span.error)
279268
self.assertIsNone(test_span.ec)
280269
self.assertFalse(consume1_span.error)
281270
self.assertIsNone(consume1_span.ec)
282-
self.assertFalse(consume2_span.error)
283-
self.assertIsNone(consume2_span.ec)
284271
self.assertFalse(publish1_span.error)
285272
self.assertIsNone(publish1_span.ec)
286273
self.assertFalse(publish2_span.error)

0 commit comments

Comments
 (0)