Skip to content

Commit

Permalink
Fixed race condition that may result in errors in cleanup and depre…
Browse files Browse the repository at this point in the history
…cate `cleanup` (#1949)

Co-authored-by: Bouwe Andela <[email protected]>
  • Loading branch information
schlunma and bouweandela authored Mar 3, 2023
1 parent b1e5fec commit 02128bf
Show file tree
Hide file tree
Showing 22 changed files with 429 additions and 195 deletions.
26 changes: 22 additions & 4 deletions esmvalcore/_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,10 +456,28 @@ def _run(self, recipe: Path, session) -> None:
def _clean_preproc(session):
import shutil

if session["remove_preproc_dir"] and session.preproc_dir.exists():
logger.info("Removing preproc containing preprocessed data")
logger.info("If this data is further needed, then")
logger.info("set remove_preproc_dir to false in config-user.yml")
if (not session['save_intermediary_cubes'] and
session._fixed_file_dir.exists()):
logger.debug(
"Removing `preproc/fixed_files` directory containing fixed "
"data"
)
logger.debug(
"If this data is further needed, then set "
"`save_intermediary_cubes` to `true` and `remove_preproc_dir` "
"to `false` in your user configuration file"
)
shutil.rmtree(session._fixed_file_dir)

if session['remove_preproc_dir'] and session.preproc_dir.exists():
logger.info(
"Removing `preproc` directory containing preprocessed data"
)
logger.info(
"If this data is further needed, then set "
"`remove_preproc_dir` to `false` in your user configuration "
"file"
)
shutil.rmtree(session.preproc_dir)

@staticmethod
Expand Down
11 changes: 0 additions & 11 deletions esmvalcore/_recipe/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,6 @@ def _get_default_settings(dataset):
'units': facets['units'],
}

# Clean up fixed files
if not session['save_intermediary_cubes']:
fix_dirs = []
for item in [dataset] + dataset.supplementaries:
output_file = _get_output_file(item.facets, session.preproc_dir)
fix_dir = f"{output_file.with_suffix('')}_fixed"
fix_dirs.append(fix_dir)
settings['cleanup'] = {
'remove': fix_dirs,
}

# Strip supplementary variables before saving
settings['remove_supplementary_variables'] = {}

Expand Down
23 changes: 17 additions & 6 deletions esmvalcore/cmor/_fixes/cmip6/cesm2.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
class Cl(Fix):
"""Fixes for ``cl``."""

def _fix_formula_terms(self, filepath, output_dir):
def _fix_formula_terms(
self,
filepath,
output_dir,
add_unique_suffix=False,
):
"""Fix ``formula_terms`` attribute."""
new_path = self.get_fixed_filepath(output_dir, filepath)
new_path = self.get_fixed_filepath(
output_dir, filepath, add_unique_suffix=add_unique_suffix
)
copyfile(filepath, new_path)
dataset = Dataset(new_path, mode='a')
dataset.variables['lev'].formula_terms = 'p0: p0 a: a b: b ps: ps'
Expand All @@ -29,7 +36,7 @@ def _fix_formula_terms(self, filepath, output_dir):
dataset.close()
return new_path

def fix_file(self, filepath, output_dir):
def fix_file(self, filepath, output_dir, add_unique_suffix=False):
"""Fix hybrid pressure coordinate.
Adds missing ``formula_terms`` attribute to file.
Expand All @@ -45,16 +52,20 @@ def fix_file(self, filepath, output_dir):
----------
filepath : str
Path to the original file.
output_dir : str
Path of the directory where the fixed file is saved to.
output_dir: Path
Output directory for fixed files.
add_unique_suffix: bool, optional (default: False)
Adds a unique suffix to `output_dir` for thread safety.
Returns
-------
str
Path to the fixed file.
"""
new_path = self._fix_formula_terms(filepath, output_dir)
new_path = self._fix_formula_terms(
filepath, output_dir, add_unique_suffix=add_unique_suffix
)
dataset = Dataset(new_path, mode='a')
dataset.variables['a_bnds'][:] = dataset.variables['a_bnds'][::-1, :]
dataset.variables['b_bnds'][:] = dataset.variables['b_bnds'][::-1, :]
Expand Down
14 changes: 9 additions & 5 deletions esmvalcore/cmor/_fixes/cmip6/cesm2_waccm.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
"""Fixes for CESM2-WACCM model."""
from netCDF4 import Dataset

from ..common import SiconcFixScalarCoord
from .cesm2 import Cl as BaseCl
from .cesm2 import Fgco2 as BaseFgco2
from .cesm2 import Omon as BaseOmon
from .cesm2 import Tas as BaseTas
from ..common import SiconcFixScalarCoord


class Cl(BaseCl):
"""Fixes for cl."""

def fix_file(self, filepath, output_dir):
def fix_file(self, filepath, output_dir, add_unique_suffix=False):
"""Fix hybrid pressure coordinate.
Adds missing ``formula_terms`` attribute to file.
Expand All @@ -27,16 +27,20 @@ def fix_file(self, filepath, output_dir):
----------
filepath : str
Path to the original file.
output_dir : str
Path of the directory where the fixed file is saved to.
output_dir: Path
Output directory for fixed files.
add_unique_suffix: bool, optional (default: False)
Adds a unique suffix to `output_dir` for thread safety.
Returns
-------
str
Path to the fixed file.
"""
new_path = self._fix_formula_terms(filepath, output_dir)
new_path = self._fix_formula_terms(
filepath, output_dir, add_unique_suffix=add_unique_suffix
)
dataset = Dataset(new_path, mode='a')
dataset.variables['a_bnds'][:] = dataset.variables['a_bnds'][:, ::-1]
dataset.variables['b_bnds'][:] = dataset.variables['b_bnds'][:, ::-1]
Expand Down
6 changes: 4 additions & 2 deletions esmvalcore/cmor/_fixes/emac/emac.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class AllVars(EmacFix):
'kg/m**2s': 'kg m-2 s-1',
}

def fix_file(self, filepath, output_dir):
def fix_file(self, filepath, output_dir, add_unique_suffix=False):
"""Fix file.
Fixes hybrid pressure level coordinate.
Expand All @@ -51,7 +51,9 @@ def fix_file(self, filepath, output_dir):
"""
if 'alevel' not in self.vardef.dimensions:
return filepath
new_path = self.get_fixed_filepath(output_dir, filepath)
new_path = self.get_fixed_filepath(
output_dir, filepath, add_unique_suffix=add_unique_suffix
)
copyfile(filepath, new_path)
with Dataset(new_path, mode='a') as dataset:
if 'formula_terms' in dataset.variables['lev'].ncattrs():
Expand Down
78 changes: 54 additions & 24 deletions esmvalcore/cmor/_fixes/fix.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Contains the base class for dataset fixes."""
from __future__ import annotations

import importlib
import inspect
import os
import tempfile
from pathlib import Path

from ..table import CMOR_TABLES
Expand All @@ -16,36 +18,45 @@ def __init__(self, vardef, extra_facets=None):
Parameters
----------
vardef: str
CMOR table entry
CMOR table entry.
extra_facets: dict, optional
Extra facets are mainly used for data outside of the big projects
like CMIP, CORDEX, obs4MIPs. For details, see :ref:`extra_facets`.
"""
self.vardef = vardef
if extra_facets is None:
extra_facets = {}
self.extra_facets = extra_facets

def fix_file(self, filepath: Path, output_dir: Path) -> Path:
def fix_file(
self,
filepath: Path,
output_dir: Path,
add_unique_suffix: bool = False,
) -> Path:
"""Apply fixes to the files prior to creating the cube.
Should be used only to fix errors that prevent loading or can
not be fixed in the cube (i.e. those related with missing_value
and _FillValue)
Should be used only to fix errors that prevent loading or cannot be
fixed in the cube (e.g., those related to `missing_value` or
`_FillValue`).
Parameters
----------
filepath: Path
file to fix
File to fix.
output_dir: Path
path to the folder to store the fixed files, if required
Output directory for fixed files.
add_unique_suffix: bool, optional (default: False)
Adds a unique suffix to `output_dir` for thread safety.
Returns
-------
Path
Path to the corrected file. It can be different from the original
filepath if a fix has been applied, but if not it should be the
original filepath
original filepath.
"""
return filepath

Expand All @@ -59,12 +70,13 @@ def fix_metadata(self, cubes):
Parameters
----------
cubes: iris.cube.CubeList
Cubes to fix
Cubes to fix.
Returns
-------
iris.cube.CubeList
Fixed cubes. They can be different instances.
"""
return cubes

Expand All @@ -74,19 +86,21 @@ def get_cube_from_list(self, cubes, short_name=None):
Parameters
----------
cubes : iris.cube.CubeList
List of cubes to search
short_name : str
Cube's variable short name. If None, short name is the class name
List of cubes to search.
short_name : str or None
Cube's variable short name. If `None`, `short name` is the class
name.
Raises
------
Exception
If no cube is found
If no cube is found.
Returns
-------
iris.Cube
Variable's cube
"""
if short_name is None:
short_name = self.vardef.short_name
Expand All @@ -103,12 +117,13 @@ def fix_data(self, cube):
Parameters
----------
cube: iris.cube.Cube
Cube to fix
Cube to fix.
Returns
-------
iris.cube.Cube
Fixed cube. It can be a difference instance.
"""
return cube

Expand Down Expand Up @@ -151,8 +166,9 @@ def get_fixes(project, dataset, mip, short_name, extra_facets=None):
Returns
-------
list(Fix)
list[Fix]
Fixes to apply for the given data.
"""
cmor_table = CMOR_TABLES[project]
vardef = cmor_table.get_variable(mip, short_name)
Expand Down Expand Up @@ -197,21 +213,35 @@ def get_fixes(project, dataset, mip, short_name, extra_facets=None):
return fixes

@staticmethod
def get_fixed_filepath(output_dir, filepath):
def get_fixed_filepath(
output_dir: str | Path,
filepath: str | Path,
add_unique_suffix: bool = False,
) -> Path:
"""Get the filepath for the fixed file.
Parameters
----------
output_dir: str
Output directory.
filepath: str
output_dir: Path
Output directory for fixed files. Will be created if it does not
exist yet.
filepath: str or Path
Original path.
add_unique_suffix: bool, optional (default: False)
Adds a unique suffix to `output_dir` for thread safety.
Returns
-------
str
Path
Path to the fixed file.
"""
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
return os.path.join(output_dir, os.path.basename(filepath))
output_dir = Path(output_dir)
if add_unique_suffix:
parent_dir = output_dir.parent
parent_dir.mkdir(parents=True, exist_ok=True)
prefix = output_dir.name
output_dir = Path(tempfile.mkdtemp(prefix=prefix, dir=parent_dir))
else:
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir / Path(filepath).name
8 changes: 5 additions & 3 deletions esmvalcore/cmor/_fixes/ipslcm/ipsl_cm6.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
class AllVars(Fix):
"""Fixes for all IPSLCM variables."""

def fix_file(self, filepath, output_dir):
def fix_file(self, filepath, output_dir, add_unique_suffix=False):
"""Select IPSLCM variable in filepath.
This is done only if input file is a multi-variable one. This
Expand Down Expand Up @@ -48,10 +48,12 @@ def fix_file(self, filepath, output_dir):
# Proceed with CDO selvar
varname = self.extra_facets.get(VARNAME_KEY, self.vardef.short_name)
alt_filepath = str(filepath).replace(".nc", "_cdo_selected.nc")
outfile = self.get_fixed_filepath(output_dir, alt_filepath)
outfile = self.get_fixed_filepath(
output_dir, alt_filepath, add_unique_suffix=add_unique_suffix
)
tim1 = time.time()
logger.debug("Using CDO for selecting %s in %s", varname, filepath)
command = ["cdo", "-selvar,%s" % varname, str(filepath), outfile]
command = ["cdo", "-selvar,%s" % varname, str(filepath), str(outfile)]
subprocess.run(command, check=True)
logger.debug("CDO selection done in %.2f seconds", time.time() - tim1)
return outfile
Expand Down
Loading

0 comments on commit 02128bf

Please sign in to comment.