diff --git a/example.js b/example.js index d3e05c5..185d062 100644 --- a/example.js +++ b/example.js @@ -8,7 +8,7 @@ var util = require('util'); var wf = require('./lib/index'); // With modules, it would be require('workflow'); var Factory = wf.Factory; -var Backend = wf.Backend; +var Backend = wf.MemoryBackend; var backend, factory; diff --git a/lib/base-backend.js b/lib/base-backend.js new file mode 100644 index 0000000..0973997 --- /dev/null +++ b/lib/base-backend.js @@ -0,0 +1,768 @@ +var e = require('./errors'); +var util = require("util"); +var makeEmitter = require("./make-emitter"), + _ = require('lodash'), + sprintf = util.format; + + +/* + Using Filter with Query Language (ORM) + http://sailsjs.org/#!/documentation/concepts/ORM/Querylanguage.html + */ + +var baseBackend = { + TYPES: { + WORKFLOWS: 'workflows', + JOBS: 'jobs', + RUNNERS: 'runners' + }, + + EXECUTION: { + RUNNING: 'running', + QUEUED: 'queued', + CANCELED: 'canceled', + SUCCEEDED: 'succeeded', + WAITING: 'waiting', + FAILED: 'failed', + RETRIED: 'retried' + }, + + init: function (pCallback) { + if (pCallback) + return pCallback(); + }, + // usage: - (test only) + quit: function (pCallback) { + if (pCallback) + return pCallback(); + }, + // usage: internal + // should save obj to persistence + // pType - type, TYPES + // pObj - Object + // pCallback - f(err, obj) + save: function (pType, pObj, pCallback) { + throw new e.BackendInternal('Backend.save() not implemented yet'); + }, + // usage: internal + // should find object from persistence + // pType - type, TYPES + // pFilterObj - Filter for search, e.g. { 'where': { 'attr': 'value' }} + // pCallback - f(err, objs), objs is an array even if empty + find: function (pType, pFilterObj, pCallback) { + throw new e.BackendInternal('Backend.find() not implemented yet'); + }, + // usage: internal + // should remove object from persistence + // pType - type, TYPES + // pObj - Object + // pCallback - f(err, boolean) + remove: function (pType, pObj, pCallback) { + throw new e.BackendInternal('Backend.remove() not implemented yet'); + }, + // usage: Factory + // workflow - Workflow object + // meta (optional) - Any additional information to pass to the backend which is not + // workflow properties + // pCallback - f(err, workflow) + createWorkflow: function (pWorkflow, pMeta, pCallback) { + if (typeof (pMeta) === 'function') { + pCallback = pMeta; + pMeta = {}; + } + var _self = this; + this.find(this.TYPES.WORKFLOWS, {'name': pWorkflow.name}, function (pError, pWorkflows) { + if (pWorkflows.length > 0) + return pCallback(new e.BackendInvalidArgumentError( + 'Workflow.name must be unique. A workflow with name "' + + pWorkflow.name + '" already exists')); + _self.save(_self.TYPES.WORKFLOWS, pWorkflow, pCallback); + }); + }, + // usage: API & Factory + // uuid - Workflow.uuid + // meta (optional) - Any additional information to pass to the backend which is not + // workflow properties + // pCallback - f(err, workflow) + getWorkflow: function (uuid, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + this.find(this.TYPES.WORKFLOWS, uuid, function (pError, pWorkflows) { + if (pWorkflows.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Workflow with uuid \'%s\' does not exist', uuid))); + return pCallback(null, pWorkflows[0]); + }); + }, + // usage: API + // workflow - the workflow object + // meta (optional) - Any additional information to pass to the backend which is not + // workflow properties + // pCallback - f(err, boolean) + deleteWorkflow: function (workflow, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + this.remove(this.TYPES.WORKFLOWS, workflow, pCallback); + }, + // usage: API + // workflow - update workflow object. + // meta (optional) - Any additional information to pass to the backend which is not + // workflow properties + // pCallback - f(err, workflow) + updateWorkflow: function (workflow, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + var _self = this; + this.getWorkflow(workflow.uuid, meta, function (pError, pWorkflow) { + if (!pWorkflow) + return pCallback(new e.BackendResourceNotFoundError( + 'Workflow does not exist. Cannot Update.')); + _.assign(pWorkflow, workflow); + _self.find(_self.TYPES.WORKFLOWS, {'name': pWorkflow.name}, function (pError, pWorkflows) { + if (pWorkflows.length > 0) + for (var i = 0; i < pWorkflows.length; i++) { + if (pWorkflows[i].uuid !== pWorkflow.uuid) + return pCallback(new e.BackendInvalidArgumentError( + 'Workflow.name must be unique. A workflow with name "' + + pWorkflow.name + '" already exists')); + } + _self.save(_self.TYPES.WORKFLOWS, pWorkflow, pCallback); + }); + }); + }, + + // usage: Factory + // job - Job object + // meta (optional) - Any additional information to pass to the backend which is not + // job properties + // pCallback - f(err, job) + createJob: function (job, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + job.created_at = job.created_at || new Date().toISOString(); + this.save(this.TYPES.JOBS, job, pCallback); + }, + + // usage: Runner + // uuid - Job.uuid + // meta (optional) - Any additional information to pass to the backend which is not + // job properties + // pCallback - f(err, job) + getJob: function (uuid, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + this.find(this.TYPES.JOBS, uuid, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', uuid))); + return pCallback(pError, pJobs[0]); + }); + }, + + // DEPRECATED + // usage: Internal & Test + // Get a single job property + // uuid - Job uuid. + // prop - (String) property name + // cb - pCallback f(err, value) + getJobProperty: function (uuid, prop, pCallback) { + this.getJob(uuid, function (pError, pJob) { + if (pError) + return pCallback(pError); + if (!pJob) + return cb(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', uuid))); + return pCallback(null, pJob[prop]); + }); + }, + + // usage: Factory + // job - the job object + // pCallback - f(err) called with error in case there is a duplicated + // job with the same target and same params + validateJobTarget: function (pJob, pCallback) { + if (typeof (pJob) === 'undefined') + return pCallback(new e.BackendInternalError('WorkflowRedisBackend.validateJobTarget job(Object) required')); + // If no target is given, we don't care: + if (!pJob.target) + return pCallback(null); + var _self = this; + _self.find(_self.TYPES.JOBS, null, function (pError, pJobs) { + if (pError || !pJobs) + return pCallback(null); + if (_.some(pJobs, function (job) { + if ([_self.EXECUTION.FAILED, _self.EXECUTION.SUCCEEDED].indexOf(job.execution) !== -1 || !job.locks) + return false; + var re = new RegExp(job.locks); + return (re.test(pJob.target)); + })) { + return pCallback(new e.BackendInvalidArgumentError('Job target is currently locked by another job')); + } + var sameTargets = _.where(pJobs, {'target': pJob.target}); + if (sameTargets.length === 0) + return pCallback(null); + // TODO async.each + for (var i = 0; i < sameTargets.length; i++) { + var job = sameTargets[i]; + if (job.uuid !== pJob.uuid && + job.workflow_uuid === pJob.workflow_uuid && + JSON.stringify(job.params) === JSON.stringify(pJob.params) && + [_self.EXECUTION.QUEUED, _self.EXECUTION.RUNNING, _self.EXECUTION.WAITING].indexOf(job.execution) !== -1) + return pCallback(new e.BackendInvalidArgumentError('Another job with the same target and params is already queued')); + } + return pCallback(null); + }); + }, + + // usage: - (test-only) + // Get the next queued job. + // index (optional) - Integer, optional. When given, it'll get the job at index + // position (when not given, it'll return the job at position + // zero). + // pCallback - f(err, job) + nextJob: function (index, pCallback) { + if (typeof (index) === 'function') { + pCallback = index; + index = 0; + } + this.find(this.TYPES.JOBS, { + 'where': {'execution': this.EXECUTION.QUEUED}, + 'sort': 'created_at' + }, function (pError, pJobs) { + return pCallback(null, (pJobs.length === 0 || index >= pJobs.length) ? null : + pJobs.slice(index, index + 1)[0]); + }); + }, + + // usage: Runner + // Lock a job, mark it as running by the given runner, update job + // status. + // uuid - the job uuid (String) + // runner_id - the runner identifier (String) + // pCallback - f(err, job) pCallback will be called with error if + // something fails, otherwise it'll return the updated job + // using getJob. + runJob: function (uuid, runner_id, pCallback) { + var _self = this; + this.find(this.TYPES.JOBS, uuid, function (pError, pJobs) { + if (pJobs.length === 0 || pJobs[0].execution != _self.EXECUTION.QUEUED) + return pCallback(new e.BackendPreconditionFailedError('Only queued jobs can be run')); + var job = pJobs[0]; + job.runner_id = runner_id; + job.execution = _self.EXECUTION.RUNNING; + return _self.save(_self.TYPES.JOBS, job, pCallback); + }); + }, + + // usage: Runner + // Unlock the job, mark it as finished, update the status, add the + // results for every job's task. + // job - the job object. It'll be saved to the backend with the provided + // properties. + // pCallback - f(err, job) pCallback will be called with error if + // something fails, otherwise it'll return the updated job + // using getJob. + finishJob: function (pJob, pCallback) { + var _self = this; + this.find(this.TYPES.JOBS, pJob.uuid, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', pJob.uuid))); + var job = pJobs[0]; + if (job.execution !== _self.EXECUTION.RUNNING && + job.execution !== _self.EXECUTION.CANCELED) + return pCallback(new e.BackendPreconditionFailedError( + 'Only running jobs can be finished')); + _.assign(job, pJob); + if (job.execution === _self.EXECUTION.RUNNING) + job.execution = _self.EXECUTION.SUCCEEDED; + delete job.runner_id; + _self.save(_self.TYPES.JOBS, job, pCallback); + }); + }, + + // usage: API + // Update the job while it is running with information regarding + // progress + // job - the job object. It'll be saved to the backend with the + // provided properties. + // meta (optional) - Any additional information to pass to the backend which is + // not job properties + // pCallback - f(err, job) pCallback will be called with error if + // something fails, otherwise it'll return the updated job + // using getJob. + updateJob: function (pJob, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + var _self = this; + this.find(this.TYPES.JOBS, pJob.uuid, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', pJob.uuid))); + var job = pJobs[0]; + _.assign(job, pJob); + _self.save(_self.TYPES.JOBS, job, pCallback); + }); + }, + + // usage: Runner + // Unlock the job, mark it as canceled, and remove the runner_id + // uuid - string, the job uuid. + // cb - f(err, job) pCallback will be called with error if something + // fails, otherwise it'll return the updated job using getJob. + cancelJob: function (uuid, pCallback) { + if (typeof (uuid) === 'undefined') { + return pCallback(new e.BackendInternalError( + 'cancelJob uuid(String) required')); + } + var _self = this; + this.find(this.TYPES.JOBS, uuid, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', job.uuid))); + var job = pJobs[0]; + job.execution = _self.EXECUTION.CANCELED; + delete job.runner_id; + _self.save(_self.TYPES.JOBS, job, pCallback); + }); + }, + + // DEPRICATED -> use getJob & saveJob + // usage: API & Runner + // Update only the given Job property. Intendeed to prevent conflicts + // with two sources updating the same job at the same time, but + // different properties: + // - uuid - the job's uuid + // - prop - the name of the property to update + // - val - value to assign to such property + // - meta (optional) - Any additional information to pass to the backend which is + // not job properties + // - pCallback - f(err) called with error if something fails, otherwise + // with null. + updateJobProperty: function (uuid, prop, val, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + var _self = this; + this.find(this.TYPES.JOBS, uuid, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', uuid))); + var job = pJobs[0]; + job[prop] = val; + _self.save(_self.TYPES.JOBS, job, pCallback); + }); + }, + + // usage: Runner + // Queue a job which has been running; i.e, due to whatever the reason, + // re-queue the job. It'll unlock the job, update the status, add the + // results for every finished task so far ... + // job - the job Object. It'll be saved to the backend with the provided + // properties to ensure job status persistence. + // pCallback - f(err, job) pCallback will be called with error if + // something fails, otherwise it'll return the updated job + // using getJob. + queueJob: function (pJob, pCallback) { + var _self = this; + this.find(this.TYPES.JOBS, pJob.uuid, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', pJob.uuid))); + var job = pJobs[0]; + if (job.execution !== _self.EXECUTION.RUNNING) + return pCallback(new e.BackendPreconditionFailedError( + 'Only running jobs can be queued again')); + _.assign(job, pJob); + delete job.runner_id; + job.execution = _self.EXECUTION.QUEUED; + _self.save(_self.TYPES.JOBS, job, pCallback); + }); + }, + + // usage: Runner + // Pause a job which has been running; i.e, tell the job to wait for + // something external to happen. It'll unlock the job, update the + // status, add the results for every finished task so far ... + // job - the job Object. It'll be saved to the backend with the provided + // properties to ensure job status persistence. + // pCallback - f(err, job) pCallback will be called with error if + // something fails, otherwise it'll return the updated job + // using getJob. + pauseJob: function (pJob, pCallback) { + var _self = this; + this.find(this.TYPES.JOBS, pJob.uuid, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', pJob.uuid))); + var job = pJobs[0]; + if (job.execution !== _self.EXECUTION.RUNNING) + return pCallback(new e.BackendPreconditionFailedError( + 'Only running jobs can be paused')); + _.assign(job, pJob); + delete job.runner_id; + job.execution = _self.EXECUTION.WAITING; + _self.save(_self.TYPES.JOBS, job, pCallback); + }); + }, + + // usage: - (test only) + // Tell a waiting job that whatever it has been waiting for has + // happened and it can run again. + // job - the job Object. It'll be saved to the backend with the provided + // properties to ensure job status persistence. + // pCallback - f(err, job) pCallback will be called with error if + // something fails, otherwise it'll return the updated job + // using getJob. + resumeJob: function (pJob, pCallback) { + var _self = this; + this.find(this.TYPES.JOBS, pJob.uuid, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Job with uuid \'%s\' does not exist', pJob.uuid))); + var job = pJobs[0]; + if (job.execution !== _self.EXECUTION.WAITING) + return pCallback(new e.BackendPreconditionFailedError( + 'Only waiting jobs can be resumed')); + _.assign(job, pJob); + delete job.runner_id; + job.execution = _self.EXECUTION.QUEUED; + _self.save(_self.TYPES.JOBS, job, pCallback); + }); + }, + + // usage: Runner + // Get the given number of queued jobs uuids. + // - start - Integer - Position of the first job to retrieve + // - stop - Integer - Position of the last job to retrieve, _included_ + // - pCallback - f(err, jobs). `jobs` is an array of job's UUIDs. + // Note `jobs` will be an array, even when empty. + nextJobs: function (start, stop, pCallback) { + this.find(this.TYPES.JOBS, { + 'where': {'execution': this.EXECUTION.QUEUED}, + 'sort': 'created_at' + }, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(pError, null); + var slice = pJobs.slice(start, stop + 1); + if (slice.length === 0) { + return pCallback(pError, null); + } else { + return pCallback(pError, _.map(slice, 'uuid')); + } + }); + }, + + // usage: Runner + // Register a runner on the backend and report it's active: + // - runner_id - String, unique identifier for runner. + // - active_at (optional) - ISO String timestamp. Optional. If none is given, + // current time + // - pCallback - f(err) + registerRunner: function (runner_id, active_at, pCallback) { + if (typeof (active_at) === 'function') { + pCallback = active_at; + active_at = new Date(); + } else if (typeof (active_at) === 'string') { + active_at = new Date(active_at); + } + this.save(this.TYPES.RUNNERS, { + uuid: runner_id, + active_at: active_at, + idle: false + }, function (pError, pRunner) { + return pCallback(pError); + }); + }, + + // DEPRECATED, use registerRunner + // usage: Runner + // Report a runner remains active: + // - runner_id - String, unique identifier for runner. Required. + // - active_at (optional) - ISO String timestamp. Optional. If none is given, + // current time + // - pCallback - f(err) + runnerActive: function (runner_id, active_at, pCallback) { + return this.registerRunner(runner_id, active_at, pCallback); + }, + + // usage: - (test only) + // Get the given runner id details + // - runner_id - String, unique identifier for runner. Required. + // - pCallback - f(err, runner) + getRunner: function (runner_id, pCallback) { + this.find(this.TYPES.RUNNERS, runner_id, function (pError, pRunners) { + if (pRunners.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Runner with uuid \'%s\' does not exist', runner_id))); + return pCallback(pError, pRunners[0].active_at); + }); + }, + + // usage: API & Runner + // Get all the registered runners: + // - pCallback - f(err, runners) + getRunners: function (pCallback) { + this.find(this.TYPES.RUNNERS, null, function (pError, pRunners) { + var theRunners = {}; + for (var i = 0; i < pRunners.length; i++) { + var runner = pRunners[i]; + theRunners[runner.uuid] = runner.active_at; + } + return pCallback(pError, theRunners); + }); + }, + + // usage: - (test only) + // Set a runner as idle: + // - runner_id - String, unique identifier for runner + // - pCallback - f(err, runner-active_at) + idleRunner: function (runner_id, pCallback) { + var _self = this; + this.find(this.TYPES.RUNNERS, runner_id, function (pError, pRunners) { + if (pRunners.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Runner with uuid \'%s\' does not exist', runner_id))); + var runner = pRunners[0]; + runner.idle = true; + _self.save(_self.TYPES.RUNNERS, runner, function (pError, pRunner) { + return pCallback(pError, pRunner.active_at); + }); + }); + }, + + // usage: Runner + // Check if the given runner is idle + // - runner_id - String, unique identifier for runner + // - pCallback - f(boolean) + isRunnerIdle: function (runner_id, pCallback) { + this.find(this.TYPES.RUNNERS, runner_id, function (pError, pRunners) { + return pCallback((pRunners.length === 1 && pRunners[0].idle === true)); + }); + }, + + // usage: - (test only) + // Remove idleness of the given runner + // - runner_id - String, unique identifier for runner + // - pCallback - f(err) + wakeUpRunner: function (runner_id, pCallback) { + var _self = this; + this.find(this.TYPES.RUNNERS, runner_id, function (pError, pRunners) { + if (pRunners.length === 0) + return pCallback(new e.BackendResourceNotFoundError(sprintf( + 'Runner with uuid \'%s\' does not exist', runner_id))); + var runner = pRunners[0]; + runner.idle = false; + _self.save(_self.TYPES.RUNNERS, runner, function (pError, pRunner) { + return pCallback(pError, pRunner.active_at); + }); + }); + }, + + // usage: Runner + // Get all jobs associated with the given runner_id + // - runner_id - String, unique identifier for runner + // - pCallback - f(err, jobs). `jobs` is an array of job's UUIDs. + // Note `jobs` will be an array, even when empty. + getRunnerJobs: function (runner_id, pCallback) { + this.find(this.TYPES.JOBS, {'runner_id': runner_id}, function (pError, pJobs) { + return pCallback(pError, _.map(pJobs, 'uuid')); + }); + }, + + // usage: API + // Get all the workflows: + // - params (optional) - JSON Object (Optional). Can include the value of the + // workflow's "name", and any other key/value pair to search for + // into workflow's definition. + // - pCallback - f(err, workflows) + getWorkflows: function (params, pCallback) { + if (typeof (params) === 'function') { + pCallback = params; + params = {}; + } + this.find(this.TYPES.WORKFLOWS, params, pCallback); + }, + + // usage: API + // Get all the jobs: + // - params (optional) - JSON Object. Can include the value of the job's + // "execution" status, and any other key/value pair to search for + // into job'sparams. + // - execution - String, the execution status for the jobs to return. + // Return all jobs if no execution status is given. + // - pCallback - f(err, jobs, count) + getJobs: function (params, pCallback) { + if (typeof (params) === 'function') { + pCallback = params; + params = {}; + } + // TODO usage: Waterline Query Language! + // TODO -> ORM should handle offset & limit + var offset; + var limit; + if (typeof (params) === 'object') { + offset = params.offset; + delete params.offset; + limit = params.limit; + delete params.limit; + } + if (typeof (params) === 'function') { + pCallback = params; + params = {}; + } + var _self = this; + var executions = Object.keys(this.EXECUTION).map(function (k) { + return _self.EXECUTION[k] + }); + if ((typeof (params.execution) !== 'undefined') && + (executions.indexOf(params.execution) === -1)) { + return pCallback(new e.BackendInvalidArgumentError( + 'execution is required and must be one of "' + + executions.join('", "') + '"')); + } + var whereObj = null; + if (params.execution) { + whereObj = {}; + whereObj.execution = params.execution; + delete params.execution; + } + this.find(this.TYPES.JOBS, whereObj, function (pError, pJobs) { + if (pError) + return pCallback(pError, pJobs); + if (_.keys(params).length > 0) + pJobs = _.where(pJobs, {'params': params}); + if (typeof (offset) !== 'undefined' && + typeof (limit) !== 'undefined') { + return pCallback(pError, pJobs.slice(offset, limit)); + } else { + return pCallback(pError, pJobs); + } + }); + }, + + // usage: API + // Get count of jobs: + // - pCallback - f(err, stats) + countJobs: function (pCallback) { + var _self = this; + var executions = Object.keys(this.EXECUTION).map(function (k) { + return _self.EXECUTION[k] + }); + var stats = { + all_time: {}, + past_24h: {}, + past_hour: {}, + current: {} + }; + executions.forEach(function (e) { + stats.all_time[e] = 0; + stats.past_24h[e] = 0; + stats.past_hour[e] = 0; + stats.current[e] = 0; + }); + this.find(this.TYPES.JOBS, null, function (pError, pJobs) { + var yesterday = (function (d) { + d.setDate(d.getDate() - 1); + return d; + })(new Date()).getTime(); + var _1hr = (function (d) { + d.setHours(d.getHours() - 1); + return d; + })(new Date()).getTime(); + var _2hr = (function (d) { + d.setHours(d.getHours() - 2); + return d; + })(new Date()).getTime(); + pJobs = pJobs.map(function (job) { + return ({ + execution: job.execution, + created_at: new Date(job.created_at).getTime() + }); + }); + pJobs.forEach(function (j) { + if (j.created_at < yesterday) { + stats.all_time[j.execution] += 1; + } else if (j.created_at > yesterday && j.created_at < _2hr) { + stats.past_24h[j.execution] += 1; + } else if (j.created_at > _2hr && j.created_at < _1hr) { + stats.past_hour[j.execution] += 1; + } else { + stats.current[j.execution] += 1; + } + }); + return pCallback(null, stats); + }); + }, + + // usage: API & Runner + // Add progress information to an existing job: + // - uuid - String, the Job's UUID. + // - info - Object, {'key' => 'Value'} + // - meta (optional) - Any additional information to pass to the backend which is + // not job info + // - pCallback - f(err) + addInfo: function (uuid, info, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + var _self = this; + this.find(this.TYPES.JOBS, uuid, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError( + 'Job does not exist. Cannot Update.')); + var job = pJobs[0]; + if (!util.isArray(job.info)) { + job.info = []; + } + job.info.push(info); + _self.save(_self.TYPES.JOBS, job, pCallback); + }); + }, + + // usage: API + // Get progress information from an existing job: + // - uuid - String, the Job's UUID. + // - meta (optional) + // - pCallback - f(err, info) + getInfo: function (uuid, meta, pCallback) { + if (typeof (meta) === 'function') { + pCallback = meta; + meta = {}; + } + this.find(this.TYPES.JOBS, uuid, function (pError, pJobs) { + if (pJobs.length === 0) + return pCallback(new e.BackendResourceNotFoundError( + 'Job does not exist. Cannot get info.')); + return pCallback(null, !util.isArray(pJobs[0].info) ? [] : pJobs[0].info); + }); + } +}; + +makeEmitter(baseBackend); + +module.exports = function (pBackend) { + var newBackend = {}; + var a; + for (a in baseBackend) { + newBackend[a] = baseBackend[a]; + } + for (a in pBackend) { + newBackend[a] = pBackend[a]; + } + return newBackend; +}; \ No newline at end of file diff --git a/lib/index.js b/lib/index.js index fe365c4..7174866 100644 --- a/lib/index.js +++ b/lib/index.js @@ -31,15 +31,17 @@ module.exports = { return WorkflowFactory(backend); }, - Backend: function (config) { - var Backend = require('./workflow-in-memory-backend'); - return Backend(config); + MemoryBackend: function (config) { + var MemoryBackend = require('./workflow-in-memory-backend'); + return MemoryBackend(config); + }, + BaseBackend: function (config) { + return require('./base-backend')(config); }, API: function (config) { if (typeof (config) !== 'object') { throw new Error('config must be an object'); } - var API = require('./api'); return API(config); }, diff --git a/lib/workflow-in-memory-backend.js b/lib/workflow-in-memory-backend.js index 9419769..7965507 100644 --- a/lib/workflow-in-memory-backend.js +++ b/lib/workflow-in-memory-backend.js @@ -1,29 +1,13 @@ // Copyright 2012 Pedro P. Candel . All rights reserved. -var util = require('util'); -var makeEmitter = require('./make-emitter'); -var Logger = require('bunyan'); -var e = require('./errors'); -var clone = require('clone'); -var sprintf = util.format; +var util = require('util'), + Logger = require('bunyan'), + e = require('./errors'), + _ = require('lodash'), + clone = require('clone'), + baseBackend = require('./base-backend'); -// Returns true when "obj" (Object) has all the properties "kv" (Object) has, -// and with exactly the same values, otherwise, false -function hasPropsAndVals(obj, kv) { - if (typeof (obj) !== 'object' || typeof (kv) !== 'object') { - return (false); - } - - if (Object.keys(kv).length === 0) { - return (true); - } - - return (Object.keys(kv).every(function (k) { - return (obj[k] && obj[k] === kv[k]); - })); -} - -var Backend = module.exports = function (config) { +function MemoryBackend(config) { var log; @@ -33,801 +17,91 @@ var Backend = module.exports = function (config) { if (!config.logger) { config.logger = {}; } - config.logger.name = 'wf-in-memory-backend'; config.logger.serializers = { err: Logger.stdSerializers.err }; - - config.logger.streams = config.logger.streams || [ { + config.logger.streams = config.logger.streams || [{ level: 'info', stream: process.stdout }]; - log = new Logger(config.logger); } - var workflows = null; - var jobs = null; - var runners = null; - var queued_jobs = null; - var waiting_jobs = null; - var locked_targets = {}; - - function _lockedTargets() { - var targets = Object.keys(locked_targets).map(function (t) { - return locked_targets[t]; - }); - return targets; - } - - function _wfNames() { - var wf_names = Object.keys(workflows).map(function (uuid) { - return workflows[uuid].name; - }); - return wf_names; - } - - function _jobTargets() { - var wf_job_targets = Object.keys(jobs).map(function (uuid) { - return jobs[uuid].target; - }); - return wf_job_targets; - } - - function getJob(uuid, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - - if (jobs[uuid]) { - return callback(null, clone(jobs[uuid])); - } else { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', uuid))); - } - } - - // Register a runner on the backend and report it's active: - // - runner_id - String, unique identifier for runner. - // - active_at - ISO String timestamp. Optional. If none is given, - // current time - // - callback - f(err) - function registerRunner(runner_id, active_at, callback) { - if (typeof (active_at) === 'function') { - callback = active_at; - active_at = new Date(); - } - if (typeof (active_at) === 'string') { - active_at = new Date(active_at); - } - runners[runner_id] = { - runner_id: runner_id, - active_at: active_at, - idle: false - }; - return callback(null); - } + var _store = null; - var backend = { + var backend = require('./base-backend'); + backend = { log: log, - init: function init(callback) { - workflows = {}; - jobs = {}; - runners = {}; - queued_jobs = []; - waiting_jobs = []; + init: function (callback) { + _store = {}; return callback(); }, - quit: function quit(callback) { + // Never get called, except in test + quit: function (callback) { return callback(); }, - // workflow - Workflow object - // meta - Any additional information to pass to the backend which is not - // workflow properties - // callback - f(err, workflow) - createWorkflow: function createWorkflow(workflow, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - if (_wfNames().indexOf(workflow.name) !== -1) { - return callback(new e.BackendInvalidArgumentError( - 'Workflow.name must be unique. A workflow with name "' + - workflow.name + '" already exists')); - } else { - workflows[workflow.uuid] = clone(workflow); - return callback(null, workflow); - } - }, - - // uuid - Workflow.uuid - // meta - Any additional information to pass to the backend which is not - // workflow properties - // callback - f(err, workflow) - getWorkflow: function getWorkflow(uuid, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - - if (workflows[uuid]) { - return callback(null, clone(workflows[uuid])); - } else { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Workflow with uuid \'%s\' does not exist', uuid))); - } - }, - - // workflow - the workflow object - // meta - Any additional information to pass to the backend which is not - // workflow properties - // callback - f(err, boolean) - deleteWorkflow: function deleteWorkflow(workflow, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - - if (workflows[workflow.uuid]) { - return callback(null, (delete workflows[workflow.uuid])); - } else { - return callback(null, false); - } - }, - - // workflow - update workflow object. - // meta - Any additional information to pass to the backend which is not - // workflow properties - // callback - f(err, workflow) - updateWorkflow: function updateWorkflow(workflow, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - - if (workflows[workflow.uuid]) { - if (_wfNames().indexOf(workflow.name) !== -1 && - workflows[workflow.uuid].name !== workflow.name) { - return callback(new e.BackendInvalidArgumentError( - 'Workflow.name must be unique. A workflow with name "' + - workflow.name + '" already exists')); - } else { - workflows[workflow.uuid] = clone(workflow); - return callback(null, workflow); - } - } else { - return callback(new e.BackendResourceNotFoundError( - 'Workflow does not exist. Cannot Update.')); - } - }, - - // job - Job object - // meta - Any additional information to pass to the backend which is not - // job properties - // callback - f(err, job) - createJob: function createJob(job, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - - job.created_at = job.created_at || new Date().toISOString(); - jobs[job.uuid] = clone(job); - queued_jobs.push(job.uuid); - if (typeof (job.locks) !== 'undefined') { - locked_targets[job.uuid] = job.locks; - } - return callback(null, job); - }, - - // uuid - Job.uuid - // meta - Any additional information to pass to the backend which is not - // job properties - // callback - f(err, job) - getJob: getJob, - - // Get a single job property - // uuid - Job uuid. - // prop - (String) property name - // cb - callback f(err, value) - getJobProperty: function (uuid, prop, cb) { - if (jobs[uuid]) { - return cb(null, jobs[uuid][prop]); - } else { - return cb(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', uuid))); - } - }, - - // job - the job object - // callback - f(err) called with error in case there is a duplicated - // job with the same target and same params - validateJobTarget: function validateJobTarget(job, callback) { - // If no target is given, we don't care: - if (!job.target) { - return callback(null); - } - - var locked = _lockedTargets().some(function (r) { - var re = new RegExp(r); - return (re.test(job.target)); - }); - - if (locked) { - return callback(new e.BackendInvalidArgumentError( - 'Job target is currently locked by another job')); - } - - if (_jobTargets().indexOf(job.target) === -1) { - return callback(null); - } - - var filtered = Object.keys(jobs).filter(function (uuid) { - return ( - uuid !== job.uuid && - jobs[uuid].target === job.target && - Object.keys(job.params).every(function (p) { - return (jobs[uuid].params[p] && - jobs[uuid].params[p] === job.params[p]); - }) && - (jobs[uuid].execution === 'queued' || - jobs[uuid].execution === 'running')); - }); - - if (filtered.length !== 0) { - return callback(new e.BackendInvalidArgumentError( - 'Another job with the same target' + - ' and params is already queued')); - } else { - return callback(null); - } - }, - - // Get the next queued job. - // index - Integer, optional. When given, it'll get the job at index - // position (when not given, it'll return the job at position - // zero). - // callback - f(err, job) - nextJob: function nextJob(index, callback) { - if (typeof (index) === 'function') { - callback = index; - index = 0; - } - - if (queued_jobs.length === 0) { - return callback(null, null); - } - - var slice = queued_jobs.slice(index, index + 1); - - if (slice.length === 0) { - return callback(null, null); - } else { - return getJob(slice[0], callback); - } - }, - - // Lock a job, mark it as running by the given runner, update job - // status. - // uuid - the job uuid (String) - // runner_id - the runner identifier (String) - // callback - f(err, job) callback will be called with error if - // something fails, otherwise it'll return the updated job - // using getJob. - runJob: function runJob(uuid, runner_id, callback) { - var idx = queued_jobs.indexOf(uuid); - if (idx === -1) { - return callback(new e.BackendPreconditionFailedError( - 'Only queued jobs can be run')); - } else { - queued_jobs.splice(idx, 1); - jobs[uuid].runner_id = runner_id; - jobs[uuid].execution = 'running'; - return callback(null, clone(jobs[uuid])); - } - }, - - // Unlock the job, mark it as finished, update the status, add the - // results for every job's task. - // job - the job object. It'll be saved to the backend with the provided - // properties. - // callback - f(err, job) callback will be called with error if - // something fails, otherwise it'll return the updated job - // using getJob. - finishJob: function finishJob(job, callback) { - if (!jobs[job.uuid]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); - } else if (jobs[job.uuid].execution !== 'running' && - jobs[job.uuid].execution !== 'canceled') { - return callback(new e.BackendPreconditionFailedError( - 'Only running jobs can be finished')); - } else { - if (job.execution === 'running') { - job.execution = 'succeeded'; - } - var info = jobs[job.uuid].info; - job.runner_id = null; - jobs[job.uuid] = clone(job); - if (info) { - jobs[job.uuid].info = info; - } - if (typeof (job.locks) !== 'undefined') { - delete locked_targets[job.uuid]; - } - return callback(null, job); - } - }, - - // Update the job while it is running with information regarding - // progress - // job - the job object. It'll be saved to the backend with the - // provided properties. - // meta - Any additional information to pass to the backend which is - // not job properties - // callback - f(err, job) callback will be called with error if - // something fails, otherwise it'll return the updated job - // using getJob. - updateJob: function updateJob(job, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - - if (!jobs[job.uuid]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); - } else { - jobs[job.uuid] = clone(job); - return callback(null, job); - } - }, - - // Unlock the job, mark it as canceled, and remove the runner_id - // uuid - string, the job uuid. - // cb - f(err, job) callback will be called with error if something - // fails, otherwise it'll return the updated job using getJob. - cancelJob: function cancelJob(uuid, cb) { - if (typeof (uuid) === 'undefined') { - return cb(new e.BackendInternalError( - 'cancelJob uuid(String) required')); - } - - jobs[uuid].execution = 'canceled'; - delete jobs[uuid].runner_id; - if (typeof (jobs[uuid].locks) !== 'undefined') { - delete locked_targets[uuid]; - } - return cb(null, jobs[uuid]); - }, - // Update only the given Job property. Intendeed to prevent conflicts - // with two sources updating the same job at the same time, but - // different properties: - // - uuid - the job's uuid - // - prop - the name of the property to update - // - val - value to assign to such property - // - meta - Any additional information to pass to the backend which is - // not job properties - // - callback - f(err) called with error if something fails, otherwise - // with null. - updateJobProperty: function updateJobProperty( - uuid, - prop, - val, - meta, - callback) - { - - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - - if (!jobs[uuid]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', uuid))); - } else { - jobs[uuid][prop] = val; - return callback(null); - } - }, - - // Queue a job which has been running; i.e, due to whatever the reason, - // re-queue the job. It'll unlock the job, update the status, add the - // results for every finished task so far ... - // job - the job Object. It'll be saved to the backend with the provided - // properties to ensure job status persistence. - // callback - f(err, job) callback will be called with error if - // something fails, otherwise it'll return the updated job - // using getJob. - queueJob: function queueJob(job, callback) { - if (!jobs[job.uuid]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); - } else if (jobs[job.uuid].execution !== 'running') { - return callback(new e.BackendPreconditionFailedError( - 'Only running jobs can be queued again')); - } else { - job.runner_id = null; - job.execution = 'queued'; - jobs[job.uuid] = clone(job); - queued_jobs.push(job.uuid); - return callback(null, job); - } - }, - - // Pause a job which has been running; i.e, tell the job to wait for - // something external to happen. It'll unlock the job, update the - // status, add the results for every finished task so far ... - // job - the job Object. It'll be saved to the backend with the provided - // properties to ensure job status persistence. - // callback - f(err, job) callback will be called with error if - // something fails, otherwise it'll return the updated job - // using getJob. - pauseJob: function pauseJob(job, callback) { - if (!jobs[job.uuid]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); - } else if (jobs[job.uuid].execution !== 'running') { - return callback(new e.BackendPreconditionFailedError( - 'Only running jobs can be paused')); - } else { - job.runner_id = null; - job.execution = 'waiting'; - jobs[job.uuid] = clone(job); - waiting_jobs.push(job.uuid); - return callback(null, job); - } - }, - - // Tell a waiting job that whatever it has been waiting for has - // happened and it can run again. - // job - the job Object. It'll be saved to the backend with the provided - // properties to ensure job status persistence. - // callback - f(err, job) callback will be called with error if - // something fails, otherwise it'll return the updated job - // using getJob. - resumeJob: function resumeJob(job, callback) { - if (!jobs[job.uuid]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Job with uuid \'%s\' does not exist', job.uuid))); - } else if (jobs[job.uuid].execution !== 'waiting') { - return callback(new e.BackendPreconditionFailedError( - 'Only waiting jobs can be resumed')); - } else { - job.runner_id = null; - job.execution = 'queued'; - jobs[job.uuid] = clone(job); - waiting_jobs = waiting_jobs.filter(function (j) { - return (j !== job.uuid); - }); - queued_jobs.push(job.uuid); - return callback(null, job); - } - }, - - // Get the given number of queued jobs uuids. - // - start - Integer - Position of the first job to retrieve - // - stop - Integer - Position of the last job to retrieve, _included_ - // - callback - f(err, jobs) - nextJobs: function nextJobs(start, stop, callback) { - if (queued_jobs.length === 0) { - return callback(null, null); - } - - var slice = queued_jobs.slice(start, stop + 1); - - if (slice.length === 0) { - return callback(null, null); - } else { - return callback(null, slice); - } - }, - - // Register a runner on the backend and report it's active: - // - runner_id - String, unique identifier for runner. - // - active_at - ISO String timestamp. Optional. If none is given, - // current time - // - callback - f(err) - registerRunner: registerRunner, - - // Report a runner remains active: - // - runner_id - String, unique identifier for runner. Required. - // - active_at - ISO String timestamp. Optional. If none is given, - // current time - // - callback - f(err) - runnerActive: function runnerActive(runner_id, active_at, callback) { - return registerRunner(runner_id, active_at, callback); - }, - - // Get the given runner id details - // - runner_id - String, unique identifier for runner. Required. - // - callback - f(err, runner) - getRunner: function getRunner(runner_id, callback) { - if (!runners[runner_id]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Runner with uuid \'%s\' does not exist', runner_id))); - } else { - return callback(null, runners[runner_id].active_at); - } - }, - - // Get all the registered runners: - // - callback - f(err, runners) - getRunners: function getRunners(callback) { - var theRunners = {}; - - Object.keys(runners).forEach(function (uuid) { - theRunners[uuid] = runners[uuid].active_at; - }); - - return callback(null, theRunners); - }, - - // Set a runner as idle: - // - runner_id - String, unique identifier for runner - // - callback - f(err) - idleRunner: function idleRunner(runner_id, callback) { - if (!runners[runner_id]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Runner with uuid \'%s\' does not exist', runner_id))); - } else { - runners[runner_id].idle = true; - return callback(null); - } - }, - - // Check if the given runner is idle - // - runner_id - String, unique identifier for runner - // - callback - f(boolean) - isRunnerIdle: function isRunnerIdle(runner_id, callback) { - if (!runners[runner_id] || (runners[runner_id].idle === true)) { - return callback(true); - } else { - return callback(false); - } - }, - - // Remove idleness of the given runner - // - runner_id - String, unique identifier for runner - // - callback - f(err) - wakeUpRunner: function wakeUpRunner(runner_id, callback) { - if (!runners[runner_id]) { - return callback(new e.BackendResourceNotFoundError(sprintf( - 'Runner with uuid \'%s\' does not exist', runner_id))); - } else { - runners[runner_id].idle = false; - return callback(null); - } - }, - - // Get all jobs associated with the given runner_id - // - runner_id - String, unique identifier for runner - // - callback - f(err, jobs). `jobs` is an array of job's UUIDs. - // Note `jobs` will be an array, even when empty. - getRunnerJobs: function getRunnerJobs(runner_id, callback) { - var wf_runner_jobs = Object.keys(jobs).filter(function (uuid) { - return jobs[uuid].runner_id === runner_id; - }); - - return callback(null, wf_runner_jobs); - }, - - // Get all the workflows: - // - params - JSON Object (Optional). Can include the value of the - // workflow's "name", and any other key/value pair to search for - // into workflow's definition. - // - callback - f(err, workflows) - getWorkflows: function getWorkflows(params, callback) { - - if (typeof (params) === 'function') { - callback = params; - params = {}; - } - - var wfs = []; - var rWorkflows = Object.keys(workflows).map(function (uuid) { - return clone(workflows[uuid]); - }); - - rWorkflows.forEach(function (wf) { - if (hasPropsAndVals(wf, params)) { - wfs.push(wf); - } - }); - return callback(null, wfs); - }, - - // Get all the jobs: - // - params - JSON Object. Can include the value of the job's - // "execution" status, and any other key/value pair to search for - // into job'sparams. - // - execution - String, the execution status for the jobs to return. - // Return all jobs if no execution status is given. - // - callback - f(err, jobs) - getJobs: function getJobs(params, callback) { - var executions = [ - 'queued', - 'failed', - 'succeeded', - 'canceled', - 'running', - 'retried', - 'waiting' - ]; - var execution; - var offset; - var limit; - var rJobs = []; - var theJobs = []; - - if (typeof (params) === 'object') { - execution = params.execution; - delete params.execution; - offset = params.offset; - delete params.offset; - limit = params.limit; - delete params.limit; - } - - if (typeof (params) === 'function') { - callback = params; - params = {}; - } - - - if ((typeof (execution) !== 'undefined') && - (executions.indexOf(execution) === -1)) { - return callback(new e.BackendInvalidArgumentError( - 'excution is required and must be one of "' + - executions.join('", "') + '"')); - } - - if (typeof (execution) !== 'undefined') { - rJobs = Object.keys(jobs).filter(function (uuid) { - return (jobs[uuid].execution === execution); - }).map(function (uuid) { - return clone(jobs[uuid]); - }); - } else { - rJobs = Object.keys(jobs).map(function (uuid) { - return clone(jobs[uuid]); - }); - } - - rJobs.forEach(function (job) { - if (hasPropsAndVals(job.params, params)) { - theJobs.push(job); - } - }); - - if (typeof (offset) !== 'undefined' && - typeof (limit) !== 'undefined') { - return callback(null, theJobs.slice(offset, limit)); - } else { - return callback(null, theJobs); - } - }, - - - countJobs: function countJobs(callback) { - var executions = [ - 'queued', - 'failed', - 'succeeded', - 'canceled', - 'running', - 'retried', - 'waiting' - ]; - - var rJobs = []; - var stats = { - all_time: {}, - past_24h: {}, - past_hour: {}, - current: {} - }; - - executions.forEach(function (e) { - stats.all_time[e] = 0; - stats.past_24h[e] = 0; - stats.past_hour[e] = 0; - stats.current[e] = 0; - }); - - rJobs = Object.keys(jobs).map(function (uuid) { - return clone(jobs[uuid]); - }); - - var yesterday = (function (d) { - d.setDate(d.getDate() - 1); - return d; - })(new Date()).getTime(); - - var _1hr = (function (d) { - d.setHours(d.getHours() - 1); - return d; - })(new Date()).getTime(); - - var _2hr = (function (d) { - d.setHours(d.getHours() - 2); - return d; - })(new Date()).getTime(); - - rJobs = rJobs.map(function (job) { - return ({ - execution: job.execution, - created_at: new Date(job.created_at).getTime() - }); - }); - - rJobs.forEach(function (j) { - if (j.created_at < yesterday) { - stats.all_time[j.execution] += 1; - } else if (j.created_at > yesterday && j.created_at < _2hr) { - stats.past_24h[j.execution] += 1; - } else if (j.created_at > _2hr && j.created_at < _1hr) { - stats.past_hour[j.execution] += 1; - } else { - stats.current[j.execution] += 1; - } - }); - return callback(null, stats); - }, - - - // Add progress information to an existing job: - // - uuid - String, the Job's UUID. - // - info - Object, {'key' => 'Value'} - // - meta - Any additional information to pass to the backend which is - // not job info - // - callback - f(err) - addInfo: function addInfo(uuid, info, meta, callback) { - - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - - if (!jobs[uuid]) { - return callback(new e.BackendResourceNotFoundError( - 'Job does not exist. Cannot Update.')); - } else { - if (!util.isArray(jobs[uuid].info)) { - jobs[uuid].info = []; - } - jobs[uuid].info.push(info); - return callback(null); - } - }, - - // Get progress information from an existing job: - // - uuid - String, the Job's UUID. - // - callback - f(err, info) - getInfo: function getInfo(uuid, meta, callback) { - if (typeof (meta) === 'function') { - callback = meta; - meta = {}; - } - - if (!jobs[uuid]) { - return callback(new e.BackendResourceNotFoundError( - 'Job does not exist. Cannot get info.')); - } else { - if (!util.isArray(jobs[uuid].info)) { - jobs[uuid].info = []; - } - return callback(null, clone(jobs[uuid].info)); - } + // usage: internal + // should save obj to persistence + // pType - type, TYPES + // pObj - Object + // pCallback - f(err, obj) + save: function (pType, pObj, pCallback) { + if (!_.has(_store, pType)) + _store[pType] = []; + // remove old elements + var item = _.find(_store[pType], {'uuid': pObj.uuid}); + if (item) + _store[pType] = _.without(_store[pType], item); + // store new element in array + _store[pType].push(clone(pObj)); + // call back + //console.error("save(): %j %j %j", pType, pObj.uuid, _store[pType].length); + if (pCallback) + pCallback(null, pObj); + }, + // usage: internal + // should find object from persistence + // pType - type, TYPES + // pFilterObj - Filter for search, e.g. { 'where': { 'attr': 'value' }} + // pCallback - f(err, objs), objs is an array even if empty + find: function (pType, pFilterObj, pCallback) { + if (!_.has(_store, pType)) + return pCallback(null, []); + if (!pFilterObj) + return pCallback(null, clone(_store[pType])); + var objs = clone(_.where(_store[pType], pFilterObj.where || pFilterObj)); + if (pFilterObj.sort) { + var sortByArray = pFilterObj.sort.split(' '); + objs = _.sortBy(objs, sortByArray[0]); + if (sortByArray.length > 1 && sortByArray[1] == 'DESC') + objs.reverse(); + } + // find & return elements + //console.error("find(): %j %j %j", pType, pFilterObj, _store[pType].length); + pCallback(null, objs); + }, + // usage: internal + // should remove object from persistence + // pType - type, TYPES + // pObj - Object + // pCallback - f(err, boolean) + remove: function (pType, pObj, pCallback) { + if (!_.has(_store, pType)) + return pCallback(null, false); + var before = _store[pType].length; + var item = _.find(_store[pType], {'uuid': pObj.uuid}); + if (item) { + _store[pType] = _.without(_store[pType], item); + //console.error("remove(): item found & removed: %j", (item !== null)); + } + //console.error("remove(): %j %j %j", pType, pObj.uuid, _store[pType].length); + return pCallback(null, before !== _store[pType].length); } - }; + return baseBackend(backend); +} - makeEmitter(backend); - return backend; -}; +module.exports = MemoryBackend; \ No newline at end of file diff --git a/package.json b/package.json index 2a806d9..fe3670c 100644 --- a/package.json +++ b/package.json @@ -1,45 +1,46 @@ { - "name": "wf", - "description": "Tasks Workflows orchestration API and runners", - "version": "0.10.0", - "repository": { - "type": "git", - "url": "git://github.com/kusor/node-workflow.git" - }, - "author": "Pedro Palazón Candel (http://www.joyent.com)", - "contributors": [ - "Mark Cavage", - "Trent Mick", - "Josh Wilsdon", - "Bryan Cantrill", - "Andrés Rodríquez", - "Rob Gulewich", - "Fred Kuo" - ], - "bin": { - "workflow-api": "./bin/workflow-api", - "workflow-runner": "./bin/workflow-runner" - }, - "main": "lib/index.js", - "dependencies": { - "node-uuid": "1.4.0", - "bunyan": "0.23.1", - "vasync": "1.6.1", - "backoff": "1.2.0", - "clone": "0.1.6", - "restify": "2.6.1", - "sigyan": "0.2.0" - }, - "scripts": { - "test": "./node_modules/.bin/tap ./test/*.test.js" - }, - "devDependencies": { - "tap": "~0.3" - }, - "optionalDependencies": { - "dtrace-provider": "0.2.8" - }, - "engines": { - "node": ">=0.8" - } + "name": "wf", + "description": "Tasks Workflows orchestration API and runners", + "version": "0.10.0", + "repository": { + "type": "git", + "url": "git://github.com/kusor/node-workflow.git" + }, + "author": "Pedro Palazón Candel (http://www.joyent.com)", + "contributors": [ + "Mark Cavage", + "Trent Mick", + "Josh Wilsdon", + "Bryan Cantrill", + "Andrés Rodríquez", + "Rob Gulewich", + "Fred Kuo" + ], + "bin": { + "workflow-api": "./bin/workflow-api", + "workflow-runner": "./bin/workflow-runner" + }, + "main": "lib/index.js", + "dependencies": { + "backoff": "1.2.0", + "bunyan": "0.23.1", + "clone": "0.1.6", + "lodash": "^3.5.0", + "node-uuid": "1.4.0", + "restify": "2.6.1", + "sigyan": "0.2.0", + "vasync": "1.6.1" + }, + "scripts": { + "test": "./node_modules/.bin/tap ./test/*.test.js" + }, + "devDependencies": { + "tap": "~0.3" + }, + "optionalDependencies": { + "dtrace-provider": "0.2.8" + }, + "engines": { + "node": ">=0.8" + } }