Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions vero/src/vero/harbor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Harbor integration: the sidecar-specific frontend over the shared
EvaluationEngine, plus Mode B (Harbor-delegated eval). The `harbor` SDK is an
optional extra, imported lazily (only registry enumeration / nested runs need it —
config, dataset compilation, and the sidecar handlers do not).
"""

from vero.harbor.config import HarborConfig
from vero.harbor.dataset import (
build_harbor_dataset,
enumerate_local_task_names,
validate_partition,
)

__all__ = [
"HarborConfig",
"build_harbor_dataset",
"enumerate_local_task_names",
"validate_partition",
]
91 changes: 91 additions & 0 deletions vero/src/vero/harbor/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""FastAPI app for the eval sidecar — the HTTP surface over the (transport-agnostic)
EvaluationSidecar handlers + the admin `finalize` over the Verifier.

Two roles over one app: agent (`/eval`, `/submit`, `/status`; unauthenticated, metered,
redacted) and admin (`/finalize`; bearer-token gated). `vero harbor serve` runs
this under uvicorn in the eval-sidecar container.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from fastapi import FastAPI, Header, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel

from vero.evaluation.engine import EvalRequest
from vero.exceptions import ExperimentBudgetExceeded, InvalidSplitError
from vero.harbor.auth import check_admin
from vero.harbor.server import SubmitDisabledError
from vero.harbor.verifier import NoCandidateError

if TYPE_CHECKING:
from vero.harbor.server import EvaluationSidecar
from vero.harbor.verifier import Verifier


class EvalBody(BaseModel):
dataset_id: str
split: str
commit: str | None = None
sample_ids: list[int] | None = None
num_samples: int | None = None


class SubmitBody(BaseModel):
commit: str | None = None


def create_app(
*,
sidecar: EvaluationSidecar,
verifier: Verifier,
admin_token: str,
) -> FastAPI:
app = FastAPI(title="vero eval sidecar")

# Known errors -> agent-facing status codes.
app.add_exception_handler(
ExperimentBudgetExceeded,
lambda r, e: JSONResponse(status_code=429, content={"error": str(e)}),
)
app.add_exception_handler(
InvalidSplitError,
lambda r, e: JSONResponse(status_code=400, content={"error": str(e)}),
)
app.add_exception_handler(
SubmitDisabledError,
lambda r, e: JSONResponse(status_code=409, content={"error": str(e)}),
)
app.add_exception_handler(
NoCandidateError,
lambda r, e: JSONResponse(status_code=409, content={"error": str(e)}),
)

@app.get("/health")
async def health():
return {"ok": True}

# --- agent endpoints (unauthenticated; metered + redacted) ---
@app.post("/eval")
async def eval_(body: EvalBody):
summary = await sidecar.evaluate(EvalRequest(**body.model_dump()), admin=False)
return summary.to_dict()

@app.post("/submit")
async def submit(body: SubmitBody):
return await sidecar.submit(commit=body.commit)

@app.get("/status")
async def status():
return sidecar.status().to_dict()

# --- admin endpoint (bearer-token gated) ---
@app.post("/finalize")
async def finalize(authorization: str | None = Header(default=None)):
if not check_admin(authorization, admin_token):
raise HTTPException(status_code=403, detail="admin token required")
return await verifier.finalize()

return app
39 changes: 39 additions & 0 deletions vero/src/vero/harbor/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Admin-token auth for the eval sidecar.

The token gates the admin `finalize` endpoint. It is generated per trial by the
sidecar and written `root:600` on a volume mounted into `main`, so the verifier
(root, shared mode) can read it but the optimizer (`agent.user`) cannot. The
optimizer therefore can only reach the agent endpoints, never `finalize`.
"""

from __future__ import annotations

import secrets
from pathlib import Path

_BEARER = "Bearer "


def generate_token() -> str:
return secrets.token_urlsafe(32)


def write_admin_token(path: Path | str, token: str, *, mode: int = 0o600) -> Path:
"""Write the token to ``path`` with restrictive perms (caller runs as root so the
file is root-owned and unreadable by ``agent.user``)."""
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(token)
p.chmod(mode)
return p


def read_admin_token(path: Path | str) -> str:
return Path(path).read_text().strip()


def check_admin(authorization: str | None, expected_token: str) -> bool:
"""Constant-time check of an ``Authorization: Bearer <token>`` header."""
if not authorization or not authorization.startswith(_BEARER):
return False
return secrets.compare_digest(authorization[len(_BEARER):], expected_token)
127 changes: 127 additions & 0 deletions vero/src/vero/harbor/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""`vero harbor` CLI.

Thin clients the optimizer and verifier use inside the compiled task:
- agent (in `main`): eval / submit / status -> POST/GET the sidecar over VERO_EVAL_URL
- verifier (in `main`): finalize -> POST /finalize with the admin token,
write /logs/verifier/reward.json
`serve` (sidecar entry) and `build`/`run` (host-side compiler) are added with stage (c).
"""

from __future__ import annotations

import json
import os
from pathlib import Path

import click


def _base_url() -> str:
url = os.environ.get("VERO_EVAL_URL")
if not url:
raise click.ClickException("VERO_EVAL_URL is not set (the eval sidecar URL).")
return url.rstrip("/")


def _request(method: str, path: str, *, payload: dict | None = None, headers: dict | None = None):
import httpx

resp = httpx.request(
method, f"{_base_url()}{path}", json=payload, headers=headers or {}, timeout=None
)
if resp.status_code >= 400:
raise click.ClickException(f"{method} {path} -> {resp.status_code}: {resp.text}")
return resp.json()


@click.group()
def harbor() -> None:
"""Vero ⇄ Harbor: optimization-as-a-Harbor-task commands."""


@harbor.command("serve")
@click.option("--config", "config_path", required=True, help="Path to the ServeConfig JSON.")
def serve_cmd(config_path):
"""Eval-sidecar entrypoint: assemble the engine/sidecar/verifier and serve (uvicorn)."""
from vero.harbor.serve import serve

serve(config_path)


@harbor.command("build")
@click.option("-c", "--config", "config_path", required=True, help="Path to build.yaml.")
@click.option("-o", "--out", required=True, help="Output task directory.")
def build_cmd(config_path, out):
"""Compile a build.yaml into a runnable Harbor optimization task directory."""
from vero.harbor.build import BuildConfig, compile_task

task_dir = compile_task(BuildConfig.from_file(config_path), out)
click.echo(f"Compiled task -> {task_dir}")


@harbor.command("run", context_settings={"ignore_unknown_options": True})
@click.option("-c", "--config", "config_path", required=True, help="Path to build.yaml.")
@click.option("-a", "--agent", required=True, help="Optimizer agent (passed to harbor run).")
@click.option("-m", "--model", default=None, help="Model for the optimizer agent.")
@click.option("-e", "--environment", "provider", default="docker", show_default=True)
@click.argument("extra", nargs=-1, type=click.UNPROCESSED)
def run_cmd(config_path, agent, model, provider, extra):
"""Build to a temp dir, then `harbor run` it (build + run convenience)."""
import subprocess
import tempfile

from vero.harbor.build import BuildConfig, compile_task

task_dir = compile_task(BuildConfig.from_file(config_path), Path(tempfile.mkdtemp()) / "task")
cmd = ["uvx", "harbor", "run", "-p", str(task_dir), "-a", agent, "-e", provider]
if model:
cmd += ["-m", model]
cmd += list(extra)
click.echo(f"$ {' '.join(cmd)}")
raise SystemExit(subprocess.call(cmd))


@harbor.command("eval")
@click.option("--dataset-id", required=True)
@click.option("--split", required=True)
@click.option("--commit", default=None, help="Defaults to the agent repo HEAD.")
@click.option("--num-samples", type=int, default=None)
@click.option("--sample-ids", default=None, help="Comma-separated sample ids.")
def eval_cmd(dataset_id, split, commit, num_samples, sample_ids):
"""Spend one evaluation of your commit on a split (agent)."""
payload: dict = {"dataset_id": dataset_id, "split": split}
if commit:
payload["commit"] = commit
if num_samples is not None:
payload["num_samples"] = num_samples
if sample_ids:
payload["sample_ids"] = [int(x) for x in sample_ids.split(",")]
click.echo(json.dumps(_request("POST", "/eval", payload=payload), indent=2))


@harbor.command("submit")
@click.option("--commit", default=None, help="Defaults to the agent repo HEAD.")
def submit_cmd(commit):
"""Nominate a commit and end the optimization run (agent; if enabled)."""
click.echo(json.dumps(_request("POST", "/submit", payload={"commit": commit}), indent=2))


@harbor.command("status")
def status_cmd():
"""Show remaining budget, evaluable splits, and whether submit is enabled (agent)."""
click.echo(json.dumps(_request("GET", "/status"), indent=2))


@harbor.command("finalize")
@click.option("--token-file", required=True, help="Path to the admin token (root:600).")
@click.option("--output", default="/logs/verifier/reward.json", show_default=True)
def finalize_cmd(token_file, output):
"""Verifier: select the best/submitted commit, score on the test split, write reward.json (admin)."""
from vero.harbor.auth import read_admin_token

token = read_admin_token(token_file)
reward = _request("POST", "/finalize", headers={"Authorization": f"Bearer {token}"})
out = Path(output)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(reward))
click.echo(json.dumps(reward, indent=2))
35 changes: 35 additions & 0 deletions vero/src/vero/harbor/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""HarborConfig — the Mode-B configuration.

User-facing config that turns "evaluate my agent on a set of Harbor tasks" into a
`harbor run` invocation. A typed projection of the user-controllable `harbor run`
flags; the per-eval-derived flags (task selection, jobs dir, source/agent resolution)
are filled in by the runner, not here.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path


@dataclass
class HarborConfig:
task_source: str # registry ref "org/name[@ver]" OR a local path to a task dir/dataset
agent_import_path: str # module path to the candidate agent, e.g. "pkg.mod:Class"
model: str | None = None
environment: str = "modal" # cloud provider (docker allowed for local testing)
n_attempts: int = 1
max_retries: int = 2
reward_key: str | None = None # primary reward; default pass -> reward -> mean
extra_args: list[str] = field(default_factory=list) # passthrough harbor run flags

@property
def is_registry(self) -> bool:
"""Local if the source resolves to an existing path; otherwise a registry ref."""
return not Path(self.task_source).expanduser().exists()

def source_args(self) -> list[str]:
"""`harbor run` source selector: `-d <ref>` (registry) or `-p <path>` (local)."""
if self.is_registry:
return ["-d", self.task_source]
return ["-p", str(Path(self.task_source).expanduser())]
80 changes: 80 additions & 0 deletions vero/src/vero/harbor/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Build the vero dataset (task-name references + split partition) for Mode B.

A Mode-B vero dataset has no labels — each "sample" is a Harbor task name. A local
task's name is its subdirectory name (the dir containing ``task.toml``), matching what
``harbor run -i/--include-task-name`` filters on; registry task names come from the
registry's task configs.

The split partition is a ``dict[str, list[str]]`` (e.g. ``{"train": [...], "test": [...]}``)
supplied by the benchmark author; this module compiles + validates it into a DatasetDict.
"""

from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from datasets import DatasetDict


def build_harbor_dataset(partition: dict[str, list[str]]) -> DatasetDict:
"""Compile a ``{split: [task_names]}`` partition into a vero DatasetDict.

Each split is a single-column (`task_name`) Dataset — the label-free sample
references Mode B evaluates.
"""
from datasets import Dataset, DatasetDict

if not partition:
raise ValueError("Harbor dataset partition is empty.")
return DatasetDict(
{split: Dataset.from_dict({"task_name": list(names)}) for split, names in partition.items()}
)


def enumerate_local_task_names(task_source: str | Path) -> list[str]:
"""Task names available in a local Harbor task source.

If the path is itself a task dir (contains ``task.toml``), returns ``[dir_name]``;
otherwise returns the names of immediate subdirectories that contain ``task.toml``.
"""
path = Path(task_source).expanduser()
if (path / "task.toml").exists():
return [path.name]
if not path.is_dir():
raise ValueError(f"Local task source is not a directory: {path}")
return sorted(
d.name for d in path.iterdir() if d.is_dir() and (d / "task.toml").exists()
)


async def enumerate_registry_task_names(
ref: str, *, registry_url: str | None = None
) -> list[str]:
"""Task names in a registry dataset (``org/name[@version]``).

Lazy-imports the ``harbor`` SDK (the ``harbor`` extra) — registry resolution is a
build-time concern, not a sidecar-runtime one. Integration-verified.
"""
from harbor.models.job.config import RegistryDatasetConfig
from harbor.models.registry import RemoteRegistryInfo

name, _, version = ref.partition("@")
config = RegistryDatasetConfig(
registry=RemoteRegistryInfo(url=registry_url) if registry_url else None,
name=name,
version=version or None,
)
return sorted(tc.path.name for tc in await config.get_task_configs())


def validate_partition(partition: dict[str, list[str]], available: list[str]) -> None:
"""Raise if the partition references task names not in ``available``."""
avail = set(available)
referenced = {name for names in partition.values() for name in names}
unknown = referenced - avail
if unknown:
raise ValueError(
f"Partition references task names not found in the source: {sorted(unknown)}"
)
Loading