diff --git a/examples/parallel/plot_streaming_pipeline.py b/examples/parallel/plot_streaming_pipeline.py index e38fbc94..dca13673 100644 --- a/examples/parallel/plot_streaming_pipeline.py +++ b/examples/parallel/plot_streaming_pipeline.py @@ -3,9 +3,27 @@ =================================================================== This demonstrates how to stream data in parallel in a Neuraxle pipeline. +The pipeline steps' parallelism here will be obvious. + +The pipeline has two steps: +1. Preprocessing: the step that process the data simply sleeps. +2. Model: the model simply multiplies the data by two. + +This can be used with scikit-learn as well to transform things in parallel, +and any other library such as tensorflow. + +Pipelines benchmarked: +1. We first use a classical pipeline and evaluate the time. +2. Then we use a minibatched pipeline and we evaluate the time. +3. Then we use a parallel pipeline and we evaluate the time. + +We expect the parallel pipeline to be faster due to having more workers +in parallel, as well as starting the model's transformations at the same +time that other batches are being preprocessed, using queues. + .. - Copyright 2019, Neuraxio Inc. + Copyright 2021, Neuraxio Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -25,47 +43,49 @@ import numpy as np from neuraxle.distributed.streaming import SequentialQueuedPipeline -from neuraxle.pipeline import Pipeline +from neuraxle.pipeline import BasePipeline, Pipeline, MiniBatchSequentialPipeline from neuraxle.steps.loop import ForEach from neuraxle.steps.misc import Sleep from neuraxle.steps.numpy import MultiplyByN -def main(): - """ - Process tasks of batch size 10 with 8 queued workers that have a max queue size of 10. - Each task doest the following: For each data input, sleep 0.02 seconds, and multiply by 2. - """ - sleep_time = 0.02 - p = SequentialQueuedPipeline([ - Pipeline([ForEach(Sleep(sleep_time=sleep_time)), MultiplyByN(2)]), - ], n_workers_per_step=8, max_queue_size=10, batch_size=10) - +def eval_run_time(pipeline: BasePipeline): + pipeline.setup() a = time.time() - outputs_streaming = p.transform(list(range(100))) + output = pipeline.transform(list(range(100))) b = time.time() - time_queued_pipeline = b - a - print('SequentialQueuedPipeline') - print('execution time: {} seconds'.format(time_queued_pipeline)) + seconds = b - a + return seconds, output + +def main(): """ - Process data inputs sequentially. - For each data input, sleep 0.02 seconds, and then multiply by 2. + The task is to sleep 0.02 seconds for each data input and then multiply by 2. """ - p = Pipeline([ - Pipeline([ForEach(Sleep(sleep_time=sleep_time)), MultiplyByN(2)]), - ]) - - a = time.time() - outputs_vanilla = p.transform(list(range(100))) - b = time.time() - time_vanilla_pipeline = b - a - - print('VanillaPipeline') - print('execution time: {} seconds'.format(time_vanilla_pipeline)) - - assert time_queued_pipeline < time_vanilla_pipeline - assert np.array_equal(outputs_streaming, outputs_vanilla) + sleep_time = 0.02 + preprocessing_and_model_steps = [ForEach(Sleep(sleep_time=sleep_time)), MultiplyByN(2)] + + # Classical pipeline - all at once with one big batch: + p = Pipeline(preprocessing_and_model_steps) + time_vanilla_pipeline, output_classical = eval_run_time(p) + print(f"Classical 'Pipeline' execution time: {time_vanilla_pipeline} seconds.") + + # Classical minibatch pipeline - minibatch size 10: + p = MiniBatchSequentialPipeline(preprocessing_and_model_steps, + batch_size=10) + time_minibatch_pipeline, output_minibatch = eval_run_time(p) + print(f"Minibatched 'MiniBatchSequentialPipeline' execution time: {time_minibatch_pipeline} seconds.") + + # Parallel pipeline - minibatch size 10 with 8 workers per step that + # have a max queue size of 5 batches between preprocessing and the model: + p = SequentialQueuedPipeline(preprocessing_and_model_steps, + n_workers_per_step=8, max_queue_size=5, batch_size=10) + time_parallel_pipeline, output_parallel = eval_run_time(p) + print(f"Parallel 'SequentialQueuedPipeline' execution time: {time_parallel_pipeline} seconds.") + + assert time_parallel_pipeline < time_minibatch_pipeline, str((time_parallel_pipeline, time_vanilla_pipeline)) + assert np.array_equal(output_classical, output_minibatch) + assert np.array_equal(output_classical, output_parallel) if __name__ == '__main__': diff --git a/neuraxle/base.py b/neuraxle/base.py index 579cdeb4..239c442c 100644 --- a/neuraxle/base.py +++ b/neuraxle/base.py @@ -2242,7 +2242,7 @@ def _teardown(self) -> 'BaseTransformer': :return: self """ self.is_initialized = False - return self + return RecursiveDict() def __del__(self): try: diff --git a/neuraxle/distributed/streaming.py b/neuraxle/distributed/streaming.py index 3ca2164a..d011e112 100644 --- a/neuraxle/distributed/streaming.py +++ b/neuraxle/distributed/streaming.py @@ -59,7 +59,7 @@ class ObservableQueueMixin(MixinForBaseTransformer): :class:`SequentialQueuedPipeline` """ - def __init__(self, queue): + def __init__(self, queue: Queue): MixinForBaseTransformer.__init__(self) self.queue = queue self.observers = [] @@ -176,7 +176,7 @@ def __init__( additional_worker_arguments = [[] for _ in range(n_workers)] MetaStep.__init__(self, wrapped) - ObservableQueueMixin.__init__(self, Queue(maxsize=max_queue_size)) + ObservableQueueMixin.__init__(self, Queue(maxsize=max_queue_size)) # max_queue_size is in batches self.use_processes: bool = use_processes self.workers: List[Process] = [] @@ -345,17 +345,16 @@ class BaseQueuedPipeline(MiniBatchSequentialPipeline): ], batch_size=10, max_queue_size=5) outputs = p.transform(list(range(100))) - :param steps: pipeline steps - :param batch_size: number of elements to combine into a single batch - :param n_workers_per_step: number of workers to spawn per step - :param max_queue_size: max number of elements inside the processing queue - :param data_joiner: transformer step to join streamed batches together at the end of the pipeline + :param steps: pipeline steps. + :param batch_size: number of elements to combine into a single batch. + :param n_workers_per_step: number of workers to spawn per step. + :param max_queue_size: max number of batches inside the processing queue between the workers. + :param data_joiner: transformer step to join streamed batches together at the end of the pipeline. :param use_processes: use processes instead of threads for parallel processing. multiprocessing.context.Process is used by default. :param use_savers: use savers to serialize steps for parallel processing. Recommended if using processes instead of threads. - :param keep_incomplete_batch: (Optional.) A bool representing - whether the last batch should be dropped in the case it has fewer than - `batch_size` elements; the default behavior is to keep the smaller - batch. + :param keep_incomplete_batch: (Optional.) A bool that indicates whether + or not the last batch should be dropped in the case it has fewer than + `batch_size` elements; the default behavior is to keep the smaller batch. :param default_value_data_inputs: expected_outputs default fill value for padding and values outside iteration range, or :class:`~neuraxle.data_container.DataContainer.AbsentValuesNullObject` to trim absent values from the batch diff --git a/neuraxle/metaopt/auto_ml.py b/neuraxle/metaopt/auto_ml.py index 315c91ca..ae9d30e2 100644 --- a/neuraxle/metaopt/auto_ml.py +++ b/neuraxle/metaopt/auto_ml.py @@ -41,8 +41,7 @@ import numpy as np -from neuraxle.base import BaseStep, ExecutionContext, ForceHandleMixin, ExecutionPhase, _HasChildrenMixin, \ - LOGGER_FORMAT, DATE_FORMAT +from neuraxle.base import BaseStep, ExecutionContext, ForceHandleMixin, ExecutionPhase, _HasChildrenMixin from neuraxle.data_container import DataContainer from neuraxle.hyperparams.space import HyperparameterSamples, HyperparameterSpace from neuraxle.metaopt.callbacks import BaseCallback, CallbackList, ScoringCallback @@ -54,6 +53,7 @@ class HyperparamsRepository(_Observable[Tuple['HyperparamsRepository', Trial]], ABC): """ Hyperparams repository that saves hyperparams, and scores for every AutoML trial. + Cache folder can be changed to do different round numbers. .. seealso:: :class:`AutoML`, @@ -66,10 +66,15 @@ class HyperparamsRepository(_Observable[Tuple['HyperparamsRepository', Trial]], :class:`~neuraxle.hyperparams.space.HyperparameterSamples` """ - def __init__(self, hyperparameter_selection_strategy=None, cache_folder=None, best_retrained_model_folder=None): + def __init__( + self, + hyperparameter_selection_strategy: 'BaseHyperparameterSelectionStrategy' = None, + cache_folder: str = None, + best_retrained_model_folder: str = None, + ): super().__init__() if cache_folder is None: - cache_folder = 'trials' + cache_folder = os.path.join(f'{self.__class__.__name__}', 'trials') if best_retrained_model_folder is None: best_retrained_model_folder = os.path.join(cache_folder, 'best') self.best_retrained_model_folder = best_retrained_model_folder @@ -155,7 +160,7 @@ def save_best_model(self, step: BaseStep): self._save_best_model(step, trial_hash) return step - def new_trial(self, auto_ml_container: 'AutoMLContainer'): + def new_trial(self, auto_ml_container: 'AutoMLContainer') -> Trial: """ Create a new trial with the best next hyperparams. @@ -164,19 +169,16 @@ def new_trial(self, auto_ml_container: 'AutoMLContainer'): :return: trial """ hyperparams = self.hyperparameter_selection_strategy.find_next_best_hyperparams(auto_ml_container) - logger = self._create_logger_for_trial(auto_ml_container.trial_number) - logger.info('\nnew trial: {}'.format(json.dumps(hyperparams.to_nested_dict(), sort_keys=True, indent=4))) trial = Trial( + trial_number=auto_ml_container.trial_number, hyperparams=hyperparams, save_trial_function=self.save_trial, - logger=logger, cache_folder=self.cache_folder, main_metric_name=auto_ml_container.main_scoring_metric_name ) return trial - def _get_trial_hash(self, hp_dict): """ Hash hyperparams with md5 to create a trial hash. @@ -187,19 +189,6 @@ def _get_trial_hash(self, hp_dict): current_hyperparameters_hash = hashlib.md5(str.encode(str(hp_dict))).hexdigest() return current_hyperparameters_hash - def _create_logger_for_trial(self, trial_number) -> logging.Logger: - - os.makedirs(self.cache_folder, exist_ok=True) - - logfile_path = os.path.join(self.cache_folder, f"trial_{trial_number}.log") - logger_name = f"trial_{trial_number}" - logger = logging.getLogger(logger_name) - formatter = logging.Formatter(fmt=LOGGER_FORMAT, datefmt=DATE_FORMAT) - file_handler = logging.FileHandler(filename=logfile_path) - file_handler.setFormatter(formatter) - logger.addHandler(file_handler) - return logger - class InMemoryHyperparamsRepository(HyperparamsRepository): """ @@ -329,14 +318,14 @@ def _save_trial(self, trial: 'Trial'): # Sleeping to have a valid time difference between files when reloading them to sort them by creation time: time.sleep(0.1) - def new_trial(self, auto_ml_container: 'AutoMLContainer'): + def new_trial(self, auto_ml_container: 'AutoMLContainer') -> Trial: """ Create new hyperperams trial json file. :param auto_ml_container: auto ml container :return: """ - trial = HyperparamsRepository.new_trial(self, auto_ml_container) + trial: Trial = HyperparamsRepository.new_trial(self, auto_ml_container) self._save_trial(trial) return trial @@ -346,6 +335,7 @@ def load_all_trials(self, status: 'TRIAL_STATUS' = None) -> 'Trials': Load all hyperparameter trials with their corresponding score. Reads all the saved trial json files, sorted by creation date. + :param status: (optional) filter to select only trials with this status. :return: (hyperparams, scores) """ trials = Trials() @@ -370,7 +360,8 @@ def getmtimens(filename): if status is None or trial_json['status'] == status.value: trials.append(Trial.from_json( update_trial_function=self.save_trial, - trial_json=trial_json + trial_json=trial_json, + cache_folder=self.cache_folder )) return trials @@ -498,7 +489,14 @@ def __init__( hyperparams_repository = InMemoryHyperparamsRepository() self.hyperparams_repository: HyperparamsRepository = hyperparams_repository - def train(self, pipeline: BaseStep, data_inputs, expected_outputs=None, context: ExecutionContext = None) -> Trial: + def train( + self, + pipeline: BaseStep, + data_inputs, + expected_outputs=None, + context: ExecutionContext = None, + trial_number=0 + ) -> Trial: """ Train pipeline using the validation splitter. Track training, and validation metrics for each epoch. @@ -523,12 +521,12 @@ def train(self, pipeline: BaseStep, data_inputs, expected_outputs=None, context: logger=context.logger, hyperparams=pipeline.get_hyperparams(), main_metric_name=self.get_main_metric_name(), - save_trial_function=self.hyperparams_repository.save_trial + save_trial_function=self.hyperparams_repository.save_trial, + trial_number=trial_number ) self.execute_trial( pipeline=pipeline, - trial_number=1, repo_trial=repo_trial, context=context, validation_splits=validation_splits, @@ -541,7 +539,6 @@ def train(self, pipeline: BaseStep, data_inputs, expected_outputs=None, context: def execute_trial( self, pipeline: BaseStep, - trial_number: int, repo_trial: Trial, context: ExecutionContext, validation_splits: List[Tuple[DataContainer, DataContainer]], @@ -576,7 +573,7 @@ def execute_trial( repo_trial=repo_trial, repo_trial_split_number=repo_trial_split.split_number, validation_splits=validation_splits, - trial_number=trial_number, + trial_number=repo_trial.trial_number, n_trial=n_trial ) @@ -867,7 +864,6 @@ def _attempt_trial(self, trial_number, validation_splits, context: ExecutionCont repo_trial_split = self.trainer.execute_trial( pipeline=self.pipeline, context=context, - trial_number=trial_number, repo_trial=repo_trial, validation_splits=validation_splits, n_trial=self.n_trial @@ -1153,8 +1149,9 @@ class ValidationSplitter(BaseValidationSplitter): def __init__(self, test_size: float): self.test_size = test_size - def split(self, data_inputs, current_ids=None, expected_outputs=None, context: ExecutionContext = None) -> Tuple[ - List, List, List, List]: + def split( + self, data_inputs, current_ids=None, expected_outputs=None, context: ExecutionContext = None + ) -> Tuple[List, List, List, List]: train_data_inputs, train_expected_outputs, train_current_ids, validation_data_inputs, validation_expected_outputs, validation_current_ids = validation_split( test_size=self.test_size, data_inputs=data_inputs, diff --git a/neuraxle/metaopt/trial.py b/neuraxle/metaopt/trial.py index 3a4e113c..39bd091e 100644 --- a/neuraxle/metaopt/trial.py +++ b/neuraxle/metaopt/trial.py @@ -26,13 +26,14 @@ import logging import os import traceback +import json from enum import Enum -from logging import Logger +from logging import FileHandler, Logger from typing import Dict, List, Callable, Iterable, Tuple import numpy as np -from neuraxle.base import BaseStep, ExecutionContext +from neuraxle.base import BaseStep, ExecutionContext, LOGGER_FORMAT, DATE_FORMAT from neuraxle.data_container import DataContainer from neuraxle.hyperparams.space import HyperparameterSamples @@ -56,10 +57,10 @@ class Trial: def __init__( self, + trial_number: int, hyperparams: HyperparameterSamples, main_metric_name: str, save_trial_function: Callable, - logger: Logger = None, status: 'TRIAL_STATUS' = None, pipeline: BaseStep = None, validation_splits: List['TrialSplit'] = None, @@ -68,13 +69,11 @@ def __init__( error_traceback: str = None, start_time: datetime.datetime = None, end_time: datetime.datetime = None, + logger: Logger = None ): + self.trial_number = trial_number self.save_trial_function: Callable = save_trial_function - if logger is None: - logger = logging.getLogger() - self.logger: Logger = logger - if status is None: status = TRIAL_STATUS.PLANNED if validation_splits is None: @@ -91,6 +90,13 @@ def __init__( self.start_time: datetime.datetime = start_time self.end_time: datetime.datetime = end_time + if logger is None: + if self.cache_folder is not None: + logger = self._initialize_logger_with_file() + else: + logger = logging.getLogger() + self.logger: Logger = logger + def save_trial(self) -> 'Trial': """ Update trial with the hyperparams repository. @@ -269,6 +275,7 @@ def _get_trial_hash(self, hp_dict: Dict): def to_json(self): return { + 'trial_number': self.trial_number, 'status': self.status.value, 'hyperparams': self.hyperparams.to_flat_dict(), 'validation_splits': [v.to_json() for v in self.validation_splits], @@ -280,8 +287,9 @@ def to_json(self): } @staticmethod - def from_json(update_trial_function: Callable, trial_json: Dict) -> 'Trial': + def from_json(update_trial_function: Callable, trial_json: Dict, cache_folder: str = None) -> 'Trial': trial: Trial = Trial( + trial_number=trial_json["trial_number"], main_metric_name=trial_json['main_metric_name'], status=TRIAL_STATUS(trial_json['status']), hyperparams=HyperparameterSamples(trial_json['hyperparams']), @@ -289,7 +297,9 @@ def from_json(update_trial_function: Callable, trial_json: Dict) -> 'Trial': error=trial_json['error'], error_traceback=trial_json['error_traceback'], start_time=datetime.datetime.strptime(trial_json['start_time'], TRIAL_DATETIME_STR_FORMAT), - end_time=datetime.datetime.strptime(trial_json['start_time'], TRIAL_DATETIME_STR_FORMAT) + end_time=datetime.datetime.strptime(trial_json['start_time'], TRIAL_DATETIME_STR_FORMAT), + cache_folder=cache_folder, + logger=None ) trial.validation_splits = [ @@ -304,12 +314,36 @@ def from_json(update_trial_function: Callable, trial_json: Dict) -> 'Trial': def __getitem__(self, item) -> 'TrialSplit': return self.validation_splits[item] + def _initialize_logger_with_file(self) -> logging.Logger: + + os.makedirs(self.cache_folder, exist_ok=True) + + logfile_path = os.path.join(self.cache_folder, f"trial_{self.trial_number}.log") + logger_name = f"trial_{self.trial_number}" + logger = logging.getLogger(logger_name) + formatter = logging.Formatter(fmt=LOGGER_FORMAT, datefmt=DATE_FORMAT) + file_handler = logging.FileHandler(filename=logfile_path) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + return logger + + def _free_logger_file(self): + """Remove file handlers from logger to free file lock on Windows.""" + for h in self.logger.handlers: + if isinstance(h, FileHandler): + self.logger.removeHandler(h) + def __enter__(self): """ Start trial, and set the trial status to PLANNED. """ self.start_time = datetime.datetime.now() self.status = TRIAL_STATUS.STARTED + + self.logger.info( + '\nnew trial: {}'.format( + json.dumps(self.hyperparams.to_nested_dict(), sort_keys=True, indent=4))) + self.save_trial() return self @@ -330,6 +364,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): raise exc_val self.save_trial() + self._free_logger_file() return self @@ -428,7 +463,7 @@ def set_main_metric_name(self, name: str) -> 'TrialSplit': return self - def add_metric_results_train(self, name: str, score: float, higher_score_is_better: bool, log_metric: bool=False): + def add_metric_results_train(self, name: str, score: float, higher_score_is_better: bool, log_metric: bool = False): """ Add a train metric result in the metric results dictionary. @@ -449,7 +484,7 @@ def add_metric_results_train(self, name: str, score: float, higher_score_is_bett if log_metric: self.trial.logger.info('{} train: {}'.format(name, score)) - def add_metric_results_validation(self, name: str, score: float, higher_score_is_better: bool, log_metric: bool=False): + def add_metric_results_validation(self, name: str, score: float, higher_score_is_better: bool, log_metric: bool = False): """ Add a validation metric result in the metric results dictionary. @@ -700,18 +735,16 @@ def get_best_trial(self) -> Trial: if len(self) == 0: raise Exception('Could not get best trial because there were no successful trial.') - higher_score_is_better = self.trials[-1].is_higher_score_better() - for trial in self.trials: trial_score = trial.get_validation_score() - if best_score is None or higher_score_is_better == (trial_score > best_score): + if best_score is None or self.trials[-1].is_higher_score_better() == (trial_score > best_score): best_score = trial_score best_trial = trial return best_trial def split_good_and_bad_trials(self, quantile_threshold: float, number_of_good_trials_max_cap: int) -> Tuple[ - 'Trials', 'Trials']: + 'Trials', 'Trials']: success_trials: Trials = self.filter(TRIAL_STATUS.SUCCESS) # Split trials into good and bad using quantile threshold. diff --git a/testing/hyperparams/test_scipy_distributions.py b/testing/hyperparams/test_scipy_distributions.py index 2c22b5a5..2349d316 100644 --- a/testing/hyperparams/test_scipy_distributions.py +++ b/testing/hyperparams/test_scipy_distributions.py @@ -272,6 +272,7 @@ def test_choice_and_priority_choice(ctor): assert abs((hd.mean() - np.mean(samples_index)) / hd.mean()) < 1e-1 assert abs((hd.var() - np.var(samples_index)) / hd.var()) < 1e-1 + @pytest.mark.parametrize("hd, test_method", [ (RandInt(min_included=-10, max_included=10, null_default_value=0), _test_randint), (LogNormal(hard_clip_min=-5, hard_clip_max=5, log2_space_mean=0.0, log2_space_std=2.0, null_default_value=-1.0), _test_lognormal), @@ -283,8 +284,8 @@ def test_choice_and_priority_choice(ctor): (Histogram(histogram=np.histogram(norm.rvs(size=10000, loc=0, scale=1.5, random_state=123), bins=100), null_default_value=0.0), _test_histogram) ]) def test_after_serialization(hd, test_method, tmpdir): - joblib.dump(hd, os.path.join(str(tmpdir), '{}.joblib'.format(hd.__class__))) - hd_loaded = joblib.load(os.path.join(str(tmpdir), '{}.joblib'.format(hd.__class__))) + joblib.dump(hd, os.path.join(str(tmpdir), '{}.joblib'.format(hd.__class__.__name__))) + hd_loaded = joblib.load(os.path.join(str(tmpdir), '{}.joblib'.format(hd.__class__.__name__))) assert hd.__class__ == hd_loaded.__class__ test_method(hd_loaded) diff --git a/testing/metaopt/test_automl.py b/testing/metaopt/test_automl.py index 8eb524a2..b4ee5e95 100644 --- a/testing/metaopt/test_automl.py +++ b/testing/metaopt/test_automl.py @@ -24,7 +24,7 @@ def test_automl_early_stopping_callback(tmpdir): # Given hp_repository = InMemoryHyperparamsRepository(cache_folder=str(tmpdir)) n_epochs = 10 - max_epochs_without_improvement=3 + max_epochs_without_improvement = 3 auto_ml = AutoML( pipeline=Pipeline([ MultiplyByN(2).set_hyperparams_space(HyperparameterSpace({ @@ -57,12 +57,11 @@ def test_automl_early_stopping_callback(tmpdir): validation_scores = trial.validation_splits[0].get_validation_scores() nepochs_executed = len(validation_scores) assert nepochs_executed == max_epochs_without_improvement + 1 - -@pytest.mark.skip + def test_automl_savebestmodel_callback(tmpdir): # Given - hp_repository = HyperparamsJSONRepository(cache_folder=str('caching')) + hp_repository = HyperparamsJSONRepository(cache_folder=tmpdir) validation_splitter = ValidationSplitter(0.20) auto_ml = AutoML( pipeline=Pipeline([ @@ -78,13 +77,12 @@ def test_automl_savebestmodel_callback(tmpdir): callbacks=[ BestModelCheckpoint() ], - n_trials=1, - epochs=10, + n_trials=3, + epochs=1, refit_trial=False, - print_func=print, hyperparams_repository=hp_repository, continue_loop_on_error=False - ) + ).with_context(ExecutionContext(tmpdir)) data_inputs = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) expected_outputs = data_inputs * 4 @@ -92,23 +90,21 @@ def test_automl_savebestmodel_callback(tmpdir): # When auto_ml.fit(data_inputs=data_inputs, expected_outputs=expected_outputs) - - #Then + # Then trials: Trials = hp_repository.load_all_trials() best_trial = trials.get_best_trial() best_trial_score = best_trial.get_validation_score() - best_trial.cache_folder = hp_repository.cache_folder best_model = best_trial.get_model('best') - _, _, valid_inputs, valid_outputs = ValidationSplitter(0.20).split(data_inputs, expected_outputs) - predicted_output = best_model.predict(valid_inputs) - score = mean_squared_error(valid_outputs, predicted_output) + _, _, _, valid_inputs, valid_outputs, _ = validation_splitter.split(data_inputs=data_inputs, expected_outputs=expected_outputs) + predicted_output = best_model.predict(*valid_inputs) + score = mean_squared_error(*valid_outputs, predicted_output) assert best_trial_score == score - + def test_automl_with_kfold(tmpdir): # Given - hp_repository = HyperparamsJSONRepository(cache_folder=str('caching')) + hp_repository = HyperparamsJSONRepository(cache_folder=tmpdir) auto_ml = AutoML( pipeline=Pipeline([ MultiplyByN(2).set_hyperparams_space(HyperparameterSpace({ @@ -342,7 +338,8 @@ def test_trainer_train(): validation_splitter=ValidationSplitter(test_size=0.20) ) - repo_trial: Trial = trainer.train(pipeline=p, data_inputs=data_inputs, expected_outputs=expected_outputs, context=ExecutionContext()) + repo_trial: Trial = trainer.train(pipeline=p, data_inputs=data_inputs, + expected_outputs=expected_outputs, context=ExecutionContext()) trained_pipeline = repo_trial.get_trained_pipeline(split_number=0) @@ -350,4 +347,3 @@ def test_trainer_train(): mse = mean_squared_error(expected_outputs, outputs) assert mse < 1 - diff --git a/testing/metaopt/test_tpe.py b/testing/metaopt/test_tpe.py index 2632c7f8..9e8fee3c 100644 --- a/testing/metaopt/test_tpe.py +++ b/testing/metaopt/test_tpe.py @@ -110,7 +110,6 @@ def test_tpe(expected_output_mult, pipeline, tmpdir): assert mean_tpe_score < mean_random_score - def _test_trial_scores( expected_output_mult, pipeline, @@ -144,4 +143,3 @@ def _test_trial_scores( trials: Trials = hp_repository.load_all_trials(status=TRIAL_STATUS.SUCCESS) validation_scores = [t.get_validation_score() for t in trials] return validation_scores - diff --git a/testing/metaopt/test_trial.py b/testing/metaopt/test_trial.py index a9e61795..a7731184 100644 --- a/testing/metaopt/test_trial.py +++ b/testing/metaopt/test_trial.py @@ -1,4 +1,5 @@ import datetime +import time from neuraxle.metaopt.auto_ml import InMemoryHyperparamsRepository @@ -19,279 +20,244 @@ MAIN_METRIC_NAME = 'mse' -def test_trial_should_create_new_split(): - hp = HyperparameterSamples({'a': 2}) - repo = InMemoryHyperparamsRepository() - trial = Trial( - save_trial_function=repo.save_trial, - hyperparams=hp, - main_metric_name=MAIN_METRIC_NAME - ) +class TestTrials: + def setup(self): + self.hp = HyperparameterSamples({'a': 2}) + self.repo = InMemoryHyperparamsRepository() + self.trial = Trial( + trial_number=0, + save_trial_function=self.repo.save_trial, + hyperparams=self.hp, + main_metric_name=MAIN_METRIC_NAME + ) + + def test_trial_should_have_end_time_later_than_start_time(self): + with self.trial.new_validation_split(Identity()) as trial_split: + time.sleep(0.001) # TODO: maybe remove sleep? + trial_split.set_success() + + assert isinstance(trial_split.start_time, datetime.datetime) + assert isinstance(trial_split.end_time, datetime.datetime) + assert trial_split.start_time < trial_split.end_time + + def test_trial_should_create_new_split(self): + with self.trial.new_validation_split(Identity()) as trial_split: + trial_split.set_success() - with trial.new_validation_split(Identity()) as trial_split: - trial_split.set_success() + assert self.trial.validation_splits[0] == trial_split - assert isinstance(trial_split.start_time, datetime.datetime) - assert isinstance(trial_split.end_time, datetime.datetime) - assert trial_split.start_time < trial_split.end_time - assert trial.validation_splits[0] == trial_split + def test_trial_split_is_new_best_score_should_return_true_with_one_score(self): + with self.trial.new_validation_split(Identity()) as trial_split: + trial_split.add_metric_results_train( + name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) + trial_split.add_metric_results_validation( + name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) + assert trial_split.is_new_best_score() -def test_trial_split_is_new_best_score_should_return_true_with_one_score(): - hp = HyperparameterSamples({'a': 2}) - repo = InMemoryHyperparamsRepository() - trial = Trial(save_trial_function=repo.save_trial, hyperparams=hp, main_metric_name=MAIN_METRIC_NAME) + def test_trial_split_is_new_best_score_should_return_false_with_not_a_new_best_score(self): + with self.trial.new_validation_split(Identity()) as trial_split: + trial_split.add_metric_results_train( + name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) + trial_split.add_metric_results_validation( + name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) + + trial_split.add_metric_results_train( + name=MAIN_METRIC_NAME, score=0.7, higher_score_is_better=False) + trial_split.add_metric_results_validation( + name=MAIN_METRIC_NAME, score=0.7, higher_score_is_better=False) + + assert not trial_split.is_new_best_score() - with trial.new_validation_split(Identity()) as trial_split: - trial_split.add_metric_results_train(name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) - trial_split.add_metric_results_validation(name=MAIN_METRIC_NAME, score=0.5, - higher_score_is_better=False) + def test_trial_split_is_new_best_score_should_return_true_with_a_new_best_score_after_multiple_scores(self): + with self.trial.new_validation_split(Identity()) as trial_split: + trial_split.add_metric_results_train( + name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) + trial_split.add_metric_results_validation( + name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) - assert trial_split.is_new_best_score() + trial_split.add_metric_results_train( + name=MAIN_METRIC_NAME, score=0.7, higher_score_is_better=False) + trial_split.add_metric_results_validation( + name=MAIN_METRIC_NAME, score=0.7, higher_score_is_better=False) + trial_split.add_metric_results_train( + name=MAIN_METRIC_NAME, score=0.4, higher_score_is_better=False) + trial_split.add_metric_results_validation( + name=MAIN_METRIC_NAME, score=0.4, higher_score_is_better=False) -def test_trial_split_is_new_best_score_should_return_false_with_not_a_new_best_score(): - hp = HyperparameterSamples({'a': 2}) - repo = InMemoryHyperparamsRepository() - trial = Trial(save_trial_function=repo.save_trial, hyperparams=hp, main_metric_name=MAIN_METRIC_NAME) + assert trial_split.is_new_best_score() - with trial.new_validation_split(Identity()) as trial_split: - trial_split.add_metric_results_train(name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) - trial_split.add_metric_results_validation(name=MAIN_METRIC_NAME, score=0.5, - higher_score_is_better=False) + def test_success_trial_split_to_json(self): + with self.trial: + trial_split = self._given_success_trial_validation_split(self.trial) + trial_json = trial_split.to_json() - trial_split.add_metric_results_train(name=MAIN_METRIC_NAME, score=0.7, higher_score_is_better=False) - trial_split.add_metric_results_validation(name=MAIN_METRIC_NAME, score=0.7, - higher_score_is_better=False) + self._then_success_trial_split_json_is_valid(trial_json) - assert not trial_split.is_new_best_score() + def _then_success_trial_split_json_is_valid(self, trial_json): + assert trial_json['status'] == TRIAL_STATUS.SUCCESS.value + assert trial_json['error'] is None + assert trial_json['error_traceback'] is None + assert trial_json['metric_results'] == EXPECTED_METRIC_RESULTS + assert trial_json['main_metric_name'] == MAIN_METRIC_NAME + start_time = datetime.datetime.strptime(trial_json['start_time'], TRIAL_DATETIME_STR_FORMAT) + end_time = datetime.datetime.strptime(trial_json['end_time'], TRIAL_DATETIME_STR_FORMAT) + datetime.timedelta( + hours=1) + assert start_time < end_time + return True -def test_trial_split_is_new_best_score_should_return_true_with_a_new_best_score_after_multiple_scores(): - hp = HyperparameterSamples({'a': 2}) - repo = InMemoryHyperparamsRepository() - trial = Trial(save_trial_function=repo.save_trial, hyperparams=hp, main_metric_name=MAIN_METRIC_NAME) + def test_success_trial_to_json(self): + with self.trial: + self._given_success_trial_validation_split(self.trial) - with trial.new_validation_split(Identity()) as trial_split: - trial_split.add_metric_results_train(name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) - trial_split.add_metric_results_validation(name=MAIN_METRIC_NAME, score=0.5, - higher_score_is_better=False) + trial_json = self.trial.to_json() - trial_split.add_metric_results_train(name=MAIN_METRIC_NAME, score=0.7, higher_score_is_better=False) - trial_split.add_metric_results_validation(name=MAIN_METRIC_NAME, score=0.7, - higher_score_is_better=False) + assert trial_json['status'] == TRIAL_STATUS.SUCCESS.value + assert trial_json['error'] is None + assert trial_json['error_traceback'] is None + assert trial_json['main_metric_name'] == self.trial.main_metric_name + assert self._then_success_trial_split_json_is_valid(trial_json['validation_splits'][0]) - trial_split.add_metric_results_train(name=MAIN_METRIC_NAME, score=0.4, higher_score_is_better=False) - trial_split.add_metric_results_validation(name=MAIN_METRIC_NAME, score=0.4, - higher_score_is_better=False) + start_time = datetime.datetime.strptime( + trial_json['start_time'], TRIAL_DATETIME_STR_FORMAT) + end_time = datetime.datetime.strptime( + trial_json['end_time'], TRIAL_DATETIME_STR_FORMAT) + datetime.timedelta(hours=1) - assert trial_split.is_new_best_score() + assert start_time < end_time + def test_success_trial_get_validation_score(self): + with self.trial: + self._given_success_trial_validation_split(self.trial, best_score=0.3) -def test_success_trial_split_to_json(): - hp = HyperparameterSamples({'a': 2}) - repo = InMemoryHyperparamsRepository() - trial = Trial( - save_trial_function=repo.save_trial, - hyperparams=hp, - main_metric_name=MAIN_METRIC_NAME - ) + validation_score = self.trial.get_validation_score() - with trial: - trial_split = given_success_trial_validation_split(trial) - trial_json = trial_split.to_json() - - then_success_trial_split_json_is_valid(trial_json) - - -def then_success_trial_split_json_is_valid(trial_json): - assert trial_json['status'] == TRIAL_STATUS.SUCCESS.value - assert trial_json['error'] is None - assert trial_json['error_traceback'] is None - assert trial_json['metric_results'] == EXPECTED_METRIC_RESULTS - assert trial_json['main_metric_name'] == MAIN_METRIC_NAME - start_time = datetime.datetime.strptime(trial_json['start_time'], TRIAL_DATETIME_STR_FORMAT) - end_time = datetime.datetime.strptime(trial_json['end_time'], TRIAL_DATETIME_STR_FORMAT) + datetime.timedelta( - hours=1) - assert start_time < end_time - - return True - - -def test_success_trial_to_json(): - hp = HyperparameterSamples({'a': 2}) - repo = InMemoryHyperparamsRepository() - trial = Trial( - save_trial_function=repo.save_trial, - hyperparams=hp, - main_metric_name='mse' - ) - - with trial: - given_success_trial_validation_split(trial) - - trial_json = trial.to_json() - - assert trial_json['status'] == TRIAL_STATUS.SUCCESS.value - assert trial_json['error'] is None - assert trial_json['error_traceback'] is None - assert trial_json['main_metric_name'] == trial.main_metric_name - assert then_success_trial_split_json_is_valid(trial_json['validation_splits'][0]) - - start_time = datetime.datetime.strptime(trial_json['start_time'], TRIAL_DATETIME_STR_FORMAT) - end_time = datetime.datetime.strptime(trial_json['end_time'], TRIAL_DATETIME_STR_FORMAT) + datetime.timedelta( - hours=1) - - assert start_time < end_time - - -def test_success_trial_get_validation_score(): - hp = HyperparameterSamples({'a': 2}) - repo = InMemoryHyperparamsRepository() - trial = Trial(save_trial_function=repo.save_trial, hyperparams=hp, main_metric_name='mse') - - with trial: - given_success_trial_validation_split(trial, best_score=0.3) - - validation_score = trial.get_validation_score() - - assert validation_score == 0.3 - - -def test_success_trial_multiple_splits_should_average_the_scores(): - hp = HyperparameterSamples({'a': 2}) - repo = InMemoryHyperparamsRepository() - trial = Trial(save_trial_function=repo.save_trial, hyperparams=hp, main_metric_name='mse') - - with trial: - given_success_trial_validation_split(trial, best_score=0.3) - given_success_trial_validation_split(trial, best_score=0.1) - - validation_score = trial.get_validation_score() + assert validation_score == 0.3 - assert validation_score == 0.2 + def test_success_trial_multiple_splits_should_average_the_scores(self): + with self.trial: + self._given_success_trial_validation_split(self.trial, best_score=0.3) + self._given_success_trial_validation_split(self.trial, best_score=0.1) + validation_score = self.trial.get_validation_score() -def test_trial_with_failed_split_should_only_average_successful_splits(): - hp = HyperparameterSamples({'a': 2}) - repo = InMemoryHyperparamsRepository() - trial = Trial(save_trial_function=repo.save_trial, hyperparams=hp, main_metric_name='mse') + assert validation_score == 0.2 - with trial: - given_success_trial_validation_split(trial, best_score=0.3) - given_success_trial_validation_split(trial, best_score=0.1) - given_failed_trial_split(trial) + def test_trial_with_failed_split_should_only_average_successful_splits(self): - validation_score = trial.get_validation_score() + with self.trial: + self._given_success_trial_validation_split(self.trial, best_score=0.3) + self._given_success_trial_validation_split(self.trial, best_score=0.1) + self._given_failed_trial_split(self.trial) - assert validation_score == 0.2 + validation_score = self.trial.get_validation_score() + assert validation_score == 0.2 -def given_success_trial_validation_split(trial, best_score=0.4): - with trial.new_validation_split(Identity()) as trial_split: - trial_split.add_metric_results_train(name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) - trial_split.add_metric_results_validation(name=MAIN_METRIC_NAME, score=0.5, - higher_score_is_better=False) + def _given_success_trial_validation_split(self, trial, best_score=0.4): + with trial.new_validation_split(Identity()) as trial_split: + trial_split.add_metric_results_train( + name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) + trial_split.add_metric_results_validation( + name=MAIN_METRIC_NAME, score=0.5, + higher_score_is_better=False) - trial_split.add_metric_results_train(name=MAIN_METRIC_NAME, score=0.7, higher_score_is_better=False) - trial_split.add_metric_results_validation(name=MAIN_METRIC_NAME, score=0.7, - higher_score_is_better=False) + trial_split.add_metric_results_train( + name=MAIN_METRIC_NAME, score=0.7, higher_score_is_better=False) + trial_split.add_metric_results_validation( + name=MAIN_METRIC_NAME, score=0.7, higher_score_is_better=False) - trial_split.add_metric_results_train(name=MAIN_METRIC_NAME, score=best_score, - higher_score_is_better=False) - trial_split.add_metric_results_validation(name=MAIN_METRIC_NAME, score=best_score, - higher_score_is_better=False) - trial_split.set_success() - trial.set_success() + trial_split.add_metric_results_train( + name=MAIN_METRIC_NAME, score=best_score, higher_score_is_better=False) + trial_split.add_metric_results_validation( + name=MAIN_METRIC_NAME, score=best_score, higher_score_is_better=False) - return trial_split + trial_split.set_success() + trial.set_success() + return trial_split -def test_failure_trial_split_to_json(): - hp = HyperparameterSamples({'a': 2}) - repo = InMemoryHyperparamsRepository() - trial = Trial(save_trial_function=repo.save_trial, hyperparams=hp, main_metric_name='mse') - with trial: - trial_split = given_failed_trial_split(trial) + def test_failure_trial_split_to_json(self): + with self.trial: + trial_split = self._given_failed_trial_split(self.trial) - trial_json = trial_split.to_json() - - then_failed_validation_split_json_is_valid(trial_json, trial_split) - - -def then_failed_validation_split_json_is_valid(trial_json, trial_split): - assert trial_json['status'] == TRIAL_STATUS.FAILED.value - assert trial_json['error'] == str(trial_split.error) - assert trial_json['error_traceback'] == EXPECTED_ERROR_TRACEBACK - assert trial_json['metric_results'] == EXPECTED_METRIC_RESULTS - assert trial_json['main_metric_name'] == trial_split.main_metric_name - - start_time = datetime.datetime.strptime(trial_json['start_time'], TRIAL_DATETIME_STR_FORMAT) - end_time = datetime.datetime.strptime(trial_json['end_time'], TRIAL_DATETIME_STR_FORMAT) + datetime.timedelta( - hours=1) - assert start_time < end_time - return True - - -def test_failure_trial_to_json(): - hp = HyperparameterSamples({'a': 2}) - repo = InMemoryHyperparamsRepository() - trial = Trial(save_trial_function=repo.save_trial, hyperparams=hp, main_metric_name='mse') - - with trial: - trial_split = given_failed_trial_split(trial) - - trial_json = trial.to_json() - - assert trial_json['status'] == TRIAL_STATUS.FAILED.value - assert trial_json['error'] == str(trial_split.error) - assert trial_json['error_traceback'] == EXPECTED_ERROR_TRACEBACK - assert trial_json['main_metric_name'] == trial.main_metric_name - assert then_failed_validation_split_json_is_valid(trial_json['validation_splits'][0], trial_split=trial_split) - - start_time = datetime.datetime.strptime(trial_json['start_time'], TRIAL_DATETIME_STR_FORMAT) - end_time = datetime.datetime.strptime(trial_json['end_time'], TRIAL_DATETIME_STR_FORMAT) + datetime.timedelta( - hours=1) - - assert start_time < end_time - - -def given_failed_trial_split(trial): - with trial.new_validation_split(Identity()) as trial_split: - trial_split.add_metric_results_train(name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) - trial_split.add_metric_results_validation(name=MAIN_METRIC_NAME, score=0.5, - higher_score_is_better=False) - - trial_split.add_metric_results_train(name=MAIN_METRIC_NAME, score=0.7, higher_score_is_better=False) - trial_split.add_metric_results_validation(name=MAIN_METRIC_NAME, score=0.7, - higher_score_is_better=False) - - trial_split.add_metric_results_train(name=MAIN_METRIC_NAME, score=0.4, higher_score_is_better=False) - trial_split.add_metric_results_validation(name=MAIN_METRIC_NAME, score=0.4, - higher_score_is_better=False) - error = IndexError('index error') - trial_split.set_failed(error) - trial.set_failed(error) - return trial_split - - -def test_trials_get_best_hyperparams_should_return_hyperparams_of_best_trial(): - # Given - repo = InMemoryHyperparamsRepository() - hp_trial_1 = HyperparameterSamples({'a': 2}) - trial_1 = Trial(save_trial_function=repo.save_trial, hyperparams=hp_trial_1, main_metric_name=MAIN_METRIC_NAME) - with trial_1: - given_success_trial_validation_split(trial_1, best_score=0.2) - - hp_trial_2 = HyperparameterSamples({'b': 3}) - trial_2 = Trial(save_trial_function=repo.save_trial, hyperparams=hp_trial_2, main_metric_name=MAIN_METRIC_NAME) - with trial_2: - given_success_trial_validation_split(trial_2, best_score=0.1) - - trials = Trials(trials=[trial_1, trial_2]) - - # When - best_hyperparams = trials.get_best_hyperparams() + trial_json = trial_split.to_json() - # Then - assert best_hyperparams == hp_trial_2 + self._then_failed_validation_split_json_is_valid(trial_json, trial_split) + + def _then_failed_validation_split_json_is_valid(self, trial_json, trial_split): + assert trial_json['status'] == TRIAL_STATUS.FAILED.value + assert trial_json['error'] == str(trial_split.error) + assert trial_json['error_traceback'] == EXPECTED_ERROR_TRACEBACK + assert trial_json['metric_results'] == EXPECTED_METRIC_RESULTS + assert trial_json['main_metric_name'] == trial_split.main_metric_name + + start_time = datetime.datetime.strptime( + trial_json['start_time'], TRIAL_DATETIME_STR_FORMAT) + end_time = datetime.datetime.strptime( + trial_json['end_time'], TRIAL_DATETIME_STR_FORMAT) + datetime.timedelta(hours=1) + assert start_time < end_time + return True + + def test_failure_trial_to_json(self): + with self.trial: + trial_split = self._given_failed_trial_split(self.trial) + + trial_json = self.trial.to_json() + + assert trial_json['status'] == TRIAL_STATUS.FAILED.value + assert trial_json['error'] == str(trial_split.error) + assert trial_json['error_traceback'] == EXPECTED_ERROR_TRACEBACK + assert trial_json['main_metric_name'] == self.trial.main_metric_name + assert self._then_failed_validation_split_json_is_valid( + trial_json['validation_splits'][0], trial_split=trial_split) + + start_time = datetime.datetime.strptime( + trial_json['start_time'], TRIAL_DATETIME_STR_FORMAT) + end_time = datetime.datetime.strptime( + trial_json['end_time'], TRIAL_DATETIME_STR_FORMAT) + datetime.timedelta(hours=1) + + assert start_time < end_time + + def _given_failed_trial_split(self, trial): + with trial.new_validation_split(Identity()) as trial_split: + trial_split.add_metric_results_train( + name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) + trial_split.add_metric_results_validation( + name=MAIN_METRIC_NAME, score=0.5, higher_score_is_better=False) + + trial_split.add_metric_results_train( + name=MAIN_METRIC_NAME, score=0.7, higher_score_is_better=False) + trial_split.add_metric_results_validation( + name=MAIN_METRIC_NAME, score=0.7, higher_score_is_better=False) + + trial_split.add_metric_results_train( + name=MAIN_METRIC_NAME, score=0.4, higher_score_is_better=False) + trial_split.add_metric_results_validation( + name=MAIN_METRIC_NAME, score=0.4, higher_score_is_better=False) + error = IndexError('index error') + trial_split.set_failed(error) + trial.set_failed(error) + return trial_split + + def test_trials_get_best_hyperparams_should_return_hyperparams_of_best_trial(self): + # Given + trial_1 = self.trial + with trial_1: + self._given_success_trial_validation_split(trial_1, best_score=0.2) + + hp_trial_2 = HyperparameterSamples({'b': 3}) + trial_2 = Trial( + trial_number=1, save_trial_function=self.repo.save_trial, + hyperparams=hp_trial_2, main_metric_name=MAIN_METRIC_NAME) + with trial_2: + self._given_success_trial_validation_split(trial_2, best_score=0.1) + + trials = Trials(trials=[trial_1, trial_2]) + + # When + best_hyperparams = trials.get_best_hyperparams() + + # Then + assert best_hyperparams == hp_trial_2 diff --git a/testing/test_context_logger.py b/testing/test_context_logger.py index 8851d70f..f52a1af9 100644 --- a/testing/test_context_logger.py +++ b/testing/test_context_logger.py @@ -71,6 +71,7 @@ def test_logger(): l = f.read() # Teardown + file_handler.close() os.remove(file_path) @@ -118,6 +119,3 @@ def test_logger_automl(self, tmpdir): with open(f, 'r') as f: log = f.readlines() assert len(log) == 36 - - def teardown(self): - shutil.rmtree(self.tmpdir) diff --git a/testing/test_streaming.py b/testing/test_streaming.py index 97ab25b0..fb02b7f8 100644 --- a/testing/test_streaming.py +++ b/testing/test_streaming.py @@ -49,6 +49,7 @@ def test_queued_pipeline_with_included_incomplete_batch(): assert np.array_equal(outputs, np.array(list(range(15))) * 2 * 2 * 2 * 2) + def test_queued_pipeline_with_included_incomplete_batch_that_raises_an_exception(): with pytest.raises(AttributeError): p = SequentialQueuedPipeline( @@ -60,8 +61,8 @@ def test_queued_pipeline_with_included_incomplete_batch_that_raises_an_exception ], batch_size=10, keep_incomplete_batch=True, - default_value_data_inputs=None, # this will raise an exception in the worker - default_value_expected_outputs=None, # this will raise an exception in the worker + default_value_data_inputs=None, # this will raise an exception in the worker + default_value_expected_outputs=None, # this will raise an exception in the worker n_workers_per_step=1, max_queue_size=5 )