Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WORKFLOW-199: polling every X seconds for job completion #138

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 60 additions & 5 deletions lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ var API = module.exports = function (opts) {

var log;

var waiting = {};

if (opts.log) {
log = opts.log({
component: 'workflow-api'
Expand Down Expand Up @@ -72,6 +74,12 @@ var API = module.exports = function (opts) {
version: '0.1.0'
};

var JOB_DONE_PATH = '/jobdone';
var JOB_DONE_ROUTE = {
path: JOB_DONE_PATH,
version: '0.1.0'
};

var JOBS_PATH = '/jobs';
var JOB_PATH = JOBS_PATH + '/:uuid';
var JOBS_ROUTE = {
Expand Down Expand Up @@ -279,7 +287,10 @@ var API = module.exports = function (opts) {

return factory.workflow(workflow, meta, function (err, result) {
if (err) {
return next(err.toRestError);
if (typeof (err) === 'string')
return next(new Error(err));
else
return next(err.toRestError || err);
}
res.header('Location', req.path() + '/' + result.uuid);
// If Request-Id hasn't been set, we'll set it to workflow UUID:
Expand Down Expand Up @@ -497,8 +508,8 @@ var API = module.exports = function (opts) {
params: {}
};
var meta = {};
var members = ['exec_after', 'workflow', 'target', 'num_attempts',
'uuid', 'locks'];
var members = ['callback_urls', 'exec_after', 'workflow', 'target',
'num_attempts', 'uuid', 'locks', 'wait'];

var job_members = [];
if (typeof (opts.api.job_extra_params) !== 'undefined') {
Expand All @@ -517,6 +528,11 @@ var API = module.exports = function (opts) {
}
});

if (job.wait &&
! (Array.isArray(job.callback_urls) && job.callback_urls.length))
return next(new restify.ConflictError(
'"callback_urls" is required when "wait" is true'));

if (req.headers['request-id']) {
meta.req_id = req.headers['request-id'];
}
Expand All @@ -534,9 +550,23 @@ var API = module.exports = function (opts) {
res.header('request-id', result.uuid);
}
res.header('Location', req.path() + '/' + result.uuid);

res.status(201);
res.send(result);
return next();
if (req.params.wait) {
log.info({req: req, job: result.uuid},
'holding onto request for job %s', result.uuid);
waiting[result.uuid] = {
req: req,
res: res,
next: next
};
// flush headers (so they have the UUID as the location header)
res.write('\n');
return undefined;
} else {
res.send(result);
return next();
}
});
}

Expand Down Expand Up @@ -677,6 +707,29 @@ var API = module.exports = function (opts) {
}
});
}

function jobDone(req, res, next) {
var job = req.params;
// end the incoming request
res.send(job.uuid ? 200 : 400);
next();

// end any waiting requests
if (job.uuid && waiting[job.uuid]) {
log.info('ending waiting request for %s', job.uuid);
var o = waiting[job.uuid];

// res.send and res.format won't work here because headers were
// flushed by writing a newline to the socket. instead, we format
// the data ourselves and ship it off by assuming the user wanted
// JSON. this is really lame. XXX
o.res.end(JSON.stringify(job));
o.next();
delete waiting[job.uuid];
}

}

// --- Routes
// Workflows:
server.get(WORKFLOWS_ROUTE, listWorkflows);
Expand Down Expand Up @@ -706,6 +759,8 @@ var API = module.exports = function (opts) {
server.get(JOB_INFO_ROUTE, getInfo);
server.head(JOB_INFO_ROUTE, getInfo);
server.post(JOB_INFO_ROUTE, postInfo);
// Job done (callback):
server.post(JOB_DONE_ROUTE, jobDone);
// Ping:
server.get(PING_ROUTE, function (req, res, next) {
var data = {
Expand Down
7 changes: 5 additions & 2 deletions lib/job-runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ var WorkflowJobRunner = module.exports = function (opts) {
// pointer to child process forked by runTask
var child = null;
// Properties of job object which a task should not be allowed to modify:
// Properties of job object which a task should not be allowed to modify:
var frozen_props = [
'chain', 'chain_results', 'onerror', 'onerror_results',
'callback_urls', 'chain', 'chain_results', 'onerror', 'onerror_results',
'exec_after', 'timeout', 'elapsed', 'uuid', 'workflow_uuid',
'name', 'execution', 'num_attempts', 'max_attempts', 'initial_delay',
'max_delay', 'prev_attempt', 'oncancel', 'oncancel_results',
Expand All @@ -90,6 +89,10 @@ var WorkflowJobRunner = module.exports = function (opts) {
job.chain_results = [];
}

if (!job.callback_urls) {
job.callback_urls = [];
}

if (job.onerror && !job.onerror_results) {
job.onerror_results = [];
}
Expand Down
Loading