Skip to content

Commit cb11034

Browse files
committed
fix(edits): account for fake edges when finding latest edges
1 parent f7bd517 commit cb11034

File tree

4 files changed

+94
-51
lines changed

4 files changed

+94
-51
lines changed

pychunkedgraph/graph/chunks/atomic.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,6 @@ def get_bounding_atomic_chunks(
6262
chunkedgraph_meta: ChunkedGraphMeta, layer: int, chunk_coords: Sequence[int]
6363
) -> List:
6464
"""Atomic chunk coordinates along the boundary of a chunk"""
65-
return get_bounding_children_chunks(chunkedgraph_meta, layer, chunk_coords, 2)
65+
return get_bounding_children_chunks(
66+
chunkedgraph_meta, layer, tuple(chunk_coords), 2
67+
)

pychunkedgraph/graph/chunks/utils.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
# pylint: disable=invalid-name, missing-docstring
22

3-
from typing import List
43
from typing import Union
54
from typing import Optional
65
from typing import Sequence
6+
from typing import Tuple
77
from typing import Iterable
88

9+
from functools import lru_cache
10+
911
import numpy as np
1012

1113

@@ -210,8 +212,9 @@ def _get_chunk_coordinates_from_vol_coordinates(
210212
return coords.astype(int)
211213

212214

215+
@lru_cache()
213216
def get_bounding_children_chunks(
214-
cg_meta, layer: int, chunk_coords: Sequence[int], children_layer, return_unique=True
217+
cg_meta, layer: int, chunk_coords: Tuple[int], children_layer, return_unique=True
215218
) -> np.ndarray:
216219
"""Children chunk coordinates at given layer, along the boundary of a chunk"""
217220
chunk_coords = np.array(chunk_coords, dtype=int)

pychunkedgraph/graph/edges/__init__.py

Lines changed: 84 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"""
44

55
from collections import namedtuple
6-
import datetime
6+
import datetime, logging
77
from os import environ
88
from copy import copy
99
from typing import Iterable, Optional
@@ -14,7 +14,10 @@
1414
from graph_tool import Graph
1515

1616
from pychunkedgraph.graph import types
17-
from pychunkedgraph.graph.chunks import utils as chunk_utils
17+
from pychunkedgraph.graph.chunks.utils import (
18+
get_bounding_children_chunks,
19+
get_chunk_ids_from_coords,
20+
)
1821
from pychunkedgraph.graph.utils import basetypes
1922

2023
from ..utils import basetypes
@@ -235,7 +238,9 @@ def get_latest_edges(
235238
These parents would be the new identities for the stale `partner`.
236239
"""
237240
_nodes = np.unique(stale_edges[:, 1])
238-
nodes_ts_map = dict(zip(_nodes, cg.get_node_timestamps(_nodes, return_numpy=False)))
241+
nodes_ts_map = dict(
242+
zip(_nodes, cg.get_node_timestamps(_nodes, return_numpy=False, normalize=True))
243+
)
239244
_nodes = np.unique(stale_edges)
240245
layers, coords = cg.get_chunk_layers_and_coordinates(_nodes)
241246
layers_d = dict(zip(_nodes, layers))
@@ -252,31 +257,55 @@ def _get_normalized_coords(node_a, node_b) -> tuple:
252257
coord_a, coord_b = cg.get_chunk_coordinates_multiple([chunk_a, chunk_b])
253258
return max_layer, coord_a, coord_b
254259

255-
def _get_l2chunkids_along_boundary(max_layer, coord_a, coord_b):
260+
def _get_l2chunkids_along_boundary(mlayer: int, coord_a, coord_b, padding: int = 0):
261+
"""
262+
Gets L2 Chunk IDs along opposing faces for larger chunks.
263+
If padding is enabled, more faces of L2 chunks are padded on both sides.
264+
This is necessary to find fake edges that can span more than 2 L2 chunks.
265+
"""
256266
direction = coord_a - coord_b
257-
axis = np.flatnonzero(direction)
258-
assert len(axis) == 1, f"{direction}, {coord_a}, {coord_b}"
259-
axis = axis[0]
260-
children_a = chunk_utils.get_bounding_children_chunks(
261-
cg.meta, max_layer, coord_a, children_layer=2
262-
)
263-
children_b = chunk_utils.get_bounding_children_chunks(
264-
cg.meta, max_layer, coord_b, children_layer=2
265-
)
266-
if direction[axis] > 0:
267-
mid = coord_a[axis] * 2 ** (max_layer - 2)
268-
l2chunks_a = children_a[children_a[:, axis] == mid]
269-
l2chunks_b = children_b[children_b[:, axis] == mid - 1]
270-
else:
271-
mid = coord_b[axis] * 2 ** (max_layer - 2)
272-
l2chunks_a = children_a[children_a[:, axis] == mid - 1]
273-
l2chunks_b = children_b[children_b[:, axis] == mid]
267+
major_axis = np.argmax(np.abs(direction))
268+
bounds_a = get_bounding_children_chunks(cg.meta, mlayer, tuple(coord_a), 2)
269+
bounds_b = get_bounding_children_chunks(cg.meta, mlayer, tuple(coord_b), 2)
270+
271+
l2chunk_count = 2 ** (mlayer - 2)
272+
max_coord = coord_a if direction[major_axis] > 0 else coord_b
273+
274+
skip = abs(direction[major_axis]) - 1
275+
l2_skip = skip * l2chunk_count
274276

275-
l2chunk_ids_a = chunk_utils.get_chunk_ids_from_coords(cg.meta, 2, l2chunks_a)
276-
l2chunk_ids_b = chunk_utils.get_chunk_ids_from_coords(cg.meta, 2, l2chunks_b)
277+
mid = max_coord[major_axis] * l2chunk_count
278+
face_a = mid if direction[major_axis] > 0 else (mid - l2_skip - 1)
279+
face_b = mid if direction[major_axis] < 0 else (mid - l2_skip - 1)
280+
281+
l2chunks_a = [bounds_a[bounds_a[:, major_axis] == face_a]]
282+
l2chunks_b = [bounds_b[bounds_b[:, major_axis] == face_b]]
283+
284+
step_a, step_b = (1, -1) if direction[major_axis] > 0 else (-1, 1)
285+
for _ in range(padding):
286+
_l2_chunks_a = copy(l2chunks_a[-1])
287+
_l2_chunks_b = copy(l2chunks_b[-1])
288+
_l2_chunks_a[:, major_axis] += step_a
289+
_l2_chunks_b[:, major_axis] += step_b
290+
l2chunks_a.append(_l2_chunks_a)
291+
l2chunks_b.append(_l2_chunks_b)
292+
293+
l2chunks_a = np.concatenate(l2chunks_a)
294+
l2chunks_b = np.concatenate(l2chunks_b)
295+
296+
l2chunk_ids_a = get_chunk_ids_from_coords(cg.meta, 2, l2chunks_a)
297+
l2chunk_ids_b = get_chunk_ids_from_coords(cg.meta, 2, l2chunks_b)
277298
return l2chunk_ids_a, l2chunk_ids_b
278299

279-
def _get_filtered_l2ids(node_a, node_b, chunks_map):
300+
def _get_filtered_l2ids(node_a, node_b, padding: int):
301+
"""
302+
Finds L2 IDs along opposing faces for given nodes.
303+
Filterting is done by first finding L2 chunks along these faces.
304+
Then get their parent chunks iteratively.
305+
Then filter children iteratively using these chunks.
306+
"""
307+
chunks_map = {}
308+
280309
def _filter(node):
281310
result = []
282311
children = np.array([node], dtype=basetypes.NODE_ID)
@@ -294,59 +323,67 @@ def _filter(node):
294323
children = cg.get_children(children[mask], flatten=True)
295324
return np.concatenate(result)
296325

297-
return _filter(node_a), _filter(node_b)
298-
299-
result = [types.empty_2d]
300-
chunks_map = {}
301-
for edge_layer, _edge in zip(edge_layers, stale_edges):
302-
node_a, node_b = _edge
303326
mlayer, coord_a, coord_b = _get_normalized_coords(node_a, node_b)
304-
chunks_a, chunks_b = _get_l2chunkids_along_boundary(mlayer, coord_a, coord_b)
327+
chunks_a, chunks_b = _get_l2chunkids_along_boundary(
328+
mlayer, coord_a, coord_b, padding
329+
)
305330

306-
chunks_map[node_a] = [np.array([cg.get_chunk_id(node_a)])]
307-
chunks_map[node_b] = [np.array([cg.get_chunk_id(node_b)])]
331+
chunks_map[node_a] = [[cg.get_chunk_id(node_a)]]
332+
chunks_map[node_b] = [[cg.get_chunk_id(node_b)]]
308333
_layer = 2
309334
while _layer < mlayer:
310335
chunks_map[node_a].append(chunks_a)
311336
chunks_map[node_b].append(chunks_b)
312337
chunks_a = np.unique(cg.get_parent_chunk_id_multiple(chunks_a))
313338
chunks_b = np.unique(cg.get_parent_chunk_id_multiple(chunks_b))
314339
_layer += 1
315-
chunks_map[node_a] = np.concatenate(chunks_map[node_a]).astype(basetypes.NODE_ID)
316-
chunks_map[node_b] = np.concatenate(chunks_map[node_b]).astype(basetypes.NODE_ID)
340+
chunks_map[node_a] = np.concatenate(chunks_map[node_a])
341+
chunks_map[node_b] = np.concatenate(chunks_map[node_b])
342+
return int(mlayer), _filter(node_a), _filter(node_b)
317343

318-
l2ids_a, l2ids_b = _get_filtered_l2ids(node_a, node_b, chunks_map)
344+
result = [types.empty_2d]
345+
for edge_layer, _edge in zip(edge_layers, stale_edges):
346+
node_a, node_b = _edge
347+
mlayer, l2ids_a, l2ids_b = _get_filtered_l2ids(node_a, node_b, padding=0)
348+
if l2ids_a.size == 0 or l2ids_b.size == 0:
349+
logging.info(f"{node_a}, {node_b}, expanding search with padding.")
350+
mlayer, l2ids_a, l2ids_b = _get_filtered_l2ids(node_a, node_b, padding=2)
351+
logging.info(f"Found {l2ids_a} and {l2ids_b}")
352+
353+
_edges = []
319354
edges_d = cg.get_cross_chunk_edges(
320355
node_ids=l2ids_a, time_stamp=nodes_ts_map[node_b], raw_only=True
321356
)
322-
323-
_edges = []
324357
for v in edges_d.values():
325358
_edges.append(v.get(edge_layer, types.empty_2d))
326-
_edges = np.concatenate(_edges)
327-
mask = np.isin(_edges[:, 1], l2ids_b)
328359

329-
children_b = cg.get_children(_edges[mask][:, 1], flatten=True)
360+
try:
361+
_edges = np.concatenate(_edges)
362+
except ValueError as exc:
363+
logging.warning(f"No edges found for {node_a}, {node_b}")
364+
raise ValueError from exc
330365

366+
mask = np.isin(_edges[:, 1], l2ids_b)
331367
parents_a = _edges[mask][:, 0]
368+
children_b = cg.get_children(_edges[mask][:, 1], flatten=True)
332369
parents_b = np.unique(cg.get_parents(children_b, time_stamp=parent_ts))
333-
_cx_edges_d = cg.get_cross_chunk_edges(parents_b)
370+
_cx_edges_d = cg.get_cross_chunk_edges(parents_b, time_stamp=parent_ts)
334371
parents_b = []
335372
for _node, _edges_d in _cx_edges_d.items():
336373
for _edges in _edges_d.values():
337-
_mask = np.isin(_edges[:,1], parents_a)
374+
_mask = np.isin(_edges[:, 1], parents_a)
338375
if np.any(_mask):
339376
parents_b.append(_node)
340377

341378
parents_b = np.array(parents_b, dtype=basetypes.NODE_ID)
342379
parents_b = np.unique(
343-
cg.get_roots(
344-
parents_b, stop_layer=mlayer, ceil=False, time_stamp=parent_ts
345-
)
380+
cg.get_roots(parents_b, stop_layer=mlayer, ceil=False, time_stamp=parent_ts)
346381
)
347382

348383
parents_a = np.array([node_a] * parents_b.size, dtype=basetypes.NODE_ID)
349-
result.append(np.column_stack((parents_a, parents_b)))
384+
_new_edges = np.column_stack((parents_a, parents_b))
385+
assert _new_edges.size, f"No edge found for {node_a}, {node_b} at {parent_ts}"
386+
result.append(_new_edges)
350387
return np.concatenate(result)
351388

352389

pychunkedgraph/ingest/upgrade/parent_layer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# pylint: disable=invalid-name, missing-docstring, c-extension-no-member
22

3-
import math, random, time
3+
import logging, math, random, time
44
import multiprocessing as mp
55
from collections import defaultdict
66

@@ -171,6 +171,7 @@ def update_chunk(
171171
args = (cg_info, layer, chunk, ts_chunk, earliest_ts)
172172
tasks.append(args)
173173

174+
logging.info(f"Processing {len(nodes)} nodes.")
174175
with mp.Pool(min(mp.cpu_count(), len(tasks))) as pool:
175176
_ = list(
176177
tqdm(

0 commit comments

Comments
 (0)