Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions .github/workflows/gpu-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# * training-smoke — Nano SFT pipeline (convert -> train 5 -> export -> t2i)
# * generator-regression — vision_sft_nano loss vs goldens (4-GPU subset)
# * inference-smoke — Nano multi-modality inference (t2vs + policy + forward_dynamics)
# * reasoner-regression — llava_ov_datapacker loss vs goldens (4-GPU subset)
# * reasoner-regression — llava_ov loss vs goldens (4-GPU subset)
#
# Requires:
# * a self-hosted runner labelled [self-hosted, gpu, h200] with 8 GPUs,
Expand Down Expand Up @@ -121,9 +121,11 @@ jobs:

# One inference call over t2vs (+sound), action policy, and forward_dynamics; checks each output.
# MAX_GPUS defaults to 8. -s streams the live process log.
# Reuse the same input-asset cache dir as the unittest job.
- name: Nano inference smoke (t2vs + action policy + forward_dynamics, 8 GPU)
run: |
export LD_LIBRARY_PATH=
export COSMOS_DOWNLOAD_CACHE_DIR="$RUNNER_WORKSPACE/cosmos_input_cache"
uv run --all-extras --group=cu128-train python -m pytest -v -s \
tests/nano_inference_smoke_test.py --num-gpus=8 --levels=2 -o addopts=

Expand Down Expand Up @@ -151,12 +153,12 @@ jobs:
- name: Sync environment (cu128-train)
run: uv sync --all-extras --group=cu128-train

# Reasoner (llava_ov_datapacker) loss vs the h100 goldens. -s streams the live log.
- name: Reasoner regression (llava_ov_datapacker, 4-GPU subset)
# Reasoner (llava_ov) loss vs the h100 goldens. -s streams the live log.
- name: Reasoner regression (llava_ov, 4-GPU subset)
run: |
export LD_LIBRARY_PATH=
uv run --all-extras --group=cu128-train python -m pytest -v -s \
tests/launch_regression_test.py -k llava_ov_datapacker \
tests/launch_regression_test.py -k llava_ov \
--num-gpus=4 --levels=2 -o addopts=

# The h100_inputs fixture removes its DCP stage on teardown; clear the
Expand Down Expand Up @@ -193,9 +195,12 @@ jobs:
# is absent (via RunIf / pytest.skip guards), so this is green without
# internal credentials; provide the credential file on the runner to
# exercise them. New tests are picked up automatically (no markers/lists).
# Cache downloaded input assets in a persistent dir (outside the repo tree,
# so the cleanup step keeps it) and reuse it across runs.
- name: Unit tests
run: |
export LD_LIBRARY_PATH=
export COSMOS_DOWNLOAD_CACHE_DIR="$RUNNER_WORKSPACE/cosmos_input_cache"
uv run --all-extras --group=cu128-train python -m pytest -v -s \
cosmos_framework/ -o addopts=

Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- [Training (Supervised Fine-Tuning)](./docs/training.md)
- [JSONL Dataset](./docs/dataset_jsonl.md)
- [Inference](./docs/inference.md)
- [Policy Server](./docs/action_policy_droid_server.md)
- Reference
- [Code Structure](./docs/code_structure.md)
- [Environment Variables](./docs/environment_variables.md)
Expand Down Expand Up @@ -82,6 +83,10 @@ python -m cosmos_framework.scripts.inference \
--seed=0
```

## Policy Server

See [Policy Server](./docs/action_policy_droid_server.md) for the full guide.

## Reference

| Topic | What it covers |
Expand All @@ -90,4 +95,5 @@ python -m cosmos_framework.scripts.inference \
| [Code Structure](./docs/code_structure.md) | Repository layout and a per-subpackage tour of `cosmos_framework/` — where each concern lives and where to add new code. |
| [Training](./docs/training.md) | Launching multi-GPU and multi-node runs; parallelism strategies; mixed precision; resuming. |
| [Inference (from a trained checkpoint)](./docs/inference.md) | Loading a trained checkpoint into one of the inference backends. |
| [Policy Server](./docs/action_policy_droid_server.md) | Running the server-client pipeline for Cosmos3-Nano-Policy-DROID. |
| [FAQ](./docs/faq.md) | Troubleshooting (OOM, NCCL hangs, slow training), environment variables, and common pitfalls. |
5 changes: 5 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ def init_torch_test():

_WHITELIST_ENV_VARS = {
"LD_LIBRARY_PATH",
# Set as a side-effect of importing TransformerEngine (via NANO_MODEL_CONFIG /
# SUPER_MODEL_CONFIG). Any SFT experiment config test that imports a model config
# will trigger this; whitelisting avoids a spurious teardown error that is
# unrelated to the test logic.
"NVTE_CUDA_INCLUDE_DIR",
"QT_QPA_FONTDIR",
"QT_QPA_PLATFORM_PLUGIN_PATH",
"TORCHINDUCTOR_CACHE_DIR",
Expand Down
1 change: 0 additions & 1 deletion cosmos_framework/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: OpenMDW-1.1

6 changes: 2 additions & 4 deletions cosmos_framework/auxiliary/guardrail/common/presets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
from cosmos_framework.auxiliary.guardrail.common.core import GuardrailRunner
from cosmos_framework.auxiliary.guardrail.face_blur_filter.face_blur_filter import RetinaFaceFilter
from cosmos_framework.auxiliary.guardrail.qwen3guard.qwen3guard import Qwen3Guard
from cosmos_framework.auxiliary.guardrail.video_content_safety_filter.video_content_safety_filter import (
VideoContentSafetyFilter,
)
from cosmos_framework.utils import log


Expand All @@ -27,7 +24,8 @@ def create_video_guardrail_runner(offload_model_to_cpu: bool = False) -> Guardra
"""Create the video guardrail runner."""
return GuardrailRunner(
safety_models=[
# VideoContentSafetyFilter(offload_model_to_cpu=offload_model_to_cpu), # Too many false positives
# VideoContentSafetyFilter(offload_model_to_cpu=offload_model_to_cpu)
# Too many false positives, add back when fixed
],
postprocessors=[RetinaFaceFilter(offload_model_to_cpu=offload_model_to_cpu)],
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# Copyright (c) 2019
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: OpenMDW-1.1

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: OpenMDW-1.1

15 changes: 14 additions & 1 deletion cosmos_framework/callbacks/compile_tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""Training callback that defers AOT compilation of the VAE tokenizer.

The actual compilation logic lives in
:meth:`~projects.cosmos3.vfm.tokenizers.wan2pt2_vae_4x16x16.Wan2pt2VAEInterface.compile_encode`.
:meth:`~cosmos_framework.model.vfm.tokenizers.wan2pt2_vae_4x16x16.Wan2pt2VAEInterface.compile_encode`.
This module provides a :class:`CompileTokenizer` callback that invokes it
at the right point during training (after ``compile_after_iterations``
steps, to avoid NCCL timeouts during CUDA/cuDNN warm-up).
Expand All @@ -21,6 +21,7 @@
"""

from collections.abc import Sequence
from typing import Literal

import torch

Expand All @@ -43,6 +44,10 @@ def __init__(
enabled: bool = False,
compile_after_iterations: int = 3,
warmup_resolutions: Sequence[str] | None = None,
backend: Literal["cudagraphs", "inductor"] = "inductor",
mode: Literal["reduce-overhead", "max-autotune"] | None = "reduce-overhead",
fullgraph: bool = False,
dynamic: bool = False,
):
"""
Args:
Expand All @@ -60,6 +65,10 @@ def __init__(
self.compile_after_iterations: int = compile_after_iterations
self.skip_counter: int = 0
self.warmup_resolutions: Sequence[str] | None = warmup_resolutions
self.backend: Literal["cudagraphs", "inductor"] = backend
self.mode: Literal["reduce-overhead", "max-autotune"] | None = mode
self.fullgraph: bool = fullgraph
self.dynamic: bool = dynamic

if self.enabled:
if self.warmup_resolutions is None:
Expand Down Expand Up @@ -101,6 +110,10 @@ def on_training_step_start(
tokenizer.compile_encode(
self.warmup_resolutions,
output_dir=self.config.job.path_local,
backend=self.backend,
mode=self.mode,
fullgraph=self.fullgraph,
dynamic=self.dynamic,
)

self.skip_counter += 1
224 changes: 224 additions & 0 deletions cosmos_framework/callbacks/cosmos_dataloader_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: OpenMDW-1.1

"""Checkpoint / resume callbacks for ``CosmosDataLoader``.

Two public classes:

* ``CosmosDataLoaderStateCallback`` — for a single ``CosmosDataLoader`` whose
distributor is a ``MapDistributor``. Saves per-worker ``(epoch, index)`` to
the DCP checkpoint and, on resume, sets ``COSMOS_DL_STATE_*`` env vars so
that ``MapDistributor.stream`` fast-forwards each worker to the saved
position.

* ``JointCosmosDataLoaderStateCallback`` — for ``JointCosmosDataLoader``.
Persists the outer ``global_id`` (dataset-selection sequence cursor) plus
inner per-dataset per-worker state via one ``CosmosDataLoaderStateCallback``
per inner loader.

Usage (single loader)::

exp["trainer"]["callbacks"]["dataloader_state"] = CosmosDataLoaderStateCallback()

Usage (joint loader)::

joint_loader = JointCosmosDataLoader(dataloaders={...}, seed=42)
exp["dataloader_train"] = joint_loader
exp["trainer"]["callbacks"]["dataloader_state"] = JointCosmosDataLoaderStateCallback(
outer_loader=joint_loader,
)
"""

from __future__ import annotations

import os
from dataclasses import dataclass
from typing import Any

import torch

from cosmos_framework.model._base import ImaginaireModel
from cosmos_framework.utils import log
from cosmos_framework.utils.callback import Callback


@dataclass
class _WorkerState:
epoch: int = 0
index: int = 0


class CosmosDataLoaderStateCallback(Callback):
"""Checkpoint/resume for a single ``CosmosDataLoader(MapDistributor)``.

Tracks the highest-seen ``(epoch, index)`` per worker from batch metadata
fields ``sample_worker_id``, ``sample_epoch``, ``sample_index`` (injected
by ``MapDistributor``).

On ``state_dict()`` the per-worker positions are serialised into the DCP
checkpoint (``checkpoint_component = "dataloader"``).

On ``load_state_dict()`` the positions are written to env vars::

COSMOS_DL_STATE_WORKER_{id}_EPOCH
COSMOS_DL_STATE_WORKER_{id}_INDEX

(or ``COSMOS_DL_STATE_{name}_WORKER_{id}_*`` when ``name`` is set, for
multi-loader namespacing). ``MapDistributor.stream`` pops these on first
iteration and resumes from ``index + 1``.
"""

checkpoint_component: str = "dataloader"

def __init__(self, name: str = "", distributor_type: str | None = None) -> None:
# distributor_type is accepted but unused — it exists only so that Hydra
# struct-merging over the legacy DataLoaderStateCallback entry (which
# carries distributor_type="${data_setting.distributor_type}") does not
# raise an unexpected-keyword-argument error at instantiation time.
super().__init__()
self.name = name
self.config: Any = None
self.state: dict[int, _WorkerState] = {}

@property
def _env_prefix(self) -> str:
return f"COSMOS_DL_STATE_{self.name}_" if self.name else "COSMOS_DL_STATE_"

def _update_state_from_batch(self, data_batch: dict[str, torch.Tensor]) -> None:
if "sample_worker_id" not in data_batch:
return # IterableDistributor / no position metadata
worker_ids = data_batch["sample_worker_id"].tolist()
epochs = data_batch["sample_epoch"].tolist()
indices = data_batch["sample_index"].tolist()
for worker_id, epoch, index in zip(worker_ids, epochs, indices, strict=True):
cur = self.state.get(worker_id)
if cur is None:
self.state[worker_id] = _WorkerState(epoch=epoch, index=index)
elif epoch > cur.epoch or (epoch == cur.epoch and index > cur.index):
self.state[worker_id] = _WorkerState(epoch=epoch, index=index)

def on_training_step_batch_end(
self,
model: ImaginaireModel,
data_batch: dict[str, torch.Tensor],
output_batch: dict[str, torch.Tensor],
loss: torch.Tensor,
iteration: int = 0,
) -> None:
self._update_state_from_batch(data_batch)

def on_training_step_end(
self,
model: ImaginaireModel,
data_batch: dict[str, torch.Tensor],
output_batch: dict[str, torch.Tensor],
loss: torch.Tensor,
iteration: int = 0,
) -> None:
if self.config and iteration % self.config.trainer.logging_iter == 0:
msg = "\n"
for wid, s in self.state.items():
msg += f"worker {wid}: epoch={s.epoch}, index={s.index}\n"
log.info(msg)

def has_checkpoint_state(self) -> bool:
return True

def state_dict(self) -> dict[int, dict[str, int]]:
result: dict[int, dict[str, int]] = {}
for worker_id, s in self.state.items():
result[worker_id] = {"epoch": s.epoch, "index": s.index}
log.info(f"Saved CosmosDataLoader state for worker {worker_id}: epoch={s.epoch}, index={s.index}")
return result

def load_state_dict(self, state_dict: dict[int, dict[str, int]]) -> None:
if not state_dict:
log.info("No CosmosDataLoader state found in checkpoint")
return

pfx = self._env_prefix
self.state = {}
for worker_id, per_worker in state_dict.items():
epoch = per_worker["epoch"]
index = per_worker["index"]
self.state[worker_id] = _WorkerState(epoch=epoch, index=index)
os.environ[f"{pfx}WORKER_{worker_id}_EPOCH"] = str(epoch)
os.environ[f"{pfx}WORKER_{worker_id}_INDEX"] = str(index)
log.info(f"Loaded CosmosDataLoader state for worker {worker_id}: epoch={epoch}, index={index}")


class JointCosmosDataLoaderStateCallback(Callback):
"""Checkpoint/resume for ``JointCosmosDataLoader``.

Manages two levels of state in a single DCP checkpoint entry:

1. **Outer** ``global_id`` — how many batches the outer loader has yielded.
Restored via ``outer_loader.set_start_iteration(global_id)`` so the
deterministic dataset-selection sequence resumes from the right step.

2. **Inner** per-dataset, per-worker ``(epoch, index)`` — one
``CosmosDataLoaderStateCallback`` per inner loader, keyed by name.

The ``checkpoint_component = "dataloader"`` class attribute ensures the DCP
checkpointer's ``_DataloaderWrapper`` discovers exactly this callback. Do
**not** also register standalone ``CosmosDataLoaderStateCallback`` instances
for the inner loaders — this class already handles them all.
"""

checkpoint_component: str = "dataloader"

def __init__(self, outer_loader: Any) -> None:
super().__init__()
self._outer = outer_loader
self._inner: dict[str, CosmosDataLoaderStateCallback] = {
name: CosmosDataLoaderStateCallback(name=name)
for name in outer_loader._names
}
self.config: Any = None

def _update_state_from_batch(self, batch: dict) -> None:
name = batch.get("dataset_name")
if name in self._inner:
self._inner[name]._update_state_from_batch(batch)

def on_training_step_batch_end(
self,
model: Any,
data_batch: dict,
output_batch: dict,
loss: Any,
iteration: int = 0,
) -> None:
self._update_state_from_batch(data_batch)

def on_training_step_end(
self,
model: Any,
data_batch: dict,
output_batch: dict,
loss: Any,
iteration: int = 0,
) -> None:
if self.config and iteration % self.config.trainer.logging_iter == 0:
msg = f"\nJointCosmosDataLoader global_id={self._outer._global_id}\n"
for name, cb in self._inner.items():
for wid, s in cb.state.items():
msg += f" [{name}] worker {wid}: epoch={s.epoch}, index={s.index}\n"
log.info(msg)

def has_checkpoint_state(self) -> bool:
return True

def state_dict(self) -> dict:
return {
"global_id": self._outer._global_id,
**{name: cb.state_dict() for name, cb in self._inner.items()},
}

def load_state_dict(self, state: dict) -> None:
global_id = state.get("global_id", 0)
self._outer.set_start_iteration(global_id)
log.info(f"JointCosmosDataLoaderStateCallback: resumed outer global_id={global_id}")
for name, cb in self._inner.items():
if name in state:
cb.load_state_dict(state[name])
Loading
Loading