Skip to content

Commit

Permalink
WORKFLOW-199: polling every X seconds for job completion
Browse files Browse the repository at this point in the history
  • Loading branch information
bahamas10 committed Jun 17, 2015
1 parent 8b370c0 commit 5c169d2
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 109 deletions.
65 changes: 60 additions & 5 deletions lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ var API = module.exports = function (opts) {

var log;

var waiting = {};

if (opts.log) {
log = opts.log({
component: 'workflow-api'
Expand Down Expand Up @@ -72,6 +74,12 @@ var API = module.exports = function (opts) {
version: '0.1.0'
};

var JOB_DONE_PATH = '/jobdone';
var JOB_DONE_ROUTE = {
path: JOB_DONE_PATH,
version: '0.1.0'
};

var JOBS_PATH = '/jobs';
var JOB_PATH = JOBS_PATH + '/:uuid';
var JOBS_ROUTE = {
Expand Down Expand Up @@ -279,7 +287,10 @@ var API = module.exports = function (opts) {

return factory.workflow(workflow, meta, function (err, result) {
if (err) {
return next(err.toRestError);
if (typeof (err) === 'string')
return next(new Error(err));
else
return next(err.toRestError || err);
}
res.header('Location', req.path() + '/' + result.uuid);
// If Request-Id hasn't been set, we'll set it to workflow UUID:
Expand Down Expand Up @@ -497,8 +508,8 @@ var API = module.exports = function (opts) {
params: {}
};
var meta = {};
var members = ['exec_after', 'workflow', 'target', 'num_attempts',
'uuid', 'locks'];
var members = ['callback_urls', 'exec_after', 'workflow', 'target',
'num_attempts', 'uuid', 'locks', 'wait'];

var job_members = [];
if (typeof (opts.api.job_extra_params) !== 'undefined') {
Expand All @@ -517,6 +528,11 @@ var API = module.exports = function (opts) {
}
});

if (job.wait &&
! (Array.isArray(job.callback_urls) && job.callback_urls.length))
return next(new restify.ConflictError(
'"callback_urls" is required when "wait" is true'));

if (req.headers['request-id']) {
meta.req_id = req.headers['request-id'];
}
Expand All @@ -534,9 +550,23 @@ var API = module.exports = function (opts) {
res.header('request-id', result.uuid);
}
res.header('Location', req.path() + '/' + result.uuid);

res.status(201);
res.send(result);
return next();
if (req.params.wait) {
log.info({req: req, job: result.uuid},
'holding onto request for job %s', result.uuid);
waiting[result.uuid] = {
req: req,
res: res,
next: next
};
// flush headers (so they have the UUID as the location header)
res.write('\n');
return undefined;
} else {
res.send(result);
return next();
}
});
}

Expand Down Expand Up @@ -677,6 +707,29 @@ var API = module.exports = function (opts) {
}
});
}

function jobDone(req, res, next) {
var job = req.params;
// end the incoming request
res.send(job.uuid ? 200 : 400);
next();

// end any waiting requests
if (job.uuid && waiting[job.uuid]) {
log.info('ending waiting request for %s', job.uuid);
var o = waiting[job.uuid];

// res.send and res.format won't work here because headers were
// flushed by writing a newline to the socket. instead, we format
// the data ourselves and ship it off by assuming the user wanted
// JSON. this is really lame. XXX
o.res.end(JSON.stringify(job));
o.next();
delete waiting[job.uuid];
}

}

// --- Routes
// Workflows:
server.get(WORKFLOWS_ROUTE, listWorkflows);
Expand Down Expand Up @@ -706,6 +759,8 @@ var API = module.exports = function (opts) {
server.get(JOB_INFO_ROUTE, getInfo);
server.head(JOB_INFO_ROUTE, getInfo);
server.post(JOB_INFO_ROUTE, postInfo);
// Job done (callback):
server.post(JOB_DONE_ROUTE, jobDone);
// Ping:
server.get(PING_ROUTE, function (req, res, next) {
var data = {
Expand Down
7 changes: 5 additions & 2 deletions lib/job-runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ var WorkflowJobRunner = module.exports = function (opts) {
// pointer to child process forked by runTask
var child = null;
// Properties of job object which a task should not be allowed to modify:
// Properties of job object which a task should not be allowed to modify:
var frozen_props = [
'chain', 'chain_results', 'onerror', 'onerror_results',
'callback_urls', 'chain', 'chain_results', 'onerror', 'onerror_results',
'exec_after', 'timeout', 'elapsed', 'uuid', 'workflow_uuid',
'name', 'execution', 'num_attempts', 'max_attempts', 'initial_delay',
'max_delay', 'prev_attempt', 'oncancel', 'oncancel_results',
Expand All @@ -90,6 +89,10 @@ var WorkflowJobRunner = module.exports = function (opts) {
job.chain_results = [];
}

if (!job.callback_urls) {
job.callback_urls = [];
}

if (job.onerror && !job.onerror_results) {
job.onerror_results = [];
}
Expand Down
Loading

0 comments on commit 5c169d2

Please sign in to comment.