diff --git a/README.md b/README.md index 3db01ba..e6b04a6 100644 --- a/README.md +++ b/README.md @@ -4,15 +4,9 @@ Python Framework for Generating Diagnostics from CESM ## Project Vision -CUPiD is a collaborative effort that unifies all CESM component diagnostics and provides +CUPiD is a “one stop shop” that enables and integrates timeseries file generation, data standardization, diagnostics, and metrics from all CESM components. -- Python code that - 1. runs in an easy-to-generate conda environment, and - 1. can be launched via CIME workflow or independently -- Diagnostics for single/multiple runs and single/multiple components -- Ability to call post-processing tools that other groups are working on -- An API that makes it easy to include outside code -- Ongoing support and software maintenance +This collaborative effort aims to simplify the user experience of running diagnostics by calling post-processing tools directly from CUPiD, running all component diagnostics from the same tool as either part of the CIME workflow or independently, and sharing python code and a standard conda environment across components. ## Installing @@ -94,4 +88,7 @@ if not serial: client = Client(cluster) client -``` \ No newline at end of file +``` + +### Timeseries File Generation +CUPiD also has the capability to generate single variable timeseries files from history files for all components. To run timeseries, edit the `config.yml` file's timeseries section to fit your preferences, and then run `cupid-run config.yml -ts`. diff --git a/cupid/run.py b/cupid/run.py index 7aa2e5b..79798f3 100755 --- a/cupid/run.py +++ b/cupid/run.py @@ -2,42 +2,82 @@ import click import os -import sys from glob import glob import papermill as pm import intake import cupid.util +import cupid.timeseries from dask.distributed import Client import dask import time import ploomber +import yaml + +CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"]) + -CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @click.command(context_settings=CONTEXT_SETTINGS) @click.option("--serial", "-s", is_flag=True, help="Do not use LocalCluster objects") -@click.option("--time-series", "-ts", is_flag=True, - help="Run time series generation scripts prior to diagnostics") +@click.option( + "--time-series", + "-ts", + is_flag=True, + help="Run time series generation scripts prior to diagnostics", +) @click.argument("config_path") def run(config_path, serial=False, time_series=False): """ Main engine to set up running all the notebooks. """ - # Abort if run with --time-series (until feature is added) - if time_series: - sys.tracebacklimit = 0 - raise NotImplementedError("--time-series option not implemented yet") - # Get control structure control = cupid.util.get_control_dict(config_path) cupid.util.setup_book(config_path) + ##################################################################### + # Managing global parameters + + global_params = dict() + + if "global_params" in control: + global_params = control["global_params"] + #################################################################### + + if time_series: + timeseries_params = control["timeseries"] + + # general timeseries arguments for all components + num_procs = timeseries_params["num_procs"] + + + + for component in ['atm', 'ocn', 'lnd', 'ice', 'glc']: + cupid.timeseries.create_time_series( + component, + timeseries_params[component]["vars"], + timeseries_params[component]["derive_vars"], + [timeseries_params["case_name"]], # could also grab from compute_notebooks section of config file + timeseries_params[component]["hist_str"], + [global_params["CESM_output_dir"] + "/" + timeseries_params["case_name"] + f"/{component}/hist/"], # could also grab from compute_notebooks section of config file + [global_params["CESM_output_dir"]+'/'+timeseries_params['case_name']+f'/{component}/proc/tseries/'], + # Note that timeseries output will eventually go in /glade/derecho/scratch/${USER}/archive/${CASE}/${component}/proc/tseries/ + timeseries_params["ts_done"], + timeseries_params["overwrite_ts"], + timeseries_params[component]["start_years"], # could get from yaml file in adf_quick_run.parameter_groups.none.config_fil_str, or for other notebooks config files, eg ocean_surface.parameter_gropus.none.mom6_tools_config.start_date + timeseries_params[component]["end_years"], # could get from yaml file in adf_quick_run.parameter_groups.none.config_fil_str, or for other notebooks config files, eg ocean_surface.parameter_gropus.none.mom6_tools_config.end_date + timeseries_params[component]["level"], + num_procs, + serial, + ) + # Grab paths - run_dir = os.path.realpath(os.path.expanduser(control['data_sources']['run_dir'])) - output_dir = run_dir + "/computed_notebooks/" + control['data_sources']['sname'] - temp_data_path = run_dir + "/temp_data" - nb_path_root = os.path.realpath(os.path.expanduser(control['data_sources']['nb_path_root'])) + run_dir = os.path.realpath(os.path.expanduser(control["data_sources"]["run_dir"])) + output_dir = run_dir + "/computed_notebooks/" + control["data_sources"]["sname"] + temp_data_path = run_dir + "/temp_data" + nb_path_root = os.path.realpath( + os.path.expanduser(control["data_sources"]["nb_path_root"]) + ) ##################################################################### # Managing catalog-related stuff @@ -46,45 +86,38 @@ def run(config_path, serial=False, time_series=False): cat_path = None - if 'path_to_cat_json' in control['data_sources']: + if "path_to_cat_json" in control["data_sources"]: use_catalog = True - full_cat_path = os.path.realpath(os.path.expanduser(control['data_sources']['path_to_cat_json'])) + full_cat_path = os.path.realpath( + os.path.expanduser(control["data_sources"]["path_to_cat_json"]) + ) full_cat = intake.open_esm_datastore(full_cat_path) - # Doing initial subsetting on full catalog, e.g. to only use certain cases + # Doing initial subsetting on full catalog, e.g. to only use certain cases - if 'subset' in control['data_sources']: - first_subset_kwargs = control['data_sources']['subset'] + if "subset" in control["data_sources"]: + first_subset_kwargs = control["data_sources"]["subset"] cat_subset = full_cat.search(**first_subset_kwargs) # This pulls out the name of the catalog from the path - cat_subset_name = full_cat_path.split("/")[-1].split('.')[0] + "_subset" - cat_subset.serialize(directory=temp_data_path, name=cat_subset_name, catalog_type="file") + cat_subset_name = full_cat_path.split("/")[-1].split(".")[0] + "_subset" + cat_subset.serialize( + directory=temp_data_path, name=cat_subset_name, catalog_type="file" + ) cat_path = temp_data_path + "/" + cat_subset_name + ".json" else: cat_path = full_cat_path - - ##################################################################### - # Managing global parameters - - global_params = dict() - - if 'global_params' in control: - global_params = control['global_params'] - - ##################################################################### # Ploomber - making a DAG dag = ploomber.DAG(executor=ploomber.executors.Serial()) - ##################################################################### # Organizing notebooks - holdover from manually managing dependencies before all_nbs = dict() - for nb, info in control['compute_notebooks'].items(): + for nb, info in control["compute_notebooks"].items(): all_nbs[nb] = info @@ -92,21 +125,32 @@ def run(config_path, serial=False, time_series=False): for nb, info in all_nbs.items(): - global_params['serial'] = serial + global_params["serial"] = serial if "dependency" in info: - cupid.util.create_ploomber_nb_task(nb, info, cat_path, nb_path_root, output_dir, global_params, dag, dependency = info["dependency"]) + cupid.util.create_ploomber_nb_task( + nb, + info, + cat_path, + nb_path_root, + output_dir, + global_params, + dag, + dependency=info["dependency"], + ) else: - cupid.util.create_ploomber_nb_task(nb, info, cat_path, nb_path_root, output_dir, global_params, dag) + cupid.util.create_ploomber_nb_task( + nb, info, cat_path, nb_path_root, output_dir, global_params, dag + ) ##################################################################### # Organizing scripts - if 'compute_scripts' in control: + if "compute_scripts" in control: all_scripts = dict() - for script, info in control['compute_scripts'].items(): + for script, info in control["compute_scripts"].items(): all_scripts[script] = info @@ -115,14 +159,23 @@ def run(config_path, serial=False, time_series=False): for script, info in all_scripts.items(): if "dependency" in info: - cupid.util.create_ploomber_script_task(script, info, cat_path, nb_path_root, global_params, dag, dependency = info["dependency"]) + cupid.util.create_ploomber_script_task( + script, + info, + cat_path, + nb_path_root, + global_params, + dag, + dependency=info["dependency"], + ) else: - cupid.util.create_ploomber_script_task(script, info, cat_path, nb_path_root, global_params, dag) + cupid.util.create_ploomber_script_task( + script, info, cat_path, nb_path_root, global_params, dag + ) # Run the full DAG dag.build() return None - diff --git a/cupid/timeseries.py b/cupid/timeseries.py new file mode 100644 index 0000000..86bec80 --- /dev/null +++ b/cupid/timeseries.py @@ -0,0 +1,420 @@ +""" +Timeseries generation tool adapted from ADF for general CUPiD use. +""" + +# ++++++++++++++++++++++++++++++ +# Import standard python modules +# ++++++++++++++++++++++++++++++ + +import sys +import glob +import multiprocessing as mp +import os +import subprocess +import xarray as xr + +import importlib + +from pathlib import Path + + +def call_ncrcat(cmd): + """This is an internal function to `create_time_series` + It just wraps the subprocess.call() function, so it can be + used with the multiprocessing Pool that is constructed below. + It is declared as global to avoid AttributeError. + """ + return subprocess.run(cmd, shell=False) + + +def create_time_series( + component, + diag_var_list, + derive_vars, + case_names, + hist_str, + hist_locs, + ts_dir, + ts_done, + overwrite_ts, + start_years, + end_years, + height_dim, + num_procs, + serial, +): + """ + Generate time series versions of the history file data. + + Args + ---- + - component: str + name of component, eg 'cam' + # This could alternatively be made into a dictionary and encorporate values such as height_dim + - derive_vars: dict + information on derivable variables + eg, {'PRECT': ['PRECL','PRECC'], + 'RESTOM': ['FLNT','FSNT']} + - case_names: list, str + name of simulaton case + - hist_str: str + CESM history number, ie h0, h1, etc. + - hist_locs: list, str + location of CESM history files + - ts_dir: list, str + location where time series files will be saved, or where pre-made time series files exist + - ts_done: list, boolean + check if time series files already exist + - overwrite_ts: list, boolean + check if existing time series files will bew overwritten + - start_years: list, str or int + first year for desired range of years + - end_years: list, str or int + last year for desired range of years + - height_dim: str + name of height dimension for given component, eg 'lev' + - num_procs: int + number of processors + - diag_var_list: list + list of variables to create diagnostics (or timeseries) from + - serial: bool + if True, run in serial; if False, run in parallel + + """ + + # Notify user that script has started: + print(f"\n Generating {component} time series files...") + + # Loop over cases: + for case_idx, case_name in enumerate(case_names): + # Check if particular case should be processed: + if ts_done[case_idx]: + emsg = ( + " Configuration file indicates time series files have been pre-computed" + ) + emsg += f" for case '{case_name}'. Will rely on those files directly." + print(emsg) + continue + # End if + + print(f"\t Processing time series for case '{case_name}' :") + + # Extract start and end year values: + start_year = start_years[case_idx] + end_year = end_years[case_idx] + + # Create path object for the history file(s) location: + starting_location = Path(hist_locs[case_idx]) + + # Check that path actually exists: + if not starting_location.is_dir(): + emsg = "Provided 'cam_hist_loc' directory '{starting_location}' not found." + emsg += " Script is ending here." + + raise FileNotFoundError(emsg) + # End if + + # Check if history files actually exist. If not then kill script: + if not list(starting_location.glob("*" + hist_str + ".*.nc")): + emsg = f"No history *{hist_str}.*.nc files found in '{starting_location}'." + emsg += " Script is ending here." + raise FileNotFoundError(emsg) + # End if + + # Create empty list: + files_list = [] + + # Loop over start and end years: + for year in range(start_year, end_year + 1): + # Add files to main file list: + for fname in starting_location.glob( + f"*{hist_str}.*{str(year).zfill(4)}*.nc" + ): + files_list.append(fname) + # End for + # End for + + # Create ordered list of CAM history files: + hist_files = sorted(files_list) + + # Open an xarray dataset from the first model history file: + hist_file_ds = xr.open_dataset( + hist_files[0], decode_cf=False, decode_times=False + ) + + # Get a list of data variables in the 1st hist file: + hist_file_var_list = list(hist_file_ds.data_vars) + # Note: could use `open_mfdataset`, but that can become very slow; + # This approach effectively assumes that all files contain the same variables. + + # Check what kind of vertical coordinate (if any) is being used for this model run: + # ------------------------ + if height_dim in hist_file_ds: + # Extract vertical level attributes: + lev_attrs = hist_file_ds[height_dim].attrs + + # First check if there is a "vert_coord" attribute: + if "vert_coord" in lev_attrs: + vert_coord_type = lev_attrs["vert_coord"] + else: + # Next check that the "long_name" attribute exists: + if "long_name" in lev_attrs: + # Extract long name: + lev_long_name = lev_attrs["long_name"] + + # Check for "keywords" in the long name: + if "hybrid level" in lev_long_name: + # Set model to hybrid vertical levels: + vert_coord_type = "hybrid" + elif "zeta level" in lev_long_name: + # Set model to height (z) vertical levels: + vert_coord_type = "height" + else: + # Print a warning, and assume that no vertical + # level information is needed. + wmsg = "WARNING! Unable to determine the vertical coordinate" + wmsg += f" type from the {height_dim} long name, which is:\n'{lev_long_name}'." + wmsg += ( + "\nNo additional vertical coordinate information will be" + ) + wmsg += ( + f" transferred beyond the {height_dim} dimension itself." + ) + print(wmsg) + + vert_coord_type = None + # End if + else: + # Print a warning, and assume hybrid levels (for now): + wmsg = ( + f"WARNING! No long name found for the {height_dim} dimension," + ) + wmsg += " so no additional vertical coordinate information will be" + wmsg += f" transferred beyond the {height_dim} dimension itself." + print(wmsg) + + vert_coord_type = None + # End if (long name) + # End if (vert_coord) + else: + # No level dimension found, so assume there is no vertical coordinate: + vert_coord_type = None + # End if (lev, or height_dim, existence) + # ------------------------ + + # Check if time series directory exists, and if not, then create it: + # Use pathlib to create parent directories, if necessary. + Path(ts_dir[case_idx]).mkdir(parents=True, exist_ok=True) + + # INPUT NAME TEMPLATE: $CASE.$scomp.[$type.][$string.]$date[$ending] + first_file_split = str(hist_files[0]).split(".") + if first_file_split[-1] == "nc": + time_string_start = first_file_split[-2].replace("-", "") + else: + time_string_start = first_file_split[-1].replace("-", "") + last_file_split = str(hist_files[-1]).split(".") + if last_file_split[-1] == "nc": + time_string_finish = last_file_split[-2].replace("-", "") + else: + time_string_finish = last_file_split[-1].replace("-", "") + time_string = "-".join([time_string_start, time_string_finish]) + + # Loop over history variables: + list_of_commands = [] + vars_to_derive = [] + # create copy of var list that can be modified for derivable variables + if diag_var_list == ["process_all"]: + print("generating time series for all variables") + # TODO: this doesn't seem to be working for ocn... + diag_var_list = hist_file_var_list + for var in diag_var_list: + if var not in hist_file_var_list: + if component == 'ocn': + print('ocean vars seem to not be present in all files and thus cause errors') + continue + if ( + var in derive_vars.keys() + ): # TODO: dictionary implementation needs to be fixed with yaml file + constit_list = derive_vars[var] + for constit in constit_list: + if constit not in diag_var_list: + diag_var_list.append(constit) + vars_to_derive.append(var) + continue + else: + msg = f"WARNING: {var} is not in the file {hist_files[0]}." + msg += " No time series will be generated." + print(msg) + continue + + # Check if variable has a height_dim (eg, 'lev') dimension according to first file: + has_lev = bool(height_dim in hist_file_ds[var].dims) + + # Create full path name, file name template: + # $cam_case_name.$hist_str.$variable.YYYYMM-YYYYMM.nc + + ts_outfil_str = ( + ts_dir[case_idx] + + os.sep + + ".".join([case_name, hist_str, var, time_string, "nc"]) + ) + + # Check if files already exist in time series directory: + ts_file_list = glob.glob(ts_outfil_str) + + # If files exist, then check if over-writing is allowed: + if ts_file_list: + if not overwrite_ts[case_idx]: + # If not, then simply skip this variable: + continue + + # Notify user of new time series file: + print(f"\t - time series for {var}") + + # Variable list starts with just the variable + ncrcat_var_list = f"{var}" + + # Determine "ncrcat" command to generate time series file: + if "date" in hist_file_ds[var].dims: + ncrcat_var_list = ncrcat_var_list + ",date" + if "datesec" in hist_file_ds[var].dims: + ncrcat_var_list = ncrcat_var_list + ",datesec" + + if has_lev and vert_coord_type: + # For now, only add these variables if using CAM: + if "cam" in hist_str: # Could also use if "cam" in component + # PS might be in a different history file. If so, continue without error. + ncrcat_var_list = ncrcat_var_list + ",hyam,hybm,hyai,hybi" + + if "PS" in hist_file_var_list: + ncrcat_var_list = ncrcat_var_list + ",PS" + print("Adding PS to file") + else: + wmsg = "WARNING: PS not found in history file." + wmsg += " It might be needed at some point." + print(wmsg) + # End if + + if vert_coord_type == "height": + # Adding PMID here works, but significantly increases + # the storage (disk usage) requirements of the ADF. + # This can be alleviated in the future by figuring out + # a way to determine all of the regridding targets at + # the start of the ADF run, and then regridding a single + # PMID file to each one of those targets separately. -JN + if "PMID" in hist_file_var_list: + ncrcat_var_list = ncrcat_var_list + ",PMID" + print("Adding PMID to file") + else: + wmsg = "WARNING: PMID not found in history file." + wmsg += " It might be needed at some point." + print(wmsg) + # End if PMID + # End if height + # End if cam + # End if has_lev + + cmd = ( + ["ncrcat", "-O", "-4", "-h", "--no_cll_mth", "-v", ncrcat_var_list] + + hist_files + + ["-o", ts_outfil_str] + ) + + # Add to command list for use in multi-processing pool: + list_of_commands.append(cmd) + + # End variable loop + + if vars_to_derive != []: + if component == "atm": + derive_cam_variables( + vars_to_derive=vars_to_derive, ts_dir=ts_dir[case_idx] + ) + + if serial: + call_ncrcat(list_of_commands) + else: # if not serial + # Now run the "ncrcat" subprocesses in parallel: + with mp.Pool(processes=num_procs) as mpool: + _ = mpool.map(call_ncrcat, list_of_commands) + # End with + # End cases loop + + # Notify user that script has ended: + print(f" ... {component} time series file generation has finished successfully.") + + +def derive_cam_variables(vars_to_derive=None, ts_dir=None, overwrite=None): + """ + Derive variables acccording to steps given here. Since derivations will depend on the + variable, each variable to derive will need its own set of steps below. + + Caution: this method assumes that there will be one time series file per variable + + If the file for the derived variable exists, the kwarg `overwrite` determines + whether to overwrite the file (true) or exit with a warning message. + """ + + for var in vars_to_derive: + if var == "PRECT": + # PRECT can be found by simply adding PRECL and PRECC + # grab file names for the PRECL and PRECC files from the case ts directory + if glob.glob(os.path.join(ts_dir, "*PRECC*")) and glob.glob( + os.path.join(ts_dir, "*PRECL*") + ): + constit_files = sorted(glob.glob(os.path.join(ts_dir, "*PREC*"))) + else: + ermsg = ( + "PRECC and PRECL were not both present; PRECT cannot be calculated." + ) + ermsg += " Please remove PRECT from diag_var_list or find the relevant CAM files." + raise FileNotFoundError(ermsg) + # create new file name for PRECT + prect_file = constit_files[0].replace("PRECC", "PRECT") + if Path(prect_file).is_file(): + if overwrite: + Path(prect_file).unlink() + else: + print( + f"[{__name__}] Warning: PRECT file was found and overwrite is False. Will use existing file." + ) + continue + # append PRECC to the file containing PRECL + os.system(f"ncks -A -v PRECC {constit_files[0]} {constit_files[1]}") + # create new file with the sum of PRECC and PRECL + os.system(f"ncap2 -s 'PRECT=(PRECC+PRECL)' {constit_files[1]} {prect_file}") + if var == "RESTOM": + # RESTOM = FSNT-FLNT + # Have to be more precise than with PRECT because FSNTOA, FSTNC, etc are valid variables + if glob.glob(os.path.join(ts_dir, "*.FSNT.*")) and glob.glob( + os.path.join(ts_dir, "*.FLNT.*") + ): + input_files = [ + sorted(glob.glob(os.path.join(ts_dir, f"*.{v}.*"))) + for v in ["FLNT", "FSNT"] + ] + constit_files = [] + for elem in input_files: + constit_files += elem + else: + ermsg = ( + "FSNT and FLNT were not both present; RESTOM cannot be calculated." + ) + ermsg += " Please remove RESTOM from diag_var_list or find the relevant CAM files." + raise FileNotFoundError(ermsg) + # create new file name for RESTOM + derived_file = constit_files[0].replace("FLNT", "RESTOM") + if Path(derived_file).is_file(): + if overwrite: + Path(derived_file).unlink() + else: + print( + f"[{__name__}] Warning: RESTOM file was found and overwrite is False. Will use existing file." + ) + continue + # append FSNT to the file containing FLNT + os.system(f"ncks -A -v FLNT {constit_files[0]} {constit_files[1]}") + # create new file with the difference of FLNT and FSNT + os.system( + f"ncap2 -s 'RESTOM=(FSNT-FLNT)' {constit_files[1]} {derived_file}" + ) diff --git a/environments/dev-environment.yml b/environments/dev-environment.yml index 5abc2e8..632ba79 100644 --- a/environments/dev-environment.yml +++ b/environments/dev-environment.yml @@ -9,6 +9,7 @@ dependencies: - intake-esm - jinja2 - jupyter-book + - nco - pandas - papermill - pip diff --git a/examples/coupled_model/config.yml b/examples/coupled_model/config.yml index 4215ee1..d27438e 100644 --- a/examples/coupled_model/config.yml +++ b/examples/coupled_model/config.yml @@ -48,6 +48,52 @@ global_params: lc_kwargs: threads_per_worker: 1 +timeseries: + num_procs: 8 + ts_done: [False] + overwrite_ts: [False] + case_name: 'b.e23_alpha16b.BLT1850.ne30_t232.054' + + atm: + vars: ['ACTNI', 'ACTNL', 'ACTREI','ACTREL','AODDUST'] + derive_vars: [] # {'PRECT':['PRECL','PRECC'], 'RESTOM':['FLNT','FSNT']} + hist_str: 'h0' + start_years: [2] + end_years: [102] + level: 'lev' + + lnd: + vars: ['ALTMAX', 'COST_NACTIVE', 'DENIT', 'EFLX_DYNBAL'] #['process_all'] + derive_vars: [] + hist_str: 'h0' + start_years: [2] + end_years: [102] + level: 'lev' + + ocn: + vars: ['taux', 'tauy'] # ['process_all'] + derive_vars: [] + hist_str: 'h.frc' + start_years: [2] + end_years: [102] + level: 'lev' + + ice: + vars: ['hi', 'hs', 'snowfrac', 'Tsfc'] #['process_all'] + derive_vars: [] + hist_str: 'h' + start_years: [2] + end_years: [102] + level: 'lev' + + glc: + vars: ['usurf', 'topg'] #['process_all'] + derive_vars: [] + hist_str: 'initial_hist' + start_years: [2] + end_years: [102] + level: 'lev' + compute_notebooks: # This is where all the notebooks you want run and their @@ -80,8 +126,6 @@ compute_notebooks: static: 'mom6.h.static.nc' oce_cat: /glade/u/home/gmarques/libs/oce-catalogs/reference-datasets.yml - - land_comparison: parameter_groups: none: