Skip to content

Commit 20b4cf2

Browse files
committed
fix(edits): more precise filter for latest edges; error on chunk_id mismatch (ingest); bump version
1 parent aee77f4 commit 20b4cf2

File tree

8 files changed

+71
-74
lines changed

8 files changed

+71
-74
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 3.0.13
2+
current_version = 3.1.0
33
commit = True
44
tag = True
55

pychunkedgraph/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "3.0.13"
1+
__version__ = "3.1.0"

pychunkedgraph/graph/chunkedgraph.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ def get_parents(
216216
if fail_to_zero:
217217
parents.append(0)
218218
else:
219+
exc.add_note(f"timestamp: {time_stamp}")
219220
raise KeyError from exc
220221
parents = np.array(parents, dtype=basetypes.NODE_ID)
221222
else:
@@ -1016,7 +1017,7 @@ def get_chunk_layers_and_coordinates(self, node_or_chunk_ids: typing.Sequence):
10161017
"""
10171018
node_or_chunk_ids = np.array(node_or_chunk_ids, dtype=basetypes.NODE_ID)
10181019
layers = self.get_chunk_layers(node_or_chunk_ids)
1019-
chunk_coords = np.zeros(shape=(len(node_or_chunk_ids), 3))
1020+
chunk_coords = np.zeros(shape=(len(node_or_chunk_ids), 3), dtype=int)
10201021
for _layer in np.unique(layers):
10211022
mask = layers == _layer
10221023
_nodes = node_or_chunk_ids[mask]

pychunkedgraph/graph/edges/__init__.py

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ def get_stale_nodes(
220220
)
221221
stale_mask = layer_nodes != _nodes
222222
stale_nodes.append(layer_nodes[stale_mask])
223-
return np.concatenate(stale_nodes), edge_supervoxels
223+
return np.concatenate(stale_nodes)
224224

225225

226226
def get_latest_edges(
@@ -279,7 +279,7 @@ def _get_l2chunkids_along_boundary(max_layer, coord_a, coord_b):
279279
def _get_filtered_l2ids(node_a, node_b, chunks_map):
280280
def _filter(node):
281281
result = []
282-
children = cg.get_children(node)
282+
children = np.array([node], dtype=basetypes.NODE_ID)
283283
while True:
284284
chunk_ids = cg.get_chunk_ids_from_node_ids(children)
285285
mask = np.isin(chunk_ids, chunks_map[node])
@@ -296,24 +296,24 @@ def _filter(node):
296296

297297
return _filter(node_a), _filter(node_b)
298298

299-
result = []
299+
result = [types.empty_2d]
300300
chunks_map = {}
301301
for edge_layer, _edge in zip(edge_layers, stale_edges):
302302
node_a, node_b = _edge
303303
mlayer, coord_a, coord_b = _get_normalized_coords(node_a, node_b)
304304
chunks_a, chunks_b = _get_l2chunkids_along_boundary(mlayer, coord_a, coord_b)
305305

306-
chunks_map[node_a] = []
307-
chunks_map[node_b] = []
306+
chunks_map[node_a] = [np.array([cg.get_chunk_id(node_a)])]
307+
chunks_map[node_b] = [np.array([cg.get_chunk_id(node_b)])]
308308
_layer = 2
309309
while _layer < mlayer:
310310
chunks_map[node_a].append(chunks_a)
311311
chunks_map[node_b].append(chunks_b)
312312
chunks_a = np.unique(cg.get_parent_chunk_id_multiple(chunks_a))
313313
chunks_b = np.unique(cg.get_parent_chunk_id_multiple(chunks_b))
314314
_layer += 1
315-
chunks_map[node_a] = np.concatenate(chunks_map[node_a])
316-
chunks_map[node_b] = np.concatenate(chunks_map[node_b])
315+
chunks_map[node_a] = np.concatenate(chunks_map[node_a]).astype(basetypes.NODE_ID)
316+
chunks_map[node_b] = np.concatenate(chunks_map[node_b]).astype(basetypes.NODE_ID)
317317

318318
l2ids_a, l2ids_b = _get_filtered_l2ids(node_a, node_b, chunks_map)
319319
edges_d = cg.get_cross_chunk_edges(
@@ -326,32 +326,57 @@ def _filter(node):
326326
_edges = np.concatenate(_edges)
327327
mask = np.isin(_edges[:, 1], l2ids_b)
328328

329-
children_a = cg.get_children(_edges[mask][:, 0], flatten=True)
330329
children_b = cg.get_children(_edges[mask][:, 1], flatten=True)
331-
if 85431849467249595 in children_a and 85502218144317440 in children_b:
332-
print("woohoo0")
333-
continue
334-
335-
if 85502218144317440 in children_a and 85431849467249595 in children_b:
336-
print("woohoo1")
337-
continue
338-
parents_a = np.unique(
339-
cg.get_roots(
340-
children_a, stop_layer=mlayer, ceil=False, time_stamp=parent_ts
341-
)
342-
)
343-
assert parents_a.size == 1 and parents_a[0] == node_a, (
344-
node_a,
345-
parents_a,
346-
children_a,
347-
)
348330

331+
parents_a = _edges[mask][:, 0]
332+
parents_b = np.unique(cg.get_parents(children_b, time_stamp=parent_ts))
333+
_cx_edges_d = cg.get_cross_chunk_edges(parents_b)
334+
parents_b = []
335+
for _node, _edges_d in _cx_edges_d.items():
336+
for _edges in _edges_d.values():
337+
_mask = np.isin(_edges[:,1], parents_a)
338+
if np.any(_mask):
339+
parents_b.append(_node)
340+
341+
parents_b = np.array(parents_b, dtype=basetypes.NODE_ID)
349342
parents_b = np.unique(
350343
cg.get_roots(
351-
children_b, stop_layer=mlayer, ceil=False, time_stamp=parent_ts
344+
parents_b, stop_layer=mlayer, ceil=False, time_stamp=parent_ts
352345
)
353346
)
354347

355348
parents_a = np.array([node_a] * parents_b.size, dtype=basetypes.NODE_ID)
356349
result.append(np.column_stack((parents_a, parents_b)))
357350
return np.concatenate(result)
351+
352+
353+
def get_latest_edges_wrapper(
354+
cg,
355+
cx_edges_d: dict,
356+
parent_ts: datetime.datetime = None,
357+
) -> np.ndarray:
358+
"""Helper function to filter stale edges and replace with latest edges."""
359+
_cx_edges = [types.empty_2d]
360+
_edge_layers = [types.empty_1d]
361+
for k, v in cx_edges_d.items():
362+
_cx_edges.append(v)
363+
_edge_layers.append([k] * len(v))
364+
_cx_edges = np.concatenate(_cx_edges)
365+
_edge_layers = np.concatenate(_edge_layers, dtype=int)
366+
367+
edge_nodes = np.unique(_cx_edges)
368+
stale_nodes = get_stale_nodes(cg, edge_nodes, parent_ts=parent_ts)
369+
stale_nodes_mask = np.isin(edge_nodes, stale_nodes)
370+
371+
latest_edges = types.empty_2d.copy()
372+
if np.any(stale_nodes_mask):
373+
stalte_edges_mask = np.isin(_cx_edges[:, 1], stale_nodes)
374+
stale_edges = _cx_edges[stalte_edges_mask]
375+
stale_edge_layers = _edge_layers[stalte_edges_mask]
376+
latest_edges = get_latest_edges(
377+
cg,
378+
stale_edges,
379+
stale_edge_layers,
380+
parent_ts=parent_ts,
381+
)
382+
return np.concatenate([_cx_edges, latest_edges])

pychunkedgraph/graph/edits.py

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from . import types
1616
from . import attributes
1717
from . import cache as cache_utils
18-
from .edges import get_latest_edges, get_stale_nodes
18+
from .edges import get_latest_edges, get_latest_edges_wrapper, get_stale_nodes
1919
from .edges.utils import concatenate_cross_edge_dicts
2020
from .edges.utils import merge_cross_edge_dicts
2121
from .utils import basetypes
@@ -587,36 +587,10 @@ def _update_cross_edge_cache(self, parent, children):
587587
children, time_stamp=self._last_successful_ts
588588
)
589589
cx_edges_d = concatenate_cross_edge_dicts(cx_edges_d.values())
590-
591-
_cx_edges = [types.empty_2d]
592-
_edge_layers = [types.empty_1d]
593-
for k, v in cx_edges_d.items():
594-
_cx_edges.append(v)
595-
_edge_layers.append([k] * len(v))
596-
_cx_edges = np.concatenate(_cx_edges)
597-
_edge_layers = np.concatenate(_edge_layers, dtype=int)
598-
599-
edge_nodes = np.unique(_cx_edges)
600-
stale_nodes, edge_supervoxels = get_stale_nodes(
601-
self.cg, edge_nodes, parent_ts=self._last_successful_ts
590+
_cx_edges = get_latest_edges_wrapper(
591+
self.cg, cx_edges_d, parent_ts=self._last_successful_ts
602592
)
603-
stale_nodes_mask = np.isin(edge_nodes, stale_nodes)
604-
605-
latest_edges = types.empty_2d.copy()
606-
if np.any(stale_nodes_mask):
607-
stalte_edges_mask = _cx_edges[:, 1] == stale_nodes
608-
stale_edges = _cx_edges[stalte_edges_mask]
609-
stale_edge_layers = _edge_layers[stalte_edges_mask]
610-
latest_edges = get_latest_edges(
611-
self.cg,
612-
stale_edges,
613-
stale_edge_layers,
614-
parent_ts=self._last_successful_ts,
615-
)
616-
617-
_cx_edges = np.concatenate([_cx_edges, latest_edges])
618-
edge_nodes = np.unique(_cx_edges)
619-
593+
edge_nodes = np.unique(_cx_edges)
620594
edge_parents = self.cg.get_roots(
621595
edge_nodes,
622596
stop_layer=parent_layer,

pychunkedgraph/ingest/cluster.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,8 @@ def _check_edges_direction(
108108
chunk_id = cg.get_chunk_id(layer=1, x=x, y=y, z=z)
109109
for edge_type in [EDGE_TYPES.between_chunk, EDGE_TYPES.cross_chunk]:
110110
edges = chunk_edges[edge_type]
111-
e1 = edges.node_ids1
112-
e2 = edges.node_ids2
113-
114-
e2_chunk_ids = cg.get_chunk_ids_from_node_ids(e2)
115-
mask = e2_chunk_ids == chunk_id
116-
e1[mask], e2[mask] = e2[mask], e1[mask]
117-
118-
e1_chunk_ids = cg.get_chunk_ids_from_node_ids(e1)
119-
mask = e1_chunk_ids == chunk_id
111+
chunk_ids = cg.get_chunk_ids_from_node_ids(edges.node_ids1)
112+
mask = chunk_ids == chunk_id
120113
assert np.all(mask), "all IDs must belong to same chunk"
121114

122115

pychunkedgraph/ingest/upgrade/parent_layer.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from pychunkedgraph.graph import ChunkedGraph
1212
from pychunkedgraph.graph.attributes import Connectivity, Hierarchy
13+
from pychunkedgraph.graph.edges import get_latest_edges_wrapper
1314
from pychunkedgraph.graph.utils import serializers
1415
from pychunkedgraph.graph.types import empty_2d
1516
from pychunkedgraph.utils.general import chunked
@@ -104,14 +105,17 @@ def update_cross_edges(cg: ChunkedGraph, layer, node, node_ts, earliest_ts) -> l
104105
assert not exists_as_parent(cg, node, edges[:, 0]), f"{node}, {node_ts}"
105106
return rows
106107

108+
row_id = serializers.serialize_uint64(node)
107109
for ts, cx_edges_d in CX_EDGES[node].items():
108-
edges = np.concatenate([empty_2d] + list(cx_edges_d.values()))
110+
if node_ts > ts:
111+
continue
112+
edges = get_latest_edges_wrapper(cg, cx_edges_d, parent_ts=ts)
109113
if edges.size == 0:
110114
continue
111-
nodes = np.unique(edges[:, 1])
112-
svs = cg.get_single_leaf_multiple(nodes)
113-
parents = cg.get_roots(svs, time_stamp=ts, stop_layer=layer, ceil=False)
114-
edge_parents_d = dict(zip(nodes, parents))
115+
116+
edge_nodes = np.unique(edges)
117+
parents = cg.get_roots(edge_nodes, time_stamp=ts, stop_layer=layer, ceil=False)
118+
edge_parents_d = dict(zip(edge_nodes, parents))
115119
val_dict = {}
116120
for _layer, layer_edges in cx_edges_d.items():
117121
layer_edges = fastremap.remap(
@@ -121,7 +125,6 @@ def update_cross_edges(cg: ChunkedGraph, layer, node, node_ts, earliest_ts) -> l
121125
layer_edges = np.unique(layer_edges, axis=0)
122126
col = Connectivity.CrossChunkEdge[_layer]
123127
val_dict[col] = layer_edges
124-
row_id = serializers.serialize_uint64(node)
125128
rows.append(cg.client.mutate_row(row_id, val_dict, time_stamp=ts))
126129
return rows
127130

pychunkedgraph/ingest/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ def queue_layer_helper(parent_layer: int, imanager: IngestionManager, fn):
202202
)
203203
)
204204
q.enqueue_many(job_datas)
205+
logging.info(f"Queued {len(job_datas)} chunks.")
205206

206207

207208
def job_type_guard(job_type: str):

0 commit comments

Comments
 (0)