diff --git a/executor/app/main.py b/executor/app/main.py index e0dce23..508ee62 100644 --- a/executor/app/main.py +++ b/executor/app/main.py @@ -231,7 +231,7 @@ def process(message: Message, compute: bool): class Executor(metaclass=LoggableMeta): _LOG = logging.getLogger("geokube.Executor") - def __init__(self, broker, store_path): + def __init__(self, broker, store_path, dask_cluster_opts): self._store = store_path broker_conn = pika.BlockingConnection( pika.ConnectionParameters(host=broker, heartbeat=10), @@ -239,19 +239,12 @@ def __init__(self, broker, store_path): self._conn = broker_conn self._channel = broker_conn.channel() self._db = DBManager() + self.dask_cluster_opts = dask_cluster_opts def create_dask_cluster(self, dask_cluster_opts: dict = None): if dask_cluster_opts is None: - dask_cluster_opts = {} - dask_cluster_opts["scheduler_port"] = int( - os.getenv("DASK_SCHEDULER_PORT", 8188) - ) - dask_cluster_opts["processes"] = True - port = int(os.getenv("DASK_DASHBOARD_PORT", 8787)) - dask_cluster_opts["dashboard_address"] = f":{port}" - dask_cluster_opts["n_workers"] = 1 - dask_cluster_opts["memory_limit"] = "auto" - dask_cluster_opts['thread_per_worker'] = 8 + dask_cluster_opts = self.dask_cluster_opts + self._worker_id = self._db.create_worker( status="enabled", dask_scheduler_port=dask_cluster_opts["scheduler_port"], @@ -264,13 +257,13 @@ def create_dask_cluster(self, dask_cluster_opts: dict = None): ) dask_cluster = LocalCluster( n_workers=dask_cluster_opts['n_workers'], - #scheduler_port=dask_cluster_opts["scheduler_port"], - #dashboard_address=dask_cluster_opts["dashboard_address"], - #memory_limit=dask_cluster_opts["memory_limit"], + scheduler_port=dask_cluster_opts["scheduler_port"], + dashboard_address=dask_cluster_opts["dashboard_address"], + memory_limit=dask_cluster_opts["memory_limit"], threads_per_worker=dask_cluster_opts['thread_per_worker'], ) self._LOG.info( - "not creating Dask Client...", extra={"track_id": self._worker_id} + "creating Dask Client...", extra={"track_id": self._worker_id} ) self._dask_client = Client(dask_cluster) self._nanny = Nanny(self._dask_client.cluster.scheduler.address) @@ -452,11 +445,22 @@ def get_size(self, location_path): executor_types = os.getenv("EXECUTOR_TYPES", "query").split(",") store_path = os.getenv("STORE_PATH", ".") - executor = Executor(broker=broker, store_path=store_path) + dask_cluster_opts = {} + dask_cluster_opts["scheduler_port"] = int( + os.getenv("DASK_SCHEDULER_PORT", 8188) + ) + dask_cluster_opts["processes"] = True + port = int(os.getenv("DASK_DASHBOARD_PORT", 8787)) + dask_cluster_opts["dashboard_address"] = f":{port}" + dask_cluster_opts["n_workers"] = os.getenv("DASK_N_WORKERS", 1) + dask_cluster_opts["memory_limit"] = os.getenv("DASK_MEMORY_LIMIT", "auto") + dask_cluster_opts['thread_per_worker'] = os.getenv("DASK_THREADS_PER_WORKER", 8) + + + executor = Executor(broker=broker, store_path=store_path, dask_cluster_opts=dask_cluster_opts) print("channel subscribe") for etype in executor_types: if etype == "query": - #TODO: create dask cluster with options executor.create_dask_cluster() executor.subscribe(etype)