Skip to content

Commit 6f5b1bf

Browse files
authored
clear context on Laminar.force_flush() (#215)
* clear context on Laminar.force_flush() * tests to verify context is properly cleared
1 parent d35c357 commit 6f5b1bf

File tree

4 files changed

+118
-11
lines changed

4 files changed

+118
-11
lines changed

src/lmnr/opentelemetry_lib/tracing/__init__.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,11 @@
1212
)
1313
from lmnr.opentelemetry_lib.tracing.context import (
1414
attach_context,
15+
clear_context,
1516
pop_span_context as ctx_pop_span_context,
1617
get_current_context,
1718
get_token_stack,
1819
push_span_context as ctx_push_span_context,
19-
_isolated_token_stack,
20-
_isolated_token_stack_storage,
2120
set_token_stack,
2221
)
2322

@@ -230,14 +229,7 @@ def clear(cls):
230229
if isinstance(cls.instance._span_processor, LaminarSpanProcessor):
231230
cls.instance._span_processor.clear()
232231
# Clear the isolated context state for clean test state
233-
try:
234-
_isolated_token_stack.set([])
235-
except LookupError:
236-
pass
237-
if hasattr(_isolated_token_stack_storage, "token_stack"):
238-
_isolated_token_stack_storage.token_stack = []
239-
# Reset the isolated context to a fresh state
240-
attach_context(Context())
232+
clear_context()
241233

242234
def shutdown(self):
243235
if self._tracer_provider is None:
@@ -254,6 +246,9 @@ def force_reinit_processor(self):
254246
if isinstance(self._span_processor, LaminarSpanProcessor):
255247
self._span_processor.force_flush()
256248
self._span_processor.force_reinit()
249+
# Clear the isolated context to prevent subsequent invocations
250+
# (e.g., in Lambda) from continuing traces from previous invocations
251+
clear_context()
257252
else:
258253
self._logger.warning(
259254
"Not using LaminarSpanProcessor, cannot force reinit processor"

src/lmnr/opentelemetry_lib/tracing/context.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,29 @@ def push_span_context(context: Context) -> None:
141141
token_stack = get_token_stack().copy()
142142
token_stack.append(token)
143143
set_token_stack(token_stack)
144+
145+
146+
def clear_context() -> None:
147+
"""Clear the isolated context and token stack.
148+
149+
This is primarily used during force_flush operations in Lambda-like
150+
environments to ensure subsequent invocations don't continue traces
151+
from previous invocations.
152+
153+
Warning: This should only be called when you're certain no spans are
154+
actively being processed, as it will reset all context state.
155+
"""
156+
# Clear the token stack first
157+
try:
158+
_isolated_token_stack.set([])
159+
except LookupError:
160+
pass
161+
162+
# Clear thread-local storage if it exists
163+
if hasattr(_isolated_token_stack_storage, "token_stack"):
164+
_isolated_token_stack_storage.token_stack = []
165+
166+
# Reset the context to a fresh empty context
167+
# This doesn't require manually detaching tokens since we're
168+
# intentionally resetting everything to a clean state
169+
_ISOLATED_RUNTIME_CONTEXT._current_context.set(Context())

src/lmnr/sdk/laminar.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,9 @@ def flush(cls) -> bool:
892892

893893
@classmethod
894894
def force_flush(cls):
895-
"""Force flush the internal tracer.
895+
"""Force flush the internal tracer. WARNING: Any active spans are
896+
removed from context; that is, spans started afterwards will start
897+
a new trace.
896898
897899
Actually shuts down the span processor and re-initializes it as long
898900
as it is a LaminarSpanProcessor. This is not recommended in production

tests/test_context.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from opentelemetry.trace.span import INVALID_SPAN_ID
2+
from lmnr import Laminar, observe
3+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
4+
5+
6+
def test_clear_context_observe(span_exporter: InMemorySpanExporter):
7+
@observe()
8+
def inner():
9+
Laminar.set_trace_user_id("test_user_id_2")
10+
return "inner"
11+
12+
@observe()
13+
def outer():
14+
Laminar.set_trace_user_id("test_user_id_1")
15+
Laminar.force_flush()
16+
return inner()
17+
18+
outer()
19+
spans = span_exporter.get_finished_spans()
20+
assert len(spans) == 2
21+
outer_span = [s for s in spans if s.name == "outer"][0]
22+
inner_span = [s for s in spans if s.name == "inner"][0]
23+
assert (
24+
outer_span.attributes["lmnr.association.properties.user_id"] == "test_user_id_1"
25+
)
26+
assert (
27+
inner_span.attributes["lmnr.association.properties.user_id"] == "test_user_id_2"
28+
)
29+
30+
assert inner_span.parent is None or inner_span.parent.span_id == INVALID_SPAN_ID
31+
assert (
32+
inner_span.get_span_context().trace_id != outer_span.get_span_context().trace_id
33+
)
34+
35+
36+
def test_clear_context_start_as_current_span(span_exporter: InMemorySpanExporter):
37+
with Laminar.start_as_current_span("outer"):
38+
Laminar.set_trace_user_id("test_user_id_1")
39+
Laminar.force_flush()
40+
with Laminar.start_as_current_span("inner"):
41+
Laminar.set_trace_user_id("test_user_id_2")
42+
pass
43+
44+
spans = span_exporter.get_finished_spans()
45+
assert len(spans) == 2
46+
outer_span = [s for s in spans if s.name == "outer"][0]
47+
inner_span = [s for s in spans if s.name == "inner"][0]
48+
assert (
49+
outer_span.attributes["lmnr.association.properties.user_id"] == "test_user_id_1"
50+
)
51+
assert (
52+
inner_span.attributes["lmnr.association.properties.user_id"] == "test_user_id_2"
53+
)
54+
55+
assert inner_span.parent is None or inner_span.parent.span_id == INVALID_SPAN_ID
56+
assert (
57+
inner_span.get_span_context().trace_id != outer_span.get_span_context().trace_id
58+
)
59+
60+
61+
def test_clear_context_start_active_span(span_exporter: InMemorySpanExporter):
62+
span = Laminar.start_active_span("outer")
63+
Laminar.set_trace_user_id("test_user_id_1")
64+
Laminar.force_flush()
65+
span2 = Laminar.start_active_span("inner")
66+
Laminar.set_trace_user_id("test_user_id_2")
67+
span2.end()
68+
span.end()
69+
70+
spans = span_exporter.get_finished_spans()
71+
assert len(spans) == 2
72+
outer_span = [s for s in spans if s.name == "outer"][0]
73+
inner_span = [s for s in spans if s.name == "inner"][0]
74+
assert (
75+
outer_span.attributes["lmnr.association.properties.user_id"] == "test_user_id_1"
76+
)
77+
assert (
78+
inner_span.attributes["lmnr.association.properties.user_id"] == "test_user_id_2"
79+
)
80+
81+
assert inner_span.parent is None or inner_span.parent.span_id == INVALID_SPAN_ID
82+
assert (
83+
inner_span.get_span_context().trace_id != outer_span.get_span_context().trace_id
84+
)

0 commit comments

Comments
 (0)