Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ddtrace/llmobs/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@
PROXY_REQUEST = "llmobs.proxy_request"

EXPERIMENT_ID_KEY = "_ml_obs.experiment_id"
EXPERIMENT_RUN_ID_KEY = "_ml_obs.experiment_run_id"
EXPERIMENT_RUN_ITERATION_KEY = "_ml_obs.experiment_run_iteration"
EXPERIMENT_EXPECTED_OUTPUT = "_ml_obs.meta.input.expected_output"
EXPERIMENTS_INPUT = "_ml_obs.meta.input"
EXPERIMENTS_OUTPUT = "_ml_obs.meta.output"
Expand Down
44 changes: 32 additions & 12 deletions ddtrace/llmobs/_experiment.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
import itertools
import sys
import traceback
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -297,6 +298,12 @@ def as_dataframe(self) -> None:
return pd.DataFrame(data=records_list, columns=pd.MultiIndex.from_tuples(column_tuples))


class _ExperimentRunInfo:
def __init__(self, run_interation: int):
self._id = uuid.uuid4()
self._run_iteration = run_interation


class Experiment:
def __init__(
self,
Expand All @@ -316,6 +323,7 @@ def __init__(
]
]
] = None,
runs: Optional[int] = None,
) -> None:
self.name = name
self._task = task
Expand All @@ -326,6 +334,7 @@ def __init__(
self._tags: Dict[str, str] = tags or {}
self._tags["ddtrace.version"] = str(ddtrace.__version__)
self._config: Dict[str, JSONType] = config or {}
self._runs: int = runs or 1
self._llmobs_instance = _llmobs_instance

if not project_name:
Expand Down Expand Up @@ -358,18 +367,23 @@ def run(self, jobs: int = 1, raise_errors: bool = False, sample_size: Optional[i
self._config,
convert_tags_dict_to_list(self._tags),
self._description,
self._runs,
)
self._id = experiment_id
self._tags["experiment_id"] = str(experiment_id)
self._run_name = experiment_run_name
task_results = self._run_task(jobs, raise_errors, sample_size)
evaluations = self._run_evaluators(task_results, raise_errors=raise_errors)
summary_evals = self._run_summary_evaluators(task_results, evaluations, raise_errors)
experiment_results = self._merge_results(task_results, evaluations, summary_evals)
experiment_evals = self._generate_metrics_from_exp_results(experiment_results)
self._llmobs_instance._dne_client.experiment_eval_post(
self._id, experiment_evals, convert_tags_dict_to_list(self._tags)
)
for run_iteration in range(self._runs):
run = _ExperimentRunInfo(run_iteration)
self._tags["run_id"] = str(run._id)
self._tags["run_iteration"] = str(run._run_iteration)
task_results = self._run_task(jobs, run, raise_errors, sample_size)
evaluations = self._run_evaluators(task_results, raise_errors=raise_errors)
summary_evals = self._run_summary_evaluators(task_results, evaluations, raise_errors)
experiment_results = self._merge_results(task_results, evaluations, summary_evals)
experiment_evals = self._generate_metrics_from_exp_results(experiment_results)
self._llmobs_instance._dne_client.experiment_eval_post(
self._id, experiment_evals, convert_tags_dict_to_list(self._tags)
)

return experiment_results

Expand All @@ -378,11 +392,13 @@ def url(self) -> str:
# FIXME: will not work for subdomain orgs
return f"{_get_base_url()}/llm/experiments/{self._id}"

def _process_record(self, idx_record: Tuple[int, DatasetRecord]) -> Optional[TaskResult]:
def _process_record(self, idx_record: Tuple[int, DatasetRecord], run: _ExperimentRunInfo) -> Optional[TaskResult]:
if not self._llmobs_instance or not self._llmobs_instance.enabled:
return None
idx, record = idx_record
with self._llmobs_instance._experiment(name=self._task.__name__, experiment_id=self._id) as span:
with self._llmobs_instance._experiment(
name=self._task.__name__, experiment_id=self._id, run_id=str(run._id), run_iteration=run._run_iteration
) as span:
span_context = self._llmobs_instance.export_span(span=span)
if span_context:
span_id = span_context.get("span_id", "")
Expand Down Expand Up @@ -422,7 +438,9 @@ def _process_record(self, idx_record: Tuple[int, DatasetRecord]) -> Optional[Tas
},
}

def _run_task(self, jobs: int, raise_errors: bool = False, sample_size: Optional[int] = None) -> List[TaskResult]:
def _run_task(
self, jobs: int, run: _ExperimentRunInfo, raise_errors: bool = False, sample_size: Optional[int] = None
) -> List[TaskResult]:
if not self._llmobs_instance or not self._llmobs_instance.enabled:
return []
if sample_size is not None and sample_size < len(self._dataset):
Expand All @@ -441,7 +459,9 @@ def _run_task(self, jobs: int, raise_errors: bool = False, sample_size: Optional
subset_dataset = self._dataset
task_results = []
with ThreadPoolExecutor(max_workers=jobs) as executor:
for result in executor.map(self._process_record, enumerate(subset_dataset)):
for result in executor.map(
self._process_record, enumerate(subset_dataset), itertools.repeat(run, len(subset_dataset))
):
if not result:
continue
task_results.append(result)
Expand Down
14 changes: 14 additions & 0 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
from ddtrace.llmobs._constants import EXPERIMENT_CSV_FIELD_MAX_SIZE
from ddtrace.llmobs._constants import EXPERIMENT_EXPECTED_OUTPUT
from ddtrace.llmobs._constants import EXPERIMENT_ID_KEY
from ddtrace.llmobs._constants import EXPERIMENT_RUN_ID_KEY
from ddtrace.llmobs._constants import EXPERIMENT_RUN_ITERATION_KEY
from ddtrace.llmobs._constants import EXPERIMENTS_INPUT
from ddtrace.llmobs._constants import EXPERIMENTS_OUTPUT
from ddtrace.llmobs._constants import INPUT_DOCUMENTS
Expand Down Expand Up @@ -781,6 +783,7 @@ def experiment(
]
]
] = None,
runs: Optional[int] = 1,
) -> Experiment:
"""Initializes an Experiment to run a task on a Dataset and evaluators.

Expand All @@ -797,6 +800,8 @@ def experiment(
to produce a single value.
Must accept parameters ``inputs``, ``outputs``, ``expected_outputs``,
``evaluators_results``.
:param runs: The number of times to run the experiment, or, run the task for every dataset record the defined
number of times.
"""
if not callable(task):
raise TypeError("task must be a callable function.")
Expand Down Expand Up @@ -837,6 +842,7 @@ def experiment(
config=config,
_llmobs_instance=cls._instance,
summary_evaluators=summary_evaluators,
runs=runs,
)

@classmethod
Expand Down Expand Up @@ -1306,6 +1312,8 @@ def _experiment(
session_id: Optional[str] = None,
ml_app: Optional[str] = None,
experiment_id: Optional[str] = None,
run_id: Optional[str] = None,
run_iteration: Optional[int] = None,
) -> Span:
"""
Trace an LLM experiment, only used internally by the experiments SDK.
Expand All @@ -1324,6 +1332,12 @@ def _experiment(
if experiment_id:
span.context.set_baggage_item(EXPERIMENT_ID_KEY, experiment_id)

if run_id:
span.context.set_baggage_item(EXPERIMENT_RUN_ID_KEY, run_id)

if run_iteration:
span.context.set_baggage_item(EXPERIMENT_RUN_ITERATION_KEY, run_iteration)

return span

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions ddtrace/llmobs/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ def experiment_create(
exp_config: Optional[Dict[str, JSONType]] = None,
tags: Optional[List[str]] = None,
description: Optional[str] = None,
runs: Optional[int] = 1,
) -> Tuple[str, str]:
path = "/api/unstable/llm-obs/v1/experiments"
resp = self.request(
Expand All @@ -628,6 +629,7 @@ def experiment_create(
"config": exp_config or {},
"metadata": {"tags": cast(JSONType, tags or [])},
"ensure_unique": True,
"run_count": runs,
},
}
},
Expand Down
17 changes: 3 additions & 14 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
[bdist_wheel]
universal=1

[codespell]
skip = *.json,*.h,*.cpp,*.c,.riot,.tox,.mypy_cache,.git,*ddtrace/vendor,tests/contrib/openai/cassettes/*,tests/contrib/langchain/cassettes/*,ddtrace/appsec/_iast/_taint_tracking/_vendor/*
exclude-file = .codespellignorelines
ignore-words-list = asend,dne,fo,medias,ment,nin,ot,setttings,statics,ba,spawnve,doas

# DEV: We use `conftest.py` as a local pytest plugin to configure hooks for collection
[tool:pytest]
# --cov-report is intentionally empty else pytest-cov will default to generating a report
addopts =
--cov=ddtrace/
--cov=tests/
--cov-append
--cov-report=
--durations=10
--junitxml=test-results/junit.xml
# DEV: The default is `test_*\.py` which will miss `test.py` files
python_files = test*\.py
asyncio_mode = auto

[flake8]
max-line-length = 120
Loading