Skip to content

Commit 7576c42

Browse files
CagriYoncapvital
authored andcommitted
refactor(pubsub): added pubsub otel instrumentation
1 parent d408cb3 commit 7576c42

File tree

2 files changed

+77
-50
lines changed

2 files changed

+77
-50
lines changed

src/instana/__init__.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,10 @@ def boot_agent():
193193
# from instana.instrumentation.aws import lambda_inst # noqa: F401
194194
from instana.instrumentation import celery # noqa: F401
195195
from instana.instrumentation.django import middleware # noqa: F401
196-
197-
# from instana.instrumentation.google.cloud import (
198-
# pubsub, # noqa: F401
199-
# storage, # noqa: F401
200-
# )
196+
from instana.instrumentation.google.cloud import (
197+
pubsub, # noqa: F401
198+
storage, # noqa: F401
199+
)
201200
from instana.instrumentation.tornado import (
202201
client, # noqa: F401
203202
server, # noqa: F401

src/instana/instrumentation/google/cloud/pubsub.py

Lines changed: 73 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,50 @@
22
# (c) Copyright Instana Inc. 2021
33

44

5-
import json
5+
from typing import TYPE_CHECKING, Any, Callable, Dict, Tuple
6+
67
import wrapt
7-
from opentracing import Format
88

9-
from ....log import logger
10-
from ....singletons import tracer
11-
from ....util.traceutils import get_tracer_tuple, tracing_is_off
9+
from instana.log import logger
10+
from instana.propagators.format import Format
11+
from instana.singletons import tracer
12+
from instana.util.traceutils import get_tracer_tuple, tracing_is_off
13+
14+
if TYPE_CHECKING:
15+
from instana.span.span import InstanaSpan
1216

1317
try:
1418
from google.cloud import pubsub_v1
1519

16-
17-
def _set_publisher_tags(span, topic_path):
18-
span.set_tag('gcps.op', 'publish')
20+
def _set_publisher_attributes(
21+
span: "InstanaSpan",
22+
topic_path: str,
23+
) -> None:
24+
span.set_attribute("gcps.op", "publish")
1925
# Fully qualified identifier is in the form of
2026
# `projects/{project_id}/topic/{topic_name}`
21-
project_id, topic_name = topic_path.split('/')[1::2]
22-
span.set_tag('gcps.projid', project_id)
23-
span.set_tag('gcps.top', topic_name)
24-
25-
26-
def _set_consumer_tags(span, subscription_path):
27-
span.set_tag('gcps.op', 'consume')
27+
project_id, topic_name = topic_path.split("/")[1::2]
28+
span.set_attribute("gcps.projid", project_id)
29+
span.set_attribute("gcps.top", topic_name)
30+
31+
def _set_consumer_attributes(
32+
span: "InstanaSpan",
33+
subscription_path: str,
34+
) -> None:
35+
span.set_attribute("gcps.op", "consume")
2836
# Fully qualified identifier is in the form of
2937
# `projects/{project_id}/subscriptions/{subscription_name}`
30-
project_id, subscription_id = subscription_path.split('/')[1::2]
31-
span.set_tag('gcps.projid', project_id)
32-
span.set_tag('gcps.sub', subscription_id)
33-
34-
35-
@wrapt.patch_function_wrapper('google.cloud.pubsub_v1', 'PublisherClient.publish')
36-
def publish_with_instana(wrapped, instance, args, kwargs):
38+
project_id, subscription_id = subscription_path.split("/")[1::2]
39+
span.set_attribute("gcps.projid", project_id)
40+
span.set_attribute("gcps.sub", subscription_id)
41+
42+
@wrapt.patch_function_wrapper("google.cloud.pubsub_v1", "PublisherClient.publish")
43+
def publish_with_instana(
44+
wrapped: Callable[..., object],
45+
instance: pubsub_v1.PublisherClient,
46+
args: Tuple[object, ...],
47+
kwargs: Dict[str, Any],
48+
) -> object:
3749
"""References:
3850
- PublisherClient.publish(topic_path, messages, metadata)
3951
"""
@@ -42,59 +54,75 @@ def publish_with_instana(wrapped, instance, args, kwargs):
4254
return wrapped(*args, **kwargs)
4355

4456
tracer, parent_span, _ = get_tracer_tuple()
57+
parent_context = parent_span.get_span_context() if parent_span else None
4558

46-
with tracer.start_active_span('gcps-producer', child_of=parent_span) as scope:
59+
with tracer.start_as_current_span(
60+
"gcps-producer", span_context=parent_context
61+
) as span:
4762
# trace continuity, inject to the span context
48-
headers = dict()
49-
tracer.inject(scope.span.context, Format.TEXT_MAP, headers, disable_w3c_trace_context=True)
63+
headers = {}
64+
tracer.inject(
65+
span.context,
66+
Format.TEXT_MAP,
67+
headers,
68+
disable_w3c_trace_context=True,
69+
)
70+
71+
headers = {key: str(value) for key, value in headers.items()}
5072

5173
# update the metadata dict with instana trace attributes
5274
kwargs.update(headers)
5375

54-
_set_publisher_tags(scope.span, topic_path=args[0])
76+
_set_publisher_attributes(span, topic_path=args[0])
5577

5678
try:
5779
rv = wrapped(*args, **kwargs)
58-
except Exception as e:
59-
scope.span.log_exception(e)
60-
raise
80+
except Exception as exc:
81+
span.record_exception(exc)
6182
else:
6283
return rv
6384

64-
65-
@wrapt.patch_function_wrapper('google.cloud.pubsub_v1', 'SubscriberClient.subscribe')
66-
def subscribe_with_instana(wrapped, instance, args, kwargs):
67-
85+
@wrapt.patch_function_wrapper(
86+
"google.cloud.pubsub_v1", "SubscriberClient.subscribe"
87+
)
88+
def subscribe_with_instana(
89+
wrapped: Callable[..., object],
90+
instance: pubsub_v1.SubscriberClient,
91+
args: Tuple[object, ...],
92+
kwargs: Dict[str, Any],
93+
) -> object:
6894
"""References:
6995
- SubscriberClient.subscribe(subscription_path, callback)
7096
- callback(message) is called from the subscription future
7197
"""
7298

7399
def callback_with_instana(message):
74100
if message.attributes:
75-
parent_span = tracer.extract(Format.TEXT_MAP, message.attributes, disable_w3c_trace_context=True)
101+
parent_context = tracer.extract(
102+
Format.TEXT_MAP, message.attributes, disable_w3c_trace_context=True
103+
)
76104
else:
77-
parent_span = None
105+
parent_context = None
78106

79-
with tracer.start_active_span('gcps-consumer', child_of=parent_span) as scope:
80-
_set_consumer_tags(scope.span, subscription_path=args[0])
107+
with tracer.start_as_current_span(
108+
"gcps-consumer", span_context=parent_context
109+
) as span:
110+
_set_consumer_attributes(span, subscription_path=args[0])
81111
try:
82112
callback(message)
83-
except Exception as e:
84-
scope.span.log_exception(e)
85-
raise
113+
except Exception as exc:
114+
span.record_exception(exc)
86115

87116
# Handle callback appropriately from args or kwargs
88-
if 'callback' in kwargs:
89-
callback = kwargs.get('callback')
90-
kwargs['callback'] = callback_with_instana
117+
if "callback" in kwargs:
118+
callback = kwargs.get("callback")
119+
kwargs["callback"] = callback_with_instana
91120
return wrapped(*args, **kwargs)
92121
else:
93122
subscription, callback, *args = args
94123
args = (subscription, callback_with_instana, *args)
95124
return wrapped(*args, **kwargs)
96125

97-
98-
logger.debug('Instrumenting Google Cloud Pub/Sub')
126+
logger.debug("Instrumenting Google Cloud Pub/Sub")
99127
except ImportError:
100128
pass

0 commit comments

Comments
 (0)