Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cached cv info for distributed workers #506

Merged
merged 4 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion pychunkedgraph/graph/meta.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from datetime import timedelta
from typing import Dict
from typing import List
Expand All @@ -10,6 +11,9 @@

from .utils.generic import compute_bitmasks
from .chunks.utils import get_chunks_boundary
from ..utils.redis import keys as r_keys
from ..utils.redis import get_rq_queue
from ..utils.redis import get_redis_connection


_datasource_fields = ("EDGES", "COMPONENTS", "WATERSHED", "DATA_VERSION", "CV_MIP")
Expand Down Expand Up @@ -80,7 +84,20 @@ def custom_data(self):
def ws_cv(self):
if self._ws_cv:
return self._ws_cv
self._ws_cv = CloudVolume(self._data_source.WATERSHED)

try:
# try reading a cached info file for distributed workers
# useful to avoid md5 errors on high gcs load
cache_key = f"{self.graph_config.ID}:ws_cv_info_cached"
redis = get_redis_connection()
cached_info = json.loads(redis.get(cache_key))
self._ws_cv = CloudVolume(self._data_source.WATERSHED, info=cached_info)
except Exception:
akhileshh marked this conversation as resolved.
Show resolved Hide resolved
self._ws_cv = CloudVolume(self._data_source.WATERSHED)
try:
redis.set(cache_key, json.dumps(self._ws_cv.info))
except Exception:
...
return self._ws_cv

@property
Expand Down
52 changes: 40 additions & 12 deletions pychunkedgraph/ingest/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@
cli for running ingest
"""

from os import environ
from time import sleep

import click
import yaml
from flask.cli import AppGroup
from rq import Queue

from .manager import IngestionManager
from .utils import bootstrap
from .cluster import randomize_grid_points
from ..graph.chunkedgraph import ChunkedGraph
from ..utils.redis import get_redis_connection
from ..utils.redis import keys as r_keys
from ..utils.general import chunked

ingest_cli = AppGroup("ingest")

Expand Down Expand Up @@ -96,18 +102,40 @@ def queue_layer(parent_layer):
chunk_coords = [(0, 0, 0)]
else:
bounds = imanager.cg_meta.layer_chunk_bounds[parent_layer]
chunk_coords = list(product(*[range(r) for r in bounds]))
np.random.shuffle(chunk_coords)

for coords in chunk_coords:
task_q = imanager.get_task_queue(f"l{parent_layer}")
task_q.enqueue(
create_parent_chunk,
job_id=chunk_id_str(parent_layer, coords),
job_timeout=f"{int(parent_layer * parent_layer)}m",
result_ttl=0,
args=(parent_layer, coords),
)
chunk_coords = randomize_grid_points(*bounds)

def get_chunks_not_done(coords: list) -> list:
"""check for set membership in redis in batches"""
coords_strs = ["_".join(map(str, coord)) for coord in coords]
try:
completed = imanager.redis.smismember(f"{parent_layer}c", coords_strs)
except Exception:
return coords
return [coord for coord, c in zip(coords, completed) if not c]

batch_size = int(environ.get("JOB_BATCH_SIZE", 10000))
batches = chunked(chunk_coords, batch_size)
q = imanager.get_task_queue(f"l{parent_layer}")

for batch in batches:
_coords = get_chunks_not_done(batch)
# buffer for optimal use of redis memory
if len(q) > int(environ.get("QUEUE_SIZE", 100000)):
interval = int(environ.get("QUEUE_INTERVAL", 300))
sleep(interval)

job_datas = []
for chunk_coord in _coords:
job_datas.append(
Queue.prepare_data(
create_parent_chunk,
args=(parent_layer, chunk_coord),
result_ttl=0,
job_id=chunk_id_str(parent_layer, chunk_coord),
timeout=f"{int(parent_layer * parent_layer)}m",
)
)
q.enqueue_many(job_datas)


@ingest_cli.command("status")
Expand Down
Loading