Skip to content

Commit dcb3ac8

Browse files
committed
fix(upgrade): use start and end timestamps to filter out irreleveant timestamps
1 parent cb11034 commit dcb3ac8

File tree

7 files changed

+91
-49
lines changed

7 files changed

+91
-49
lines changed

pychunkedgraph/graph/edges/__init__.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,10 @@ def get_latest_edges(
237237
Then get supervoxels of those L2 IDs and get parent(s) at `node` level.
238238
These parents would be the new identities for the stale `partner`.
239239
"""
240-
_nodes = np.unique(stale_edges[:, 1])
240+
_nodes = np.unique(stale_edges)
241241
nodes_ts_map = dict(
242242
zip(_nodes, cg.get_node_timestamps(_nodes, return_numpy=False, normalize=True))
243243
)
244-
_nodes = np.unique(stale_edges)
245244
layers, coords = cg.get_chunk_layers_and_coordinates(_nodes)
246245
layers_d = dict(zip(_nodes, layers))
247246
coords_d = dict(zip(_nodes, coords))
@@ -352,7 +351,9 @@ def _filter(node):
352351

353352
_edges = []
354353
edges_d = cg.get_cross_chunk_edges(
355-
node_ids=l2ids_a, time_stamp=nodes_ts_map[node_b], raw_only=True
354+
node_ids=l2ids_a,
355+
time_stamp=max(nodes_ts_map[node_a], nodes_ts_map[node_b]),
356+
raw_only=True,
356357
)
357358
for v in edges_d.values():
358359
_edges.append(v.get(edge_layer, types.empty_2d))
@@ -382,7 +383,8 @@ def _filter(node):
382383

383384
parents_a = np.array([node_a] * parents_b.size, dtype=basetypes.NODE_ID)
384385
_new_edges = np.column_stack((parents_a, parents_b))
385-
assert _new_edges.size, f"No edge found for {node_a}, {node_b} at {parent_ts}"
386+
err = f"No edge found for {node_a}, {node_b} at {edge_layer}; {parent_ts}"
387+
assert _new_edges.size, err
386388
result.append(_new_edges)
387389
return np.concatenate(result)
388390

pychunkedgraph/ingest/upgrade/atomic_layer.py

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
# pylint: disable=invalid-name, missing-docstring, c-extension-no-member
22

3-
from datetime import timedelta
4-
53
import fastremap
64
import numpy as np
75
from pychunkedgraph.graph import ChunkedGraph
8-
from pychunkedgraph.graph.attributes import Connectivity
6+
from pychunkedgraph.graph.attributes import Connectivity, Hierarchy
97
from pychunkedgraph.graph.utils import serializers
108

11-
from .utils import exists_as_parent, get_parent_timestamps
9+
from .utils import exists_as_parent, get_end_timestamps, get_parent_timestamps
10+
11+
CHILDREN = {}
1212

1313

1414
def update_cross_edges(
15-
cg: ChunkedGraph, node, cx_edges_d: dict, node_ts, timestamps: set, earliest_ts
15+
cg: ChunkedGraph, node, cx_edges_d: dict, node_ts, node_end_ts, timestamps: set
1616
) -> list:
1717
"""
1818
Helper function to update a single L2 ID.
@@ -27,13 +27,15 @@ def update_cross_edges(
2727
assert not exists_as_parent(cg, node, edges[:, 0])
2828
return rows
2929

30-
partner_parent_ts_d = get_parent_timestamps(cg, edges[:, 1])
30+
partner_parent_ts_d = get_parent_timestamps(cg, np.unique(edges[:, 1]))
3131
for v in partner_parent_ts_d.values():
3232
timestamps.update(v)
3333

3434
for ts in sorted(timestamps):
35-
if ts < earliest_ts:
36-
ts = earliest_ts
35+
if ts < node_ts:
36+
continue
37+
if ts > node_end_ts:
38+
break
3739
val_dict = {}
3840
svs = edges[:, 1]
3941
parents = cg.get_parents(svs, time_stamp=ts)
@@ -51,21 +53,22 @@ def update_cross_edges(
5153
return rows
5254

5355

54-
def update_nodes(cg: ChunkedGraph, nodes) -> list:
55-
nodes_ts = cg.get_node_timestamps(nodes, return_numpy=False, normalize=True)
56-
earliest_ts = cg.get_earliest_timestamp()
56+
def update_nodes(cg: ChunkedGraph, nodes, nodes_ts, children_map=None) -> list:
57+
if children_map is None:
58+
children_map = CHILDREN
59+
end_timestamps = get_end_timestamps(cg, nodes, nodes_ts, children_map)
5760
timestamps_d = get_parent_timestamps(cg, nodes)
5861
cx_edges_d = cg.get_atomic_cross_edges(nodes)
5962
rows = []
60-
for node, node_ts in zip(nodes, nodes_ts):
63+
for node, node_ts, end_ts in zip(nodes, nodes_ts, end_timestamps):
6164
if cg.get_parent(node) is None:
62-
# invalid id caused by failed ingest task
65+
# invalid id caused by failed ingest task / edits
6366
continue
6467
_cx_edges_d = cx_edges_d.get(node, {})
6568
if not _cx_edges_d:
6669
continue
6770
_rows = update_cross_edges(
68-
cg, node, _cx_edges_d, node_ts, timestamps_d[node], earliest_ts
71+
cg, node, _cx_edges_d, node_ts, end_ts, timestamps_d[node]
6972
)
7073
rows.extend(_rows)
7174
return rows
@@ -76,10 +79,26 @@ def update_chunk(cg: ChunkedGraph, chunk_coords: list[int], layer: int = 2):
7679
Iterate over all L2 IDs in a chunk and update their cross chunk edges,
7780
within the periods they were valid/active.
7881
"""
82+
global CHILDREN
83+
7984
x, y, z = chunk_coords
8085
chunk_id = cg.get_chunk_id(layer=layer, x=x, y=y, z=z)
8186
cg.copy_fake_edges(chunk_id)
8287
rr = cg.range_read_chunk(chunk_id)
83-
nodes = list(rr.keys())
84-
rows = update_nodes(cg, nodes)
88+
89+
nodes = []
90+
nodes_ts = []
91+
earliest_ts = cg.get_earliest_timestamp()
92+
for k, v in rr.items():
93+
nodes.append(k)
94+
CHILDREN[k] = v[Hierarchy.Child][0].value
95+
ts = v[Hierarchy.Child][0].timestamp
96+
nodes_ts.append(earliest_ts if ts < earliest_ts else ts)
97+
98+
if len(nodes):
99+
assert len(CHILDREN) > 0, (nodes, CHILDREN)
100+
else:
101+
return
102+
103+
rows = update_nodes(cg, nodes, nodes_ts)
85104
cg.client.write(rows)

pychunkedgraph/ingest/upgrade/parent_layer.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from pychunkedgraph.graph.types import empty_2d
1616
from pychunkedgraph.utils.general import chunked
1717

18-
from .utils import exists_as_parent, get_parent_timestamps
18+
from .utils import exists_as_parent, get_end_timestamps, get_parent_timestamps
1919

2020

2121
CHILDREN = {}
@@ -51,7 +51,7 @@ def _get_cx_edges_at_timestamp(node, response, ts):
5151

5252

5353
def _populate_cx_edges_with_timestamps(
54-
cg: ChunkedGraph, layer: int, nodes: list, nodes_ts: list, earliest_ts
54+
cg: ChunkedGraph, layer: int, nodes: list, nodes_ts: list
5555
):
5656
"""
5757
Collect timestamps of edits from children, since we use the same timestamp
@@ -63,7 +63,8 @@ def _populate_cx_edges_with_timestamps(
6363
all_children = np.concatenate(list(CHILDREN.values()))
6464
response = cg.client.read_nodes(node_ids=all_children, properties=attrs)
6565
timestamps_d = get_parent_timestamps(cg, nodes)
66-
for node, node_ts in zip(nodes, nodes_ts):
66+
end_timestamps = get_end_timestamps(cg, nodes, nodes_ts, CHILDREN)
67+
for node, node_ts, node_end_ts in zip(nodes, nodes_ts, end_timestamps):
6768
CX_EDGES[node] = {}
6869
timestamps = timestamps_d[node]
6970
cx_edges_d_node_ts = _get_cx_edges_at_timestamp(node, response, node_ts)
@@ -75,8 +76,8 @@ def _populate_cx_edges_with_timestamps(
7576
CX_EDGES[node][node_ts] = cx_edges_d_node_ts
7677

7778
for ts in sorted(timestamps):
78-
if ts < earliest_ts:
79-
ts = earliest_ts
79+
if ts > node_end_ts:
80+
break
8081
CX_EDGES[node][ts] = _get_cx_edges_at_timestamp(node, response, ts)
8182

8283

@@ -107,7 +108,7 @@ def update_cross_edges(cg: ChunkedGraph, layer, node, node_ts, earliest_ts) -> l
107108

108109
row_id = serializers.serialize_uint64(node)
109110
for ts, cx_edges_d in CX_EDGES[node].items():
110-
if node_ts > ts:
111+
if ts < node_ts:
111112
continue
112113
edges = get_latest_edges_wrapper(cg, cx_edges_d, parent_ts=ts)
113114
if edges.size == 0:
@@ -136,7 +137,7 @@ def _update_cross_edges_helper(args):
136137
parents = cg.get_parents(nodes, fail_to_zero=True)
137138
for node, parent, node_ts in zip(nodes, parents, nodes_ts):
138139
if parent == 0:
139-
# invalid id caused by failed ingest task
140+
# invalid id caused by failed ingest task / edits
140141
continue
141142
_rows = update_cross_edges(cg, layer, node, node_ts, earliest_ts)
142143
rows.extend(_rows)
@@ -159,7 +160,7 @@ def update_chunk(
159160
nodes = list(CHILDREN.keys())
160161
random.shuffle(nodes)
161162
nodes_ts = cg.get_node_timestamps(nodes, return_numpy=False, normalize=True)
162-
_populate_cx_edges_with_timestamps(cg, layer, nodes, nodes_ts, earliest_ts)
163+
_populate_cx_edges_with_timestamps(cg, layer, nodes, nodes_ts)
163164

164165
task_size = int(math.ceil(len(nodes) / mp.cpu_count() / 2))
165166
chunked_nodes = chunked(nodes, task_size)

pychunkedgraph/ingest/upgrade/utils.py

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# pylint: disable=invalid-name, missing-docstring
22

33
from collections import defaultdict
4-
from datetime import timedelta
4+
from datetime import datetime, timedelta, timezone
55

66
import numpy as np
77
from pychunkedgraph.graph import ChunkedGraph
@@ -33,31 +33,50 @@ def get_edit_timestamps(cg: ChunkedGraph, edges_d, start_ts, end_ts) -> list:
3333
return sorted(timestamps)
3434

3535

36-
def get_end_ts(cg: ChunkedGraph, children, start_ts):
37-
# get end_ts when node becomes invalid (bigtable resolution is in ms)
38-
start = start_ts + timedelta(milliseconds=1)
39-
_timestamps = get_parent_timestamps(cg, children, start_time=start)
40-
try:
41-
end_ts = sorted(_timestamps)[0]
42-
except IndexError:
43-
# start_ts == end_ts means there has been no edit involving this node
44-
# meaning only one timestamp to update cross edges, start_ts
45-
end_ts = start_ts
46-
return end_ts
36+
def get_end_timestamps(cg: ChunkedGraph, nodes, nodes_ts, children_map):
37+
"""
38+
Gets the last timestamp for each node at which to update its cross edges.
39+
For this, we get parent timestamps for all children of a node.
40+
The first timestamp > node_timestamp among these is the last timestamp.
41+
This is the timestamp at which one of node's children
42+
got a new parent that superseded the current node.
43+
"""
44+
result = []
45+
children = np.concatenate([*children_map.values()])
46+
timestamps_d = get_parent_timestamps(cg, children)
47+
for node, node_ts in zip(nodes, nodes_ts):
48+
node_children = children_map[node]
49+
_timestamps = set().union(*[timestamps_d[k] for k in node_children])
50+
try:
51+
_timestamps = sorted(_timestamps)
52+
_index = np.searchsorted(_timestamps, node_ts)
53+
assert _timestamps[_index] == node_ts, (_index, node_ts, _timestamps)
54+
end_ts = _timestamps[_index + 1] - timedelta(milliseconds=1)
55+
except IndexError:
56+
# this node has not been edited, but might have it edges updated
57+
end_ts = datetime.now(timezone.utc)
58+
result.append(end_ts)
59+
return result
4760

4861

49-
def get_parent_timestamps(cg: ChunkedGraph, nodes) -> dict[int, set]:
62+
def get_parent_timestamps(
63+
cg: ChunkedGraph, nodes, start_time=None, end_time=None
64+
) -> dict[int, set]:
5065
"""
5166
Timestamps of when the given nodes were edited.
5267
"""
68+
earliest_ts = cg.get_earliest_timestamp()
5369
response = cg.client.read_nodes(
5470
node_ids=nodes,
5571
properties=[Hierarchy.Parent],
72+
start_time=start_time,
73+
end_time=end_time,
5674
end_time_inclusive=False,
5775
)
5876

5977
result = defaultdict(set)
6078
for k, v in response.items():
6179
for cell in v[Hierarchy.Parent]:
62-
result[k].add(cell.timestamp)
80+
ts = cell.timestamp
81+
result[k].add(earliest_ts if ts < earliest_ts else ts)
6382
return result

pychunkedgraph/utils/redis.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
)
1919
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD", "")
2020
REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0"
21+
CONNECTION = redis.Redis.from_url(REDIS_URL, socket_timeout=60)
2122

2223
keys_fields = ("INGESTION_MANAGER", "JOB_TYPE")
2324
keys_defaults = ("pcg:imanager", "pcg:job_type")
@@ -27,9 +28,10 @@
2728

2829

2930
def get_redis_connection(redis_url=REDIS_URL):
31+
if redis_url == REDIS_URL:
32+
return CONNECTION
3033
return redis.Redis.from_url(redis_url, socket_timeout=60)
3134

3235

3336
def get_rq_queue(queue):
34-
connection = redis.Redis.from_url(REDIS_URL, socket_timeout=60)
35-
return Queue(queue, connection=connection)
37+
return Queue(queue, connection=CONNECTION)

requirements.in

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ google-cloud-datastore>=1.8
1010
flask
1111
flask_cors
1212
python-json-logger
13-
redis
14-
rq<2
13+
redis>5
14+
rq>2
1515
pyyaml
1616
cachetools
1717
werkzeug

requirements.txt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ gevent==23.9.1
9797
# task-queue
9898
google-api-core[grpc]==2.11.1
9999
# via
100-
# google-api-core
101100
# google-cloud-bigtable
102101
# google-cloud-core
103102
# google-cloud-datastore
@@ -295,7 +294,7 @@ pytz==2023.3.post1
295294
# via pandas
296295
pyyaml==6.0.1
297296
# via -r requirements.in
298-
redis==5.0.0
297+
redis==6.4.0
299298
# via
300299
# -r requirements.in
301300
# rq
@@ -316,7 +315,7 @@ rpds-py==0.10.3
316315
# via
317316
# jsonschema
318317
# referencing
319-
rq==1.15.1
318+
rq==2.4.1
320319
# via -r requirements.in
321320
rsa==4.9
322321
# via

0 commit comments

Comments
 (0)