|
2 | 2 | # (c) Copyright Instana Inc. 2019
|
3 | 3 |
|
4 | 4 |
|
| 5 | +import time |
| 6 | +from contextlib import contextmanager |
| 7 | +from typing import Any, Callable, Dict, Iterator, Tuple |
| 8 | + |
5 | 9 | import wrapt
|
6 |
| -from opentracing.scope_managers.constants import ACTIVE_ATTR |
7 |
| -from opentracing.scope_managers.contextvars import no_parent_scope |
| 10 | +from opentelemetry.trace import use_span |
| 11 | +from opentelemetry.trace.status import StatusCode |
8 | 12 |
|
9 |
| -from ..configurator import config |
10 |
| -from ..log import logger |
11 |
| -from ..singletons import async_tracer |
| 13 | +from instana.configurator import config |
| 14 | +from instana.log import logger |
| 15 | +from instana.span.span import InstanaSpan |
| 16 | +from instana.util.traceutils import get_tracer_tuple, tracing_is_off |
12 | 17 |
|
13 | 18 | try:
|
14 | 19 | import asyncio
|
15 | 20 |
|
16 | 21 | @wrapt.patch_function_wrapper("asyncio", "ensure_future")
|
17 |
| - def ensure_future_with_instana(wrapped, instance, argv, kwargs): |
18 |
| - if config["asyncio_task_context_propagation"]["enabled"] is False: |
19 |
| - with no_parent_scope(): |
20 |
| - return wrapped(*argv, **kwargs) |
21 |
| - |
22 |
| - scope = async_tracer.scope_manager.active |
23 |
| - task = wrapped(*argv, **kwargs) |
24 |
| - |
25 |
| - if scope is not None: |
26 |
| - setattr(task, ACTIVE_ATTR, scope) |
| 22 | + def ensure_future_with_instana( |
| 23 | + wrapped: Callable[..., asyncio.ensure_future], |
| 24 | + instance: object, |
| 25 | + argv: Tuple[object, Tuple[object, ...]], |
| 26 | + kwargs: Dict[str, Any], |
| 27 | + ) -> object: |
| 28 | + if ( |
| 29 | + not config["asyncio_task_context_propagation"]["enabled"] |
| 30 | + or tracing_is_off() |
| 31 | + ): |
| 32 | + return wrapped(*argv, **kwargs) |
27 | 33 |
|
28 |
| - return task |
| 34 | + with _start_as_current_async_span() as span: |
| 35 | + try: |
| 36 | + span.set_status(StatusCode.OK) |
| 37 | + return wrapped(*argv, **kwargs) |
| 38 | + except Exception as exc: |
| 39 | + logger.debug(f"asyncio ensure_future_with_instana error: {exc}") |
29 | 40 |
|
30 | 41 | if hasattr(asyncio, "create_task"):
|
31 | 42 |
|
32 | 43 | @wrapt.patch_function_wrapper("asyncio", "create_task")
|
33 |
| - def create_task_with_instana(wrapped, instance, argv, kwargs): |
34 |
| - if config["asyncio_task_context_propagation"]["enabled"] is False: |
35 |
| - with no_parent_scope(): |
| 44 | + def create_task_with_instana( |
| 45 | + wrapped: Callable[..., asyncio.create_task], |
| 46 | + instance: object, |
| 47 | + argv: Tuple[object, Tuple[object, ...]], |
| 48 | + kwargs: Dict[str, Any], |
| 49 | + ) -> object: |
| 50 | + if ( |
| 51 | + not config["asyncio_task_context_propagation"]["enabled"] |
| 52 | + or tracing_is_off() |
| 53 | + ): |
| 54 | + return wrapped(*argv, **kwargs) |
| 55 | + |
| 56 | + with _start_as_current_async_span() as span: |
| 57 | + try: |
| 58 | + span.set_status(StatusCode.OK) |
36 | 59 | return wrapped(*argv, **kwargs)
|
| 60 | + except Exception as exc: |
| 61 | + logger.debug(f"asyncio create_task_with_instana error: {exc}") |
37 | 62 |
|
38 |
| - scope = async_tracer.scope_manager.active |
39 |
| - task = wrapped(*argv, **kwargs) |
| 63 | + @contextmanager |
| 64 | + def _start_as_current_async_span() -> Iterator[InstanaSpan]: |
| 65 | + """ |
| 66 | + Creates and yield a special InstanaSpan to only propagate the Asyncio |
| 67 | + context. |
| 68 | + """ |
| 69 | + tracer, parent_span, _ = get_tracer_tuple() |
| 70 | + parent_context = parent_span.get_span_context() if parent_span else None |
40 | 71 |
|
41 |
| - if scope is not None: |
42 |
| - setattr(task, ACTIVE_ATTR, scope) |
| 72 | + _time = time.time_ns() |
43 | 73 |
|
44 |
| - return task |
| 74 | + span = InstanaSpan( |
| 75 | + name="asyncio", |
| 76 | + context=parent_context, |
| 77 | + span_processor=tracer.span_processor, |
| 78 | + start_time=_time, |
| 79 | + end_time=_time, |
| 80 | + ) |
| 81 | + with use_span( |
| 82 | + span, |
| 83 | + end_on_exit=False, |
| 84 | + record_exception=False, |
| 85 | + set_status_on_exception=False, |
| 86 | + ) as span: |
| 87 | + yield span |
45 | 88 |
|
46 | 89 | logger.debug("Instrumenting asyncio")
|
47 | 90 | except ImportError:
|
|
0 commit comments