Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
fail-fast: false
matrix:
java_version: ['17']
nextflow_version: ['25.10']
nextflow_version: ['26.04']

steps:
- name: Environment
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ A proof-of-concept pipeline for performing hyperparameter optimization of machin
## Requirements

* Unix-like operating system (Linux, macOS, etc)
* Java >=11
* Java >=17
* [Conda](https://docs.conda.io/en/latest/) or [Docker](https://docs.docker.com/)


## Quickstart

1. Install Nextflow (version 23.10 or higher):
1. Install Nextflow (version 26.04 or higher):

```bash
curl -s https://get.nextflow.io | bash
Expand Down
155 changes: 127 additions & 28 deletions main.nf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env nextflow

nextflow.enable.moduleBinaries = true
nextflow.enable.types = true

include { FETCH_DATASET } from './modules/fetch_dataset'
include { SPLIT_TRAIN_TEST } from './modules/split_train_test'
Expand All @@ -14,24 +15,25 @@ include { EVALUATE } from './modules/evaluate'
* Pipeline parameters. They can be overriden on the command line,
* e.g. `--fetch_datasets some_value`.
*/
params.fetch_datasets = null
params.train_test_splits = null
params.train_models = null
params.pretrained_models = null
params.outdir = 'results'
params {
fetch_datasets: String?
train_test_splits: Path?
train_models: String?
pretrained_models: Path?
}

/*
* entry workflow
*/
workflow {
main:
log.info """\
H Y P E R O P T P I P E L I N E
=================================
fetch_datasets : ${params.fetch_datasets}
train_test_splits : ${params.train_test_splits}
train_models : ${params.train_models}
pretrained_models : ${params.pretrained_models}
outdir : ${params.outdir}
""".stripIndent()

// fetch and split datasets if specified
Expand All @@ -44,10 +46,15 @@ workflow {
// otherwise load custom train/test splits
else if( params.train_test_splits != null ) {
ch_datasets = channel.empty()
ch_train_test_splits = channel.of(file(params.train_test_splits))
.flatMap { json -> json.splitJson() }
ch_train_test_splits = channel.of(params.train_test_splits)
.flatMap { json -> json.splitJson() as List<Map> }
.map { r ->
tuple(r.dataset_name, file(r.meta), file(r.data_train), file(r.data_test))
record(
dataset_name: r.dataset_name as String,
meta: file(r.meta, checkIfExists: true),
data_train: file(r.data_train, checkIfExists: true),
data_test: file(r.data_test, checkIfExists: true)
)
}
}

Expand All @@ -56,16 +63,25 @@ workflow {
}

// separate training and test data
ch_train_datasets = ch_train_test_splits.map { dataset_name, meta, data_train, data_test ->
tuple(dataset_name, meta, data_train)
ch_train_datasets = ch_train_test_splits.map { r ->
record(dataset_name: r.dataset_name, meta: r.meta, data: r.data_train)
}
ch_test_datasets = ch_train_test_splits.map { dataset_name, meta, data_train, data_test ->
tuple(dataset_name, meta, data_test)
ch_test_datasets = ch_train_test_splits.map { r ->
record(dataset_name: r.dataset_name, meta: r.meta, data: r.data_test)
}

// visualize train/test datasets
VISUALIZE_TRAIN(ch_train_datasets)
VISUALIZE_TEST(ch_test_datasets)
ch_train_plots = VISUALIZE_TRAIN(ch_train_datasets).map { r ->
record(dataset_name: r.dataset_name, plot_train: r.plot)
}
ch_test_plots = VISUALIZE_TEST(ch_test_datasets).map { r ->
record(dataset_name: r.dataset_name, plot_test: r.plot)
}

// combine train/test data with train/test plots
ch_splits = ch_train_test_splits
.join(ch_train_plots, by: 'dataset_name')
.join(ch_test_plots, by: 'dataset_name')

// print warning if both training and pre-trained model are enabled
if( params.train_models != null && params.pretrained_models != null ) {
Expand All @@ -75,15 +91,24 @@ workflow {
// train new models if specified
if( params.train_models != null ) {
model_types = params.train_models.tokenize(',')
(ch_models, ch_train_logs) = TRAIN(ch_train_datasets, model_types)
ch_train_inputs = ch_train_datasets.flatMap { r ->
model_types.collect { model_type ->
r + record(model_type: model_type)
}
}
ch_models = TRAIN(ch_train_inputs)
}

// otherwise load pretrained models if specified
else if( params.pretrained_models != null ) {
ch_models = channel.of(file(params.pretrained_models))
.flatMap { json -> json.splitJson() }
ch_models = channel.of(params.pretrained_models)
.flatMap { json -> json.splitJson() as List<Map> }
.map { r ->
tuple(r.dataset_name, r.model_type, file(r.model))
record(
dataset_name: r.dataset_name as String,
model_type: r.model_type as String,
model: file(r.model, checkIfExists: true)
)
}
}

Expand All @@ -92,19 +117,93 @@ workflow {
}

// evaluate each model against test dataset
ch_evaluate_inputs = ch_models.combine(ch_test_datasets, by: 0)
(ch_scores, ch_test_logs) = EVALUATE(ch_evaluate_inputs)
ch_evaluate_inputs = ch_models.join(ch_test_datasets, by: 'dataset_name')
ch_evals = EVALUATE(ch_evaluate_inputs).map { r ->
record(
model_type: r.model_type,
dataset_name: r.dataset_name,
score_name: r.score.name,
score: r.score.value,
logs: r.logs,
)
}

// report the best model for each dataset based on evaluation score
ch_scores
.max { it -> fromJson(it[2]).value }
.subscribe { dataset_name, model_type, score_file ->
def score = fromJson(score_file)
printf "The best model for dataset '${dataset_name}' was '${model_type}' (${score.name} = %.3f)\n", score.value
ch_evals
.map { r -> tuple(r.dataset_name, r) }
.groupBy()
.subscribe { dataset_name, evals ->
def best = evals.max { r -> r.score }
printf "The best model for dataset '${dataset_name}' was '${best.model_type}' (${best.score_name} = %.3f)\n", best.score
}

publish:
datasets = ch_datasets
splits = ch_splits
trained_models = params.train_models != null ? ch_models : channel.empty()
evals = ch_evals
}

/*
* Workflow outputs
*/
output {
datasets: Channel<Dataset> {
path { r -> "datasets/${r.dataset_name}/" }
index {
path 'datasets/index.json'
}
}

splits: Channel<TrainTestSplit> {
path { r -> "splits/${r.dataset_name}/" }
index {
path 'splits/index.json'
}
}

trained_models: Channel<Model> {
path { r -> "trained_models/${r.dataset_name}-${r.model_type}/" }
index {
path 'trained_models/index.json'
}
}

evals: Channel<Eval> {
path { r -> "evals/${r.dataset_name}-${r.model_type}/" }
index {
path 'evals/index.json'
}
}
}

/*
* Types
*/
record Dataset {
dataset_name: String
meta: Path
data: Path
}

record TrainTestSplit {
dataset_name: String
meta: Path
data_train: Path
plot_train: Path
data_test: Path
plot_test: Path
}

record Model {
model_type: String
model: Path
dataset_name: String
}

def fromJson(file) {
return new groovy.json.JsonSlurper().parse(file)
record Eval {
model_type: String
dataset_name: String
score_name: String
score: Float
}
33 changes: 26 additions & 7 deletions modules/evaluate/main.nf
Original file line number Diff line number Diff line change
@@ -1,20 +1,39 @@
nextflow.enable.types = true

process EVALUATE {
publishDir params.outdir, mode: 'copy', saveAs: { file -> "${dataset_name}.${model_type}.${file}" }
tag "${dataset_name}/${model_type}"

input:
tuple val(dataset_name), val(model_type), path(model_file), path(meta_file), path(data_file)
record(
model_type: String,
model: Path,
dataset_name: String,
data: Path,
meta: Path
)

output:
tuple val(dataset_name), val(model_type), path('score.json'), emit: scores
tuple val(dataset_name), val(model_type), stdout, emit: logs
record(
model_type: model_type,
dataset_name: dataset_name,
score: new groovy.json.JsonSlurper().parse(file('score.json')) as Map,
// score: fromJson(file('score.json')) as Map,
Comment on lines +19 to +20
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Update when nextflow-io/nextflow#7148 is merged and Nextflow 26.04.2 is released

logs: file('evaluate.log'),
)

script:
"""
evaluate.py \
--model ${model_file} \
--data ${data_file} \
--meta ${meta_file}
--model ${model} \
--data ${data} \
--meta ${meta} \
> evaluate.log
"""
}

/*
* Load data from a JSON file
*/
def fromJson(file: Path) {
return new groovy.json.JsonSlurper().parse(file)
}
12 changes: 8 additions & 4 deletions modules/fetch_dataset/main.nf
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
nextflow.enable.types = true

process FETCH_DATASET {
publishDir params.outdir, mode: 'copy', saveAs: { file -> "${dataset_name}.${file}" }
tag "${dataset_name}"
tag dataset_name

input:
val(dataset_name)
dataset_name: String

output:
tuple val(dataset_name), path('meta.json'), path('data.txt')
record(
dataset_name: dataset_name,
meta: file('meta.json'),
data: file('data.txt'),
)

script:
"""
Expand Down
19 changes: 14 additions & 5 deletions modules/split_train_test/main.nf
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
nextflow.enable.types = true

process SPLIT_TRAIN_TEST {
publishDir params.outdir, mode: 'copy', saveAs: { file -> "${dataset_name}.${file}" }
tag "${dataset_name}"
tag dataset_name

input:
tuple val(dataset_name), path(meta_file), path(data_file)
record(
dataset_name: String,
meta: Path,
data: Path
)

output:
tuple val(dataset_name), path(meta_file), path('train.txt'), path('test.txt')
record(
dataset_name: dataset_name,
meta: meta,
data_train: file('train.txt'),
data_test: file('test.txt'),
)

script:
"""
split-train-test.py --data ${data_file}
split-train-test.py --data ${data}
"""
}
25 changes: 17 additions & 8 deletions modules/train/main.nf
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
nextflow.enable.types = true

process TRAIN {
publishDir params.outdir, mode: 'copy', saveAs: { file -> "${dataset_name}.${model_type}.${file}" }
tag "${dataset_name}/${model_type}"

input:
tuple val(dataset_name), path(meta_file), path(data_file)
each model_type
record(
dataset_name: String,
meta: Path,
data: Path,
model_type: String
)

output:
tuple val(dataset_name), val(model_type), path('model.pkl'), emit: models
tuple val(dataset_name), val(model_type), stdout, emit: logs
record(
dataset_name: dataset_name,
model_type: model_type,
model: file('model.pkl'),
logs: file('train.log'),
)

script:
"""
train.py \
--data ${data_file} \
--meta ${meta_file} \
--data ${data} \
--meta ${meta} \
--scaler standard \
--model-type ${model_type}
--model-type ${model_type} \
> train.log
"""
}
Loading
Loading