2929from azimuth .startup import startup_tasks
3030from azimuth .task_manager import TaskManager
3131from azimuth .types import DatasetSplitName , ModuleOptions , SupportedModule
32- from azimuth .utils .cluster import default_cluster
3332from azimuth .utils .conversion import JSONResponseIgnoreNan
3433from azimuth .utils .logs import set_logger_config
3534from azimuth .utils .validation import assert_not_none
@@ -147,9 +146,7 @@ def start_app(config_path: Optional[str], load_config_history: bool, debug: bool
147146 if azimuth_config .dataset is None :
148147 raise ValueError ("No dataset has been specified in the config." )
149148
150- local_cluster = default_cluster (large = azimuth_config .large_dask_cluster )
151-
152- run_startup_tasks (azimuth_config , local_cluster )
149+ run_startup_tasks (azimuth_config )
153150 task_manager = assert_not_none (_task_manager )
154151 task_manager .client .run (set_logger_config , level )
155152
@@ -321,25 +318,20 @@ def load_dataset_split_managers_from_config(
321318 }
322319
323320
324- def initialize_managers (azimuth_config : AzimuthConfig , cluster : SpecCluster ):
325- """Initialize DatasetSplitManagers and TaskManagers.
326-
321+ def initialize_managers_and_config (
322+ azimuth_config : AzimuthConfig , cluster : Optional [SpecCluster ] = None
323+ ):
324+ """Initialize DatasetSplitManagers and Config.
327325
328326 Args:
329- azimuth_config: Configuration
330- cluster: Dask cluster to use.
327+ azimuth_config: Config
328+ cluster: Dask cluster to use, if different than default .
331329 """
332330 global _task_manager , _dataset_split_managers , _azimuth_config
333- _azimuth_config = azimuth_config
334- if _task_manager is not None :
335- task_history = _task_manager .current_tasks
336- else :
337- task_history = {}
338-
339- _task_manager = TaskManager (azimuth_config , cluster = cluster )
340-
341- _task_manager .current_tasks = task_history
331+ if not _task_manager :
332+ _task_manager = TaskManager (cluster , azimuth_config .large_dask_cluster )
342333
334+ _azimuth_config = azimuth_config
343335 _dataset_split_managers = load_dataset_split_managers_from_config (azimuth_config )
344336
345337
@@ -361,6 +353,7 @@ def run_validation_module(pipeline_index=None):
361353 _ , task = task_manager .get_task (
362354 task_name = SupportedModule .Validation ,
363355 dataset_split_name = dataset_split ,
356+ config = config ,
364357 mod_options = ModuleOptions (pipeline_index = pipeline_index ),
365358 )
366359 # Will raise exceptions as needed.
@@ -373,15 +366,14 @@ def run_validation_module(pipeline_index=None):
373366 run_validation_module (pipeline_index )
374367
375368
376- def run_startup_tasks (azimuth_config : AzimuthConfig , cluster : SpecCluster ):
369+ def run_startup_tasks (azimuth_config : AzimuthConfig , cluster : Optional [ SpecCluster ] = None ):
377370 """Initialize managers, run validation and startup tasks.
378371
379372 Args:
380373 azimuth_config: Config
381- cluster: Cluster
382-
374+ cluster: Dask cluster to use, if different than default.
383375 """
384- initialize_managers (azimuth_config , cluster )
376+ initialize_managers_and_config (azimuth_config , cluster )
385377
386378 task_manager = assert_not_none (get_task_manager ())
387379 # Validate that everything is in order **before** the startup tasks.
@@ -393,5 +385,5 @@ def run_startup_tasks(azimuth_config: AzimuthConfig, cluster: SpecCluster):
393385 azimuth_config .save () # Save only after the validation modules ran successfully
394386
395387 global _startup_tasks , _ready_flag
396- _startup_tasks = startup_tasks (_dataset_split_managers , task_manager )
388+ _startup_tasks = startup_tasks (_dataset_split_managers , task_manager , azimuth_config )
397389 _ready_flag = Event ()
0 commit comments