Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TCM / FileDB improvements #120

Merged
merged 7 commits into from
Mar 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dataflow-config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# legend_metadata_version: main
allow_none_par: false
build_file_dbs: true
check_log_files: true

paths:
sandbox_path: $_/sandbox
Expand Down
11 changes: 7 additions & 4 deletions workflow/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,16 @@ onsuccess:
auto_report(workflow.persistence.dag, report_plugin, report_settings)
)

with (rep_dir / "dag.txt").open("w") as f:
f.writelines(str(workflow.persistence.dag))
# FIXME: broken with Snakemake>=9
# with (rep_dir / "dag.txt").open("w") as f:
# if workflow.persistence.dag is not None:
# f.writelines(str(workflow.persistence.dag))

with (rep_dir / "rulegraph.txt").open("w") as f:
f.writelines(str(workflow.persistence.dag.rule_dot()))
if workflow.persistence.dag is not None:
f.writelines(str(workflow.persistence.dag.rule_dot()))

# remove .gen files
# remove .gen files
files = glob.glob("*.gen")
for file in files:
if os.path.isfile(file):
Expand Down
27 changes: 20 additions & 7 deletions workflow/rules/filelist_gen.smk
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,30 @@ def get_analysis_runs(
"""
ignore_keys = []
analysis_runs = {}

if ignore_keys_file is not None:
if Path(ignore_keys_file).is_file():
if Path(ignore_keys_file).suffix == ".json":
with Path(ignore_keys_file).open() as f:
ignore_keys = json.load(f)

elif Path(ignore_keys_file).suffix == ".keylist":
with Path(ignore_keys_file).open() as f:
ignore_keys = f.read().splitlines()
ignore_keys = [ # remove any comments in the keylist
key.split("#")[0].strip() if "#" in key else key.strip()
for key in ignore_keys
]

elif Path(ignore_keys_file).suffix in (".yaml", ".yml"):
with Path(ignore_keys_file).open() as f:
ignore_keys = yaml.safe_load(f)

else:
raise ValueError(
"ignore_keys_file file not in json, yaml or keylist format"
)
ignore_keys = [ # remove any comments in the keylist
key.split("#")[0].strip() if "#" in key else key.strip()
for key in ignore_keys
]

else:
msg = f"no ignore_keys file found: {ignore_keys_file}"
raise ValueError(msg)
Expand All @@ -80,6 +85,7 @@ def get_analysis_runs(
else:
msg = f"no analysis_runs file found: {analysis_runs_file}"
raise ValueError(msg)

return analysis_runs, ignore_keys


Expand Down Expand Up @@ -157,15 +163,22 @@ def build_filelist(
and tier. It will ignore any keys in the ignore_keys list and only include
the keys specified in the analysis_runs dict.
"""
fn_pattern = get_pattern(config, tier)

if ignore_keys is None:
# the ignore_keys dictionary organizes keys in sections, gather all the
# section contents in a single list
if ignore_keys is not None:
_ignore_keys = []
for item in ignore_keys.values():
_ignore_keys += item
ignore_keys = _ignore_keys
else:
ignore_keys = []

if analysis_runs is None:
analysis_runs = {}

phy_filenames = []
other_filenames = []
fn_pattern = get_pattern(config, tier)

for key in filekeys:
if Path(search_pattern).suffix == ".*":
Expand Down
92 changes: 48 additions & 44 deletions workflow/src/legenddataflow/scripts/complete_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,70 +244,74 @@ def build_file_dbs(gen_tier_path, outdir):
print(f"INFO: ...took {dt}")


file_db_config = {}

if (
os.getenv("PRODENV") is not None
and os.getenv("PRODENV") in snakemake.params.filedb_path
):
prodenv = as_ro(os.getenv("PRODENV"))
def fformat(tier):
abs_path = patterns.get_pattern_tier(
snakemake.params.setup, tier, check_in_cycle=False
)
return str(abs_path).replace(ut.get_tier_path(snakemake.params.setup, tier), "")

def tdirs(tier):
return as_ro(ut.get_tier_path(snakemake.params.setup, tier)).replace(
prodenv, ""
)

file_db_config["data_dir"] = "$PRODENV"
if snakemake.params.setup.get("build_file_dbs", True):
file_db_config = {}

else:
print("WARNING: $PRODENV not set, the FileDB will not be relocatable")
if (
os.getenv("PRODENV") is not None
and os.getenv("PRODENV") in snakemake.params.filedb_path
):
prodenv = as_ro(os.getenv("PRODENV"))

def tdirs(tier):
return as_ro(ut.get_tier_path(snakemake.params.setup, tier))
def tdirs(tier):
return as_ro(ut.get_tier_path(snakemake.params.setup, tier)).replace(
prodenv, ""
)

file_db_config["data_dir"] = "/"
file_db_config["data_dir"] = "$PRODENV"

else:
print("WARNING: $PRODENV not set, the FileDB will not be relocatable")

file_db_config["tier_dirs"] = {
k: tdirs(k) for k in snakemake.params.setup["table_format"]
}
def tdirs(tier):
return as_ro(ut.get_tier_path(snakemake.params.setup, tier))

file_db_config["data_dir"] = "/"

def fformat(tier):
abs_path = patterns.get_pattern_tier(
snakemake.params.setup, tier, check_in_cycle=False
)
return str(abs_path).replace(ut.get_tier_path(snakemake.params.setup, tier), "")

file_db_config["tier_dirs"] = {
k: tdirs(k) for k in snakemake.params.setup["table_format"]
}

file_db_config |= {
"file_format": {k: fformat(k) for k in snakemake.params.setup["table_format"]},
"table_format": snakemake.params.setup["table_format"],
}
file_db_config |= {
"file_format": {k: fformat(k) for k in snakemake.params.setup["table_format"]},
"table_format": snakemake.params.setup["table_format"],
}

if snakemake.wildcards.tier != "daq":
print(f"INFO: ...building FileDBs with {snakemake.threads} threads")
if snakemake.params.setup.get("build_file_dbs", True):
print(f"INFO: ...building FileDBs with {snakemake.threads} threads")

Path(snakemake.params.filedb_path).mkdir(parents=True, exist_ok=True)
Path(snakemake.params.filedb_path).mkdir(parents=True, exist_ok=True)

with (Path(snakemake.params.filedb_path) / "file_db_config.json").open("w") as f:
json.dump(file_db_config, f, indent=2)
with (Path(snakemake.params.filedb_path) / "file_db_config.json").open(
"w"
) as f:
json.dump(file_db_config, f, indent=2)

build_file_dbs(ut.tier_path(snakemake.params.setup), snakemake.params.filedb_path)
(Path(snakemake.params.filedb_path) / "file_db_config.json").unlink()
build_file_dbs(
ut.tier_path(snakemake.params.setup), snakemake.params.filedb_path
)
(Path(snakemake.params.filedb_path) / "file_db_config.json").unlink()

build_valid_keys(
Path(ut.tmp_par_path(snakemake.params.setup)) / "*_db.json",
snakemake.params.valid_keys_path,
)

print("INFO: ...checking log files")

check_log_files(
ut.tmp_log_path(snakemake.params.setup),
snakemake.output.summary_log,
snakemake.output.gen_output,
warning_file=snakemake.output.warning_log,
)
if snakemake.params.setup.get("check_log_files", True):
print("INFO: ...checking log files")
check_log_files(
ut.tmp_log_path(snakemake.params.setup),
snakemake.output.summary_log,
snakemake.output.gen_output,
warning_file=snakemake.output.warning_log,
)

Path(snakemake.output.gen_output).touch()
4 changes: 2 additions & 2 deletions workflow/src/legenddataflow/scripts/filedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def build_filedb() -> None:
break

if (loc_timestamps == default).all() or not found:
msg = "something went wrong! no valid first timestamp found"
msg = "something went wrong! no valid first timestamp found. Likely: the file is empty"
raise RuntimeError(msg)

timestamps[i] = np.min(loc_timestamps)
Expand All @@ -80,7 +80,7 @@ def build_filedb() -> None:
log.info(msg)

if timestamps[i] < 0 or timestamps[i] > 4102444800:
msg = "something went wrong! timestamp does not make sense"
msg = f"something went wrong! timestamp {timestamps[i]} does not make sense"
raise RuntimeError(msg)

fdb.df["first_timestamp"] = timestamps
Expand Down
9 changes: 8 additions & 1 deletion workflow/src/legenddataflow/scripts/tier/tcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@ def build_tier_tcm() -> None:
configs = TextDB(args.configs, lazy=True).on(args.timestamp, system=args.datatype)
config_dict = configs["snakemake_rules"]["tier_tcm"]

build_log(config_dict, args.log)
log = build_log(config_dict, args.log)

settings = Props.read_from(config_dict["inputs"]["config"])

# get the list of channels by fcid
ch_list = lh5.ls(args.input, "/ch*")

if len(ch_list) == 0:
msg = "no tables matching /ch* found in input file"
raise RuntimeError(msg)

log.debug(ch_list)

fcid_channels = {}
for ch in ch_list:
key = int(ch[2:])
Expand Down
Loading