Skip to content

Commit

Permalink
cached cv info for distributed workers (#506)
Browse files Browse the repository at this point in the history
* feat: cache info for improved distributed performance

* fix: check for completed chunks

* make key specific to watershed cv

* move definition outside try
  • Loading branch information
akhileshh authored Sep 3, 2024
1 parent 432ca3c commit cfc8505
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 13 deletions.
19 changes: 18 additions & 1 deletion pychunkedgraph/graph/meta.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from datetime import timedelta
from typing import Dict
from typing import List
Expand All @@ -10,6 +11,9 @@

from .utils.generic import compute_bitmasks
from .chunks.utils import get_chunks_boundary
from ..utils.redis import keys as r_keys
from ..utils.redis import get_rq_queue
from ..utils.redis import get_redis_connection


_datasource_fields = ("EDGES", "COMPONENTS", "WATERSHED", "DATA_VERSION", "CV_MIP")
Expand Down Expand Up @@ -80,7 +84,20 @@ def custom_data(self):
def ws_cv(self):
if self._ws_cv:
return self._ws_cv
self._ws_cv = CloudVolume(self._data_source.WATERSHED)

cache_key = f"{self.graph_config.ID}:ws_cv_info_cached"
try:
# try reading a cached info file for distributed workers
# useful to avoid md5 errors on high gcs load
redis = get_redis_connection()
cached_info = json.loads(redis.get(cache_key))
self._ws_cv = CloudVolume(self._data_source.WATERSHED, info=cached_info)
except Exception:
self._ws_cv = CloudVolume(self._data_source.WATERSHED)
try:
redis.set(cache_key, json.dumps(self._ws_cv.info))
except Exception:
...
return self._ws_cv

@property
Expand Down
52 changes: 40 additions & 12 deletions pychunkedgraph/ingest/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@
cli for running ingest
"""

from os import environ
from time import sleep

import click
import yaml
from flask.cli import AppGroup
from rq import Queue

from .manager import IngestionManager
from .utils import bootstrap
from .cluster import randomize_grid_points
from ..graph.chunkedgraph import ChunkedGraph
from ..utils.redis import get_redis_connection
from ..utils.redis import keys as r_keys
from ..utils.general import chunked

ingest_cli = AppGroup("ingest")

Expand Down Expand Up @@ -96,18 +102,40 @@ def queue_layer(parent_layer):
chunk_coords = [(0, 0, 0)]
else:
bounds = imanager.cg_meta.layer_chunk_bounds[parent_layer]
chunk_coords = list(product(*[range(r) for r in bounds]))
np.random.shuffle(chunk_coords)

for coords in chunk_coords:
task_q = imanager.get_task_queue(f"l{parent_layer}")
task_q.enqueue(
create_parent_chunk,
job_id=chunk_id_str(parent_layer, coords),
job_timeout=f"{int(parent_layer * parent_layer)}m",
result_ttl=0,
args=(parent_layer, coords),
)
chunk_coords = randomize_grid_points(*bounds)

def get_chunks_not_done(coords: list) -> list:
"""check for set membership in redis in batches"""
coords_strs = ["_".join(map(str, coord)) for coord in coords]
try:
completed = imanager.redis.smismember(f"{parent_layer}c", coords_strs)
except Exception:
return coords
return [coord for coord, c in zip(coords, completed) if not c]

batch_size = int(environ.get("JOB_BATCH_SIZE", 10000))
batches = chunked(chunk_coords, batch_size)
q = imanager.get_task_queue(f"l{parent_layer}")

for batch in batches:
_coords = get_chunks_not_done(batch)
# buffer for optimal use of redis memory
if len(q) > int(environ.get("QUEUE_SIZE", 100000)):
interval = int(environ.get("QUEUE_INTERVAL", 300))
sleep(interval)

job_datas = []
for chunk_coord in _coords:
job_datas.append(
Queue.prepare_data(
create_parent_chunk,
args=(parent_layer, chunk_coord),
result_ttl=0,
job_id=chunk_id_str(parent_layer, chunk_coord),
timeout=f"{int(parent_layer * parent_layer)}m",
)
)
q.enqueue_many(job_datas)


@ingest_cli.command("status")
Expand Down

0 comments on commit cfc8505

Please sign in to comment.