Skip to content

chore(llmobs): span linking for oai agents sdk #13072

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ddtrace/llmobs/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,11 @@
NAME = "_ml_obs.name"
DECORATOR = "_ml_obs.decorator"
INTEGRATION = "_ml_obs.integration"

DISPATCH_ON_TOOL_CALL_OUTPUT_USED = "on_tool_call_output_used"
DISPATCH_ON_LLM_TOOL_CHOICE = "on_llm_tool_choice"
DISPATCH_ON_TOOL_CALL = "on_tool_call"

# Tool call arguments are used to lookup the associated tool call info.
# When there are no tool call args, we use this as a place-holder lookup key
OAI_HANDOFF_TOOL_ARG = "{}"
30 changes: 26 additions & 4 deletions ddtrace/llmobs/_integrations/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
from typing import Optional
from typing import Tuple

from ddtrace.internal import core
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.utils.formats import format_trace_id
from ddtrace.internal.utils.version import parse_version
from ddtrace.llmobs._constants import DISPATCH_ON_LLM_TOOL_CHOICE
from ddtrace.llmobs._constants import DISPATCH_ON_TOOL_CALL_OUTPUT_USED
from ddtrace.llmobs._constants import INPUT_DOCUMENTS
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import INPUT_TOKENS_METRIC_KEY
Expand Down Expand Up @@ -158,6 +162,9 @@ def _llmobs_set_meta_tags_from_chat(span: Span, kwargs: Dict[str, Any], messages
"""Extract prompt/response tags from a chat completion and set them as temporary "_ml_obs.meta.*" tags."""
input_messages = []
for m in kwargs.get("messages", []):
tool_call_id = m.get("tool_call_id")
if tool_call_id:
core.dispatch(DISPATCH_ON_TOOL_CALL_OUTPUT_USED, (tool_call_id, span))
input_messages.append({"content": str(_get_attr(m, "content", "")), "role": str(_get_attr(m, "role", ""))})
parameters = {k: v for k, v in kwargs.items() if k not in ("model", "messages", "tools", "functions")}
span._set_ctx_items({INPUT_MESSAGES: input_messages, METADATA: parameters})
Expand Down Expand Up @@ -199,13 +206,28 @@ def _llmobs_set_meta_tags_from_chat(span: Span, kwargs: Dict[str, Any], messages
continue
tool_calls = _get_attr(choice_message, "tool_calls", []) or []
for tool_call in tool_calls:
tool_args = getattr(tool_call.function, "arguments", "")
tool_name = getattr(tool_call.function, "name", "")
tool_id = getattr(tool_call, "id", "")
tool_call_info = {
"name": getattr(tool_call.function, "name", ""),
"arguments": json.loads(getattr(tool_call.function, "arguments", "")),
"tool_id": getattr(tool_call, "id", ""),
"type": getattr(tool_call, "type", ""),
"name": tool_name,
"arguments": json.loads(tool_args),
"tool_id": tool_id,
"type": "function",
}
tool_calls_info.append(tool_call_info)
core.dispatch(
DISPATCH_ON_LLM_TOOL_CHOICE,
(
tool_id,
tool_name,
tool_args,
{
"trace_id": format_trace_id(span.trace_id),
"span_id": str(span.span_id),
},
),
)
if tool_calls_info:
output_messages.append({"content": content, "role": role, "tool_calls": tool_calls_info})
continue
Expand Down
36 changes: 34 additions & 2 deletions ddtrace/llmobs/_integrations/openai_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@
from typing import Union
import weakref

from ddtrace.internal import core
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils.formats import format_trace_id
from ddtrace.llmobs._constants import DISPATCH_ON_LLM_TOOL_CHOICE
from ddtrace.llmobs._constants import DISPATCH_ON_TOOL_CALL
from ddtrace.llmobs._constants import DISPATCH_ON_TOOL_CALL_OUTPUT_USED
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import INPUT_VALUE
from ddtrace.llmobs._constants import METADATA
from ddtrace.llmobs._constants import METRICS
from ddtrace.llmobs._constants import MODEL_NAME
from ddtrace.llmobs._constants import MODEL_PROVIDER
from ddtrace.llmobs._constants import NAME
from ddtrace.llmobs._constants import OAI_HANDOFF_TOOL_ARG
from ddtrace.llmobs._constants import OUTPUT_MESSAGES
from ddtrace.llmobs._constants import OUTPUT_VALUE
from ddtrace.llmobs._constants import PARENT_ID_KEY
Expand Down Expand Up @@ -201,11 +206,30 @@ def _llmobs_set_response_attributes(self, span: Span, oai_span: OaiSpanAdapter)
span._set_ctx_item(MODEL_PROVIDER, "openai")

if oai_span.input:
messages = oai_span.llmobs_input_messages()
messages, tool_call_ids = oai_span.llmobs_input_messages()

for tool_call_id in tool_call_ids:
core.dispatch(DISPATCH_ON_TOOL_CALL_OUTPUT_USED, (tool_call_id, span))

span._set_ctx_item(INPUT_MESSAGES, messages)

if oai_span.response and oai_span.response.output:
messages = oai_span.llmobs_output_messages()
messages, tool_call_outputs = oai_span.llmobs_output_messages()

for tool_id, tool_name, tool_args in tool_call_outputs:
core.dispatch(
DISPATCH_ON_LLM_TOOL_CHOICE,
(
tool_id,
tool_name,
tool_args,
{
"trace_id": format_trace_id(span.trace_id),
"span_id": str(span.span_id),
},
),
)

span._set_ctx_item(OUTPUT_MESSAGES, messages)

metadata = oai_span.llmobs_metadata
Expand All @@ -219,12 +243,20 @@ def _llmobs_set_response_attributes(self, span: Span, oai_span: OaiSpanAdapter)
def _llmobs_set_tool_attributes(self, span: Span, oai_span: OaiSpanAdapter) -> None:
span._set_ctx_item(INPUT_VALUE, oai_span.input or "")
span._set_ctx_item(OUTPUT_VALUE, oai_span.output or "")
core.dispatch(
DISPATCH_ON_TOOL_CALL,
(oai_span.name, oai_span.input, "function", span),
)

def _llmobs_set_handoff_attributes(self, span: Span, oai_span: OaiSpanAdapter) -> None:
handoff_tool_name = "transfer_to_{}".format("_".join(oai_span.to_agent.split(" ")).lower())
span.name = handoff_tool_name
span._set_ctx_item("input_value", oai_span.from_agent or "")
span._set_ctx_item("output_value", oai_span.to_agent or "")
core.dispatch(
DISPATCH_ON_TOOL_CALL,
(handoff_tool_name, OAI_HANDOFF_TOOL_ARG, "handoff", span),
)

def _llmobs_set_agent_attributes(self, span: Span, oai_span: OaiSpanAdapter) -> None:
if oai_span.llmobs_metadata:
Expand Down
36 changes: 25 additions & 11 deletions ddtrace/llmobs/_integrations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._constants import INPUT_TOKENS_METRIC_KEY
from ddtrace.llmobs._constants import OAI_HANDOFF_TOOL_ARG
from ddtrace.llmobs._constants import OUTPUT_TOKENS_METRIC_KEY
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY
from ddtrace.llmobs._utils import _get_attr
Expand Down Expand Up @@ -486,19 +487,22 @@ def get_error_data(self) -> Optional[Dict[str, Any]]:
return None
return self.error.get("data")

def llmobs_input_messages(self) -> List[Dict[str, Any]]:
def llmobs_input_messages(self) -> Tuple[List[Dict[str, Any]], List[str]]:
"""Returns processed input messages for LLM Obs LLM spans.

Returns:
A list of processed messages
- A list of processed messages
- A list of tool call IDs for span linking purposes
"""
messages = self.input
processed: List[Dict[str, Any]] = []
tool_call_ids: List[str] = []

if not messages:
return processed
return processed, tool_call_ids

if isinstance(messages, str):
return [{"content": messages, "role": "user"}]
return [{"content": messages, "role": "user"}], tool_call_ids

for item in messages:
processed_item: Dict[str, Union[str, List[Dict[str, str]]]] = {}
Expand Down Expand Up @@ -541,6 +545,7 @@ def llmobs_input_messages(self) -> List[Dict[str, Any]]:
output = json.loads(output)
except json.JSONDecodeError:
output = {"output": output}
tool_call_ids.append(item["call_id"])
processed_item["tool_calls"] = [
{
"tool_id": item["call_id"],
Expand All @@ -550,21 +555,23 @@ def llmobs_input_messages(self) -> List[Dict[str, Any]]:
if processed_item:
processed.append(processed_item)

return processed
return processed, tool_call_ids

def llmobs_output_messages(self) -> List[Dict[str, Any]]:
def llmobs_output_messages(self) -> Tuple[List[Dict[str, Any]], List[Tuple[str, str, str]]]:
"""Returns processed output messages for LLM Obs LLM spans.

Returns:
A list of processed messages
- A list of processed messages
- A list of tool call data (name, id, args) for span linking purposes
"""
if not self.response or not self.response.output:
return []
return [], []

messages: List[Any] = self.response.output
processed: List[Dict[str, Any]] = []
tool_call_outputs: List[Tuple[str, str, str]] = []
if not messages:
return processed
return processed, tool_call_outputs

if not isinstance(messages, list):
messages = [messages]
Expand All @@ -581,6 +588,13 @@ def llmobs_output_messages(self) -> List[Dict[str, Any]]:
message.update({"role": getattr(item, "role", "assistant"), "content": text})
# Handle tool calls
elif hasattr(item, "call_id") and hasattr(item, "arguments"):
tool_call_outputs.append(
(
item.call_id,
getattr(item, "name", ""),
item.arguments if item.arguments else OAI_HANDOFF_TOOL_ARG,
)
)
message.update(
{
"tool_calls": [
Expand All @@ -599,7 +613,7 @@ def llmobs_output_messages(self) -> List[Dict[str, Any]]:
message.update({"content": str(item)})
processed.append(message)

return processed
return processed, tool_call_outputs

def llmobs_trace_input(self) -> Optional[str]:
"""Converts Response span data to an input value for top level trace.
Expand All @@ -611,7 +625,7 @@ def llmobs_trace_input(self) -> Optional[str]:
return None

try:
messages = self.llmobs_input_messages()
messages, _ = self.llmobs_input_messages()
if messages and len(messages) > 0:
return messages[0].get("content")
except (AttributeError, IndexError):
Expand Down
14 changes: 14 additions & 0 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
from ddtrace.llmobs._constants import AGENTLESS_BASE_URL
from ddtrace.llmobs._constants import ANNOTATIONS_CONTEXT_ID
from ddtrace.llmobs._constants import DECORATOR
from ddtrace.llmobs._constants import DISPATCH_ON_LLM_TOOL_CHOICE
from ddtrace.llmobs._constants import DISPATCH_ON_TOOL_CALL
from ddtrace.llmobs._constants import DISPATCH_ON_TOOL_CALL_OUTPUT_USED
from ddtrace.llmobs._constants import INPUT_DOCUMENTS
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import INPUT_PROMPT
Expand All @@ -61,6 +64,7 @@
from ddtrace.llmobs._evaluators.runner import EvaluatorRunner
from ddtrace.llmobs._utils import AnnotationContext
from ddtrace.llmobs._utils import LinkTracker
from ddtrace.llmobs._utils import ToolCallTracker
from ddtrace.llmobs._utils import _get_ml_app
from ddtrace.llmobs._utils import _get_session_id
from ddtrace.llmobs._utils import _get_span_name
Expand Down Expand Up @@ -122,6 +126,8 @@ def __init__(self, tracer=None):
self._annotations = []
self._annotation_context_lock = forksafe.RLock()

self._tool_call_tracker = ToolCallTracker()

def _on_span_start(self, span):
if self.enabled and span.span_type == SpanTypes.LLM:
self._activate_llmobs_span(span)
Expand Down Expand Up @@ -303,6 +309,10 @@ def _stop_service(self) -> None:
core.reset_listeners("threading.submit", self._current_trace_context)
core.reset_listeners("threading.execution", self._llmobs_context_provider.activate)

core.reset_listeners(DISPATCH_ON_LLM_TOOL_CHOICE, self._tool_call_tracker.on_llm_tool_choice)
core.reset_listeners(DISPATCH_ON_TOOL_CALL, self._tool_call_tracker.on_tool_call)
core.reset_listeners(DISPATCH_ON_TOOL_CALL_OUTPUT_USED, self._tool_call_tracker.on_tool_call_output_used)

forksafe.unregister(self._child_after_fork)

@classmethod
Expand Down Expand Up @@ -399,6 +409,10 @@ def enable(
core.on("threading.submit", cls._instance._current_trace_context, "llmobs_ctx")
core.on("threading.execution", cls._instance._llmobs_context_provider.activate)

core.on(DISPATCH_ON_LLM_TOOL_CHOICE, cls._instance._tool_call_tracker.on_llm_tool_choice)
core.on(DISPATCH_ON_TOOL_CALL, cls._instance._tool_call_tracker.on_tool_call)
core.on(DISPATCH_ON_TOOL_CALL_OUTPUT_USED, cls._instance._tool_call_tracker.on_tool_call_output_used)

atexit.register(cls.disable)
telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.LLMOBS, True)

Expand Down
Loading
Loading