Skip to content

Commit d0500cd

Browse files
CagriYoncapvital
authored andcommitted
refactor(cloudstorage): added cloud storage otel instrumentation
1 parent 049fc3e commit d0500cd

File tree

1 file changed

+90
-56
lines changed
  • src/instana/instrumentation/google/cloud

1 file changed

+90
-56
lines changed

src/instana/instrumentation/google/cloud/storage.py

Lines changed: 90 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,41 @@
55
import wrapt
66
import re
77

8-
from ....log import logger
9-
from .collectors import _storage_api
10-
from ....util.traceutils import get_tracer_tuple, tracing_is_off
8+
from typing import Any, Callable, Dict, Tuple, Union
9+
from instana.log import logger
10+
from instana.instrumentation.google.cloud.collectors import _storage_api
11+
from instana.util.traceutils import get_tracer_tuple, tracing_is_off
1112

1213
try:
1314
from google.cloud import storage
1415

15-
logger.debug('Instrumenting google-cloud-storage')
16+
logger.debug("Instrumenting google-cloud-storage")
1617

17-
def _collect_tags(api_request):
18+
def _collect_attributes(
19+
api_request: Dict[str, Any],
20+
) -> Dict[str, Any]:
1821
"""
1922
Extract span tags from Google Cloud Storage API request. Returns None if the request is not
2023
supported.
2124
2225
:param: dict
2326
:return: dict or None
2427
"""
25-
method, path = api_request.get('method', None), api_request.get('path', None)
28+
method, path = api_request.get("method", None), api_request.get("path", None)
2629

2730
if method not in _storage_api:
2831
return
2932

3033
try:
31-
params = api_request.get('query_params', {})
32-
data = api_request.get('data', {})
34+
params = api_request.get("query_params", {})
35+
data = api_request.get("data", {})
3336

3437
if path in _storage_api[method]:
3538
# check is any of string keys matches the path exactly
3639
return _storage_api[method][path](params, data)
3740
else:
3841
# look for a regex that matches the string
39-
for (matcher, collect) in _storage_api[method].items():
42+
for matcher, collect in _storage_api[method].items():
4043
if not isinstance(matcher, re.Pattern):
4144
continue
4245

@@ -46,108 +49,139 @@ def _collect_tags(api_request):
4649

4750
return collect(params, data, m)
4851
except Exception:
49-
logger.debug("instana.instrumentation.google.cloud.storage._collect_tags: ", exc_info=True)
50-
51-
def execute_with_instana(wrapped, instance, args, kwargs):
52+
logger.debug(
53+
"instana.instrumentation.google.cloud.storage._collect_attributes: ",
54+
exc_info=True,
55+
)
56+
57+
def execute_with_instana(
58+
wrapped: Callable[..., object],
59+
instance: Union[storage.Batch, storage._http.Connection],
60+
args: Tuple[object, ...],
61+
kwargs: Dict[str, Any],
62+
) -> object:
5263
# batch requests are traced with finish_batch_with_instana()
5364
# also return early if we're not tracing
5465
if isinstance(instance, storage.Batch) or tracing_is_off():
5566
return wrapped(*args, **kwargs)
5667

5768
tracer, parent_span, _ = get_tracer_tuple()
58-
tags = _collect_tags(kwargs)
59-
60-
# don't trace if the call is not instrumented
61-
if tags is None:
62-
logger.debug('uninstrumented Google Cloud Storage API request: %s' % kwargs)
63-
return wrapped(*args, **kwargs)
64-
65-
with tracer.start_active_span('gcs', child_of=parent_span) as scope:
66-
for (k, v) in tags.items():
67-
scope.span.set_tag(k, v)
69+
parent_context = parent_span.get_span_context() if parent_span else None
6870

71+
with tracer.start_as_current_span("gcs", span_context=parent_context) as span:
6972
try:
73+
attributes = _collect_attributes(kwargs)
74+
75+
# don't trace if the call is not instrumented
76+
if attributes is None:
77+
logger.debug(
78+
f"uninstrumented Google Cloud Storage API request: {kwargs}"
79+
)
80+
return wrapped(*args, **kwargs)
81+
span.set_attributes(attributes)
7082
kv = wrapped(*args, **kwargs)
71-
except Exception as e:
72-
scope.span.log_exception(e)
73-
raise
83+
except Exception as exc:
84+
span.record_exception(exc)
7485
else:
7586
return kv
7687

77-
def download_with_instana(wrapped, instance, args, kwargs):
88+
def download_with_instana(
89+
wrapped: Callable[..., object],
90+
instance: storage.Blob,
91+
args: Tuple[object, ...],
92+
kwargs: Dict[str, Any],
93+
) -> object:
7894
# return early if we're not tracing
7995
if tracing_is_off():
8096
return wrapped(*args, **kwargs)
8197

8298
tracer, parent_span, _ = get_tracer_tuple()
99+
parent_context = parent_span.get_span_context() if parent_span else None
83100

84-
with tracer.start_active_span('gcs', child_of=parent_span) as scope:
85-
scope.span.set_tag('gcs.op', 'objects.get')
86-
scope.span.set_tag('gcs.bucket', instance.bucket.name)
87-
scope.span.set_tag('gcs.object', instance.name)
101+
with tracer.start_as_current_span("gcs", span_context=parent_context) as span:
102+
span.set_attribute("gcs.op", "objects.get")
103+
span.set_attribute("gcs.bucket", instance.bucket.name)
104+
span.set_attribute("gcs.object", instance.name)
88105

89-
start = len(args) > 4 and args[4] or kwargs.get('start', None)
106+
start = len(args) > 4 and args[4] or kwargs.get("start", None)
90107
if start is None:
91-
start = ''
108+
start = ""
92109

93-
end = len(args) > 5 and args[5] or kwargs.get('end', None)
110+
end = len(args) > 5 and args[5] or kwargs.get("end", None)
94111
if end is None:
95-
end = ''
112+
end = ""
96113

97-
if start != '' or end != '':
98-
scope.span.set_tag('gcs.range', '-'.join((start, end)))
114+
if start != "" or end != "":
115+
span.set_attribute("gcs.range", "-".join((start, end)))
99116

100117
try:
101118
kv = wrapped(*args, **kwargs)
102119
except Exception as e:
103-
scope.span.log_exception(e)
104-
raise
120+
span.record_exception(e)
105121
else:
106122
return kv
107123

108-
def upload_with_instana(wrapped, instance, args, kwargs):
124+
def upload_with_instana(
125+
wrapped: Callable[..., object],
126+
instance: storage.Blob,
127+
args: Tuple[object, ...],
128+
kwargs: Dict[str, Any],
129+
) -> object:
109130
# return early if we're not tracing
110131
if tracing_is_off():
111132
return wrapped(*args, **kwargs)
112133

113134
tracer, parent_span, _ = get_tracer_tuple()
135+
parent_context = parent_span.get_span_context() if parent_span else None
114136

115-
with tracer.start_active_span('gcs', child_of=parent_span) as scope:
116-
scope.span.set_tag('gcs.op', 'objects.insert')
117-
scope.span.set_tag('gcs.bucket', instance.bucket.name)
118-
scope.span.set_tag('gcs.object', instance.name)
137+
with tracer.start_as_current_span("gcs", span_context=parent_context) as span:
138+
span.set_attribute("gcs.op", "objects.insert")
139+
span.set_attribute("gcs.bucket", instance.bucket.name)
140+
span.set_attribute("gcs.object", instance.name)
119141

120142
try:
121143
kv = wrapped(*args, **kwargs)
122144
except Exception as e:
123-
scope.span.log_exception(e)
124-
raise
145+
span.record_exception(e)
125146
else:
126147
return kv
127148

128-
def finish_batch_with_instana(wrapped, instance, args, kwargs):
149+
def finish_batch_with_instana(
150+
wrapped: Callable[..., object],
151+
instance: storage.Batch,
152+
args: Tuple[object, ...],
153+
kwargs: Dict[str, Any],
154+
) -> object:
129155
# return early if we're not tracing
130156
if tracing_is_off():
131157
return wrapped(*args, **kwargs)
132158

133159
tracer, parent_span, _ = get_tracer_tuple()
160+
parent_context = parent_span.get_span_context() if parent_span else None
134161

135-
with tracer.start_active_span('gcs', child_of=parent_span) as scope:
136-
scope.span.set_tag('gcs.op', 'batch')
137-
scope.span.set_tag('gcs.projectId', instance._client.project)
138-
scope.span.set_tag('gcs.numberOfOperations', len(instance._requests))
162+
with tracer.start_as_current_span("gcs", span_context=parent_context) as span:
163+
span.set_attribute("gcs.op", "batch")
164+
span.set_attribute("gcs.projectId", instance._client.project)
165+
span.set_attribute("gcs.numberOfOperations", len(instance._requests))
139166

140167
try:
141168
kv = wrapped(*args, **kwargs)
142169
except Exception as e:
143-
scope.span.log_exception(e)
144-
raise
170+
span.record_exception(e)
145171
else:
146172
return kv
147173

148-
wrapt.wrap_function_wrapper('google.cloud.storage._http', 'Connection.api_request', execute_with_instana)
149-
wrapt.wrap_function_wrapper('google.cloud.storage.blob', 'Blob._do_download', download_with_instana)
150-
wrapt.wrap_function_wrapper('google.cloud.storage.blob', 'Blob._do_upload', upload_with_instana)
151-
wrapt.wrap_function_wrapper('google.cloud.storage.batch', 'Batch.finish', finish_batch_with_instana)
174+
wrapt.wrap_function_wrapper(
175+
"google.cloud.storage._http", "Connection.api_request", execute_with_instana
176+
)
177+
wrapt.wrap_function_wrapper(
178+
"google.cloud.storage.blob", "Blob._do_download", download_with_instana
179+
)
180+
wrapt.wrap_function_wrapper(
181+
"google.cloud.storage.blob", "Blob._do_upload", upload_with_instana
182+
)
183+
wrapt.wrap_function_wrapper(
184+
"google.cloud.storage.batch", "Batch.finish", finish_batch_with_instana
185+
)
152186
except ImportError:
153187
pass

0 commit comments

Comments
 (0)