Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

M hietala/adding function tracing conveniency #39368

2 changes: 2 additions & 0 deletions sdk/ai/azure-ai-projects/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features added

* Added trace_function decorator which can be used to trace functions

### Bugs Fixed

### Breaking Changes
Expand Down
13 changes: 13 additions & 0 deletions sdk/ai/azure-ai-projects/azure/ai/projects/telemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,14 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) Python Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------

from ._trace_function import trace_function

__all__ = [
"trace_function",
]
__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import functools
import asyncio
from typing import Any, Callable, Optional, Dict

try:
# pylint: disable = no-name-in-module
from opentelemetry import trace as opentelemetry_trace

tracer = opentelemetry_trace.get_tracer(__name__)
_tracing_library_available = True
except ModuleNotFoundError:
_tracing_library_available = False

if _tracing_library_available:

def trace_function(span_name: Optional[str] = None):
"""
A decorator for tracing function calls using OpenTelemetry.

This decorator handles various data types for function parameters and return values,
and records them as attributes in the trace span. The supported data types include:
- Basic data types: str, int, float, bool
- Collections: list, dict, tuple, set

Special handling for collections:
- If a collection (list, dict, tuple, set) contains nested collections, the entire collection
is converted to a string before being recorded as an attribute.
- Sets and dictionaries are always converted to strings to ensure compatibility with span attributes.

Object types are omitted, and the corresponding parameter is not traced.

:param span_name: The name of the span. If not provided, the function name is used.
:type span_name: Optional[str]
:return: The decorated function with tracing enabled.
:rtype: Callable
"""

def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
"""
Wrapper function for asynchronous functions.

:param args: Positional arguments passed to the function.
:type args: Tuple[Any]
:return: The result of the decorated asynchronous function.
:rtype: Any
"""
name = span_name if span_name else func.__name__
with tracer.start_as_current_span(name) as span:
try:
# Sanitize parameters and set them as attributes
sanitized_params = sanitize_parameters(func, *args, **kwargs)
span.set_attributes(sanitized_params)
result = await func(*args, **kwargs)
sanitized_result = sanitize_for_attributes(result)
if sanitized_result is not None:
if isinstance(sanitized_result, (list, dict, tuple, set)):
if any(isinstance(i, (list, dict, tuple, set)) for i in sanitized_result):
sanitized_result = str(sanitized_result)
span.set_attribute("code.function.return.value", sanitized_result) # type: ignore
return result
except Exception as e:
span.record_exception(e)
span.set_attribute("error.type", e.__class__.__qualname__) # type: ignore
raise

@functools.wraps(func)
def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
"""
Wrapper function for synchronous functions.

:param args: Positional arguments passed to the function.
:type args: Tuple[Any]
:return: The result of the decorated synchronous function.
:rtype: Any
"""
name = span_name if span_name else func.__name__
with tracer.start_as_current_span(name) as span:
try:
# Sanitize parameters and set them as attributes
sanitized_params = sanitize_parameters(func, *args, **kwargs)
span.set_attributes(sanitized_params)
result = func(*args, **kwargs)
sanitized_result = sanitize_for_attributes(result)
if sanitized_result is not None:
if isinstance(sanitized_result, (list, dict, tuple, set)):
if any(isinstance(i, (list, dict, tuple, set)) for i in sanitized_result):
sanitized_result = str(sanitized_result)
span.set_attribute("code.function.return.value", sanitized_result) # type: ignore
return result
except Exception as e:
span.record_exception(e)
span.set_attribute("error.type", e.__class__.__qualname__) # type: ignore
raise

# Determine if the function is async
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper

return decorator

else:
# Define a no-op decorator if OpenTelemetry is not available
def trace_function(span_name: Optional[str] = None): # pylint: disable=unused-argument
"""
A no-op decorator for tracing function calls when OpenTelemetry is not available.

:param span_name: Not used in this version.
:type span_name: Optional[str]
:return: The original function.
:rtype: Callable
"""

def decorator(func: Callable) -> Callable:
return func

return decorator


def sanitize_parameters(func, *args, **kwargs) -> Dict[str, Any]:
"""
Sanitize function parameters to include only built-in data types.

:param func: The function being decorated.
:type func: Callable
:param args: Positional arguments passed to the function.
:type args: Tuple[Any]
:return: A dictionary of sanitized parameters.
:rtype: Dict[str, Any]
"""
import inspect

params = inspect.signature(func).parameters
sanitized_params = {}

for i, (name, param) in enumerate(params.items()):
if param.default == inspect.Parameter.empty and i < len(args):
value = args[i]
else:
value = kwargs.get(name, param.default)

sanitized_value = sanitize_for_attributes(value)
# Check if the collection has nested collections
if isinstance(sanitized_value, (list, dict, tuple, set)):
if any(isinstance(i, (list, dict, tuple, set)) for i in sanitized_value):
sanitized_value = str(sanitized_value)
if sanitized_value is not None:
sanitized_params["code.function.parameter." + name] = sanitized_value

return sanitized_params


# pylint: disable=R0911
def sanitize_for_attributes(value: Any, is_recursive: bool = False) -> Any:
"""
Sanitize a value to be used as an attribute.

:param value: The value to sanitize.
:type value: Any
:param is_recursive: Indicates if the function is being called recursively. Default is False.
:type is_recursive: bool
:return: The sanitized value or None if the value is not a supported type.
:rtype: Any
"""
if isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, list):
return [
sanitize_for_attributes(item, True)
for item in value
if isinstance(item, (str, int, float, bool, list, dict, tuple, set))
]
if isinstance(value, dict):
retval = {
k: sanitize_for_attributes(v, True)
for k, v in value.items()
if isinstance(v, (str, int, float, bool, list, dict, tuple, set))
}
# dict to compatible with span attribute, so return it as a string
if is_recursive:
return retval
return str(retval)
if isinstance(value, tuple):
return tuple(
sanitize_for_attributes(item, True)
for item in value
if isinstance(item, (str, int, float, bool, list, dict, tuple, set))
)
if isinstance(value, set):
retval_set = {
sanitize_for_attributes(item, True)
for item in value
if isinstance(item, (str, int, float, bool, list, dict, tuple, set))
}
if is_recursive:
return retval_set
return str(retval_set)
return None
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import asyncio
import os
import sys
from typing import Any, Callable, Set
import json
import datetime
from typing import Any, Callable, Set, Optional
from azure.ai.projects.telemetry import trace_function


# Add parent directory to sys.path to import user_functions
Expand All @@ -31,9 +34,33 @@ async def send_email_async(recipient: str, subject: str, body: str) -> str:
return send_email(recipient, subject, body)


# The trace_func decorator will trace the function call and enable adding additional attributes
# to the span in the function implementation. Note that this will trace the function parameters and their values.
@trace_function()
async def fetch_current_datetime_async(format: Optional[str] = None) -> str:
"""
Get the current time as a JSON string, optionally formatted.

:param format (Optional[str]): The format in which to return the current time. Defaults to None, which uses a standard format.
:return: The current time in JSON format.
:rtype: str
"""
await asyncio.sleep(1)
current_time = datetime.datetime.now()

# Use the provided format if available, else use a default format
if format:
time_format = format
else:
time_format = "%Y-%m-%d %H:%M:%S"

time_json = json.dumps({"current_time": current_time.strftime(time_format)})
return time_json


# Statically defined user functions for fast reference with send_email as async but the rest as sync
user_async_functions: Set[Callable[..., Any]] = {
fetch_current_datetime,
fetch_current_datetime_async,
fetch_weather,
send_email_async,
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import os, time, json
from azure.ai.projects import AIProjectClient
from azure.ai.projects.telemetry import trace_function
from azure.identity import DefaultAzureCredential
from azure.ai.projects.models import FunctionTool, RequiredFunctionToolCall, SubmitToolOutputsAction, ToolOutput
from opentelemetry import trace
Expand All @@ -49,9 +50,9 @@
tracer = trace.get_tracer(__name__)


# The tracer.start_as_current_span decorator will trace the function call and enable adding additional attributes
# The trace_func decorator will trace the function call and enable adding additional attributes
# to the span in the function implementation. Note that this will trace the function parameters and their values.
@tracer.start_as_current_span("fetch_weather") # type: ignore
@trace_function()
def fetch_weather(location: str) -> str:
"""
Fetches the weather information for the specified location.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from azure.ai.projects.models import FunctionTool, RequiredFunctionToolCall, SubmitToolOutputsAction, ToolOutput
from azure.ai.projects.telemetry import trace_function
from opentelemetry import trace

project_client = AIProjectClient.from_connection_string(
Expand All @@ -50,9 +51,9 @@
tracer = trace.get_tracer(__name__)


# The tracer.start_as_current_span decorator will trace the function call and enable adding additional attributes
# The trace_func decorator will trace the function call and enable adding additional attributes
# to the span in the function implementation. Note that this will trace the function parameters and their values.
@tracer.start_as_current_span("fetch_weather") # type: ignore
@trace_function()
def fetch_weather(location: str) -> str:
"""
Fetches the weather information for the specified location.
Expand Down
Loading