Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature / refresh History dtype upon encountering a new field #1203

Merged
merged 6 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/basic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jobs:
pip install -r install/testing_requirements.txt
pip install -r install/misc_feature_requirements.txt

git clone --recurse-submodules -b refactor/pounders_API https://github.com/POptUS/IBCDFO.git
git clone --recurse-submodules -b develop https://github.com/POptUS/IBCDFO.git
pushd IBCDFO/minq/py/minq5/
export PYTHONPATH="$PYTHONPATH:$(pwd)"
echo "PYTHONPATH=$PYTHONPATH" >> $GITHUB_ENV
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/extra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ jobs:
sed -i -e "s/pyzmq>=22.1.0,<23.0.0/pyzmq>=23.0.0,<24.0.0/" ./balsam/setup.cfg
cd balsam; pip install -e .; cd ..

git clone --recurse-submodules -b refactor/pounders_API https://github.com/POptUS/IBCDFO.git
git clone --recurse-submodules -b develop https://github.com/POptUS/IBCDFO.git
pushd IBCDFO/minq/py/minq5/
export PYTHONPATH="$PYTHONPATH:$(pwd)"
echo "PYTHONPATH=$PYTHONPATH" >> $GITHUB_ENV
Expand Down
2 changes: 1 addition & 1 deletion libensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@
if old_spec.get("in") and old_spec.get("inputs"):
old_spec.pop("inputs") # avoid clashes
elif old_spec.get("out") and old_spec.get("outputs"):
old_spec.pop("inputs") # avoid clashes
old_spec.pop("outputs") # avoid clashes

Check warning on line 483 in libensemble/ensemble.py

View check run for this annotation

Codecov / codecov/patch

libensemble/ensemble.py#L483

Added line #L483 was not covered by tests
setattr(self, f, ClassType(**old_spec))
else: # None. attribute not set yet
setattr(self, f, ClassType(**loaded_spec))
Expand Down
24 changes: 20 additions & 4 deletions libensemble/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(
# - then convert that type into a python type in the best way known so far...
# - we need to make sure the size of string types is preserved
# - if sub-array shape, save as 3-tuple

H0_fields = []
for i in range(len(H0.dtype.names)):
dtype = H0.dtype[i]
Expand Down Expand Up @@ -110,6 +111,7 @@ def __init__(
self.using_H0 = len(H0) > 0
self.index = len(H0)
self.grow_count = 0
self.safe_mode = False

self.sim_started_count = np.sum(H["sim_started"])
self.sim_ended_count = np.sum(H["sim_ended"])
Expand All @@ -123,7 +125,15 @@ def __init__(
self.last_started = -1
self.last_ended = -1

def update_history_f(self, D: dict, safe_mode: bool, kill_canceled_sims: bool = False) -> None:
def _append_new_fields(self, H_f: npt.NDArray) -> None:
dtype_new = np.dtype(list(set(self.H.dtype.descr + H_f.dtype.descr)))
H_new = np.zeros(len(self.H), dtype=dtype_new)
old_fields = self.H.dtype.names
for field in old_fields:
H_new[field][: len(self.H)] = self.H[field]
self.H = H_new

def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None:
"""
Updates the history after points have been evaluated
"""
Expand All @@ -132,9 +142,12 @@ def update_history_f(self, D: dict, safe_mode: bool, kill_canceled_sims: bool =
returned_H = D["calc_out"]
fields = returned_H.dtype.names if returned_H is not None else []

if returned_H is not None and any([field not in self.H.dtype.names for field in returned_H.dtype.names]):
self._append_new_fields(returned_H)

for j, ind in enumerate(new_inds):
for field in fields:
if safe_mode:
if self.safe_mode:
assert field not in protected_libE_fields, "The field '" + field + "' is protected"
if np.isscalar(returned_H[field][j]) or returned_H.dtype[field].hasobject:
self.H[field][ind] = returned_H[field][j]
Expand Down Expand Up @@ -206,7 +219,7 @@ def update_history_to_gen(self, q_inds: npt.NDArray):
self.H["gen_informed_time"][q_inds] = t
self.gen_informed_count += len(q_inds)

def update_history_x_in(self, gen_worker: int, D: npt.NDArray, safe_mode: bool, gen_started_time: int) -> None:
def update_history_x_in(self, gen_worker: int, D: npt.NDArray, gen_started_time: int) -> None:
"""
Updates the history (in place) when new points have been returned from a gen

Expand All @@ -221,6 +234,9 @@ def update_history_x_in(self, gen_worker: int, D: npt.NDArray, safe_mode: bool,
if len(D) == 0:
return

if any([field not in self.H.dtype.names for field in D.dtype.names]):
self._append_new_fields(D)

t = time.time()
rows_remaining = len(self.H) - self.index

Expand Down Expand Up @@ -251,7 +267,7 @@ def update_history_x_in(self, gen_worker: int, D: npt.NDArray, safe_mode: bool,
update_inds = D["sim_id"]

for field in D.dtype.names:
if safe_mode:
if self.safe_mode:
assert field not in protected_libE_fields, "The field '" + field + "' is protected"
self.H[field][update_inds] = D[field]

Expand Down
2 changes: 1 addition & 1 deletion libensemble/libE.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def manager(
persis_info,
alloc_specs,
libE_specs,
hist: np.ndarray,
hist: History,
on_abort: Callable = None,
on_cleanup: Callable = None,
):
Expand Down
13 changes: 7 additions & 6 deletions libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def report_worker_exc(wrk_exc: Exception = None) -> None:


def manager_main(
hist: npt.NDArray,
hist,
libE_specs: dict,
alloc_specs: dict,
sim_specs: dict,
Expand Down Expand Up @@ -168,7 +168,7 @@ class Manager:

def __init__(
self,
hist: npt.NDArray,
hist,
libE_specs: dict,
alloc_specs: dict,
sim_specs: dict,
Expand All @@ -183,6 +183,7 @@ def __init__(
self.safe_mode = libE_specs.get("safe_mode")
self.kill_canceled_sims = libE_specs.get("kill_canceled_sims")
self.hist = hist
self.hist.safe_mode = self.safe_mode
self.libE_specs = libE_specs
self.alloc_specs = alloc_specs
self.sim_specs = sim_specs
Expand Down Expand Up @@ -452,9 +453,9 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) -
final_data = D_recv.get("calc_out", None)
if isinstance(final_data, np.ndarray):
if calc_status is FINISHED_PERSISTENT_GEN_TAG and self.libE_specs.get("use_persis_return_gen", False):
self.hist.update_history_x_in(w, final_data, self.safe_mode, self.W[w - 1]["gen_started_time"])
self.hist.update_history_x_in(w, final_data, self.W[w - 1]["gen_started_time"])
elif calc_status is FINISHED_PERSISTENT_SIM_TAG and self.libE_specs.get("use_persis_return_sim", False):
self.hist.update_history_f(D_recv, self.safe_mode, self.kill_canceled_sims)
self.hist.update_history_f(D_recv, self.kill_canceled_sims)
else:
logger.info(_PERSIS_RETURN_WARNING)
self.W[w - 1]["persis_state"] = 0
Expand All @@ -467,9 +468,9 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) -
self._freeup_resources(w)
else:
if calc_type == EVAL_SIM_TAG:
self.hist.update_history_f(D_recv, self.safe_mode, self.kill_canceled_sims)
self.hist.update_history_f(D_recv, self.kill_canceled_sims)
if calc_type == EVAL_GEN_TAG:
self.hist.update_history_x_in(w, D_recv["calc_out"], self.safe_mode, self.W[w - 1]["gen_started_time"])
self.hist.update_history_x_in(w, D_recv["calc_out"], self.W[w - 1]["gen_started_time"])
assert (
len(D_recv["calc_out"]) or np.any(self.W["active"]) or self.W[w - 1]["persis_state"]
), "Gen must return work when is is the only thing active and not persistent."
Expand Down
61 changes: 61 additions & 0 deletions libensemble/tests/functionality_tests/test_new_field.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Runs libEnsemble with Latin hypercube sampling on a simple 1D problem

Execute via one of the following commands (e.g. 3 workers):
mpiexec -np 4 python test_1d_sampling.py
python test_1d_sampling.py --nworkers 3 --comms local
python test_1d_sampling.py --nworkers 3 --comms tcp

The number of concurrent evaluations of the objective function will be 4-1=3.
"""

# Do not change these lines - they are parsed by run-tests.sh
# TESTSUITE_COMMS: mpi local
# TESTSUITE_NPROCS: 2 4

import numpy as np

from libensemble.gen_funcs.sampling import latin_hypercube_sample as gen_f

# Import libEnsemble items for this test
from libensemble.libE import libE
from libensemble.tools import add_unique_random_streams, parse_args, save_libE_output


def sim_f(In):
Out = np.zeros(1, dtype=[("f", float), ("N", int)])
Out["f"] = np.linalg.norm(In)
Out["N"] = 123
return Out


if __name__ == "__main__":
nworkers, is_manager, libE_specs, _ = parse_args()

sim_specs = {
"sim_f": sim_f,
"in": ["x"],
"out": [("f", float)],
}

gen_specs = {
"gen_f": gen_f,
"out": [("x", float, (1,))],
"user": {
"gen_batch_size": 500,
"lb": np.array([-3]),
"ub": np.array([3]),
},
}

persis_info = add_unique_random_streams({}, nworkers + 1, seed=1234)

exit_criteria = {"gen_max": 501}

H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs)

if is_manager:
assert len(H) >= 501
assert "N" in H.dtype.names, "New datatype not added to history"
print("\nlibEnsemble with random sampling has generated enough points")
save_libE_output(H, persis_info, __file__, nworkers)
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@
gen_specs = {
"gen_f": gen_f,
"persis_in": ["f", "x", "corner_id", "sim_id"],
"out": [("sim_id", int), ("corner_id", int), ("x", float, (n,)), ("f_est", float)],
"out": [
("sim_id", int),
("corner_id", int),
("x", float, (n,)),
], # expect ("f_est", float) from gen - test ability for gen to send back "unexpected" field.
"user": {
"initial_batch_size": 20,
"lb": np.array([-3, -2, -1]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,6 @@ def combine_component(x):
# assert np.all(grad[l <= 1e-7] <= 0)

# if not np.all(grad[np.logical_and(u >= 1e-7, l >= 1e-7)] <= 1e-5):
# import ipdb

# ipdb.set_trace()
# else:
# d = np.linalg.solve(np.dot(J.T, J), np.dot(J.T, F))
# assert np.linalg.norm(d) <= 1e-5
1 change: 1 addition & 0 deletions libensemble/tests/unit_tests/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def hist_setup2(sim_max=10, H0_in=[]):
alloc_specs = specs_dump(AllocSpecs())
H0 = H0_in
hist = History(alloc_specs, sim_specs, gen_specs, exit_criteria, H0)
hist.safe_mode = True
return hist, sim_specs, gen_specs, exit_criteria, alloc_specs


Expand Down
32 changes: 15 additions & 17 deletions libensemble/tests/unit_tests/test_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@

exp_x_in_setup2["x"] = x

safe_mode = True


def isclose(a, b, rel_tol=1e-09, abs_tol=0.0):
return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
Expand Down Expand Up @@ -149,7 +147,7 @@ def test_update_history_x_in_Oempty():
hist, sim_specs, gen_specs, _, _ = setup.hist_setup2()
H_o = np.zeros(0, dtype=gen_specs["out"])
gen_worker = 1
hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)

compare_hists(hist.H, wrs2)

Expand All @@ -173,7 +171,7 @@ def test_update_history_x_in():
H_o = np.zeros(size, dtype=gen_specs["out"])
H_o["x"] = single_rand

hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
assert isclose(single_rand, hist.H["x"][0])
assert hist.sim_started_count == 0
assert hist.index == 1
Expand All @@ -185,7 +183,7 @@ def test_update_history_x_in():
H_o = np.zeros(size, dtype=gen_specs["out"])
H_o["x"] = gen_specs["gen_f"](size=size)

hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
# Compare by column
exp_x = exp_x_in_setup2[: size + 1]

Expand All @@ -202,7 +200,7 @@ def test_update_history_x_in():
H_o = np.zeros(size, dtype=gen_specs["out"])
H_o["x"] = gen_specs["gen_f"](size=size)

hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
# Compare by column
exp_x = exp_x_in_setup2

Expand All @@ -215,7 +213,7 @@ def test_update_history_x_in():
# Test libE errors when a protected field appears in output from a gen_worker
H_o = np.zeros(size, dtype=gen_specs["out"] + [("sim_started", bool)])
try:
hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
except AssertionError:
assert 1, "Failed like it should have"
else:
Expand All @@ -224,7 +222,7 @@ def test_update_history_x_in():
# Test libE errors when a protected field appears in output from a gen_worker
H_o = np.zeros(size, dtype=gen_specs["out"] + [("sim_started", bool)])
try:
hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
except AssertionError:
assert 1, "Failed like it should have"
else:
Expand All @@ -246,7 +244,7 @@ def test_update_history_x_in_sim_ids():
H_o["x"] = single_rand
H_o["sim_id"] = 0

hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
assert isclose(single_rand, hist.H["x"][0])
assert hist.sim_started_count == 0
assert hist.index == 1
Expand All @@ -258,7 +256,7 @@ def test_update_history_x_in_sim_ids():
H_o = np.zeros(size, dtype=gen_specs["out"])
H_o["x"] = gen_specs["gen_f"](size=size)
H_o["sim_id"] = range(1, 7)
hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)

# Compare by column
exp_x = exp_x_in_setup2[: size + 1]
Expand All @@ -277,7 +275,7 @@ def test_update_history_x_in_sim_ids():
H_o["x"] = gen_specs["gen_f"](size=size)
H_o["sim_id"] = range(7, 10)

hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
# Compare by column
exp_x = exp_x_in_setup2

Expand Down Expand Up @@ -357,7 +355,7 @@ def test_update_history_f():
"calc_type": 2,
}

hist.update_history_f(D_recv, safe_mode)
hist.update_history_f(D_recv)
assert isclose(exp_vals[0], hist.H["g"][0])
assert np.all(hist.H["sim_ended"][0:1])
assert np.all(~hist.H["sim_ended"][1:10]) # Check the rest
Expand All @@ -383,7 +381,7 @@ def test_update_history_f():
"calc_type": 2,
}

hist.update_history_f(D_recv, safe_mode)
hist.update_history_f(D_recv)
assert np.allclose(exp_vals, hist.H["g"])
assert np.all(hist.H["sim_ended"][0:3])
assert np.all(~hist.H["sim_ended"][3:10]) # Check the rest
Expand All @@ -405,7 +403,7 @@ def test_update_history_f():
"calc_type": 2,
}

hist.update_history_f(D_recv, safe_mode)
hist.update_history_f(D_recv)
assert np.all(hist.H["sim_ended"][0:1])
assert np.all(~hist.H["sim_ended"][3:10]) # Check the rest
assert hist.sim_ended_count == 2
Expand Down Expand Up @@ -435,7 +433,7 @@ def test_update_history_f_vec():
"calc_type": 2,
}

hist.update_history_f(D_recv, safe_mode)
hist.update_history_f(D_recv)

assert isclose(exp_fs[0], hist.H["f"][0])
assert np.allclose(exp_fvecs[0], hist.H["fvec"][0])
Expand Down Expand Up @@ -470,7 +468,7 @@ def test_update_history_f_vec():
"calc_type": 2,
}

hist.update_history_f(D_recv, safe_mode)
hist.update_history_f(D_recv)

assert np.allclose(exp_fs, hist.H["f"])
assert np.allclose(exp_fvecs, hist.H["fvec"])
Expand Down Expand Up @@ -507,7 +505,7 @@ def test_update_history_f_vec():
"calc_type": 2,
}

hist.update_history_f(D_recv, safe_mode)
hist.update_history_f(D_recv)

assert np.allclose(exp_fs, hist.H["f"])
assert np.allclose(exp_fvecs, hist.H["fvec"])
Expand Down
Loading