diff --git a/lib/api.js b/lib/api.js index 22c955d..063c0e6 100644 --- a/lib/api.js +++ b/lib/api.js @@ -26,6 +26,8 @@ var API = module.exports = function (opts) { var log; + var waiting = {}; + if (opts.log) { log = opts.log({ component: 'workflow-api' @@ -72,6 +74,12 @@ var API = module.exports = function (opts) { version: '0.1.0' }; + var JOB_DONE_PATH = '/jobdone'; + var JOB_DONE_ROUTE = { + path: JOB_DONE_PATH, + version: '0.1.0' + }; + var JOBS_PATH = '/jobs'; var JOB_PATH = JOBS_PATH + '/:uuid'; var JOBS_ROUTE = { @@ -279,7 +287,10 @@ var API = module.exports = function (opts) { return factory.workflow(workflow, meta, function (err, result) { if (err) { - return next(err.toRestError); + if (typeof (err) === 'string') + return next(new Error(err)); + else + return next(err.toRestError || err); } res.header('Location', req.path() + '/' + result.uuid); // If Request-Id hasn't been set, we'll set it to workflow UUID: @@ -497,8 +508,8 @@ var API = module.exports = function (opts) { params: {} }; var meta = {}; - var members = ['exec_after', 'workflow', 'target', 'num_attempts', - 'uuid', 'locks']; + var members = ['callback_urls', 'exec_after', 'workflow', 'target', + 'num_attempts', 'uuid', 'locks', 'wait']; var job_members = []; if (typeof (opts.api.job_extra_params) !== 'undefined') { @@ -517,6 +528,11 @@ var API = module.exports = function (opts) { } }); + if (job.wait && + ! (Array.isArray(job.callback_urls) && job.callback_urls.length)) + return next(new restify.ConflictError( + '"callback_urls" is required when "wait" is true')); + if (req.headers['request-id']) { meta.req_id = req.headers['request-id']; } @@ -534,9 +550,23 @@ var API = module.exports = function (opts) { res.header('request-id', result.uuid); } res.header('Location', req.path() + '/' + result.uuid); + res.status(201); - res.send(result); - return next(); + if (req.params.wait) { + log.info({req: req, job: result.uuid}, + 'holding onto request for job %s', result.uuid); + waiting[result.uuid] = { + req: req, + res: res, + next: next + }; + // flush headers (so they have the UUID as the location header) + res.write('\n'); + return undefined; + } else { + res.send(result); + return next(); + } }); } @@ -677,6 +707,29 @@ var API = module.exports = function (opts) { } }); } + + function jobDone(req, res, next) { + var job = req.params; + // end the incoming request + res.send(job.uuid ? 200 : 400); + next(); + + // end any waiting requests + if (job.uuid && waiting[job.uuid]) { + log.info('ending waiting request for %s', job.uuid); + var o = waiting[job.uuid]; + + // res.send and res.format won't work here because headers were + // flushed by writing a newline to the socket. instead, we format + // the data ourselves and ship it off by assuming the user wanted + // JSON. this is really lame. XXX + o.res.end(JSON.stringify(job)); + o.next(); + delete waiting[job.uuid]; + } + + } + // --- Routes // Workflows: server.get(WORKFLOWS_ROUTE, listWorkflows); @@ -706,6 +759,8 @@ var API = module.exports = function (opts) { server.get(JOB_INFO_ROUTE, getInfo); server.head(JOB_INFO_ROUTE, getInfo); server.post(JOB_INFO_ROUTE, postInfo); + // Job done (callback): + server.post(JOB_DONE_ROUTE, jobDone); // Ping: server.get(PING_ROUTE, function (req, res, next) { var data = { diff --git a/lib/job-runner.js b/lib/job-runner.js index 8baacf5..66dff5f 100644 --- a/lib/job-runner.js +++ b/lib/job-runner.js @@ -62,9 +62,8 @@ var WorkflowJobRunner = module.exports = function (opts) { // pointer to child process forked by runTask var child = null; // Properties of job object which a task should not be allowed to modify: - // Properties of job object which a task should not be allowed to modify: var frozen_props = [ - 'chain', 'chain_results', 'onerror', 'onerror_results', + 'callback_urls', 'chain', 'chain_results', 'onerror', 'onerror_results', 'exec_after', 'timeout', 'elapsed', 'uuid', 'workflow_uuid', 'name', 'execution', 'num_attempts', 'max_attempts', 'initial_delay', 'max_delay', 'prev_attempt', 'oncancel', 'oncancel_results', @@ -90,6 +89,10 @@ var WorkflowJobRunner = module.exports = function (opts) { job.chain_results = []; } + if (!job.callback_urls) { + job.callback_urls = []; + } + if (job.onerror && !job.onerror_results) { job.onerror_results = []; } diff --git a/lib/runner.js b/lib/runner.js index 5f149d2..3a21f95 100644 --- a/lib/runner.js +++ b/lib/runner.js @@ -1,10 +1,11 @@ // Copyright 2013 Pedro P. Candel All rights reserved. var assert = require('assert-plus'); -var util = require('util'); +var http = require('http'); var uuid = require('node-uuid'); var fs = require('fs'); var path = require('path'); +var url = require('url'); var vasync = require('vasync'); var bunyan = require('bunyan'); var WorkflowJobRunner = require('./job-runner'); @@ -442,7 +443,7 @@ var WorkflowRunner = module.exports = function (opts) { function retryJob(oldJob, callback) { if (oldJob.num_attempts + 1 >= oldJob.max_attempts) { return callback( - 'Max attempts reached. Last attempt: ' + oldJob.uuid); + 'Max attempts reached. Last attempt: ' + oldJob.uuid, oldJob); } var retryParams = { @@ -456,7 +457,7 @@ var WorkflowRunner = module.exports = function (opts) { var factory = Factory(backend); return factory.job(retryParams, function (err, newJob) { if (err) { - return callback(err); + return callback(err, oldJob); } // Update the old job with the next job's uuid and update the // new job with the old job's uuid. @@ -474,9 +475,9 @@ var WorkflowRunner = module.exports = function (opts) { oldJob.uuid, function (err) { if (err) { - return callback(err); + return callback(err, newJob); } - return callback(); + return callback(null, newJob); }); }); }); @@ -491,7 +492,7 @@ var WorkflowRunner = module.exports = function (opts) { } if (!job) { - return callback(); + return callback(null, job); } if (runNow(job)) { @@ -512,11 +513,11 @@ var WorkflowRunner = module.exports = function (opts) { function (err, job) { job_trace.end(job_trace.fields.name + '.lock'); if (err) { - return callback(err); + return callback(err, job); } if (!job) { - return callback(); + return callback(null, job); } var t = Date.now(); @@ -568,9 +569,9 @@ var WorkflowRunner = module.exports = function (opts) { if (err && err === 'retry') { return retryJob(job, callback); } else if (err) { - return callback(err); + return callback(err, job); } - return callback(); + return callback(null, job); }); }); } else { @@ -599,111 +600,127 @@ var WorkflowRunner = module.exports = function (opts) { function doPoll() { backend.isRunnerIdle(identifier, function (idle) { - if (idle === false) { - vasync.parallel({ - // Fetch stale jobs from runners which stopped - // reporting activity and cancel them: - funcs: [function cancelStaleJobs(cb) { - staleJobs(function (err, jobs) { - if (err) { - log.error({err: err}, - 'Error fetching stale jobs'); - // We will not stop even on error: - return cb(null, null); - } - function cancelJobs(uuid, fe_cb) { - backend.updateJobProperty( - uuid, 'execution', 'canceled', - function (err) { + if (idle) { + log.info('Runner idle.'); + if (!shutting_down) { + interval = setTimeout(doPoll, run_interval); + } + return; + } + vasync.parallel({ + // Fetch stale jobs from runners which stopped + // reporting activity and cancel them: + funcs: [function cancelStaleJobs(cb) { + staleJobs(function (err, jobs) { + if (err) { + log.error({err: err}, + 'Error fetching stale jobs'); + // We will not stop even on error: + return cb(null, null); + } + function cancelJobs(uuid, fe_cb) { + backend.updateJobProperty(uuid, 'execution', + 'canceled', function (err) { + if (err) { + return fe_cb(err); + } + return backend.getJob(uuid, + function (err, job) { + if (err) { + return fe_cb(err); + } + return backend.finishJob(job, + function (err, job) { if (err) { return fe_cb(err); } - return backend.getJob(uuid, - function (err, job) { - if (err) { - return fe_cb(err); - } - return backend.finishJob( - job, function (err, job) { - if (err) { - return fe_cb(err); - } - log.info( - 'Stale Job ' + - job.uuid + - ' canceled'); - return fe_cb(null); - }); - }); - }); - } - return vasync.forEachParallel({ - inputs: jobs, - func: cancelJobs - }, function (err, results) { - return cb(null, null); + log.info('Stale Job %s concelled', + job.uuid); + return fe_cb(null); + }); + }); }); + } + return vasync.forEachParallel({ + inputs: jobs, + func: cancelJobs + }, function (err, results) { + return cb(null, null); }); - }, - // Fetch jobs to process. - function fetchJobsToProcess(cb) { - var fetch = slots - 1; - if (isNaN(fetch) || fetch <= 0) { - log.info('No available slots. ' + + }); + }, + // Fetch jobs to process. + function fetchJobsToProcess(cb) { + var fetch = slots - 1; + if (isNaN(fetch) || fetch <= 0) { + log.info('No available slots. ' + 'Waiting next iteration'); + return cb(null, null); + } + return backend.nextJobs(0, fetch, function (err, jobs) { + // Error fetching jobs + if (err) { + log.error({err: err}, 'Error fetching jobs'); + // We will not stop even on error: return cb(null, null); } - return backend.nextJobs(0, fetch, - function (err, jobs) { - // Error fetching jobs - if (err) { - log.error({err: err}, - 'Error fetching jobs'); - // We will not stop even on error: - return cb(null, null); - } - // No queued jobs - if (!jobs) { - return cb(null, null); - } - // Got jobs, let's see if we can run them: - jobs.forEach(function (job) { - if (rn_jobs.indexOf(job) === -1) { - rn_jobs.push(job); - queue.push(job, function (err) { - // Called once queue worker - // finished processing the job - if (err) { - log.error({err: err}, - 'Error running job'); - } - log.info('Job with uuid ' + - job + - ' ran successfully'); - if (rn_jobs.indexOf(job) !== - -1) { - rn_jobs.splice( - rn_jobs.indexOf(job), - 1); - } - }); - } - }); + // No queued jobs + if (!jobs) { return cb(null, null); + } + // Got jobs, let's see if we can run them: + jobs.forEach(function (job) { + if (rn_jobs.indexOf(job) === -1) { + rn_jobs.push(job); + queue.push(job, function (err, j) { + // Called once queue worker + // finished processing the job + if (err) { + log.error({err: err}, + 'Error running job'); + } else { + log.info('Job with uuid %s ' + + 'ran successfully', job); + } + if (rn_jobs.indexOf(job) !== -1) { + rn_jobs.splice(rn_jobs.indexOf(job), + 1); + } + + // fire callback urls + if (j && j.callback_urls) { + j.callback_urls.forEach( + function (uri) { + var u = url.parse(uri); + u.method = 'POST'; + u.headers = { + 'content-type': + 'application/json; ' + + 'charset=utf8' + }; + var req = http.request(u, function(res) { + var logLevel = (res.statusCode >= 200 && res.statusCode < 300) ? 'info' : 'warn'; + log[logLevel]({job: job, uri: uri, code: res.statusCode}, 'POST to callback URL %s', uri); + }); + req.on('error', function(err) { + log.warn({uri: uri, job: job, err: err}, 'failed to POST to callback URL %s', uri); + }); + req.write(JSON.stringify(j)); + req.end(); + }); + } + }); + } }); - }] - }, function (err, results) { - if (!shutting_down) { - interval = setTimeout(doPoll, run_interval); - } - return; - }); - } else { - log.info('Runner idle.'); + return cb(null, null); + }); + }] + }, function (err, results) { if (!shutting_down) { interval = setTimeout(doPoll, run_interval); } - } + return; + }); }); }