feat(evaluator): publish_to_intake entrypoint (D9)#456
Conversation
publish_to_intake(result, *, platform, experiment_id, workspace=None, ...) is the explicit, post-run consumer of AgentEvalResult. Per trial it POSTs the ATIF trajectory, resolves the root span via the traces query-back (§3.5 option 1), then POSTs one evaluator-result per metric output. All request shapes come from the D8 mapping module; wire calls go through the generated SDK intake resources. - Async, bounded-concurrent across trials (asyncio.gather + semaphore). - References an existing Experiment; never creates one (caller pre-creates via the Experiments SDK). Agent identity is taken as arguments since it lives on the run target, which AgentEvalResult does not carry (§3.9 #6). - Returns a PublishReport (per-trial session/span ids + result counts). - Raises on HTTP/validation failure; the local bundle is never touched. Unit-tested with a fake async client (round-trip, multi-trial, score-less trial, workspace resolution, span-resolution failure, ingest-failure propagation). Manually validated end-to-end against live Intake + ClickHouse. Refs: AALGO-290 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: Sandy Chapman <schapman@nvidia.com>
📝 WalkthroughWalkthroughAdds skipped-output handling to intake mapping, publishes eval results with partial failure reporting, and expands unit and live integration coverage. ChangesIntake publish flow
Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
plugins/nemo-evaluator/src/nemo_evaluator/intake/publish.py (1)
138-138: 🩺 Stability & Availability | 🔵 TrivialUse
asyncio.TaskGrouphere.asyncio.gatherlets other trial publishes keep running after the first exception, so a failedPOSTcan leave orphaned in-flight writes. Python 3.11+ is already required, so structured cancellation is available.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/nemo-evaluator/src/nemo_evaluator/intake/publish.py` at line 138, The trial publishing fan-out in publish() still uses asyncio.gather, which allows other _publish_trial tasks to keep running after one fails. Replace that gather call with asyncio.TaskGroup so all in-flight trial publishes are structurally cancelled on the first exception, and update the surrounding logic in publish/_publish_trial to collect results from the TaskGroup without changing the existing publish flow.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@plugins/nemo-evaluator/src/nemo_evaluator/intake/publish.py`:
- Line 138: The trial publishing fan-out in publish() still uses asyncio.gather,
which allows other _publish_trial tasks to keep running after one fails. Replace
that gather call with asyncio.TaskGroup so all in-flight trial publishes are
structurally cancelled on the first exception, and update the surrounding logic
in publish/_publish_trial to collect results from the TaskGroup without changing
the existing publish flow.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: d476ccbf-3112-47f9-952f-ec1099a6f6e9
📒 Files selected for processing (4)
plugins/nemo-evaluator/pyproject.tomlplugins/nemo-evaluator/src/nemo_evaluator/intake/publish.pyplugins/nemo-evaluator/tests/intake/test_publish.pyplugins/nemo-evaluator/tests/integration/test_publish_to_intake.py
|
arpitsardhana
left a comment
There was a problem hiding this comment.
There can be potential of intake corruption of some trials of experiment get persisted while other did not
…ke (D9) Stands up ClickHouse (Docker) + the platform (auth,entities,intake) via session fixtures, creates an Experiment, runs publish_to_intake, and asserts the full round-trip back through the public Intake API: experiment_context propagation (experiment_id + test_case_id), root-span resolution, and every evaluator-result field/data_type (NUMERIC/BOOLEAN/TEXT, including false->0.0). Marked 'integration' (registered in the plugin pyproject), so it runs under make test-integration / -m integration and is excluded from the unit suite; it skips cleanly when Docker is unavailable, mirroring the intake conftest's _docker_available() gate. Verified end-to-end locally (~18s). Refs: AALGO-290 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: Sandy Chapman <schapman@nvidia.com>
78b2a3d to
69e2ac1
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/nemo-evaluator/src/nemo_evaluator/intake/publish.py (1)
135-160: 🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win
skippedleaks omissions from failed trials into the report.
skippedis shared across trials and extended mid-trial (Line 160). If a trial raises after appending some omissions (e.g. eval-create fails on a later score), those entries persist and surface inreport.skipped, even though the trial is reported as failed. Collect omissions locally and merge only from succeeded trials.Proposed fix: return skipped from each trial
- semaphore = asyncio.Semaphore(max_concurrency) - skipped: list[SkippedScore] = [] - - async def _publish_trial(trial: AgentEvalTrial) -> PublishedTrial: + semaphore = asyncio.Semaphore(max_concurrency) + + async def _publish_trial(trial: AgentEvalTrial) -> tuple[PublishedTrial, list[SkippedScore]]: async with semaphore: ... written = 0 + trial_skipped: list[SkippedScore] = [] for score in scores_by_trial.get(trial.id, []): rows, omitted = mapping.score_to_evaluator_results(score, session_id=session_id, span_id=span_id) for row in rows: row["workspace"] = resolved_workspace await platform.intake.evaluator_results.create(**row) written += 1 - skipped.extend(SkippedScore(trial_id=trial.id, name=item.name, reason=item.reason) for item in omitted) + trial_skipped.extend(SkippedScore(trial_id=trial.id, name=item.name, reason=item.reason) for item in omitted) - return PublishedTrial(...) + return PublishedTrial(...), trial_skippedThen accumulate
skippedonly fromPublishedTrialoutcomes when unpackingoutcomes.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/nemo-evaluator/src/nemo_evaluator/intake/publish.py` around lines 135 - 160, The `skipped` list in `_publish_trial` is currently shared across trials and gets extended before the trial fully succeeds, which can leak omissions from failed trials into the final report. Move omission collection to a per-trial/local variable in `_publish_trial`, return it with `PublishedTrial`, and only merge skipped entries into the top-level `skipped` list when unpacking successful `outcomes` so failed trials do not contribute to `report.skipped`.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@plugins/nemo-evaluator/src/nemo_evaluator/intake/publish.py`:
- Around line 135-160: The `skipped` list in `_publish_trial` is currently
shared across trials and gets extended before the trial fully succeeds, which
can leak omissions from failed trials into the final report. Move omission
collection to a per-trial/local variable in `_publish_trial`, return it with
`PublishedTrial`, and only merge skipped entries into the top-level `skipped`
list when unpacking successful `outcomes` so failed trials do not contribute to
`report.skipped`.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 1f12023e-d2f2-4f77-a8ff-aec5aa41af5c
📒 Files selected for processing (6)
plugins/nemo-evaluator/pyproject.tomlplugins/nemo-evaluator/src/nemo_evaluator/intake/mapping.pyplugins/nemo-evaluator/src/nemo_evaluator/intake/publish.pyplugins/nemo-evaluator/tests/intake/test_mapping.pyplugins/nemo-evaluator/tests/intake/test_publish.pyplugins/nemo-evaluator/tests/integration/test_publish_to_intake.py
✅ Files skipped from review due to trivial changes (1)
- plugins/nemo-evaluator/pyproject.toml
What
Adds
publish_to_intake(D9, AALGO-290) — the explicit, post-run consumer ofAgentEvalResultthat writes a completed agent evaluation to Intake. Builds on the D8 mapping module (#443).Per trial (concurrent, bounded by a semaphore):
POST /ingest/atif→ resolve the root span via thetracesquery-back (design §3.5 option 1) → onePOST /evaluator-resultsper metric output. Returns aPublishReport(per-trial session/span ids + result counts).Design notes
*CreateParamsTypedDicts); all wire calls go through the generated SDKintakeresources (client.intake.ingest.atif,.traces,.evaluator_results). It imports the SDK client types, never the Intake service.exist_ok=True). Ingest 400s on an unknown experiment and we surface it.agent_name/agent_version/model_name) because it lives on the run target, whichAgentEvalResultdoesn't carry (§3.9 feat(evaluator): port metric output protocol #6).Testing
tests/intake/test_publish.py, fake async client): round-trip, multi-trial, score-less trial, workspace resolution, span-resolution failure, ingest-failure propagation.tests/integration/test_publish_to_intake.py): stands up ClickHouse (Docker) +auth,entities,intakevia session fixtures, runs the real round-trip, and asserts every field read back through the public Intake API —experiment_context.{experiment_id,test_case_id},root_span_id, and each evaluator-resultname/data_type/value/string_value/session_id/span_id. Markedintegration(excluded from the unit suite; skips when Docker is absent). Verified end-to-end locally (~18s).ruff,ty, and 28 unit tests green.Notes / follow-ups
/evaluator-resultsmints a fresh id per call — ask X1); MVP stance is write-once per result.publish.pyinline or refactor into adapter layers is a separate, open discussion (tracked, not decided here).Refs: AALGO-290.
🤖 Generated with Claude Code
Summary by CodeRabbit