Skip to content

Commit

Permalink
run fix-style
Browse files Browse the repository at this point in the history
  • Loading branch information
bgunnar5 committed May 8, 2024
1 parent 9fd8c89 commit 4d24195
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 91 deletions.
25 changes: 15 additions & 10 deletions merlin/study/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,9 @@ def format_status_for_csv(self) -> Dict:

# Loop through information for each step
for step_info_key, step_info_value in overall_step_info.items():
# Skip the workers entry at the top level; this will be added in the else statement below on a task-by-task basis
if step_info_key == "workers" or step_info_key == "worker_name":
# Skip the workers entry at the top level;
# this will be added in the else statement below on a task-by-task basis
if step_info_key in ("workers", "worker_name"):
continue
# Format task queue entry
if step_info_key == "task_queue":
Expand Down Expand Up @@ -834,13 +835,15 @@ def apply_filters(self):
filtered_statuses = {}
for step_name, overall_step_info in self.requested_statuses.items():
filtered_statuses[step_name] = {}
# Add the non-workspace keys to the filtered_status dict so we don't accidentally miss any of this information while filtering
# Add the non-workspace keys to the filtered_status dict so we
# don't accidentally miss any of this information while filtering
for non_ws_key in NON_WORKSPACE_KEYS:
try:
filtered_statuses[step_name][non_ws_key] = overall_step_info[non_ws_key]
except KeyError:
LOG.debug(
f"Tried to add {non_ws_key} to filtered_statuses dict but it was not found in requested_statuses[{step_name}]"
f"Tried to add {non_ws_key} to filtered_statuses dict "
f"but it was not found in requested_statuses[{step_name}]"
)

# Go through the actual statuses and filter them as necessary
Expand Down Expand Up @@ -1100,7 +1103,8 @@ def display(self, test_mode: Optional[bool] = False):
LOG.warning("No statuses to display.")


def status_conflict_handler(*args, **kwargs) -> Any:
# Pylint complains that args is unused but we can ignore that
def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613
"""
The conflict handler function to apply to any status entries that have conflicting
values while merging two status files together.
Expand Down Expand Up @@ -1144,7 +1148,6 @@ def status_conflict_handler(*args, **kwargs) -> Any:
# - once it's more modular, move the below code and the above merge_rules dict to a property in
# one of the new status classes (the one that has condensing maybe? or upstream from that?)


# params = self.spec.get_parameters()
# for token in params.parameters:
# merge_rules[token] = "use-initial-and-log-warning"
Expand All @@ -1167,8 +1170,8 @@ def status_conflict_handler(*args, **kwargs) -> Any:
merge_val = f"{dict_a_val}, {dict_b_val}"
elif merge_rule == "use-initial-and-log-warning":
LOG.warning(
f"Conflict at key '{key}' while merging status files. Defaulting to initial value. " \
"This could lead to incorrect status information, you may want to re-run in debug mode and " \
f"Conflict at key '{key}' while merging status files. Defaulting to initial value. "
"This could lead to incorrect status information, you may want to re-run in debug mode and "
"check the files in the output directory for this task."
)
merge_val = dict_a_val
Expand Down Expand Up @@ -1230,7 +1233,8 @@ def read_status(
# Catch all exceptions so that we don't crash the workers
except Exception as exc: # pylint: disable=broad-except
LOG.warning(
f"An exception was raised while trying to read status from '{status_filepath}'!\n{print_exception(type(exc), exc, exc.__traceback__)}"
f"An exception was raised while trying to read status from '{status_filepath}'!\n"
f"{print_exception(type(exc), exc, exc.__traceback__)}"
)
if raise_errors:
raise exc
Expand All @@ -1257,5 +1261,6 @@ def write_status(status_to_write: Dict, status_filepath: str, lock_file: str, ti
# Catch all exceptions so that we don't crash the workers
except Exception as exc: # pylint: disable=broad-except
LOG.warning(
f"An exception was raised while trying to write status to '{status_filepath}'!\n{print_exception(type(exc), exc, exc.__traceback__)}"
f"An exception was raised while trying to write status to '{status_filepath}'!\n"
f"{print_exception(type(exc), exc, exc.__traceback__)}"
)
57 changes: 43 additions & 14 deletions merlin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import psutil
import yaml


try:
import cPickle as pickle
except ImportError:
Expand Down Expand Up @@ -556,23 +557,13 @@ def needs_merlin_expansion(
return False


def dict_deep_merge(dict_a: dict, dict_b: dict, path: str = None, conflict_handler: Callable = None):
def _both_inputs_are_dicts(dict_a: dict, dict_b: dict):
"""
This function recursively merges dict_b into dict_a. The built-in
merge of dictionaries in python (dict(dict_a) | dict(dict_b)) does not do a
deep merge so this function is necessary. This will only merge in new keys,
it will NOT update existing ones, unless you specify a conflict handler function.
Credit to this stack overflow post: https://stackoverflow.com/a/7205107.
Helper function for `dict_deep_merge` to ensure both entries are dictionaries.
:param `dict_a`: A dict that we'll merge dict_b into
:param `dict_b`: A dict that we want to merge into dict_a
:param `path`: The path down the dictionary tree that we're currently at
:param `conflict_handler`: An optional function to handle conflicts between values at the same key.
The function should return the value to be used in the merged dictionary.
The default behavior without this argument is to log a warning.
"""

# Check to make sure we have valid dict_a and dict_b input
dict_a_is_dict = isinstance(dict_a, dict)
dict_b_is_dict = isinstance(dict_b, dict)
if not dict_a_is_dict or not dict_b_is_dict:
Expand All @@ -584,21 +575,59 @@ def dict_deep_merge(dict_a: dict, dict_b: dict, path: str = None, conflict_handl
problem_str = f"dict_b '{dict_b}' is not a dictionary"

LOG.warning(f"Problem with dict_deep_merge: {problem_str}. Ignoring this merge call.")
return False
return True


def dict_deep_merge(dict_a: dict, dict_b: dict, path: str = None, conflict_handler: Callable = None):
"""
This function recursively merges dict_b into dict_a. The built-in
merge of dictionaries in python (dict(dict_a) | dict(dict_b)) does not do a
deep merge so this function is necessary. This will only merge in new keys,
it will NOT update existing ones, unless you specify a conflict handler function.
Credit to this stack overflow post: https://stackoverflow.com/a/7205107.
:param `dict_a`: A dict that we'll merge dict_b into
:param `dict_b`: A dict that we want to merge into dict_a
:param `path`: The path down the dictionary tree that we're currently at
:param `conflict_handler`: An optional function to handle conflicts between values at the same key.
The function should return the value to be used in the merged dictionary.
The default behavior without this argument is to log a warning.
"""

# Check to make sure we have valid dict_a and dict_b input
if not _both_inputs_are_dicts(dict_a, dict_b):
return

# # Check to make sure we have valid dict_a and dict_b input
# dict_a_is_dict = isinstance(dict_a, dict)
# dict_b_is_dict = isinstance(dict_b, dict)
# if not dict_a_is_dict or not dict_b_is_dict:
# if not dict_a_is_dict and not dict_b_is_dict:
# problem_str = f"both dict_a '{dict_a}' and dict_b '{dict_b}' are not dictionaries"
# elif not dict_a_is_dict:
# problem_str = f"dict_a '{dict_a}' is not a dictionary"
# elif not dict_b_is_dict:
# problem_str = f"dict_b '{dict_b}' is not a dictionary"

# LOG.warning(f"Problem with dict_deep_merge: {problem_str}. Ignoring this merge call.")
# return

if path is None:
path = []
for key in dict_b:
if key in dict_a:
if isinstance(dict_a[key], dict) and isinstance(dict_b[key], dict):
dict_deep_merge(dict_a[key], dict_b[key], path=path+[str(key)], conflict_handler=conflict_handler)
dict_deep_merge(dict_a[key], dict_b[key], path=path + [str(key)], conflict_handler=conflict_handler)
elif isinstance(dict_a[key], list) and isinstance(dict_a[key], list):
dict_a[key] += dict_b[key]
elif dict_a[key] == dict_b[key]:
pass # same leaf value
else:
if conflict_handler is not None:
merged_val = conflict_handler(dict_a_val=dict_a[key], dict_b_val=dict_b[key], key=key, path=path+[str(key)])
merged_val = conflict_handler(
dict_a_val=dict_a[key], dict_b_val=dict_b[key], key=key, path=path + [str(key)]
)
dict_a[key] = merged_val
else:
# Want to just output a warning instead of raising an exception so that the workflow doesn't crash
Expand Down
6 changes: 1 addition & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,7 @@
# Loading in Module Specific Fixtures #
#######################################
pytest_plugins = [
fixture_file.replace("/", ".").replace(".py", "")
for fixture_file in glob(
"tests/fixtures/[!__]*.py",
recursive=True
)
fixture_file.replace("/", ".").replace(".py", "") for fixture_file in glob("tests/fixtures/[!__]*.py", recursive=True)
]


Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
def example_test_data():
return {"key": "val"}
```
"""
"""
18 changes: 11 additions & 7 deletions tests/fixtures/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,29 @@
Fixtures specifically for help testing the functionality related to
status/detailed-status.
"""

import os
import pytest
from pathlib import Path

import pytest


@pytest.fixture(scope="class")
def status_testing_dir(temp_output_dir: str) -> str:
"""
A pytest fixture to set up a temporary directory to write files to for testing status.
:param temp_output_dir: The path to the temporary output directory we'll be using for this test run
"""
status_testing_dir = f"{temp_output_dir}/status_testing/"
if not os.path.exists(status_testing_dir):
os.mkdir(status_testing_dir)
testing_dir = f"{temp_output_dir}/status_testing/"
if not os.path.exists(testing_dir):
os.mkdir(testing_dir)

return testing_dir

return status_testing_dir

@pytest.fixture(scope="class")
def status_empty_file(status_testing_dir: str) -> str:
def status_empty_file(status_testing_dir: str) -> str: # pylint: disable=W0621
"""
A pytest fixture to create an empty status file.
Expand All @@ -30,4 +34,4 @@ def status_empty_file(status_testing_dir: str) -> str:
if not empty_file.exists():
empty_file.touch()

return empty_file
return empty_file
Loading

0 comments on commit 4d24195

Please sign in to comment.