Skip to content
This repository was archived by the owner on Jan 25, 2018. It is now read-only.

Commit f45bc8c

Browse files
author
Lee Kamentsky
committed
Really nice BatchProfiler and it seems to work
1 parent 9c630bd commit f45bc8c

8 files changed

+249
-97
lines changed

Diff for: KillJobs.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,10 @@
2929
batch_id = BATCHPROFILER_VARIABLES[BATCH_ID]
3030
if job_id is not None:
3131
job = RunBatch.BPJob.select(job_id)
32-
if job is not None:
33-
run = RunBatch.BPRun.select(job.run_id)
34-
RunBatch.kill_one(run)
35-
else:
32+
if job is None:
3633
bputilities.kill_job(job_id)
34+
else:
35+
RunBatch.kill_job(job)
3736
print"""
3837
<html><head><title>Job %(job_id)d killed</title></head>
3938
<body>Job %(job_id)d killed

Diff for: RunBatch.py

+106-22
Original file line numberDiff line numberDiff line change
@@ -199,16 +199,30 @@ def select_runs(self):
199199
result.append(BPRun(self.batch_id, run_id, bstart, bend, command))
200200
return result
201201

202-
def select_jobs(self, by_status=None, by_run=None):
202+
def select_job_count(self):
203+
'''Return the # of jobs with links to the batch through the run tbl'''
204+
cmd = """select count('x') from run r where r.batch_id = %d
205+
and exists (select 'x' from job_status js
206+
where js.run_id = r.run_id)""" % self.batch_id
207+
with bpcursor() as cursor:
208+
cursor.execute(cmd)
209+
return cursor.fetchone()[0]
210+
211+
def select_jobs(self, by_status=None, by_run=None, page_size = None,
212+
first_item = None):
203213
'''Get jobs with one of the given statuses
204214
205-
args - the statuses to fetch
215+
by_status - a sequence of status names to search for (default = all)
216+
by_run - a run ID to search for (Default = all runs
217+
page_size - return at most this many items (default all)
218+
first_item - the one-based index of the first item on the page
219+
(default first)
206220
207221
returns a sequence of run, job, status tuples
208222
'''
209223
cmd = """
210224
select rjs.run_id, rjs.bstart, rjs.bend, rjs.command, rjs.job_id,
211-
js.status
225+
js.status, @rownum:=@rownum+1 as rank
212226
from (select r.run_id as run_id, r.bstart as bstart, r.bend as bend,
213227
r.command as command, js.job_id as job_id,
214228
max(js.created) as js_created, j.created as j_created
@@ -222,6 +236,7 @@ def select_jobs(self, by_status=None, by_run=None):
222236
join job_status js
223237
on rjs.run_id = js.run_id and rjs.job_id = js.job_id
224238
and rjs.js_created = js.created
239+
join (select @rownum:=0) as ranktbl
225240
""" % self.batch_id
226241
clauses = []
227242
if by_status is not None:
@@ -230,10 +245,13 @@ def select_jobs(self, by_status=None, by_run=None):
230245
clauses.append("rjs.run_id = %d" % by_run)
231246
if len(clauses) > 0:
232247
cmd += " where " + " and ".join(clauses)
248+
if first_item is not None and page_size is not None:
249+
cmd = "select * from (%s) cmd where cmd.rank between %d and %d" % (
250+
cmd, first_item, first_item + page_size - 1)
233251
with bpcursor() as cursor:
234252
cursor.execute(cmd)
235253
result = []
236-
for run_id, bstart, bend, command, job_id, status in cursor:
254+
for run_id, bstart, bend, command, job_id, status, rank in cursor:
237255
run = BPRun(self.batch_id, run_id, bstart, bend, command)
238256
job = BPJob(run_id, job_id)
239257
result.append((run, job, status))
@@ -263,26 +281,42 @@ def get_job_name(self):
263281
def get_file_name(self):
264282
raise NotImplemented("Use BPRun or BPSQLRun")
265283

284+
@staticmethod
285+
def select(run_id):
286+
'''Select a BPRun or BPSQLRun given a run_id
287+
288+
'''
289+
with bpcursor() as cursor:
290+
cmd = """select rb.run_type
291+
from run_base rb where run_id=%d""" % run_id
292+
cursor.execute(cmd)
293+
run_type = cursor.fetchone()[0]
294+
if run_type == RT_SQL:
295+
return BPSQLRun.select_by_run_id(run_id)
296+
return BPRun.select(run_id)
297+
266298
def select_jobs(self, by_status = None):
267299
cmd = """
268-
select rjs.job_id, js.status
269-
from (select js.job_id as job_id, max(js.created) as created
270-
from job_status js
271-
where js.run_id = %d
272-
group by job_id) js1
273-
join job_status js1
274-
on js1.run_id = js2.run_id and js1.job_id = js2.job_id
275-
and js1.created = js2.created
276-
""" % self.run_id
300+
select js.job_id, js.status
301+
from job_status js
302+
join job j on js.job_id = j.job_id and js.run_id = j.run_id
303+
where js.created in
304+
(select max(js2.created) from job_status js2
305+
where js2.run_id = %d
306+
group by js2.job_id)
307+
and j.created in
308+
(select max(j2.created) from job j2 where j2.run_id = %d)
309+
and j.run_id = %d
310+
""" % (self.run_id, self.run_id, self.run_id)
277311
clauses = []
278312
if by_status is not None:
279-
cmd += " where status in ( '%s' )" % ("','".join(args))
313+
cmd += " and status in ( '%s' )" % ("','".join(args))
280314
with bpcursor() as cursor:
281315
cursor.execute(cmd)
282316
result = []
283317
for job_id, status in cursor:
284-
job = BPJob(run_id, job_id)
285-
result.append(job, status)
318+
job = BPJob(self.run_id, job_id)
319+
result.append((job, status))
286320
return result
287321

288322
class BPRun(BPRunBase):
@@ -343,6 +377,8 @@ def select_by_sql_filename(batch, sql_filename):
343377
where rs.sql_filename = %s
344378
and rb.run_type = 'SQL'
345379
and rb.batch_id = %s""", [sql_filename, batch.batch_id])
380+
if cursor.rowcount == 0:
381+
return None
346382
run_id, command = cursor.fetchone()
347383
return BPSQLRun(batch.batch_id, int(run_id), sql_filename, command)
348384

@@ -422,7 +458,7 @@ def run_one(my_batch, run, cwd = None):
422458
cwd - the working directory for the command. Defaults to my_batch.cpcluster
423459
'''
424460
assert isinstance(my_batch, BPBatch)
425-
assert isinstance(run, BPRun)
461+
assert isinstance(run, BPRunBase)
426462
txt_output = text_file_directory(my_batch)
427463
if not os.path.exists(txt_output):
428464
os.mkdir(txt_output)
@@ -438,6 +474,15 @@ def run_one(my_batch, run, cwd = None):
438474
script = """#!/bin/sh
439475
export RUN_ID=%d
440476
""" % run.run_id
477+
#
478+
# A work-around if HOME has been defined differently on the host
479+
#
480+
script += """
481+
if [ ! -z "$SGE_O_HOME" ]; then
482+
export HOME="$SGE_O_HOME"
483+
echo "Set home to $HOME"
484+
fi
485+
"""
441486
#
442487
# This is a REST PUT to JobStatus.py to create the job record
443488
#
@@ -456,9 +501,14 @@ def run_one(my_batch, run, cwd = None):
456501
if run.source_cpenv:
457502
script += '. %s\n' % os.path.join(PREFIX, "bin", "cpenv.sh")
458503
#
504+
# set +e allows the command to error-out without ending this script.
505+
# This lets us capture the error status.
506+
#
507+
script += "set +e\n"
508+
#
459509
# Run CellProfiler
460510
#
461-
script += run.command
511+
script += run.command +"\n"
462512
#
463513
# Figure out the status from the error code
464514
#
@@ -467,6 +517,10 @@ def run_one(my_batch, run, cwd = None):
467517
script += "else\n JOB_STATUS=%s\n " % JS_ERROR
468518
script += "fi\n"
469519
#
520+
# Go back to erroring-out
521+
#
522+
script += "set -e\n"
523+
#
470524
# Set the status based on the result from CellProfiler
471525
# Use CURL again
472526
#
@@ -506,13 +560,20 @@ def cellprofiler_command(my_batch, bstart, bend):
506560
def kill_one(run):
507561
batch = BPBatch()
508562
batch.select(run.batch_id)
509-
jobs = batch.select_jobs(by_status = [JS_RUNNING], by_run=run.run_id)
510-
bputilities.kill_jobs([job.job_id for job in jobs])
563+
jobs = batch.select_jobs(by_status = [JS_SUBMITTED, JS_RUNNING],
564+
by_run=run.run_id)
565+
bputilities.kill_jobs([job.job_id for run, job, status in jobs])
566+
for job in jobs:
567+
job.update_status(JS_ABORTED)
568+
569+
def kill_job(job):
570+
bputilities.kill_jobs([job.job_id])
571+
job.update_status(JS_ABORTED)
511572

512573
def kill_batch(batch_id):
513574
batch = BPBatch()
514575
batch.select(batch_id)
515-
jobs = batch.select_jobs(by_status = [JS_RUNNING])
576+
jobs = batch.select_jobs(by_status = [JS_SUBMITTED, JS_RUNNING])
516577
bputilities.kill_jobs([job.job_id for run, job, status in jobs])
517578
for run, job, status in jobs:
518579
job.update_status(JS_ABORTED)
@@ -541,6 +602,29 @@ def text_file_directory(batch):
541602
def script_file_directory(batch):
542603
return os.path.join(batch.data_dir, "job_scripts")
543604

605+
def batch_script_file(script_file):
606+
'''The name of the SQL script file modded to pull in all of the .CSV files
607+
608+
script_file - the name of the original file
609+
'''
610+
return "batch_%s" % script_file
611+
612+
def batch_script_directory(batch):
613+
'''The directory housing the modded SQL files
614+
615+
batch - batch in question
616+
script_file - the name of the original file
617+
618+
Note: this can't be in batch.data_dir because
619+
it would be automagically scanned and
620+
picked up by sql_jobs
621+
'''
622+
return os.path.join(batch.data_dir, "sql_scripts")
623+
624+
def batch_script_path(batch, script_file):
625+
return os.path.join(batch_script_directory(batch),
626+
batch_script_file(script_file))
627+
544628
def script_file_path(batch, run):
545629
return os.path.join(script_file_directory(batch),
546630
"run_%s.sh" % run.get_file_name())
@@ -577,7 +661,7 @@ def GetCPUTime(batch, run):
577661
run - the job's last run
578662
'''
579663
assert isinstance(batch, BPBatch)
580-
assert isinstance(run, BPRun)
664+
assert isinstance(run, BPRunBase)
581665
with bpcursor() as cursor:
582666
cmd = """
583667
select unix_timestamp(js2.created)-unix_timestamp(js1.created) as cputime

Diff for: UploadToDatabase.py

100644100755
+24-13
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#!/usr/bin/env /imaging/analysis/People/imageweb/batchprofiler/cgi-bin/python-2.6.sh
1+
#!/usr/bin/env ./batchprofiler.sh
22
"""
33
CellProfiler is distributed under the GNU General Public License.
44
See the accompanying file LICENSE for details.
@@ -17,6 +17,7 @@
1717
import cgitb
1818
cgitb.enable()
1919
import RunBatch
20+
from bpformdata import *
2021
import cgi
2122
import re
2223
import os
@@ -25,11 +26,16 @@
2526

2627
import sql_jobs
2728

28-
form = cgi.FieldStorage()
29-
batch_id = int(form["batch_id"].value)
30-
sql_script = form["sql_script"].value
31-
output_file = form["output_file"].value
32-
queue = (form.has_key("queue") and form["queue"].value) or None
29+
#
30+
# TODO: Move the logic that collects the load statements into sql_jobs
31+
#
32+
# TODO: use yattag to build the HTML
33+
#
34+
35+
batch_id = BATCHPROFILER_VARIABLES[BATCH_ID]
36+
sql_script = BATCHPROFILER_VARIABLES[SQL_SCRIPT]
37+
output_file = BATCHPROFILER_VARIABLES[OUTPUT_FILE]
38+
queue = BATCHPROFILER_VARIABLES[QUEUE]
3339
my_batch = RunBatch.BPBatch()
3440
my_batch.select(batch_id)
3541

@@ -70,19 +76,24 @@
7076
match.groups(1)[1] == 'Object'):
7177
object_files.append(file_name)
7278

73-
batch_script = my_batch.batch_id+os.sep+"batch_"+sql_script
74-
batch_script_path = os.path.join(my_batch.data_dir, batch_script)
75-
sql_script_file = open(batch_script_path,"w")
79+
batch_script_file = RunBatch.batch_script_file(sql_script)
80+
batch_script_dir = RunBatch.batch_script_directory(my_batch)
81+
if not os.path.isdir(batch_script_dir):
82+
os.makedirs(batch_script_dir)
83+
batch_script_path = RunBatch.batch_script_path(my_batch, sql_script)
84+
sql_script_file = open(batch_script_path, "w")
7685
try:
7786
sql_script_file.writelines(table_lines)
7887
for file_name in image_files:
88+
path_name = os.path.join(my_batch.data_dir, file_name)
7989
sql_script_file.write("""SELECT 'Loading %(file_name)s into %(image_table)s';"""%(globals()))
80-
sql_script_file.write("""LOAD DATA LOCAL INFILE '%(file_name)s' REPLACE INTO TABLE %(image_table)s FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"';
90+
sql_script_file.write("""LOAD DATA LOCAL INFILE '%(path_name)s' REPLACE INTO TABLE %(image_table)s FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"';
8191
SHOW WARNINGS;
8292
"""%(globals()))
8393
for file_name in object_files:
94+
path_name = os.path.join(my_batch.data_dir, file_name)
8495
sql_script_file.write("""SELECT 'Loading %(file_name)s into %(object_table)s';"""%(globals()))
85-
sql_script_file.write("""LOAD DATA LOCAL INFILE '%(file_name)s' REPLACE INTO TABLE %(object_table)s FIELDS TERMINATED BY ',';
96+
sql_script_file.write("""LOAD DATA LOCAL INFILE '%(path_name)s' REPLACE INTO TABLE %(object_table)s FIELDS TERMINATED BY ',';
8697
SHOW WARNINGS;
8798
"""%(globals()))
8899
finally:
@@ -120,8 +131,8 @@
120131
if line_count > 10 and index == line_count-4:
121132
print "</div>"
122133
print "</tt>"
123-
job_id = sql_jobs.run_sql_file(batch_id, batch_script)
134+
job = sql_jobs.run_sql_file(batch_id, batch_script_file)
124135

125-
print "<h2>SQL script submitted to cluster as job # %s"%(job_id)
136+
print "<h2>SQL script submitted to cluster as job # %s"%(job.job_id)
126137
print "</body>"
127138
print "</html>"

0 commit comments

Comments
 (0)