Skip to content

Commit f7c565a

Browse files
Joe Hammanmrocklin
Joe Hamman
authored andcommitted
Track pending, running, and failed jobs (#63)
This tracks the pending, running, and failed jobs on the cluster. It does this by using a naming convention for the dask worker name, and by installing a SchedulerPlugin to watch for when these names come in. This helps to improve the adaptive experience with dask-jobqueue
1 parent 4816a25 commit f7c565a

File tree

17 files changed

+271
-126
lines changed

17 files changed

+271
-126
lines changed

ci/none.sh

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
#!/usr/bin/env bash
22

3-
set -x
4-
53
function jobqueue_before_install {
64
# Install miniconda
75
./ci/conda_setup.sh

ci/pbs.sh

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
#!/usr/bin/env bash
22

3-
set -x
4-
53
function jobqueue_before_install {
64
docker version
75
docker-compose version
@@ -17,7 +15,7 @@ function jobqueue_before_install {
1715
}
1816

1917
function jobqueue_install {
20-
docker exec -it pbs_master /bin/bash -c "cd /dask-jobqueue; python setup.py install"
18+
docker exec -it pbs_master /bin/bash -c "cd /dask-jobqueue; pip install -e ."
2119
}
2220

2321
function jobqueue_script {

ci/sge.sh

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
#!/usr/bin/env bash
22

3-
set -x
4-
53
function jobqueue_before_install {
64
docker version
75
docker-compose version
@@ -16,7 +14,7 @@ function jobqueue_before_install {
1614
}
1715

1816
function jobqueue_install {
19-
docker exec -it sge_master /bin/bash -c "cd /dask-jobqueue; python setup.py install"
17+
docker exec -it sge_master /bin/bash -c "cd /dask-jobqueue; pip install -e ."
2018
}
2119

2220
function jobqueue_script {

ci/slurm.sh

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
#!/usr/bin/env bash
22

3-
set -x
4-
53
function jobqueue_before_install {
64
docker version
75
docker-compose version
@@ -16,7 +14,7 @@ function jobqueue_before_install {
1614
}
1715

1816
function jobqueue_install {
19-
docker exec -it slurmctld /bin/bash -c "cd /dask-jobqueue; python setup.py install"
17+
docker exec -it slurmctld /bin/bash -c "cd /dask-jobqueue; pip install -e ."
2018
}
2119

2220
function jobqueue_script {

dask_jobqueue/config.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
from __future__ import print_function, division, absolute_import
1+
from __future__ import absolute_import, division, print_function
22

33
import os
44

55
import dask
66
import yaml
77

8-
98
fn = os.path.join(os.path.dirname(__file__), 'jobqueue.yaml')
109
dask.config.ensure_file(source=fn)
1110

dask_jobqueue/core.py

Lines changed: 133 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
1-
from contextlib import contextmanager
1+
from __future__ import absolute_import, division, print_function
2+
23
import logging
4+
import math
35
import shlex
46
import socket
57
import subprocess
68
import sys
79
import warnings
10+
from collections import OrderedDict
11+
from contextlib import contextmanager
812

913
import dask
1014
import docrep
1115
from distributed import LocalCluster
1216
from distributed.deploy import Cluster
13-
from distributed.utils import (get_ip_interface, ignoring, parse_bytes, tmpfile,
14-
format_bytes)
17+
from distributed.diagnostics.plugin import SchedulerPlugin
18+
from distributed.utils import (
19+
format_bytes, get_ip_interface, parse_bytes, tmpfile)
1520

1621
logger = logging.getLogger(__name__)
1722
docstrings = docrep.DocstringProcessor()
@@ -28,6 +33,54 @@
2833
""".strip()
2934

3035

36+
def _job_id_from_worker_name(name):
37+
''' utility to parse the job ID from the worker name
38+
39+
template: 'prefix--jobid--suffix'
40+
'''
41+
_, job_id, _ = name.split('--')
42+
return job_id
43+
44+
45+
class JobQueuePlugin(SchedulerPlugin):
46+
def __init__(self):
47+
self.pending_jobs = OrderedDict()
48+
self.running_jobs = OrderedDict()
49+
self.finished_jobs = OrderedDict()
50+
self.all_workers = {}
51+
52+
def add_worker(self, scheduler, worker=None, name=None, **kwargs):
53+
''' Run when a new worker enters the cluster'''
54+
logger.debug("adding worker %s" % worker)
55+
w = scheduler.workers[worker]
56+
job_id = _job_id_from_worker_name(w.name)
57+
logger.debug("job id for new worker: %s" % job_id)
58+
self.all_workers[worker] = (w.name, job_id)
59+
60+
# if this is the first worker for this job, move job to running
61+
if job_id not in self.running_jobs:
62+
logger.debug("this is a new job")
63+
self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
64+
65+
# add worker to dict of workers in this job
66+
self.running_jobs[job_id][w.name] = w
67+
68+
def remove_worker(self, scheduler=None, worker=None, **kwargs):
69+
''' Run when a worker leaves the cluster'''
70+
logger.debug("removing worker %s" % worker)
71+
name, job_id = self.all_workers[worker]
72+
logger.debug("removing worker name (%s) and"
73+
"job_id (%s)" % (name, job_id))
74+
75+
# remove worker from this job
76+
del self.running_jobs[job_id][name]
77+
78+
# once there are no more workers, move this job to finished_jobs
79+
if not self.running_jobs[job_id]:
80+
logger.debug("that was the last worker for job %s" % job_id)
81+
self.finished_jobs[job_id] = self.running_jobs.pop(job_id)
82+
83+
3184
@docstrings.get_sectionsf('JobQueueCluster')
3285
class JobQueueCluster(Cluster):
3386
""" Base class to launch Dask Clusters for Job queues
@@ -87,6 +140,8 @@ class JobQueueCluster(Cluster):
87140
submit_command = None
88141
cancel_command = None
89142
scheduler_name = ''
143+
_adaptive_options = {
144+
'worker_key': lambda ws: _job_id_from_worker_name(ws.name)}
90145

91146
def __init__(self,
92147
name=None,
@@ -155,15 +210,17 @@ def __init__(self,
155210

156211
self.local_cluster = LocalCluster(n_workers=0, ip=host, **kwargs)
157212

158-
# Keep information on process, cores, and memory, for use in subclasses
159-
self.worker_memory = parse_bytes(memory)
160-
213+
# Keep information on process, threads and memory, for use in
214+
# subclasses
215+
self.worker_memory = parse_bytes(memory) if memory is not None else None
161216
self.worker_processes = processes
162217
self.worker_cores = cores
163218
self.name = name
164219

165-
self.jobs = dict()
166-
self.n = 0
220+
# plugin for tracking job status
221+
self._scheduler_plugin = JobQueuePlugin()
222+
self.local_cluster.scheduler.add_plugin(self._scheduler_plugin)
223+
167224
self._adaptive = None
168225

169226
self._env_header = '\n'.join(env_extra)
@@ -179,47 +236,60 @@ def __init__(self,
179236
mem = format_bytes(self.worker_memory / self.worker_processes)
180237
mem = mem.replace(' ', '')
181238
self._command_template += " --memory-limit %s" % mem
239+
self._command_template += " --name %s--${JOB_ID}--" % name
182240

183-
if name is not None:
184-
self._command_template += " --name %s" % name
185-
self._command_template += "-%(n)d" # Keep %(n) to be replaced later
186241
if death_timeout is not None:
187242
self._command_template += " --death-timeout %s" % death_timeout
188243
if local_directory is not None:
189244
self._command_template += " --local-directory %s" % local_directory
190245
if extra is not None:
191246
self._command_template += extra
192247

248+
@property
249+
def pending_jobs(self):
250+
""" Jobs pending in the queue """
251+
return self._scheduler_plugin.pending_jobs
252+
253+
@property
254+
def running_jobs(self):
255+
""" Jobs with currenly active workers """
256+
return self._scheduler_plugin.running_jobs
257+
258+
@property
259+
def finished_jobs(self):
260+
""" Jobs that have finished """
261+
return self._scheduler_plugin.finished_jobs
262+
193263
@property
194264
def worker_threads(self):
195265
return int(self.worker_cores / self.worker_processes)
196266

197267
def job_script(self):
198268
""" Construct a job submission script """
199-
self.n += 1
200-
template = self._command_template % {'n': self.n}
201-
return self._script_template % {'job_header': self.job_header,
202-
'env_header': self._env_header,
203-
'worker_command': template}
269+
pieces = {'job_header': self.job_header,
270+
'env_header': self._env_header,
271+
'worker_command': self._command_template}
272+
return self._script_template % pieces
204273

205274
@contextmanager
206275
def job_file(self):
207276
""" Write job submission script to temporary file """
208277
with tmpfile(extension='sh') as fn:
209278
with open(fn, 'w') as f:
279+
logger.debug("writing job script: \n%s" % self.job_script())
210280
f.write(self.job_script())
211281
yield fn
212282

213283
def start_workers(self, n=1):
214284
""" Start workers and point them to our local scheduler """
215-
workers = []
216-
for _ in range(n):
285+
logger.debug('starting %s workers' % n)
286+
num_jobs = math.ceil(n / self.worker_processes)
287+
for _ in range(num_jobs):
217288
with self.job_file() as fn:
218289
out = self._call(shlex.split(self.submit_command) + [fn])
219290
job = self._job_id_from_submit_output(out.decode())
220-
self.jobs[self.n] = job
221-
workers.append(self.n)
222-
return workers
291+
logger.debug("started job: %s" % job)
292+
self.pending_jobs[job] = {}
223293

224294
@property
225295
def scheduler(self):
@@ -248,12 +318,12 @@ def _calls(self, cmds):
248318
Also logs any stderr information
249319
"""
250320
logger.debug("Submitting the following calls to command line")
321+
procs = []
251322
for cmd in cmds:
252323
logger.debug(' '.join(cmd))
253-
procs = [subprocess.Popen(cmd,
254-
stdout=subprocess.PIPE,
255-
stderr=subprocess.PIPE)
256-
for cmd in cmds]
324+
procs.append(subprocess.Popen(cmd,
325+
stdout=subprocess.PIPE,
326+
stderr=subprocess.PIPE))
257327

258328
result = []
259329
for proc in procs:
@@ -269,33 +339,60 @@ def _call(self, cmd):
269339

270340
def stop_workers(self, workers):
271341
""" Stop a list of workers"""
342+
logger.debug("Stopping workers: %s" % workers)
272343
if not workers:
273344
return
274-
workers = list(map(int, workers))
275-
jobs = [self.jobs[w] for w in workers]
276-
self._call([self.cancel_command] + list(jobs))
345+
jobs = self._stop_pending_jobs() # stop pending jobs too
277346
for w in workers:
278-
with ignoring(KeyError):
279-
del self.jobs[w]
347+
if isinstance(w, dict):
348+
jobs.append(_job_id_from_worker_name(w['name']))
349+
else:
350+
jobs.append(_job_id_from_worker_name(w.name))
351+
self.stop_jobs(set(jobs))
352+
353+
def stop_jobs(self, jobs):
354+
""" Stop a list of jobs"""
355+
logger.debug("Stopping jobs: %s" % jobs)
356+
if jobs:
357+
jobs = list(jobs)
358+
self._call([self.cancel_command] + list(set(jobs)))
280359

281360
def scale_up(self, n, **kwargs):
282361
""" Brings total worker count up to ``n`` """
283-
return self.start_workers(n - len(self.jobs))
362+
logger.debug("Scaling up to %d workers." % n)
363+
active_and_pending = sum([len(j) for j in self.running_jobs.values()])
364+
active_and_pending += self.worker_processes * len(self.pending_jobs)
365+
logger.debug("Found %d active/pending workers." % active_and_pending)
366+
self.start_workers(n - active_and_pending)
284367

285368
def scale_down(self, workers):
286369
''' Close the workers with the given addresses '''
287-
if isinstance(workers, dict):
288-
names = {v['name'] for v in workers.values()}
289-
job_ids = {name.split('-')[-2] for name in names}
290-
self.stop_workers(job_ids)
370+
logger.debug("Scaling down. Workers: %s" % workers)
371+
worker_states = []
372+
for w in workers:
373+
try:
374+
# Get the actual WorkerState
375+
worker_states.append(self.scheduler.workers[w])
376+
except KeyError:
377+
logger.debug('worker %s is already gone' % w)
378+
self.stop_workers(worker_states)
291379

292380
def __enter__(self):
293381
return self
294382

295383
def __exit__(self, type, value, traceback):
296-
self.stop_workers(self.jobs)
384+
jobs = self._stop_pending_jobs()
385+
jobs += list(self.running_jobs.keys())
386+
self.stop_jobs(set(jobs))
297387
self.local_cluster.__exit__(type, value, traceback)
298388

389+
def _stop_pending_jobs(self):
390+
jobs = list(self.pending_jobs.keys())
391+
logger.debug("Stopping pending jobs %s" % jobs)
392+
for job_id in jobs:
393+
del self.pending_jobs[job_id]
394+
return jobs
395+
299396
def _job_id_from_submit_output(self, out):
300397
raise NotImplementedError('_job_id_from_submit_output must be '
301398
'implemented when JobQueueCluster is '

dask_jobqueue/moab.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class MoabCluster(PBSCluster):
3030
memory='16G', resource_spec='96G',
3131
job_extra=['-d /home/First.Last', '-M none'],
3232
local_directory=os.getenv('TMPDIR', '/tmp'))
33-
>>> cluster.start_workers(10) # this may take a few seconds to launch
33+
>>> cluster.start_workers(10) # submit enough jobs to deploy 10 workers
3434
3535
>>> from dask.distributed import Client
3636
>>> client = Client(cluster)

dask_jobqueue/pbs.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,10 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,
7575
# Instantiate args and parameters from parent abstract class
7676
super(PBSCluster, self).__init__(**kwargs)
7777

78-
header_lines = []
78+
# Try to find a project name from environment variable
79+
project = project or os.environ.get('PBS_ACCOUNT')
80+
81+
header_lines = ['#!/usr/bin/env bash']
7982
# PBS header build
8083
if self.name is not None:
8184
header_lines.append('#PBS -N %s' % self.name)
@@ -95,6 +98,7 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,
9598
if walltime is not None:
9699
header_lines.append('#PBS -l walltime=%s' % walltime)
97100
header_lines.extend(['#PBS %s' % arg for arg in job_extra])
101+
header_lines.append('JOB_ID=${PBS_JOBID%.*}')
98102

99103
# Declare class attribute that shall be overriden
100104
self.job_header = '\n'.join(header_lines)

dask_jobqueue/sge.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import absolute_import, division, print_function
2+
13
import logging
24

35
import dask
@@ -56,8 +58,7 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,
5658

5759
super(SGECluster, self).__init__(**kwargs)
5860

59-
header_lines = ['#!/bin/bash']
60-
61+
header_lines = ['#!/usr/bin/env bash']
6162
if self.name is not None:
6263
header_lines.append('#$ -N %(name)s')
6364
if queue is not None:

0 commit comments

Comments
 (0)