Skip to content

Commit

Permalink
fix: check for completed chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshh committed Sep 1, 2024
1 parent c48dde3 commit f0c1496
Showing 1 changed file with 40 additions and 12 deletions.
52 changes: 40 additions & 12 deletions pychunkedgraph/ingest/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@
cli for running ingest
"""

from os import environ
from time import sleep

import click
import yaml
from flask.cli import AppGroup
from rq import Queue

from .manager import IngestionManager
from .utils import bootstrap
from .cluster import randomize_grid_points
from ..graph.chunkedgraph import ChunkedGraph
from ..utils.redis import get_redis_connection
from ..utils.redis import keys as r_keys
from ..utils.general import chunked

ingest_cli = AppGroup("ingest")

Expand Down Expand Up @@ -96,18 +102,40 @@ def queue_layer(parent_layer):
chunk_coords = [(0, 0, 0)]
else:
bounds = imanager.cg_meta.layer_chunk_bounds[parent_layer]
chunk_coords = list(product(*[range(r) for r in bounds]))
np.random.shuffle(chunk_coords)

for coords in chunk_coords:
task_q = imanager.get_task_queue(f"l{parent_layer}")
task_q.enqueue(
create_parent_chunk,
job_id=chunk_id_str(parent_layer, coords),
job_timeout=f"{int(parent_layer * parent_layer)}m",
result_ttl=0,
args=(parent_layer, coords),
)
chunk_coords = randomize_grid_points(*bounds)

def get_chunks_not_done(coords: list) -> list:
"""check for set membership in redis in batches"""
coords_strs = ["_".join(map(str, coord)) for coord in coords]
try:
completed = imanager.redis.smismember(f"{parent_layer}c", coords_strs)
except Exception:
return coords
return [coord for coord, c in zip(coords, completed) if not c]

batch_size = int(environ.get("JOB_BATCH_SIZE", 10000))
batches = chunked(chunk_coords, batch_size)
q = imanager.get_task_queue(f"l{parent_layer}")

for batch in batches:
_coords = get_chunks_not_done(batch)
# buffer for optimal use of redis memory
if len(q) > int(environ.get("QUEUE_SIZE", 100000)):
interval = int(environ.get("QUEUE_INTERVAL", 300))
sleep(interval)

job_datas = []
for chunk_coord in _coords:
job_datas.append(
Queue.prepare_data(
create_parent_chunk,
args=(parent_layer, chunk_coord),
result_ttl=0,
job_id=chunk_id_str(parent_layer, chunk_coord),
timeout=f"{int(parent_layer * parent_layer)}m",
)
)
q.enqueue_many(job_datas)


@ingest_cli.command("status")
Expand Down

0 comments on commit f0c1496

Please sign in to comment.