Skip to content

Commit

Permalink
Feature / refresh History dtype upon encountering a new field (#1203)
Browse files Browse the repository at this point in the history
* initial commit, first approach at adding new fields to history array if an unexpected one is returned from a user function

* adjust ibcdfo branch

* slight refactoring of passing safe_mode attribute around - its an immmutable attribute usable by both the manager and history throughout runtime

* can only check if its appropiate to append new fields if we actually got a returned_H back

* gen can send back "unexpected" fields too - test this with adjust to test_persistent_uniform_sampling_running_mean.py

* from-file bugfix
  • Loading branch information
jlnav authored Jan 3, 2024
1 parent ae0c35a commit 0190c0d
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 33 deletions.
2 changes: 1 addition & 1 deletion libensemble/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def _parameterize(self, loaded):
if old_spec.get("in") and old_spec.get("inputs"):
old_spec.pop("inputs") # avoid clashes
elif old_spec.get("out") and old_spec.get("outputs"):
old_spec.pop("inputs") # avoid clashes
old_spec.pop("outputs") # avoid clashes
setattr(self, f, ClassType(**old_spec))
else: # None. attribute not set yet
setattr(self, f, ClassType(**loaded_spec))
Expand Down
24 changes: 20 additions & 4 deletions libensemble/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(
# - then convert that type into a python type in the best way known so far...
# - we need to make sure the size of string types is preserved
# - if sub-array shape, save as 3-tuple

H0_fields = []
for i in range(len(H0.dtype.names)):
dtype = H0.dtype[i]
Expand Down Expand Up @@ -110,6 +111,7 @@ def __init__(
self.using_H0 = len(H0) > 0
self.index = len(H0)
self.grow_count = 0
self.safe_mode = False

self.sim_started_count = np.sum(H["sim_started"])
self.sim_ended_count = np.sum(H["sim_ended"])
Expand All @@ -123,7 +125,15 @@ def __init__(
self.last_started = -1
self.last_ended = -1

def update_history_f(self, D: dict, safe_mode: bool, kill_canceled_sims: bool = False) -> None:
def _append_new_fields(self, H_f: npt.NDArray) -> None:
dtype_new = np.dtype(list(set(self.H.dtype.descr + H_f.dtype.descr)))
H_new = np.zeros(len(self.H), dtype=dtype_new)
old_fields = self.H.dtype.names
for field in old_fields:
H_new[field][: len(self.H)] = self.H[field]
self.H = H_new

def update_history_f(self, D: dict, kill_canceled_sims: bool = False) -> None:
"""
Updates the history after points have been evaluated
"""
Expand All @@ -132,9 +142,12 @@ def update_history_f(self, D: dict, safe_mode: bool, kill_canceled_sims: bool =
returned_H = D["calc_out"]
fields = returned_H.dtype.names if returned_H is not None else []

if returned_H is not None and any([field not in self.H.dtype.names for field in returned_H.dtype.names]):
self._append_new_fields(returned_H)

for j, ind in enumerate(new_inds):
for field in fields:
if safe_mode:
if self.safe_mode:
assert field not in protected_libE_fields, "The field '" + field + "' is protected"
if np.isscalar(returned_H[field][j]) or returned_H.dtype[field].hasobject:
self.H[field][ind] = returned_H[field][j]
Expand Down Expand Up @@ -206,7 +219,7 @@ def update_history_to_gen(self, q_inds: npt.NDArray):
self.H["gen_informed_time"][q_inds] = t
self.gen_informed_count += len(q_inds)

def update_history_x_in(self, gen_worker: int, D: npt.NDArray, safe_mode: bool, gen_started_time: int) -> None:
def update_history_x_in(self, gen_worker: int, D: npt.NDArray, gen_started_time: int) -> None:
"""
Updates the history (in place) when new points have been returned from a gen
Expand All @@ -221,6 +234,9 @@ def update_history_x_in(self, gen_worker: int, D: npt.NDArray, safe_mode: bool,
if len(D) == 0:
return

if any([field not in self.H.dtype.names for field in D.dtype.names]):
self._append_new_fields(D)

t = time.time()
rows_remaining = len(self.H) - self.index

Expand Down Expand Up @@ -251,7 +267,7 @@ def update_history_x_in(self, gen_worker: int, D: npt.NDArray, safe_mode: bool,
update_inds = D["sim_id"]

for field in D.dtype.names:
if safe_mode:
if self.safe_mode:
assert field not in protected_libE_fields, "The field '" + field + "' is protected"
self.H[field][update_inds] = D[field]

Expand Down
2 changes: 1 addition & 1 deletion libensemble/libE.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def manager(
persis_info,
alloc_specs,
libE_specs,
hist: np.ndarray,
hist: History,
on_abort: Callable = None,
on_cleanup: Callable = None,
):
Expand Down
13 changes: 7 additions & 6 deletions libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def report_worker_exc(wrk_exc: Exception = None) -> None:


def manager_main(
hist: npt.NDArray,
hist,
libE_specs: dict,
alloc_specs: dict,
sim_specs: dict,
Expand Down Expand Up @@ -168,7 +168,7 @@ class Manager:

def __init__(
self,
hist: npt.NDArray,
hist,
libE_specs: dict,
alloc_specs: dict,
sim_specs: dict,
Expand All @@ -183,6 +183,7 @@ def __init__(
self.safe_mode = libE_specs.get("safe_mode")
self.kill_canceled_sims = libE_specs.get("kill_canceled_sims")
self.hist = hist
self.hist.safe_mode = self.safe_mode
self.libE_specs = libE_specs
self.alloc_specs = alloc_specs
self.sim_specs = sim_specs
Expand Down Expand Up @@ -452,9 +453,9 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) -
final_data = D_recv.get("calc_out", None)
if isinstance(final_data, np.ndarray):
if calc_status is FINISHED_PERSISTENT_GEN_TAG and self.libE_specs.get("use_persis_return_gen", False):
self.hist.update_history_x_in(w, final_data, self.safe_mode, self.W[w - 1]["gen_started_time"])
self.hist.update_history_x_in(w, final_data, self.W[w - 1]["gen_started_time"])
elif calc_status is FINISHED_PERSISTENT_SIM_TAG and self.libE_specs.get("use_persis_return_sim", False):
self.hist.update_history_f(D_recv, self.safe_mode, self.kill_canceled_sims)
self.hist.update_history_f(D_recv, self.kill_canceled_sims)
else:
logger.info(_PERSIS_RETURN_WARNING)
self.W[w - 1]["persis_state"] = 0
Expand All @@ -467,9 +468,9 @@ def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) -
self._freeup_resources(w)
else:
if calc_type == EVAL_SIM_TAG:
self.hist.update_history_f(D_recv, self.safe_mode, self.kill_canceled_sims)
self.hist.update_history_f(D_recv, self.kill_canceled_sims)
if calc_type == EVAL_GEN_TAG:
self.hist.update_history_x_in(w, D_recv["calc_out"], self.safe_mode, self.W[w - 1]["gen_started_time"])
self.hist.update_history_x_in(w, D_recv["calc_out"], self.W[w - 1]["gen_started_time"])
assert (
len(D_recv["calc_out"]) or np.any(self.W["active"]) or self.W[w - 1]["persis_state"]
), "Gen must return work when is is the only thing active and not persistent."
Expand Down
61 changes: 61 additions & 0 deletions libensemble/tests/functionality_tests/test_new_field.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Runs libEnsemble with Latin hypercube sampling on a simple 1D problem
Execute via one of the following commands (e.g. 3 workers):
mpiexec -np 4 python test_1d_sampling.py
python test_1d_sampling.py --nworkers 3 --comms local
python test_1d_sampling.py --nworkers 3 --comms tcp
The number of concurrent evaluations of the objective function will be 4-1=3.
"""

# Do not change these lines - they are parsed by run-tests.sh
# TESTSUITE_COMMS: mpi local
# TESTSUITE_NPROCS: 2 4

import numpy as np

from libensemble.gen_funcs.sampling import latin_hypercube_sample as gen_f

# Import libEnsemble items for this test
from libensemble.libE import libE
from libensemble.tools import add_unique_random_streams, parse_args, save_libE_output


def sim_f(In):
Out = np.zeros(1, dtype=[("f", float), ("N", int)])
Out["f"] = np.linalg.norm(In)
Out["N"] = 123
return Out


if __name__ == "__main__":
nworkers, is_manager, libE_specs, _ = parse_args()

sim_specs = {
"sim_f": sim_f,
"in": ["x"],
"out": [("f", float)],
}

gen_specs = {
"gen_f": gen_f,
"out": [("x", float, (1,))],
"user": {
"gen_batch_size": 500,
"lb": np.array([-3]),
"ub": np.array([3]),
},
}

persis_info = add_unique_random_streams({}, nworkers + 1, seed=1234)

exit_criteria = {"gen_max": 501}

H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs)

if is_manager:
assert len(H) >= 501
assert "N" in H.dtype.names, "New datatype not added to history"
print("\nlibEnsemble with random sampling has generated enough points")
save_libE_output(H, persis_info, __file__, nworkers)
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@
gen_specs = {
"gen_f": gen_f,
"persis_in": ["f", "x", "corner_id", "sim_id"],
"out": [("sim_id", int), ("corner_id", int), ("x", float, (n,)), ("f_est", float)],
"out": [
("sim_id", int),
("corner_id", int),
("x", float, (n,)),
], # expect ("f_est", float) from gen - test ability for gen to send back "unexpected" field.
"user": {
"initial_batch_size": 20,
"lb": np.array([-3, -2, -1]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,6 @@ def combine_component(x):
# assert np.all(grad[l <= 1e-7] <= 0)

# if not np.all(grad[np.logical_and(u >= 1e-7, l >= 1e-7)] <= 1e-5):
# import ipdb

# ipdb.set_trace()
# else:
# d = np.linalg.solve(np.dot(J.T, J), np.dot(J.T, F))
# assert np.linalg.norm(d) <= 1e-5
1 change: 1 addition & 0 deletions libensemble/tests/unit_tests/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def hist_setup2(sim_max=10, H0_in=[]):
alloc_specs = specs_dump(AllocSpecs())
H0 = H0_in
hist = History(alloc_specs, sim_specs, gen_specs, exit_criteria, H0)
hist.safe_mode = True
return hist, sim_specs, gen_specs, exit_criteria, alloc_specs


Expand Down
32 changes: 15 additions & 17 deletions libensemble/tests/unit_tests/test_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@

exp_x_in_setup2["x"] = x

safe_mode = True


def isclose(a, b, rel_tol=1e-09, abs_tol=0.0):
return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
Expand Down Expand Up @@ -149,7 +147,7 @@ def test_update_history_x_in_Oempty():
hist, sim_specs, gen_specs, _, _ = setup.hist_setup2()
H_o = np.zeros(0, dtype=gen_specs["out"])
gen_worker = 1
hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)

compare_hists(hist.H, wrs2)

Expand All @@ -173,7 +171,7 @@ def test_update_history_x_in():
H_o = np.zeros(size, dtype=gen_specs["out"])
H_o["x"] = single_rand

hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
assert isclose(single_rand, hist.H["x"][0])
assert hist.sim_started_count == 0
assert hist.index == 1
Expand All @@ -185,7 +183,7 @@ def test_update_history_x_in():
H_o = np.zeros(size, dtype=gen_specs["out"])
H_o["x"] = gen_specs["gen_f"](size=size)

hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
# Compare by column
exp_x = exp_x_in_setup2[: size + 1]

Expand All @@ -202,7 +200,7 @@ def test_update_history_x_in():
H_o = np.zeros(size, dtype=gen_specs["out"])
H_o["x"] = gen_specs["gen_f"](size=size)

hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
# Compare by column
exp_x = exp_x_in_setup2

Expand All @@ -215,7 +213,7 @@ def test_update_history_x_in():
# Test libE errors when a protected field appears in output from a gen_worker
H_o = np.zeros(size, dtype=gen_specs["out"] + [("sim_started", bool)])
try:
hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
except AssertionError:
assert 1, "Failed like it should have"
else:
Expand All @@ -224,7 +222,7 @@ def test_update_history_x_in():
# Test libE errors when a protected field appears in output from a gen_worker
H_o = np.zeros(size, dtype=gen_specs["out"] + [("sim_started", bool)])
try:
hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
except AssertionError:
assert 1, "Failed like it should have"
else:
Expand All @@ -246,7 +244,7 @@ def test_update_history_x_in_sim_ids():
H_o["x"] = single_rand
H_o["sim_id"] = 0

hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
assert isclose(single_rand, hist.H["x"][0])
assert hist.sim_started_count == 0
assert hist.index == 1
Expand All @@ -258,7 +256,7 @@ def test_update_history_x_in_sim_ids():
H_o = np.zeros(size, dtype=gen_specs["out"])
H_o["x"] = gen_specs["gen_f"](size=size)
H_o["sim_id"] = range(1, 7)
hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)

# Compare by column
exp_x = exp_x_in_setup2[: size + 1]
Expand All @@ -277,7 +275,7 @@ def test_update_history_x_in_sim_ids():
H_o["x"] = gen_specs["gen_f"](size=size)
H_o["sim_id"] = range(7, 10)

hist.update_history_x_in(gen_worker, H_o, safe_mode, np.inf)
hist.update_history_x_in(gen_worker, H_o, np.inf)
# Compare by column
exp_x = exp_x_in_setup2

Expand Down Expand Up @@ -357,7 +355,7 @@ def test_update_history_f():
"calc_type": 2,
}

hist.update_history_f(D_recv, safe_mode)
hist.update_history_f(D_recv)
assert isclose(exp_vals[0], hist.H["g"][0])
assert np.all(hist.H["sim_ended"][0:1])
assert np.all(~hist.H["sim_ended"][1:10]) # Check the rest
Expand All @@ -383,7 +381,7 @@ def test_update_history_f():
"calc_type": 2,
}

hist.update_history_f(D_recv, safe_mode)
hist.update_history_f(D_recv)
assert np.allclose(exp_vals, hist.H["g"])
assert np.all(hist.H["sim_ended"][0:3])
assert np.all(~hist.H["sim_ended"][3:10]) # Check the rest
Expand All @@ -405,7 +403,7 @@ def test_update_history_f():
"calc_type": 2,
}

hist.update_history_f(D_recv, safe_mode)
hist.update_history_f(D_recv)
assert np.all(hist.H["sim_ended"][0:1])
assert np.all(~hist.H["sim_ended"][3:10]) # Check the rest
assert hist.sim_ended_count == 2
Expand Down Expand Up @@ -435,7 +433,7 @@ def test_update_history_f_vec():
"calc_type": 2,
}

hist.update_history_f(D_recv, safe_mode)
hist.update_history_f(D_recv)

assert isclose(exp_fs[0], hist.H["f"][0])
assert np.allclose(exp_fvecs[0], hist.H["fvec"][0])
Expand Down Expand Up @@ -470,7 +468,7 @@ def test_update_history_f_vec():
"calc_type": 2,
}

hist.update_history_f(D_recv, safe_mode)
hist.update_history_f(D_recv)

assert np.allclose(exp_fs, hist.H["f"])
assert np.allclose(exp_fvecs, hist.H["fvec"])
Expand Down Expand Up @@ -507,7 +505,7 @@ def test_update_history_f_vec():
"calc_type": 2,
}

hist.update_history_f(D_recv, safe_mode)
hist.update_history_f(D_recv)

assert np.allclose(exp_fs, hist.H["f"])
assert np.allclose(exp_fvecs, hist.H["fvec"])
Expand Down

0 comments on commit 0190c0d

Please sign in to comment.