Skip to content

Commit f30eec0

Browse files
committed
fix(migration): handle cases where children IDs have already had their edges migrated
1 parent 55ad3d5 commit f30eec0

File tree

1 file changed

+26
-16
lines changed

1 file changed

+26
-16
lines changed

pychunkedgraph/graph/edges/__init__.py

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -354,10 +354,9 @@ def _get_new_edge(edge, parent_ts, padding):
354354
return types.empty_2d.copy()
355355

356356
_edges = []
357+
max_node_ts = max(nodes_ts_map[node_a], nodes_ts_map[node_b])
357358
_edges_d = cg.get_cross_chunk_edges(
358-
node_ids=l2ids_a,
359-
time_stamp=max(nodes_ts_map[node_a], nodes_ts_map[node_b]),
360-
raw_only=True,
359+
node_ids=l2ids_a, time_stamp=max_node_ts, raw_only=True
361360
)
362361
for v in _edges_d.values():
363362
if edge_layer in v:
@@ -369,18 +368,29 @@ def _get_new_edge(edge, parent_ts, padding):
369368
return types.empty_2d.copy()
370369

371370
mask = np.isin(_edges[:, 1], l2ids_b)
372-
parents_a = _edges[mask][:, 0]
373-
children_b = cg.get_children(_edges[mask][:, 1], flatten=True)
374-
parents_b = np.unique(cg.get_parents(children_b, time_stamp=parent_ts))
375-
_cx_edges_d = cg.get_cross_chunk_edges(parents_b, time_stamp=parent_ts)
376-
parents_b = []
377-
for _node, _edges_d in _cx_edges_d.items():
378-
for _edges in _edges_d.values():
379-
_mask = np.isin(_edges[:, 1], parents_a)
380-
if np.any(_mask):
381-
parents_b.append(_node)
382-
383-
parents_b = np.array(parents_b, dtype=basetypes.NODE_ID)
371+
if np.any(mask):
372+
parents_a = _edges[mask][:, 0]
373+
children_b = cg.get_children(_edges[mask][:, 1], flatten=True)
374+
parents_b = np.unique(cg.get_parents(children_b, time_stamp=parent_ts))
375+
_cx_edges_d = cg.get_cross_chunk_edges(parents_b, time_stamp=parent_ts)
376+
parents_b = []
377+
for _node, _edges_d in _cx_edges_d.items():
378+
for _edges in _edges_d.values():
379+
_mask = np.isin(_edges[:, 1], parents_a)
380+
if np.any(_mask):
381+
parents_b.append(_node)
382+
parents_b = np.array(parents_b, dtype=basetypes.NODE_ID)
383+
else:
384+
# if none of `l2ids_b` were found in edges, `l2ids_a` already have new edges
385+
# so get the new identities of `l2ids_b` by using chunk mask
386+
parents_b = _edges[:, 1]
387+
chunks_old = cg.get_chunk_ids_from_node_ids(l2ids_b)
388+
chunks_new = cg.get_chunk_ids_from_node_ids(parents_b)
389+
chunk_mask = np.isin(chunks_new, chunks_old)
390+
parents_b = parents_b[chunk_mask]
391+
_stale_nodes = get_stale_nodes(cg, parents_b, parent_ts=max_node_ts)
392+
assert _stale_nodes.size == 0, f"{edge}, {_stale_nodes}, {parent_ts}"
393+
384394
parents_b = np.unique(
385395
cg.get_roots(parents_b, stop_layer=mlayer, ceil=False, time_stamp=parent_ts)
386396
)
@@ -436,5 +446,5 @@ def get_latest_edges_wrapper(
436446
stale_edge_layers,
437447
parent_ts=parent_ts,
438448
)
439-
logging.info(f"{stale_edges} -> {latest_edges}; {parent_ts}")
449+
logging.debug(f"{stale_edges} -> {latest_edges}; {parent_ts}")
440450
return np.concatenate([_cx_edges, latest_edges])

0 commit comments

Comments
 (0)