Add ALM Data Pipeline tutorial and stages#1419
Add ALM Data Pipeline tutorial and stages#1419mohammadaaftabv wants to merge 25 commits intoNVIDIA-NeMo:mainfrom
Conversation
66abf28 to
0125f32
Compare
Greptile SummaryThis PR adds a complete ALM (Audio Language Model) data curation pipeline with two main processing stages, manifest I/O, comprehensive tests, documentation, and benchmarking support. Key ChangesCore Stages:
Tutorial & Documentation:
Testing:
Benchmarking:
ArchitectureThe pipeline follows NeMo Curator patterns by inheriting from Issues Noted in Previous ReviewsSeveral issues have been flagged in previous review threads that should be addressed before merge (see "Previous Threads" section). The most critical are tuple order inconsistencies in the overlap stage, brittle test assertions, and threshold semantics that may not match documentation. Confidence Score: 3/5
Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[Input JSONL Manifests<br/>audio segments with<br/>diarization metadata] --> B[ALMManifestReader<br/>CompositeStage]
B --> C[FilePartitioningStage<br/>discover & partition files]
C --> D[ALMManifestReaderStage<br/>read line-by-line, no Pandas]
D --> E[ALMDataBuilderStage<br/>create training windows]
E --> F{Quality Filters}
F -->|sample rate < 16kHz| G[Reject entry]
F -->|bandwidth < 8kHz| H[Reject segment]
F -->|speakers not 2-5| I[Reject window]
F -->|duration not 108-132s| J[Reject window]
F -->|all filters pass| K[Accept window]
K --> L[ALMDataOverlapStage<br/>filter overlapping windows]
L --> M{Overlap Check}
M -->|overlap ratio >= threshold| N[Keep window closer<br/>to target duration]
M -->|overlap ratio < threshold| O[Keep both windows]
N --> P[ALMManifestWriterStage<br/>write JSONL output]
O --> P
P --> Q[Output JSONL<br/>filtered windows +<br/>stats + durations]
style A fill:#e1f5ff
style Q fill:#d4edda
style G fill:#f8d7da
style H fill:#f8d7da
style I fill:#f8d7da
style J fill:#f8d7da
Last reviewed commit: 09c1ad5 |
ayushdg
left a comment
There was a problem hiding this comment.
Few comments:
- Can you add a benchmarking script to benchmarks and share a representative dataset that can be used to run an alm pipeline.
- You are already logging many statistics in the stages here, is it possible to also use
_log_metricslike done in some of the text stages to log some of these timing metrics so that they can be tracked better to catch regressions?
https://github.com/mohammadaaftabv/Curator/tree/alm_data_build/tests/fixtures/audio/alm is the representative dataset and i am assuming by benchmarks you mean result of running both processors on the representative data, in that case alm data build should build 181 windows based on config in test file and alm data overlap applied on resultant 181 windows with allowing max 50% overlap will give 3035.5 seconds total output. All this is in test cases here. |
Added _log_metrics calls to both stages, following the pattern in text stages. Now tracking:
|
tutorials/audio/alm/main.py
Outdated
|
|
||
| # Calculate statistics | ||
| # Stage 1 output: total_dur_list_window contains the original window count | ||
| stage1_windows = sum(len(e.get("total_dur_list_window", e.get("windows", []))) for e in output_entries) |
There was a problem hiding this comment.
I guess these make sense, but also take a look at Task._metadata and Task._stage_perf_stats if there are things that are relevant
| self._drop_fields_set = {f.strip() for f in self.drop_fields.split(",") if f.strip()} | ||
| self._drop_fields_top_level_set = {f.strip() for f in self.drop_fields_top_level.split(",") if f.strip()} | ||
|
|
||
| def process_dataset_entry(self, data_entry: dict[str, Any]) -> list[AudioBatch]: |
There was a problem hiding this comment.
Is it intentional that we operate on a single manifest entry at a time? Can any of this be vectorized? Same for other stages
There was a problem hiding this comment.
Yes, this is intentional — it follows the LegacySpeechStage pattern used by all other audio stages (GetAudioDurationStage, PreserveByValueStage, etc.), where process() iterates over task.data and calls process_dataset_entry() per entry.
Parallelism is handled at the executor level instead. In benchmark testing (10,000 entries, XennaExecutor on 8-core i9-9900KF), the autoscaler allocated 4 workers to the Builder stage, achieving ~1,460 entries/sec aggregate throughput (365 entries/sec/worker) with 86% CPU utilization. The Overlap stage ran 3 workers at ~5,650 entries/sec. Full pipeline completed in 90s.
If we want batch-level optimization in the future, it would need to happen at the LegacySpeechStage base class level, which would affect all audio stages.
| } | ||
| ) | ||
|
|
||
| return [AudioBatch(data=[result])] |
There was a problem hiding this comment.
Each time we return a Task you must pass its parents tasks metadata and stage_perf_stats..
In such a fan-out implementation this becomes hard to reason about..
There was a problem hiding this comment.
Yeah the _stage_perfs are supposedly propagated via the base LegacySpeechStage. I would be curious to look at the benchmark results for this PR though to get an even better understanding of how existing audio curation code can be refactored.
…uppressions, /tmp usage
Additional Comments (3)
Without validation, invalid configs can cause confusing runtime behavior.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
Additional Comments (5)
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
|
Add new NeMo Curator stages for ALM (Audio Language Model) data curation:
Add complete tutorial with:
Tested with sample data:
Description
Usage
# Add snippet demonstrating usageChecklist