|
2 | 2 | # (c) Copyright Instana Inc. 2019
|
3 | 3 |
|
4 | 4 |
|
5 |
| -import opentracing |
| 5 | +from types import SimpleNamespace |
| 6 | +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Tuple |
6 | 7 | import wrapt
|
7 | 8 |
|
8 |
| -from ...log import logger |
9 |
| -from ...singletons import agent, async_tracer |
10 |
| -from ...util.secrets import strip_secrets_from_query |
11 |
| -from ...util.traceutils import tracing_is_off |
| 9 | +from opentelemetry.semconv.trace import SpanAttributes |
| 10 | + |
| 11 | +from instana.log import logger |
| 12 | +from instana.propagators.format import Format |
| 13 | +from instana.singletons import agent |
| 14 | +from instana.util.secrets import strip_secrets_from_query |
| 15 | +from instana.util.traceutils import get_tracer_tuple, tracing_is_off |
12 | 16 |
|
13 | 17 | try:
|
14 | 18 | import aiohttp
|
15 |
| - import asyncio |
16 | 19 |
|
| 20 | + if TYPE_CHECKING: |
| 21 | + from aiohttp.client import ClientSession |
| 22 | + from instana.span.span import InstanaSpan |
17 | 23 |
|
18 |
| - async def stan_request_start(session, trace_config_ctx, params): |
| 24 | + async def stan_request_start( |
| 25 | + session: "ClientSession", trace_config_ctx: SimpleNamespace, params |
| 26 | + ) -> Awaitable[None]: |
19 | 27 | try:
|
20 | 28 | # If we're not tracing, just return
|
21 | 29 | if tracing_is_off():
|
22 |
| - trace_config_ctx.scope = None |
| 30 | + trace_config_ctx.span_context = None |
23 | 31 | return
|
24 | 32 |
|
25 |
| - scope = async_tracer.start_active_span("aiohttp-client", child_of=async_tracer.active_span) |
26 |
| - trace_config_ctx.scope = scope |
| 33 | + tracer, parent_span, _ = get_tracer_tuple() |
| 34 | + parent_context = parent_span.get_span_context() if parent_span else None |
| 35 | + |
| 36 | + span = tracer.start_span("aiohttp-client", span_context=parent_context) |
27 | 37 |
|
28 |
| - async_tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, params.headers) |
| 38 | + tracer.inject(span.context, Format.HTTP_HEADERS, params.headers) |
29 | 39 |
|
30 |
| - parts = str(params.url).split('?') |
| 40 | + parts = str(params.url).split("?") |
31 | 41 | if len(parts) > 1:
|
32 |
| - cleaned_qp = strip_secrets_from_query(parts[1], agent.options.secrets_matcher, |
33 |
| - agent.options.secrets_list) |
34 |
| - scope.span.set_tag("http.params", cleaned_qp) |
35 |
| - scope.span.set_tag("http.url", parts[0]) |
36 |
| - scope.span.set_tag('http.method', params.method) |
| 42 | + cleaned_qp = strip_secrets_from_query( |
| 43 | + parts[1], agent.options.secrets_matcher, agent.options.secrets_list |
| 44 | + ) |
| 45 | + span.set_attribute("http.params", cleaned_qp) |
| 46 | + span.set_attribute(SpanAttributes.HTTP_URL, parts[0]) |
| 47 | + span.set_attribute(SpanAttributes.HTTP_METHOD, params.method) |
| 48 | + trace_config_ctx.span_context = span |
37 | 49 | except Exception:
|
38 |
| - logger.debug("stan_request_start", exc_info=True) |
| 50 | + logger.debug("aiohttp-client stan_request_start error:", exc_info=True) |
39 | 51 |
|
40 |
| - |
41 |
| - async def stan_request_end(session, trace_config_ctx, params): |
| 52 | + async def stan_request_end( |
| 53 | + session: "ClientSession", trace_config_ctx: SimpleNamespace, params |
| 54 | + ) -> Awaitable[None]: |
42 | 55 | try:
|
43 |
| - scope = trace_config_ctx.scope |
44 |
| - if scope is not None: |
45 |
| - scope.span.set_tag('http.status_code', params.response.status) |
| 56 | + span: "InstanaSpan" = trace_config_ctx.span_context |
| 57 | + if span: |
| 58 | + span.set_attribute( |
| 59 | + SpanAttributes.HTTP_STATUS_CODE, params.response.status |
| 60 | + ) |
46 | 61 |
|
47 |
| - if agent.options.extra_http_headers is not None: |
| 62 | + if agent.options.extra_http_headers: |
48 | 63 | for custom_header in agent.options.extra_http_headers:
|
49 | 64 | if custom_header in params.response.headers:
|
50 |
| - scope.span.set_tag("http.header.%s" % custom_header, params.response.headers[custom_header]) |
| 65 | + span.set_attribute( |
| 66 | + "http.header.%s" % custom_header, |
| 67 | + params.response.headers[custom_header], |
| 68 | + ) |
51 | 69 |
|
52 | 70 | if 500 <= params.response.status:
|
53 |
| - scope.span.mark_as_errored({"http.error": params.response.reason}) |
| 71 | + span.mark_as_errored({"http.error": params.response.reason}) |
54 | 72 |
|
55 |
| - scope.close() |
| 73 | + if span.is_recording(): |
| 74 | + span.end() |
| 75 | + trace_config_ctx = None |
56 | 76 | except Exception:
|
57 |
| - logger.debug("stan_request_end", exc_info=True) |
58 |
| - |
| 77 | + logger.debug("aiohttp-client stan_request_end error:", exc_info=True) |
59 | 78 |
|
60 |
| - async def stan_request_exception(session, trace_config_ctx, params): |
| 79 | + async def stan_request_exception( |
| 80 | + session: "ClientSession", trace_config_ctx: SimpleNamespace, params |
| 81 | + ) -> Awaitable[None]: |
61 | 82 | try:
|
62 |
| - scope = trace_config_ctx.scope |
63 |
| - if scope is not None: |
64 |
| - scope.span.log_exception(params.exception) |
65 |
| - scope.span.set_tag("http.error", str(params.exception)) |
66 |
| - scope.close() |
| 83 | + span: "InstanaSpan" = trace_config_ctx.span_context |
| 84 | + if span: |
| 85 | + span.record_exception(params.exception) |
| 86 | + span.set_attribute("http.error", str(params.exception)) |
| 87 | + if span.is_recording(): |
| 88 | + span.end() |
| 89 | + trace_config_ctx = None |
67 | 90 | except Exception:
|
68 |
| - logger.debug("stan_request_exception", exc_info=True) |
69 |
| - |
70 |
| - |
71 |
| - @wrapt.patch_function_wrapper('aiohttp.client', 'ClientSession.__init__') |
72 |
| - def init_with_instana(wrapped, instance, argv, kwargs): |
| 91 | + logger.debug("aiohttp-client stan_request_exception error:", exc_info=True) |
| 92 | + |
| 93 | + @wrapt.patch_function_wrapper("aiohttp.client", "ClientSession.__init__") |
| 94 | + def init_with_instana( |
| 95 | + wrapped: Callable[..., Awaitable["ClientSession"]], |
| 96 | + instance: aiohttp.client.ClientSession, |
| 97 | + args: Tuple[int, str, Tuple[object, ...]], |
| 98 | + kwargs: Dict[str, Any], |
| 99 | + ) -> object: |
73 | 100 | instana_trace_config = aiohttp.TraceConfig()
|
74 | 101 | instana_trace_config.on_request_start.append(stan_request_start)
|
75 | 102 | instana_trace_config.on_request_end.append(stan_request_end)
|
76 | 103 | instana_trace_config.on_request_exception.append(stan_request_exception)
|
77 |
| - if 'trace_configs' in kwargs: |
78 |
| - kwargs['trace_configs'].append(instana_trace_config) |
| 104 | + if "trace_configs" in kwargs: |
| 105 | + kwargs["trace_configs"].append(instana_trace_config) |
79 | 106 | else:
|
80 |
| - kwargs['trace_configs'] = [instana_trace_config] |
81 |
| - |
82 |
| - return wrapped(*argv, **kwargs) |
| 107 | + kwargs["trace_configs"] = [instana_trace_config] |
83 | 108 |
|
| 109 | + return wrapped(*args, **kwargs) |
84 | 110 |
|
85 | 111 | logger.debug("Instrumenting aiohttp client")
|
86 | 112 | except ImportError:
|
|
0 commit comments