Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed race condition that may result in errors in cleanup and deprecate cleanup #1949

Merged
merged 16 commits into from
Mar 3, 2023
Merged
26 changes: 22 additions & 4 deletions esmvalcore/_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,10 +456,28 @@ def _run(self, recipe: Path, session) -> None:
def _clean_preproc(session):
import shutil

if session["remove_preproc_dir"] and session.preproc_dir.exists():
logger.info("Removing preproc containing preprocessed data")
logger.info("If this data is further needed, then")
logger.info("set remove_preproc_dir to false in config-user.yml")
if (not session['save_intermediary_cubes'] and
session.fixed_file_dir.exists()):
logger.debug(
"Removing `preproc/fixed_files` directory containing fixed "
"data"
)
logger.debug(
"If this data is further needed, then set "
"`save_intermediary_cubes` to `true` and `remove_preproc_dir` "
"to `false` in your user configuration file"
)
shutil.rmtree(session.fixed_file_dir)

if session['remove_preproc_dir'] and session.preproc_dir.exists():
logger.info(
"Removing `preproc` directory containing preprocessed data"
)
logger.info(
"If this data is further needed, then set "
"`remove_preproc_dir` to `false` in your user configuration "
"file"
)
shutil.rmtree(session.preproc_dir)

@staticmethod
Expand Down
11 changes: 0 additions & 11 deletions esmvalcore/_recipe/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,6 @@ def _get_default_settings(dataset):
'units': facets['units'],
}

# Clean up fixed files
if not session['save_intermediary_cubes']:
fix_dirs = []
for item in [dataset] + dataset.supplementaries:
output_file = _get_output_file(item.facets, session.preproc_dir)
fix_dir = f"{output_file.with_suffix('')}_fixed"
fix_dirs.append(fix_dir)
settings['cleanup'] = {
'remove': fix_dirs,
}

# Strip supplementary variables before saving
settings['remove_supplementary_variables'] = {}

Expand Down
19 changes: 12 additions & 7 deletions esmvalcore/cmor/_fixes/fix.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Contains the base class for dataset fixes."""
from __future__ import annotations

import importlib
import inspect
import os
from pathlib import Path

from ..table import CMOR_TABLES
Expand Down Expand Up @@ -197,21 +198,25 @@ def get_fixes(project, dataset, mip, short_name, extra_facets=None):
return fixes

@staticmethod
def get_fixed_filepath(output_dir, filepath):
def get_fixed_filepath(
output_dir: str | Path,
filepath: str | Path,
) -> str:
schlunma marked this conversation as resolved.
Show resolved Hide resolved
"""Get the filepath for the fixed file.

Parameters
----------
output_dir: str
output_dir: str or Path
Output directory.
filepath: str
filepath: str or Path
Original path.

Returns
-------
str
Path to the fixed file.
"""
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
return os.path.join(output_dir, os.path.basename(filepath))
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
schlunma marked this conversation as resolved.
Show resolved Hide resolved
filepath = Path(filepath)
return str(output_dir / filepath.name)
6 changes: 6 additions & 0 deletions esmvalcore/config/_config_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class Session(ValidatedConfig):
_deprecated_defaults = _deprecated_options_defaults

relative_preproc_dir = Path('preproc')
relative_fixed_file_dir = Path('preproc', 'fixed_files')
relative_work_dir = Path('work')
relative_plot_dir = Path('plots')
relative_run_dir = Path('run')
Expand Down Expand Up @@ -184,6 +185,11 @@ def preproc_dir(self):
"""Return preproc directory."""
return self.session_dir / self.relative_preproc_dir

@property
def fixed_file_dir(self):
"""Return fixed file directory."""
return self.session_dir / self.relative_fixed_file_dir

@property
def work_dir(self):
"""Return work directory."""
Expand Down
21 changes: 20 additions & 1 deletion esmvalcore/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import pprint
import re
import tempfile
import textwrap
import uuid
from copy import deepcopy
Expand Down Expand Up @@ -702,7 +703,7 @@ def _load(self, callback) -> Cube:

settings: dict[str, dict[str, Any]] = {}
settings['fix_file'] = {
'output_dir': Path(f"{output_file.with_suffix('')}_fixed"),
'output_dir': self.get_temporary_fixed_file_dir(),
**self.facets,
}
settings['load'] = {'callback': callback}
Expand Down Expand Up @@ -848,3 +849,21 @@ def _update_timerange(self):
check.valid_time_selection(timerange)

self.set_facet('timerange', timerange)

def get_temporary_fixed_file_dir(self) -> Path:
schlunma marked this conversation as resolved.
Show resolved Hide resolved
"""Create and return new temporary directory for storing fixed files.

Returns
-------
Path
Path to new temporary directory.

"""
fixed_file_dir = self.session.fixed_file_dir
fixed_file_dir.mkdir(parents=True, exist_ok=True)
schlunma marked this conversation as resolved.
Show resolved Hide resolved
facets_for_prefix = ('project', 'dataset', 'mip', 'short_name')
schlunma marked this conversation as resolved.
Show resolved Hide resolved
prefix = '_'.join(
[str(self.facets.get(facet, '')) for facet in facets_for_prefix] +
schlunma marked this conversation as resolved.
Show resolved Hide resolved
['']
)
return Path(tempfile.mkdtemp(prefix=prefix, dir=fixed_file_dir))
schlunma marked this conversation as resolved.
Show resolved Hide resolved
20 changes: 3 additions & 17 deletions tests/integration/recipe/test_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@

DEFAULT_PREPROCESSOR_STEPS = (
'load',
'cleanup',
'remove_supplementary_variables',
'save',
)
Expand All @@ -104,16 +103,13 @@ def create_test_file(filename, tracking_id=None):
iris.save(cube, filename)


def _get_default_settings_for_chl(fix_dir, save_filename):
def _get_default_settings_for_chl(save_filename):
"""Get default preprocessor settings for chl."""
defaults = {
'load': {
'callback': 'default'
},
'remove_supplementary_variables': {},
'cleanup': {
'remove': [fix_dir]
},
'save': {
'compress': False,
'filename': save_filename,
Expand Down Expand Up @@ -432,9 +428,7 @@ def test_default_preprocessor(tmp_path, patched_datafinder, session):
preproc_dir = os.path.dirname(product.filename)
assert preproc_dir.startswith(str(tmp_path))

fix_dir = os.path.join(
preproc_dir, 'CMIP5_CanESM2_Oyr_historical_r1i1p1_chl_2000-2005_fixed')
defaults = _get_default_settings_for_chl(fix_dir, product.filename)
defaults = _get_default_settings_for_chl(product.filename)
assert product.settings == defaults


Expand Down Expand Up @@ -472,9 +466,7 @@ def test_default_preprocessor_custom_order(tmp_path, patched_datafinder,
preproc_dir = os.path.dirname(product.filename)
assert preproc_dir.startswith(str(tmp_path))

fix_dir = os.path.join(
preproc_dir, 'CMIP5_CanESM2_Oyr_historical_r1i1p1_chl_2000-2005_fixed')
defaults = _get_default_settings_for_chl(fix_dir, product.filename)
defaults = _get_default_settings_for_chl(product.filename)
assert product.settings == defaults


Expand Down Expand Up @@ -564,17 +556,11 @@ def test_default_fx_preprocessor(tmp_path, patched_datafinder, session):
preproc_dir = os.path.dirname(product.filename)
assert preproc_dir.startswith(str(tmp_path))

fix_dir = os.path.join(preproc_dir,
'CMIP5_CanESM2_fx_historical_r0i0p0_sftlf_fixed')

defaults = {
'load': {
'callback': 'default'
},
'remove_supplementary_variables': {},
'cleanup': {
'remove': [fix_dir]
},
'save': {
'compress': False,
'filename': product.filename,
Expand Down
16 changes: 16 additions & 0 deletions tests/unit/main/test_esmvaltool.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def cfg(mocker, tmp_path):
session.session_dir = output_dir / 'recipe_test'
session.run_dir = session.session_dir / 'run_dir'
session.preproc_dir = session.session_dir / 'preproc_dir'
session.fixed_file_dir = session.preproc_dir / 'fixed_files'

cfg = mocker.Mock()
cfg.start_session.return_value = session
Expand Down Expand Up @@ -82,6 +83,7 @@ def test_run(mocker, session, search_esgf):
session['log_level'] = 'default'
session['config_file'] = '/path/to/config-user.yml'
session['remove_preproc_dir'] = True
session['save_intermediary_cubes'] = False

recipe = Path('/recipe_dir/recipe_test.yml')

Expand Down Expand Up @@ -156,10 +158,24 @@ def test_run_session_dir_exists_alternative_fails(mocker, session):

def test_clean_preproc_dir(session):
session.preproc_dir.mkdir(parents=True)
session.fixed_file_dir.mkdir(parents=True)
session['remove_preproc_dir'] = True
session['save_intermediary_cubes'] = False
program = ESMValTool()
program._clean_preproc(session)
assert not session.preproc_dir.exists()
assert not session.fixed_file_dir.exists()


def test_do_not_clean_preproc_dir(session):
session.preproc_dir.mkdir(parents=True)
session.fixed_file_dir.mkdir(parents=True)
session['remove_preproc_dir'] = False
session['save_intermediary_cubes'] = True
program = ESMValTool()
program._clean_preproc(session)
assert session.preproc_dir.exists()
assert session.fixed_file_dir.exists()


@mock.patch('esmvalcore._main.iter_entry_points')
Expand Down
1 change: 0 additions & 1 deletion tests/unit/recipe/test_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,6 @@ def test_get_default_settings(mocker):
'load': {'callback': 'default'},
'remove_supplementary_variables': {},
'save': {'compress': False, 'alias': 'sic'},
'cleanup': {'remove': ['/path/to/file_fixed']},
}


Expand Down
25 changes: 25 additions & 0 deletions tests/unit/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1620,6 +1620,12 @@ def test_load(mocker, session):
create_autospec=True,
return_value=output_file,
)
test_get_temporary_fixed_file_dir = mocker.patch.object(
dataset,
'get_temporary_fixed_file_dir',
create_autospec=True,
return_value=fix_dir,
)
args = {}
order = []

Expand Down Expand Up @@ -1714,6 +1720,7 @@ def mock_preprocess(items, step, input_files, output_file, debug,
assert args == load_args

_get_output_file.assert_called_with(dataset.facets, session.preproc_dir)
test_get_temporary_fixed_file_dir.assert_called_once_with()


def test_load_fail(session):
Expand All @@ -1723,3 +1730,21 @@ def test_load_fail(session):
dataset.files = []
with pytest.raises(InputFilesNotFound):
dataset.load()


@pytest.mark.parametrize(
'dataset,prefix',
[
(Dataset(project='OBS', dataset='X', mip='fx', short_name='sftlf'),
'OBS_X_fx_sftlf_'),
(Dataset(dataset='Y'), '_Y___'),
(Dataset(), '____'),
]
)
def test_get_temporary_fixed_file_dir(session, dataset, prefix):
"""Test ``Dataset._get_temporary_fixed_file_dir``."""
dataset.session = session
temp_dir = dataset.get_temporary_fixed_file_dir()
assert temp_dir.parent == session.session_dir / 'preproc' / 'fixed_files'
assert temp_dir.is_dir()
assert temp_dir.name.startswith(prefix)