function Queue(name, redisPort, redisHost, redisOptions){
if(!(this instanceof Queue)){
return new Queue(name, redisPort, redisHost, redisOptions);
}
if(_.isObject(redisPort)) {
var opts = redisPort;
var redisOpts = opts.redis || {};
redisPort = redisOpts.port;
redisHost = redisOpts.host;
redisOptions = redisOpts.opts || {};
redisOptions.db = redisOpts.DB || redisOpts.DB;
} else if(parseInt(redisPort) == redisPort) {
redisPort = parseInt(redisPort);
redisOptions = redisOptions || {};
} else if(_.isString(redisPort)) {
try {
var redisUrl = url.parse(redisPort);
assert(_.isObject(redisHost) || _.isUndefined(redisHost),
'Expected an object as redis option');
redisOptions = redisHost || {};
redisPort = redisUrl.port;
redisHost = redisUrl.hostname;
if (redisUrl.auth) {
redisOptions.password = redisUrl.auth.split(':')[1];
}
} catch (e) {
throw new Error(e.message);
}
}
redisOptions = redisOptions || {};
function createClient(type) {
var client;
if(_.isFunction(redisOptions.createClient)){
client = redisOptions.createClient(type);
}else{
client = new redis(redisPort, redisHost, redisOptions);
}
return client;
}
redisPort = redisPort || 6379;
redisHost = redisHost || '127.0.0.1';
var _this = this;
this.name = name;
this.keyPrefix = redisOptions.keyPrefix || 'bull';
//
// We cannot use ioredis keyPrefix feature until we
// stop creating keys dynamically in lua scripts.
//
delete redisOptions.keyPrefix;
//
// Create queue client (used to add jobs, pause queues, etc);
//
this.client = createClient('client');
getRedisVersion(this.client).then(function(version){
if(semver.lt(version, MINIMUM_REDIS_VERSION)){
throw new Error('Redis version needs to be greater than ' + MINIMUM_REDIS_VERSION + '. Current: ' + version);
}
}).catch(function(err){
_this.emit('error', err);
});
//
// Keep track of cluster clients for redlock
//
this.clients = [this.client];
if (redisOptions.clients) {
this.clients.push.apply(this.clients, redisOptions.clients);
}
this.redlock = {
driftFactor: REDLOCK_DRIFT_FACTOR,
retryCount: REDLOCK_RETRY_COUNT,
retryDelay: REDLOCK_RETRY_DELAY
};
_.extend(this.redlock, redisOptions.redlock || {});
//
// Create blocking client (used to wait for jobs)
//
this.bclient = createClient('block');
//
// Create event subscriber client (receive messages from other instance of the queue)
//
this.eclient = createClient('subscriber');
this.delayTimer = null;
this.processing = 0;
this.retrieving = 0;
this.LOCK_RENEW_TIME = LOCK_RENEW_TIME;
this.LOCK_DURATION = LOCK_DURATION;
this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL;
this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT;
// bubble up Redis error events
[this.client, this.bclient, this.eclient].forEach(function (client) {
client.on('error', _this.emit.bind(_this, 'error'));
});
// keeps track of active timers. used by close() to
// ensure that disconnect() is deferred until all
// scheduled redis commands have been executed
this.timers = new TimerManager();
// emit ready when redis connections ready
var initializers = [this.client, this.bclient, this.eclient].map(function (client) {
return new Promise(function(resolve, reject) {
client.once('ready', resolve);
client.once('error', reject);
});
});
this._initializing = Promise.all(initializers).then(function(){
return Promise.join(
_this.eclient.subscribe(_this.toKey('delayed')),
_this.eclient.subscribe(_this.toKey('paused'))
);
}).then(function(){
debuglog(name + ' queue ready');
_this.emit('ready');
}, function(err){
console.error('Error initializing queue:', err);
});
Disturbed.call(this, _this.client, _this.eclient);
//
// Listen distributed queue events
//
listenDistEvent('stalled'); //
listenDistEvent('active'); //
listenDist ...n/a
job = function (queue, data, opts){
opts = opts || {};
this.queue = queue;
this.data = data;
this.opts = opts;
this._progress = 0;
this.delay = opts.delay || 0;
this.timestamp = opts.timestamp || Date.now();
this.stacktrace = [];
if(this.opts.attempts > 1){
this.attempts = opts.attempts;
}else{
this.attempts = 1;
}
this.returnvalue = null;
this.attemptsMade = 0;
}n/a
priority_queue = function (name, redisPort, redisHost, redisOptions) {
if (!(this instanceof PriorityQueue)) {
return new PriorityQueue(name, redisPort, redisHost, redisOptions);
}
console.warn("DEPRECATION NOTICE: PriorityQueue has been deprecated and will be removed in bull 3.0.0, please use the priority
option instead.");
var _this = this;
this.paused = false;
this.queues = [];
for (var key in PriorityQueue.priorities) {
var queue = Queue(PriorityQueue.getQueueName(name, key), redisPort, redisHost, redisOptions);
this.queues[PriorityQueue.priorities[key]] = queue;
}
var groupEvents = ['ready', 'paused', 'resumed']
groupEvents.forEach(function(event) {
Promise.map(_this.queues, function(queue) {
return new Promise(function(resolve, reject) {
queue.once(event, resolve);
});
}).then(_this.emit.bind(_this, event))
})
var singleEvents = ['error', 'active', 'stalled', 'progress', 'completed', 'failed', 'cleaned']
singleEvents.forEach(function(event) {
_this.queues.forEach(function(queue) {
queue.on(event, _this.emit.bind(_this, event))
})
})
this.strategy = Strategy.exponential;
}n/a
function Queue(name, redisPort, redisHost, redisOptions){
if(!(this instanceof Queue)){
return new Queue(name, redisPort, redisHost, redisOptions);
}
if(_.isObject(redisPort)) {
var opts = redisPort;
var redisOpts = opts.redis || {};
redisPort = redisOpts.port;
redisHost = redisOpts.host;
redisOptions = redisOpts.opts || {};
redisOptions.db = redisOpts.DB || redisOpts.DB;
} else if(parseInt(redisPort) == redisPort) {
redisPort = parseInt(redisPort);
redisOptions = redisOptions || {};
} else if(_.isString(redisPort)) {
try {
var redisUrl = url.parse(redisPort);
assert(_.isObject(redisHost) || _.isUndefined(redisHost),
'Expected an object as redis option');
redisOptions = redisHost || {};
redisPort = redisUrl.port;
redisHost = redisUrl.hostname;
if (redisUrl.auth) {
redisOptions.password = redisUrl.auth.split(':')[1];
}
} catch (e) {
throw new Error(e.message);
}
}
redisOptions = redisOptions || {};
function createClient(type) {
var client;
if(_.isFunction(redisOptions.createClient)){
client = redisOptions.createClient(type);
}else{
client = new redis(redisPort, redisHost, redisOptions);
}
return client;
}
redisPort = redisPort || 6379;
redisHost = redisHost || '127.0.0.1';
var _this = this;
this.name = name;
this.keyPrefix = redisOptions.keyPrefix || 'bull';
//
// We cannot use ioredis keyPrefix feature until we
// stop creating keys dynamically in lua scripts.
//
delete redisOptions.keyPrefix;
//
// Create queue client (used to add jobs, pause queues, etc);
//
this.client = createClient('client');
getRedisVersion(this.client).then(function(version){
if(semver.lt(version, MINIMUM_REDIS_VERSION)){
throw new Error('Redis version needs to be greater than ' + MINIMUM_REDIS_VERSION + '. Current: ' + version);
}
}).catch(function(err){
_this.emit('error', err);
});
//
// Keep track of cluster clients for redlock
//
this.clients = [this.client];
if (redisOptions.clients) {
this.clients.push.apply(this.clients, redisOptions.clients);
}
this.redlock = {
driftFactor: REDLOCK_DRIFT_FACTOR,
retryCount: REDLOCK_RETRY_COUNT,
retryDelay: REDLOCK_RETRY_DELAY
};
_.extend(this.redlock, redisOptions.redlock || {});
//
// Create blocking client (used to wait for jobs)
//
this.bclient = createClient('block');
//
// Create event subscriber client (receive messages from other instance of the queue)
//
this.eclient = createClient('subscriber');
this.delayTimer = null;
this.processing = 0;
this.retrieving = 0;
this.LOCK_RENEW_TIME = LOCK_RENEW_TIME;
this.LOCK_DURATION = LOCK_DURATION;
this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL;
this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT;
// bubble up Redis error events
[this.client, this.bclient, this.eclient].forEach(function (client) {
client.on('error', _this.emit.bind(_this, 'error'));
});
// keeps track of active timers. used by close() to
// ensure that disconnect() is deferred until all
// scheduled redis commands have been executed
this.timers = new TimerManager();
// emit ready when redis connections ready
var initializers = [this.client, this.bclient, this.eclient].map(function (client) {
return new Promise(function(resolve, reject) {
client.once('ready', resolve);
client.once('error', reject);
});
});
this._initializing = Promise.all(initializers).then(function(){
return Promise.join(
_this.eclient.subscribe(_this.toKey('delayed')),
_this.eclient.subscribe(_this.toKey('paused'))
);
}).then(function(){
debuglog(name + ' queue ready');
_this.emit('ready');
}, function(err){
console.error('Error initializing queue:', err);
});
Disturbed.call(this, _this.client, _this.eclient);
//
// Listen distributed queue events
//
listenDistEvent('stalled'); //
listenDistEvent('active'); //
listenDist ...n/a
super_ = function (pubClient, subClient) {
var _this = this;
EventEmitter.call(this);
this.uuid = uuid();
this.pubClient = pubClient;
this.subClient = subClient;
subClient.on('message', function (channel, msg) {
var count = _this.listenerCount(channel);
if (count) {
var args;
try {
args = JSON.parse(msg);
} catch (err) {
console.error('Parsing event message', err);
}
if (args[0] !== _this.uuid) {
args[0] = channel;
_this.emit.apply(_this, args);
}
}
});
}n/a
function TimerManager(){
this.idle = true;
this.listeners = [];
this.timers = {};
}n/a
job = function (queue, data, opts){
opts = opts || {};
this.queue = queue;
this.data = data;
this.opts = opts;
this._progress = 0;
this.delay = opts.delay || 0;
this.timestamp = opts.timestamp || Date.now();
this.stacktrace = [];
if(this.opts.attempts > 1){
this.attempts = opts.attempts;
}else{
this.attempts = 1;
}
this.returnvalue = null;
this.attemptsMade = 0;
}n/a
create = function (queue, data, opts){
var job = new Job(queue, data, opts);
return addJob(queue, job).then(function(jobId){
job.jobId = jobId;
queue.distEmit('waiting', job.toJSON());
debuglog('Job added', jobId);
return job;
});
}n/a
fromData = function (queue, jobId, data){
var job = new Job(queue, JSON.parse(data.data), JSON.parse(data.opts));
job.jobId = jobId;
job._progress = parseInt(data.progress);
job.delay = parseInt(data.delay);
job.timestamp = parseInt(data.timestamp);
job.failedReason = data.failedReason;
job.attempts = parseInt(data.attempts);
if(isNaN(job.attempts)) {
job.attempts = 1; // Default to 1 try for legacy jobs
}
job.attemptsMade = parseInt(data.attemptsMade);
var _traces;
try{
_traces = JSON.parse(data.stacktrace);
if(!(_traces instanceof Array)){
_traces = [];
}
}catch (err){
_traces = [];
}
job.stacktrace = _traces;
try{
job.returnvalue = JSON.parse(data.returnvalue);
}catch (e){
//swallow exception because the returnvalue got corrupted somehow.
debuglog('corrupted returnvalue: ' + data.returnvalue, e);
}
return job;
}...
Job.fromId = function(queue, jobId){
// jobId can be undefined if moveJob returns undefined
if(!jobId) {
return Promise.resolve();
}
return queue.client.hgetall(queue.toKey(jobId)).then(function(jobData){
if(!_.isEmpty(jobData)){
return Job.fromData(queue, jobId, jobData);
}else{
return null;
}
});
};
Job.prototype.toData = function(){
...fromId = function (queue, jobId){
// jobId can be undefined if moveJob returns undefined
if(!jobId) {
return Promise.resolve();
}
return queue.client.hgetall(queue.toKey(jobId)).then(function(jobData){
if(!_.isEmpty(jobData)){
return Job.fromData(queue, jobId, jobData);
}else{
return null;
}
});
}...
var _this = this;
function status(resolve, reject){
return _this.isCompleted().then(function(completed){
if(!completed){
return _this.isFailed().then(function(failed){
if(failed){
return Job.fromId(_this.queue, _this.jobId, 'failedReason').then(
function(data){
reject(Error(data.failedReason));
return true;
});
}
});
}
resolve();
...fromJSON = function (queue, json){
var job = new Job(queue, json.data, json.opts);
job.jobId = json.opts.jobId;
job._progress = parseInt(json.progress);
job.delay = parseInt(json.delay);
job.timestamp = parseInt(json.timestamp);
job.attempts = parseInt(json.attempts);
if(isNaN(job.attempts)) {
job.attempts = 1; // Default to 1 try for legacy jobs
}
job.attemptsMade = parseInt(json.attemptsMade);
var _traces;
try{
_traces = JSON.parse(json.stacktrace);
if(!(_traces instanceof Array)){
_traces = [];
}
}catch (err){
_traces = [];
}
job.stacktrace = _traces;
try{
job.returnvalue = JSON.parse(json.returnvalue);
}catch (e){
//swallow exception because the returnvalue got corrupted somehow.
debuglog('corrupted returnvalue: ' + json.returnvalue, e);
}
return job;
}n/a
_getBackOff = function () {
var backoff = 0;
var delay;
if(this.opts.backoff){
if(!isNaN(this.opts.backoff)){
backoff = this.opts.backoff;
}else if(this.opts.backoff.type === 'fixed'){
backoff = this.opts.backoff.delay;
}else if(this.opts.backoff.type === 'exponential'){
delay = this.opts.backoff.delay;
backoff = Math.round((Math.pow(2, this.attemptsMade) - 1) * delay);
}
}
return backoff;
}...
Job.prototype.moveToFailed = function(err){
var _this = this;
return this._saveAttempt(err).then(function() {
// Check if an automatic retry should be performed
if(_this.attemptsMade < _this.attempts && !_this._discarded){
// Check if backoff is needed
var backoff = _this._getBackOff();
if(backoff){
// If so, move to delayed
return _this.moveToDelayed(Date.now() + backoff);
}else{
// If not, retry immediately
return _this._retryAtOnce();
}
..._isDone = function (list){
return this.queue.client
.sismember(this.queue.toKey(list), this.jobId).then(function(isMember){
return isMember === 1;
});
}...
} else if (result === -2) {
throw new Error('Couldn\'t retry job: The job has been already retried or has not failed');
}
});
};
Job.prototype.isCompleted = function(){
return this._isDone('completed');
};
Job.prototype.isFailed = function(){
return this._isDone('failed');
};
Job.prototype.isDelayed = function() {
..._isInList = function (list) {
return scripts.isJobInList(this.queue.client, this.queue.toKey(list), this.jobId);
}...
return this.queue.client
.zrank(this.queue.toKey('delayed'), this.jobId).then(function(rank) {
return rank !== null;
});
};
Job.prototype.isActive = function() {
return this._isInList('active');
};
Job.prototype.isWaiting = function() {
return this._isInList('wait');
};
Job.prototype.isPaused = function() {
..._moveToSet = function (set, context){
var queue = this.queue;
var jobId = this.jobId;
return scripts.moveToSet(queue, set, jobId, context);
}...
}
} else if(_this.opts.removeOnFail){
return _this.releaseLock().then(function(){
return _this.remove();
});
}
// If not, move to failed
return _this._moveToSet('failed');
});
};
Job.prototype.moveToDelayed = function(timestamp){
return this._moveToSet('delayed', timestamp);
};
..._retryAtOnce = function (){
var queue = this.queue;
var jobId = this.jobId;
var script = [
'if redis.call("EXISTS", KEYS[3]) == 1 then',
' redis.call("LREM", KEYS[1], 0, ARGV[2])',
' redis.call(ARGV[1], KEYS[2], ARGV[2])',
' return 0',
'else',
' return -1',
'end'
].join('\n');
var keys = _.map(['active', 'wait', jobId], function(name){
return queue.toKey(name);
});
var pushCmd = (this.opts.lifo ? 'R' : 'L') + 'PUSH';
return queue.client.eval(
script,
keys.length,
keys[0],
keys[1],
keys[2],
pushCmd,
jobId).then(function(result){
if(result === -1){
throw new Error('Missing Job ' + jobId + ' during retry');
}
});
}...
// Check if backoff is needed
var backoff = _this._getBackOff();
if(backoff){
// If so, move to delayed
return _this.moveToDelayed(Date.now() + backoff);
}else{
// If not, retry immediately
return _this._retryAtOnce();
}
} else if(_this.opts.removeOnFail){
return _this.releaseLock().then(function(){
return _this.remove();
});
}
// If not, move to failed
..._saveAttempt = function (err){
if(isNaN(this.attemptsMade)){
this.attemptsMade = 1;
}else{
this.attemptsMade++;
}
var params = {
attemptsMade: this.attemptsMade
};
this.stacktrace.push(err.stack);
params.stacktrace = JSON.stringify(this.stacktrace);
params.failedReason = err.message;
return this.queue.client.hmset(this.queue.toKey(this.jobId), params);
}...
Job.prototype.discard = function(){
this._discarded = true;
}
Job.prototype.moveToFailed = function(err){
var _this = this;
return this._saveAttempt(err).then(function() {
// Check if an automatic retry should be performed
if(_this.attemptsMade < _this.attempts && !_this._discarded){
// Check if backoff is needed
var backoff = _this._getBackOff();
if(backoff){
// If so, move to delayed
return _this.moveToDelayed(Date.now() + backoff);
...delayIfNeeded = function (){
if(this.delay){
var jobDelayedTimestamp = this.timestamp + this.delay;
if(jobDelayedTimestamp > Date.now()){
return this.moveToDelayed(jobDelayedTimestamp).then(function(){
return true;
});
}
}
return Promise.resolve(false);
}n/a
discard = function (){
this._discarded = true;
}n/a
finished = function (){
var _this = this;
function status(resolve, reject){
return _this.isCompleted().then(function(completed){
if(!completed){
return _this.isFailed().then(function(failed){
if(failed){
return Job.fromId(_this.queue, _this.jobId, 'failedReason').then(function(data){
reject(Error(data.failedReason));
return true;
});
}
});
}
resolve();
return true;
});
}
return new Promise(function(resolve, reject){
status(resolve, reject).then(function(finished){
if(!finished){
var interval;
function onCompleted(job){
if(String(job.jobId) === String(_this.jobId)){
resolve();
removeListeners();
clearInterval(interval);
}
}
function onFailed(job, err){
if(String(job.jobId) === String(_this.jobId)){
reject(err);
removeListeners();
clearInterval(interval);
}
}
function removeListeners(){
_this.queue.removeListener('completed', onCompleted);
_this.queue.removeListener('failed', onFailed);
}
_this.queue.on('completed', onCompleted);
_this.queue.on('failed', onFailed);
//
// Watchdog
//
interval = setInterval(function(){
status(resolve, reject).then(function(finished){
if(finished){
removeListeners();
clearInterval(interval );
}
})
}, 5000);
};
});
});
}n/a
getState = function () {
var _this = this;
var fns = [
{ fn: 'isCompleted', state: 'completed' },
{ fn: 'isFailed', state: 'failed' },
{ fn: 'isDelayed', state: 'delayed' },
{ fn: 'isActive', state: 'active' },
{ fn: 'isWaiting', state: 'waiting' },
{ fn: 'isPaused', state: 'paused' }
];
return Promise.reduce(fns, function(state, fn) {
if(state){
return state;
}
return _this[fn.fn]().then(function(result) {
return result ? fn.state : null;
});
}, null).then(function(result) {
return result ? result : 'stuck';
});
}...
};
Job.prototype.isPaused = function() {
return this._isInList('paused');
};
Job.prototype.isStuck = function() {
return this.getState().then(function(state) {
return state === 'stuck';
});
};
Job.prototype.getState = function() {
var _this = this;
var fns = [
...isActive = function () {
return this._isInList('active');
}n/a
isCompleted = function (){
return this._isDone('completed');
}...
* TODO: Add a watchdog to check if the job has finished periodically.
* since pubsub does not give any guarantees.
*/
Job.prototype.finished = function(){
var _this = this;
function status(resolve, reject){
return _this.isCompleted().then(function(completed){
if(!completed){
return _this.isFailed().then(function(failed){
if(failed){
return Job.fromId(_this.queue, _this.jobId, 'failedReason').then(function(data){
reject(Error(data.failedReason));
return true;
});
...isDelayed = function () {
return this.queue.client
.zrank(this.queue.toKey('delayed'), this.jobId).then(function(rank) {
return rank !== null;
});
}n/a
isFailed = function (){
return this._isDone('failed');
}...
*/
Job.prototype.finished = function(){
var _this = this;
function status(resolve, reject){
return _this.isCompleted().then(function(completed){
if(!completed){
return _this.isFailed().then(function(failed){
if(failed){
return Job.fromId(_this.queue, _this.jobId, 'failedReason').then(function(data){
reject(Error(data.failedReason));
return true;
});
}
});
...isPaused = function () {
return this._isInList('paused');
}n/a
isStuck = function () {
return this.getState().then(function(state) {
return state === 'stuck';
});
}n/a
isWaiting = function () {
return this._isInList('wait');
}n/a
lockKey = function (){
return this.queue.toKey(this.jobId) + ':lock';
}...
extraLockArgs: [job.jobId]
};
redlock = new Redlock(queue.clients, _.extend(opts, queue.redlock));
} else {
redlock = new Redlock(queue.clients, queue.redlock);
}
return redlock.lock(job.lockKey(), queue.LOCK_DURATION).catch(function(err){
//
// Failing to lock due to already locked is not an error.
//
if(err.name != 'LockError'){
throw err;
}
});
...move = function (src, target, returnValue){
if(target === 'completed'){
this.returnvalue = returnValue || 0;
if(this.opts.removeOnComplete){
target = void 0;
}
}
return scripts.move(this, src, target);
}...
Job.prototype.move = function(src, target, returnValue){
if(target === 'completed'){
this.returnvalue = returnValue || 0;
if(this.opts.removeOnComplete){
target = void 0;
}
}
return scripts.move(this, src, target);
}
Job.prototype.discard = function(){
this._discarded = true;
}
Job.prototype.moveToFailed = function(err){
...moveToCompleted = function (returnValue){
this.returnvalue = returnValue || 0;
return scripts.moveToCompleted(this, this.opts.removeOnComplete);
}...
}
}
return Promise.resolve(false);
};
Job.prototype.moveToCompleted = function(returnValue){
this.returnvalue = returnValue || 0;
return scripts.moveToCompleted(this, this.opts.removeOnComplete);
};
Job.prototype.move = function(src, target, returnValue){
if(target === 'completed'){
this.returnvalue = returnValue || 0;
if(this.opts.removeOnComplete){
target = void 0;
...moveToDelayed = function (timestamp){
return this._moveToSet('delayed', timestamp);
}...
.then(function() { _this.lock = null; });
};
Job.prototype.delayIfNeeded = function(){
if(this.delay){
var jobDelayedTimestamp = this.timestamp + this.delay;
if(jobDelayedTimestamp > Date.now()){
return this.moveToDelayed(jobDelayedTimestamp).then(function(){
return true;
});
}
}
return Promise.resolve(false);
};
...moveToFailed = function (err){
var _this = this;
return this._saveAttempt(err).then(function() {
// Check if an automatic retry should be performed
if(_this.attemptsMade < _this.attempts && !_this._discarded){
// Check if backoff is needed
var backoff = _this._getBackOff();
if(backoff){
// If so, move to delayed
return _this.moveToDelayed(Date.now() + backoff);
}else{
// If not, retry immediately
return _this._retryAtOnce();
}
} else if(_this.opts.removeOnFail){
return _this.releaseLock().then(function(){
return _this.remove();
});
}
// If not, move to failed
return _this._moveToSet('failed');
});
}n/a
progress = function (progress){
if(progress){
var _this = this;
this._progress = progress;
return this.queue.client.hset(this.queue.toKey(this.jobId), 'progress', progress).then(function(){
_this.queue.distEmit('progress', _this.toJSON(), progress);
});
}else{
return this._progress;
}
}...
videoQueue.process(function(job, done){
// job.data contains the custom data passed when the job was created
// job.jobId contains id of this job.
// transcode video asynchronously and report progress
job.progress(42);
// call done when finished
done();
// or give a error if error
done(Error('error transcoding'));
...promote = function (){
var queue = this.queue;
var jobId = this.jobId;
var script = [
'if redis.call("ZREM", KEYS[1], ARGV[1]) == 1 then',
' redis.call("LPUSH", KEYS[2], ARGV[1])',
' return 0',
'else',
' return -1',
'end'
].join('\n');
var keys = _.map(['delayed', 'wait'], function(name){
return queue.toKey(name);
});
return queue.client.eval(
script,
keys.length,
keys[0],
keys[1],
jobId).then(function(result){
if(result === -1){
throw new Error('Job ' + jobId + ' is not in a delayed state');
}
});
}n/a
releaseLock = function (){
var _this = this;
return scripts.releaseLock(this)
.then(function() { _this.lock = null; });
}...
};
/**
Releases the lock. Only locks owned by the queue instance can be released.
*/
Job.prototype.releaseLock = function(){
var _this = this;
return scripts.releaseLock(this)
.then(function() { _this.lock = null; });
};
Job.prototype.delayIfNeeded = function(){
if(this.delay){
var jobDelayedTimestamp = this.timestamp + this.delay;
if(jobDelayedTimestamp > Date.now()){
...remove = function (){
var queue = this.queue;
var job = this;
return job.takeLock().then(function(lock) {
if (!lock) {
throw new Error('Could not get lock for job: ' + job.jobId + '. Cannot remove job.');
}
return scripts.remove(queue, job.jobId)
.then(function() {
queue.emit('removed', job.toJSON());
})
.finally(function () {
return job.releaseLock();
});
});
}...
return _this.moveToDelayed(Date.now() + backoff);
}else{
// If not, retry immediately
return _this._retryAtOnce();
}
} else if(_this.opts.removeOnFail){
return _this.releaseLock().then(function(){
return _this.remove();
});
}
// If not, move to failed
return _this._moveToSet('failed');
});
};
...renewLock = function (){
return this.takeLock(true /* Renew */);
}n/a
retry = function (){
var queue = this.queue;
var _this = this;
return scripts.reprocessJob(this, { state: 'failed' }).then(function(result) {
if (result === 1) {
queue.emit('waiting', _this);
} else if (result === 0) {
throw new Error('Couldn\'t retry job: The job doesn\'t exist');
} else if (result === -1) {
throw new Error('Couldn\'t retry job: The job is locked');
} else if (result === -2) {
throw new Error('Couldn\'t retry job: The job has been already retried or has not failed');
}
});
}n/a
takeLock = function (renew, ensureActive){
var _this = this;
return scripts.takeLock(this.queue, this, renew, ensureActive).then(function(lock) {
if (lock){
_this.lock = lock;
}
return lock || false;
});
}...
/**
Takes a lock for this job so that no other queue worker can process it at the
same time.
*/
Job.prototype.takeLock = function(renew, ensureActive){
var _this = this;
return scripts.takeLock(this.queue, this, renew, ensureActive).then(function(lock) {
if (lock){
_this.lock = lock;
}
return lock || false;
});
};
...toData = function (){
return {
data: JSON.stringify(this.data || {}),
opts: JSON.stringify(this.opts || {}),
progress: this._progress,
delay: this.delay,
timestamp: this.timestamp,
attempts: this.attempts,
attemptsMade: this.attemptsMade,
failedReason: this.failedReason,
stacktrace: JSON.stringify(this.stacktrace || null),
returnvalue: JSON.stringify(this.returnvalue || null)
};
}...
}
this.returnvalue = null;
this.attemptsMade = 0;
};
function addJob(queue, job){
var opts = job.opts;
var jobData = job.toData();
var toKey = _.bind(queue.toKey, queue);
return scripts.addJob(queue.client, toKey, jobData, {
lifo: opts.lifo,
customJobId: opts.jobId,
priority: opts.priority
});
}
...toJSON = function (){
var opts = _.extend({}, this.opts || {});
opts.jobId = this.jobId;
return {
id: this.jobId,
data: this.data || {},
opts: opts,
progress: this._progress,
delay: this.delay,
timestamp: this.timestamp,
attempts: this.attempts,
attemptsMade: this.attemptsMade,
failedReason: this.failedReason,
stacktrace: this.stacktrace || null,
returnvalue: this.returnvalue || null
};
}...
}
Job.create = function(queue, data, opts){
var job = new Job(queue, data, opts);
return addJob(queue, job).then(function(jobId){
job.jobId = jobId;
queue.distEmit('waiting', job.toJSON());
debuglog('Job added', jobId);
return job;
});
};
Job.fromId = function(queue, jobId){
// jobId can be undefined if moveJob returns undefined
...priority_queue = function (name, redisPort, redisHost, redisOptions) {
if (!(this instanceof PriorityQueue)) {
return new PriorityQueue(name, redisPort, redisHost, redisOptions);
}
console.warn("DEPRECATION NOTICE: PriorityQueue has been deprecated and will be removed in bull 3.0.0, please use the priority
option instead.");
var _this = this;
this.paused = false;
this.queues = [];
for (var key in PriorityQueue.priorities) {
var queue = Queue(PriorityQueue.getQueueName(name, key), redisPort, redisHost, redisOptions);
this.queues[PriorityQueue.priorities[key]] = queue;
}
var groupEvents = ['ready', 'paused', 'resumed']
groupEvents.forEach(function(event) {
Promise.map(_this.queues, function(queue) {
return new Promise(function(resolve, reject) {
queue.once(event, resolve);
});
}).then(_this.emit.bind(_this, event))
})
var singleEvents = ['error', 'active', 'stalled', 'progress', 'completed', 'failed', 'cleaned']
singleEvents.forEach(function(event) {
_this.queues.forEach(function(queue) {
queue.on(event, _this.emit.bind(_this, event))
})
})
this.strategy = Strategy.exponential;
}n/a
genericGetter = function (fnName) {
return function() {
var args = arguments;
return Promise.map(this.queues, function(queue) {
return queue[fnName].apply(queue, args);
}).then(function(results) {
var jobs = [];
results.forEach(function(val) {
jobs = jobs.concat(val);
});
return jobs;
})
}
}...
jobs = jobs.concat(val);
});
return jobs;
})
}
}
PriorityQueue.prototype.getWaiting = PriorityQueue.genericGetter("getWaiting"
;);
PriorityQueue.prototype.getActive = PriorityQueue.genericGetter("getActive");
PriorityQueue.prototype.getDelayed = PriorityQueue.genericGetter("getDelayed");
PriorityQueue.prototype.getCompleted = PriorityQueue.genericGetter("getCompleted");
PriorityQueue.prototype.getFailed = PriorityQueue.genericGetter("getFailed");
// ---------------------------------------------------------------------
...getQueueName = function (name, priority) {
return name + ':prio:' + priority;
}...
console.warn("DEPRECATION NOTICE: PriorityQueue has been deprecated and will be removed in bull 3.0.0, please use the priority
option instead.");
var _this = this;
this.paused = false;
this.queues = [];
for (var key in PriorityQueue.priorities) {
var queue = Queue(PriorityQueue.getQueueName(name, key), redisPort, redisHost, redisOptions
);
this.queues[PriorityQueue.priorities[key]] = queue;
}
var groupEvents = ['ready', 'paused', 'resumed']
groupEvents.forEach(function(event) {
Promise.map(_this.queues, function(queue) {
return new Promise(function(resolve, reject) {
...function EventEmitter() {
EventEmitter.init.call(this);
}n/a
add = function (data, opts) {
return this.getQueue(opts && opts.priority).add(data, opts);
}...
});
pdfQueue.process(function(job){
// Processors can also return promises instead of using the done callback
return pdfAsyncProcessor();
});
videoQueue.add({video: 'http://example.com/video1.mov'});
audioQueue.add({audio: 'http://example.com/audio1.mp3'});
imageQueue.add({image: 'http://example.com/image1.tiff'});
```
Alternatively, you can use return promises instead of using the `done` callback:
```javascript
...clean = function (grace, type) {
var _this = this;
return Promise.map(this.queues, function(queue) {
return queue.clean(grace, type);
}).then(function (results) {
var jobs = [].concat.apply([], results);
var tp = type || 'completed';
_this.emit('cleaned', jobs, tp);
return Promise.resolve(jobs);
});
}...
Tells the queue remove jobs of a specific type created outside of a grace period.
__Example__
```javascript
//cleans all jobs that completed over 5 seconds ago.
queue.clean(5000);
//clean all jobs that failed over 10 seconds ago.
queue.clean(10000, 'failed');
queue.on('cleaned', function (job, type) {
console.log('Cleaned %s %s jobs', job.length, type);
});
```
...close = function ( doNotWaitJobs ) {
return this.closing = Promise.map(this.queues, function(queue) {
return queue.close( doNotWaitJobs );
});
}...
shutdown.
```javascript
var Queue = require('bull');
var queue = Queue('example');
var after100 = _.after(100, function () {
queue.close().then(function () { console.log('done') })
});
queue.on('completed', after100);
```
`close` can be called from anywhere, with one caveat: if called
from within a job handler the queue won't close until *after*
...count = function () {
return Promise.map(this.queues, function(queue) {
return queue.count();
}).then(function(results) {
var sum = 0;
results.forEach(function(val) {
sum += val;
});
return sum;
})
}...
return Promise.resolve(jobs);
});
}
PriorityQueue.prototype.count = function() {
return Promise.map(this.queues, function(queue) {
return queue.count();
}).then(function(results) {
var sum = 0;
results.forEach(function(val) {
sum += val;
});
return sum;
})
...disconnect = function () {
return Promise.map(this.queues, function(queue) {
return queue.disconnect();
})
}...
*
* @type {number}
*/
PriorityQueue.prototype.waitAfterEmptyLoop = 200;
PriorityQueue.prototype.disconnect = function() {
return Promise.map(this.queues, function(queue) {
return queue.disconnect();
})
}
PriorityQueue.prototype.close = function( doNotWaitJobs ) {
return this.closing = Promise.map(this.queues, function(queue) {
return queue.close( doNotWaitJobs );
});
...empty = function () {
return Promise.map(this.queues, function(queue) {
return queue.empty();
});
}...
PriorityQueue.prototype.add = function(data, opts) {
return this.getQueue(opts && opts.priority).add(data, opts);
}
PriorityQueue.prototype.empty = function() {
return Promise.map(this.queues, function(queue) {
return queue.empty();
});
}
PriorityQueue.prototype.pause = function(localOnly) {
var _this = this;
_this.paused = Promise.map(this.queues, function(queue) {
...getActive = function () {
var args = arguments;
return Promise.map(this.queues, function(queue) {
return queue[fnName].apply(queue, args);
}).then(function(results) {
var jobs = [];
results.forEach(function(val) {
jobs = jobs.concat(val);
});
return jobs;
})
}n/a
getCompleted = function () {
var args = arguments;
return Promise.map(this.queues, function(queue) {
return queue[fnName].apply(queue, args);
}).then(function(results) {
var jobs = [];
results.forEach(function(val) {
jobs = jobs.concat(val);
});
return jobs;
})
}n/a
getDelayed = function () {
var args = arguments;
return Promise.map(this.queues, function(queue) {
return queue[fnName].apply(queue, args);
}).then(function(results) {
var jobs = [];
results.forEach(function(val) {
jobs = jobs.concat(val);
});
return jobs;
})
}n/a
getFailed = function () {
var args = arguments;
return Promise.map(this.queues, function(queue) {
return queue[fnName].apply(queue, args);
}).then(function(results) {
var jobs = [];
results.forEach(function(val) {
jobs = jobs.concat(val);
});
return jobs;
})
}n/a
getQueue = function (priority) {
if (!(priority in PriorityQueue.priorities)) {
//in case of unknown priority, we use normal
priority = "normal";
}
var queue = this.queues[PriorityQueue.priorities[priority]];
return queue;
}...
PriorityQueue.prototype.setLockRenewTime = function(lockRenewTime) {
this.queues.forEach(function(queue) {
queue.LOCK_RENEW_TIME = lockRenewTime;
})
}
PriorityQueue.prototype.add = function(data, opts) {
return this.getQueue(opts && opts.priority).add(data, opts);
}
PriorityQueue.prototype.empty = function() {
return Promise.map(this.queues, function(queue) {
return queue.empty();
});
}
...getWaiting = function () {
var args = arguments;
return Promise.map(this.queues, function(queue) {
return queue[fnName].apply(queue, args);
}).then(function(results) {
var jobs = [];
results.forEach(function(val) {
jobs = jobs.concat(val);
});
return jobs;
})
}n/a
pause = function (localOnly) {
var _this = this;
_this.paused = Promise.map(this.queues, function(queue) {
return queue.pause(localOnly || false);
}).then(_this.emit.bind(_this, 'paused'));
return _this.paused;
}...
return Promise.reject(new Error('some unexpected error'));
});
```
A queue can be paused and resumed globally (pass `true` to pause processing for
just this worker):
```javascript
queue.pause().then(function(){
// queue is paused now
});
queue.resume().then(function(){
// queue is resumed now
})
```
...process = function (handler) {
this.handler = handler;
this.queues.forEach(function(queue, key) {
queue.setHandler(handler);
});
return this.run();
}...
var Queue = require('bull');
var videoQueue = Queue('video transcoding', 6379, '127.0.0.1');
var audioQueue = Queue('audio transcoding', 6379, '127.0.0.1');
var imageQueue = Queue('image transcoding', 6379, '127.0.0.1');
var pdfQueue = Queue('pdf transcoding', 6379, '127.0.0.1');
videoQueue.process(function(job, done){
// job.data contains the custom data passed when the job was created
// job.jobId contains id of this job.
// transcode video asynchronously and report progress
job.progress(42);
...resume = function (localOnly) {
var _this = this;
_this.paused = false;
return Promise.map(this.queues, function(queue) {
return queue.resume(localOnly || false);
}).then(_this.emit.bind(_this, 'resumed')).then(function() {
if (_this.handler) {
_this.run();
}
});
}...
A queue can be paused and resumed globally (pass `true` to pause processing for
just this worker):
```javascript
queue.pause().then(function(){
// queue is paused now
});
queue.resume().then(function(){
// queue is resumed now
})
```
A queue emits also some useful events:
```javascript
.on('ready', function() {
...run = function () {
var _this = this;
// .reverse() is done in place and therefore mutating the queues array
// so a copy is needed to prevent harmful side effects and general voodoo
var reversedQueues = _this.queues.slice().reverse();
var loop = function() {
var emptyLoop = true;
return Promise.each(reversedQueues, function(queue, index) {
if(_this.closing){
return _this.closing;
}
// the index is reversed to the actual priority number (0 is 'critical')
// so flip it to get the correct "priority index"
var nbJobsToProcess = _this.strategy(PriorityQueue.priorities.critical - index);
var i = 0;
var fn = function() {
return queue.moveUnlockedJobsToWait().then(queue.getNextJob.bind(queue, {
block: false
}))
.then(function(job) {
if (job) {
emptyLoop = false;
return queue.processJob(job).then(function() {
if (++i < nbJobsToProcess && !_this.paused) {
return fn();
}
})
} else {
//nothing It will release loop and call next priority queue even if we have no reach nbJobsToProcess
}
})
}
return fn();
}).then(function() {
if (!_this.paused) {
return Promise.delay((emptyLoop) ? _this.waitAfterEmptyLoop : 0).then(loop);
}
});
}
return loop();
}...
}
PriorityQueue.prototype.process = function(handler) {
this.handler = handler;
this.queues.forEach(function(queue, key) {
queue.setHandler(handler);
});
return this.run();
}
//
// TODO: Remove the polling mechanism using pub/sub.
//
PriorityQueue.prototype.run = function() {
var _this = this;
...setLockRenewTime = function (lockRenewTime) {
this.queues.forEach(function(queue) {
queue.LOCK_RENEW_TIME = lockRenewTime;
})
}n/a
function Queue(name, redisPort, redisHost, redisOptions){
if(!(this instanceof Queue)){
return new Queue(name, redisPort, redisHost, redisOptions);
}
if(_.isObject(redisPort)) {
var opts = redisPort;
var redisOpts = opts.redis || {};
redisPort = redisOpts.port;
redisHost = redisOpts.host;
redisOptions = redisOpts.opts || {};
redisOptions.db = redisOpts.DB || redisOpts.DB;
} else if(parseInt(redisPort) == redisPort) {
redisPort = parseInt(redisPort);
redisOptions = redisOptions || {};
} else if(_.isString(redisPort)) {
try {
var redisUrl = url.parse(redisPort);
assert(_.isObject(redisHost) || _.isUndefined(redisHost),
'Expected an object as redis option');
redisOptions = redisHost || {};
redisPort = redisUrl.port;
redisHost = redisUrl.hostname;
if (redisUrl.auth) {
redisOptions.password = redisUrl.auth.split(':')[1];
}
} catch (e) {
throw new Error(e.message);
}
}
redisOptions = redisOptions || {};
function createClient(type) {
var client;
if(_.isFunction(redisOptions.createClient)){
client = redisOptions.createClient(type);
}else{
client = new redis(redisPort, redisHost, redisOptions);
}
return client;
}
redisPort = redisPort || 6379;
redisHost = redisHost || '127.0.0.1';
var _this = this;
this.name = name;
this.keyPrefix = redisOptions.keyPrefix || 'bull';
//
// We cannot use ioredis keyPrefix feature until we
// stop creating keys dynamically in lua scripts.
//
delete redisOptions.keyPrefix;
//
// Create queue client (used to add jobs, pause queues, etc);
//
this.client = createClient('client');
getRedisVersion(this.client).then(function(version){
if(semver.lt(version, MINIMUM_REDIS_VERSION)){
throw new Error('Redis version needs to be greater than ' + MINIMUM_REDIS_VERSION + '. Current: ' + version);
}
}).catch(function(err){
_this.emit('error', err);
});
//
// Keep track of cluster clients for redlock
//
this.clients = [this.client];
if (redisOptions.clients) {
this.clients.push.apply(this.clients, redisOptions.clients);
}
this.redlock = {
driftFactor: REDLOCK_DRIFT_FACTOR,
retryCount: REDLOCK_RETRY_COUNT,
retryDelay: REDLOCK_RETRY_DELAY
};
_.extend(this.redlock, redisOptions.redlock || {});
//
// Create blocking client (used to wait for jobs)
//
this.bclient = createClient('block');
//
// Create event subscriber client (receive messages from other instance of the queue)
//
this.eclient = createClient('subscriber');
this.delayTimer = null;
this.processing = 0;
this.retrieving = 0;
this.LOCK_RENEW_TIME = LOCK_RENEW_TIME;
this.LOCK_DURATION = LOCK_DURATION;
this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL;
this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT;
// bubble up Redis error events
[this.client, this.bclient, this.eclient].forEach(function (client) {
client.on('error', _this.emit.bind(_this, 'error'));
});
// keeps track of active timers. used by close() to
// ensure that disconnect() is deferred until all
// scheduled redis commands have been executed
this.timers = new TimerManager();
// emit ready when redis connections ready
var initializers = [this.client, this.bclient, this.eclient].map(function (client) {
return new Promise(function(resolve, reject) {
client.once('ready', resolve);
client.once('error', reject);
});
});
this._initializing = Promise.all(initializers).then(function(){
return Promise.join(
_this.eclient.subscribe(_this.toKey('delayed')),
_this.eclient.subscribe(_this.toKey('paused'))
);
}).then(function(){
debuglog(name + ' queue ready');
_this.emit('ready');
}, function(err){
console.error('Error initializing queue:', err);
});
Disturbed.call(this, _this.client, _this.eclient);
//
// Listen distributed queue events
//
listenDistEvent('stalled'); //
listenDistEvent('active'); //
listenDist ...n/a
super_ = function (pubClient, subClient) {
var _this = this;
EventEmitter.call(this);
this.uuid = uuid();
this.pubClient = pubClient;
this.subClient = subClient;
subClient.on('message', function (channel, msg) {
var count = _this.listenerCount(channel);
if (count) {
var args;
try {
args = JSON.parse(msg);
} catch (err) {
console.error('Parsing event message', err);
}
if (args[0] !== _this.uuid) {
args[0] = channel;
_this.emit.apply(_this, args);
}
}
});
}n/a
add = function (data, opts){
return Job.create(this, data, opts);
}...
});
pdfQueue.process(function(job){
// Processors can also return promises instead of using the done callback
return pdfAsyncProcessor();
});
videoQueue.add({video: 'http://example.com/video1.mov'});
audioQueue.add({audio: 'http://example.com/audio1.mp3'});
imageQueue.add({image: 'http://example.com/image1.tiff'});
```
Alternatively, you can use return promises instead of using the `done` callback:
```javascript
...clean = function (grace, type, limit) {
var _this = this;
return new Promise(function (resolve, reject) {
if(grace === undefined || grace === null) {
return reject(new Error('You must define a grace period.'));
}
if(!type) {
type = 'completed';
}
if(_.indexOf([
'completed',
'wait',
'active',
'delayed',
'failed'], type) === -1){
return reject(new Error('Cannot clean unkown queue type'));
}
return scripts.cleanJobsInSet(_this, type, Date.now() - grace, limit).then(function (jobs) {
_this.distEmit('cleaned', jobs, type);
resolve(jobs);
return null;
}).catch(function (err) {
_this.emit('error', err);
reject(err);
});
});
}...
Tells the queue remove jobs of a specific type created outside of a grace period.
__Example__
```javascript
//cleans all jobs that completed over 5 seconds ago.
queue.clean(5000);
//clean all jobs that failed over 10 seconds ago.
queue.clean(10000, 'failed');
queue.on('cleaned', function (job, type) {
console.log('Cleaned %s %s jobs', job.length, type);
});
```
...close = function ( doNotWaitJobs ){
var _this = this;
if(this.closing){
return this.closing;
}
return this.closing = this._initializing.then(function(){
clearTimeout(_this.delayTimer);
clearInterval(_this.guardianTimer);
clearInterval(_this.moveUnlockedJobsToWaitInterval);
_this.timers.clearAll();
return _this.timers.whenIdle().then(function(){
return _this.pause(true, doNotWaitJobs);
}).then(function(){
return _this.disconnect();
}).then(function(){
_this.closed = true;
});
});
}...
shutdown.
```javascript
var Queue = require('bull');
var queue = Queue('example');
var after100 = _.after(100, function () {
queue.close().then(function () { console.log('done') })
});
queue.on('completed', after100);
```
`close` can be called from anywhere, with one caveat: if called
from within a job handler the queue won't close until *after*
...count = function (){
var multi = this.multi();
multi.llen(this.toKey('wait'));
multi.llen(this.toKey('paused'));
multi.zcard(this.toKey('delayed'));
return multi.exec().then(function(res){
return Math.max(res[0][1], res[1][1]) + res[2][1];
});
}...
return Promise.resolve(jobs);
});
}
PriorityQueue.prototype.count = function() {
return Promise.map(this.queues, function(queue) {
return queue.count();
}).then(function(results) {
var sum = 0;
results.forEach(function(val) {
sum += val;
});
return sum;
})
...disconnect = function (){
var _this = this;
function endClients(){
var timeoutMsg = 'Timed out while waiting for redis clients to close';
return new Promise(function(resolve) {
_this.bclient.end(true);
_this.bclient.stream.once('close', resolve);
}).timeout(CLIENT_CLOSE_TIMEOUT_MS, timeoutMsg)
.catch(function(err){
if(!(err instanceof Promise.TimeoutError)){
throw err;
}
});
}
return Promise.join(
_this.client.quit(),
_this.eclient.quit()
).then(endClients, endClients);
}...
*
* @type {number}
*/
PriorityQueue.prototype.waitAfterEmptyLoop = 200;
PriorityQueue.prototype.disconnect = function() {
return Promise.map(this.queues, function(queue) {
return queue.disconnect();
})
}
PriorityQueue.prototype.close = function( doNotWaitJobs ) {
return this.closing = Promise.map(this.queues, function(queue) {
return queue.close( doNotWaitJobs );
});
...distEmit = function (){
var args = Array.prototype.slice.call(arguments);
// Emit local event
this.emit.apply(this, args);
// Emit global event
args[0] = args[0] + '@' + this.name;
return Disturbed.prototype.distEmit.apply(this, args);
}...
}
Job.create = function(queue, data, opts){
var job = new Job(queue, data, opts);
return addJob(queue, job).then(function(jobId){
job.jobId = jobId;
queue.distEmit('waiting', job.toJSON());
debuglog('Job added', jobId);
return job;
});
};
Job.fromId = function(queue, jobId){
// jobId can be undefined if moveJob returns undefined
...empty = function (){
var _this = this;
// Get all jobids and empty all lists atomically.
var multi = this.multi();
multi.lrange(this.toKey('wait'), 0, -1);
multi.lrange(this.toKey('paused'), 0, -1);
multi.del(this.toKey('wait'));
multi.del(this.toKey('paused'));
multi.del(this.toKey('meta-paused'));
multi.del(this.toKey('delayed'));
return multi.exec().spread(function(waiting, paused){
waiting = waiting[1];
paused = paused[1];
var jobKeys = (paused.concat(waiting)).map(_this.toKey, _this);
if(jobKeys.length){
multi = _this.multi();
multi.del.apply(multi, jobKeys);
return multi.exec();
}
});
}...
PriorityQueue.prototype.add = function(data, opts) {
return this.getQueue(opts && opts.priority).add(data, opts);
}
PriorityQueue.prototype.empty = function() {
return Promise.map(this.queues, function(queue) {
return queue.empty();
});
}
PriorityQueue.prototype.pause = function(localOnly) {
var _this = this;
_this.paused = Promise.map(this.queues, function(queue) {
...getActive = function (){
return this.getJobs('active', 'LIST');
}n/a
getActiveCount = function () {
return this.client.llen(this.toKey('active'));
}n/a
getCompleted = function (){
return this.getJobs('completed', 'SET');
}n/a
getCompletedCount = function () {
return this.client.scard(this.toKey('completed'));
}n/a
getDelayed = function (){
return this.getJobs('delayed', 'ZSET');
}n/a
getDelayedCount = function () {
return this.client.zcard(this.toKey('delayed'));
}n/a
getFailed = function (){
return this.getJobs('failed', 'SET');
}n/a
getFailedCount = function () {
return this.client.scard(this.toKey('failed'));
}n/a
getJob = function (jobId){
return Job.fromId(this, jobId);
}n/a
getJobCountByTypes = function () {
var _this = this;
var args = _.compact(Array.prototype.slice.call(arguments));
var types = _.compact(args.join(',').replace(/ /g, '').split(','));
var multi = this.multi();
_.each(types, function(type) {
var key = _this.toKey(type);
switch(type) {
case 'completed':
case 'failed':
multi.scard(key);
break;
case 'delayed':
multi.zcard(key);
break;
case 'active':
case 'wait':
case 'paused':
multi.llen(key);
break;
}
});
return multi.exec().then(function(res){
return res.map(function(v) {
return v[1]
}).reduce(function(a, b) {
return a + b
});
}) || 0;
}n/a
getJobCounts = function (){
var types = ['wait', 'active', 'completed', 'failed', 'delayed'];
var counts = {};
return this.client.multi()
.llen(this.toKey('wait'))
.llen(this.toKey('active'))
.scard(this.toKey('completed'))
.scard(this.toKey('failed'))
.zcard(this.toKey('delayed'))
.exec().then(function(result){
result.forEach(function(res, index){
counts[types[index]] = res[1] || 0;
});
return counts;
});
}n/a
getJobMoveCount = function (){
return this.bclient.commandQueue.length;
}n/a
getJobs = function (queueType, type, start, end){
var _this = this;
var key = this.toKey(queueType);
var jobs;
start = _.isUndefined(start) ? 0 : start;
end = _.isUndefined(end) ? -1 : end;
switch(type){
case 'LIST':
jobs = this.client.lrange(key, start, end);
break;
case 'SET':
jobs = this.client.smembers(key).then(function(jobIds) {
// Can't set a range for smembers. So do the slice programatically instead.
// Note that redis ranges are inclusive, so handling for javascript accordingly
if (end === -1) {
return jobIds.slice(start);
}
return jobIds.slice(start, end + 1);
});
break;
case 'ZSET':
jobs = this.client.zrange(key, start, end);
break;
}
return jobs.then(function(jobIds){
var jobsFromId = jobIds.map(_this.getJobFromId);
return Promise.all(jobsFromId);
});
}n/a
getNextJob = function (opts){
var _this = this;
if(!this.closing){
this.retrieving++;
return this.moveJob('wait', 'active', opts)
.then(this.getJobFromId)
.tap(function(job) {
_this.retrieving--;
if (job) {
_this.processing++;
} else {
_this.emit('no-job-retrieved');
}
})
.catch(function(err) {
_this.retrieving--;
_this.emit('no-job-retrieved');
throw err;
});
}else{
return Promise.reject();
}
}n/a
getPausedCount = function () {
return this.client.llen(this.toKey('paused'));
}n/a
getWaiting = function (){
return Promise.join(
this.getJobs('wait', 'LIST'),
this.getJobs('paused', 'LIST')).spread(function(waiting, paused){
return _.concat(waiting, paused);
});
}n/a
getWaitingCount = function () {
return this.client.llen(this.toKey('wait'));
}n/a
isReady = function (){
var _this = this;
return this._initializing.then(function(){
return _this;
});
}n/a
moveJob = function (src, dst, opts) {
var args = arguments;
var _this = this;
var move;
if(opts && opts.block === false){
if(!this.closing){
move = this.bclient.rpoplpush(this.toKey(src), this.toKey(dst));
}else{
move = Promise.reject();
}
} else if (this.closing || this.paused) {
move = Promise.resolve();
} else if (this.getJobMoveCount()) {
move = this.whenCurrentMoveFinished().then(function() {
return _this.moveJob.apply(_this, args);
});
}else{
move = this.bclient.brpoplpush(
this.toKey(src),
this.toKey(dst),
Math.floor(this.LOCK_RENEW_TIME / 1000));
}
return move.then(function(jobId){
//
// Unfortunatelly this cannot be performed atomically, which will lead to a
// slight hazard for priority queues (will only affect its order).
//
if(jobId){
return _this.client.zrem(_this.toKey('priority'), jobId).then(function(){
return jobId;
});
}
}, function(err){
if(!_this.closing){
throw err;
}
});
}n/a
moveUnlockedJobsToWait = function (){
var _this = this;
return scripts.moveUnlockedJobsToWait(this).then(function(responses){
var handleFailedJobs = responses[0].map(function(jobId){
return _this.getJobFromId(jobId).then(function(job){
_this.distEmit('failed', job, new Error('job stalled more than allowable limit'));
return null;
});
});
var handleStalledJobs = responses[1].map(function(jobId){
return _this.getJobFromId(jobId).then(function(job){
_this.distEmit('stalled', job);
return null;
});
});
return Promise.all(handleFailedJobs.concat(handleStalledJobs));
}).catch(function(err){
console.error('Failed to handle unlocked job in active:', err);
});
}...
// the index is reversed to the actual priority number (0 is 'critical')
// so flip it to get the correct "priority index"
var nbJobsToProcess = _this.strategy(PriorityQueue.priorities.critical - index);
var i = 0;
var fn = function() {
return queue.moveUnlockedJobsToWait().then(queue.getNextJob.bind(queue, {
block: false
}))
.then(function(job) {
if (job) {
emptyLoop = false;
return queue.processJob(job).then(function() {
if (++i < nbJobsToProcess && !_this.paused) {
...multi = function (){
return this.client.multi();
}...
};
/*
Queue.prototype.empty = function(){
var _this = this;
// Get all jobids and empty all lists atomically.
var multi = this.multi();
multi.lrange(this.toKey('wait'), 0, -1);
multi.lrange(this.toKey('paused'), 0, -1);
multi.del(this.toKey('wait'));
multi.del(this.toKey('paused'));
multi.del(this.toKey('meta-paused'));
multi.del(this.toKey('delayed'));
...on = function (){
var args = Array.prototype.slice.call(arguments);
var promise = Disturbed.prototype.on.apply(this, args);
var _this = this;
promise.catch(function(err){ _this.emit('error', err); });
return this;
}...
queue.resume().then(function(){
// queue is resumed now
})
```
A queue emits also some useful events:
```javascript
.on('ready', function() {
// Queue ready for job
// All Redis connections are done
})
.on('error', function(error) {
// Error
})
.on('active', function(job, jobPromise){
...once = function (){
var args = Array.prototype.slice.call(arguments);
Disturbed.prototype.once.apply(this, args);
return this;
}...
this.queues[PriorityQueue.priorities[key]] = queue;
}
var groupEvents = ['ready', 'paused', 'resumed']
groupEvents.forEach(function(event) {
Promise.map(_this.queues, function(queue) {
return new Promise(function(resolve, reject) {
queue.once(event, resolve);
});
}).then(_this.emit.bind(_this, event))
})
var singleEvents = ['error', 'active', 'stalled', 'progress', 'completed', '
;failed', 'cleaned']
singleEvents.forEach(function(event) {
_this.queues.forEach(function(queue) {
...pause = function (isLocal, doNotWaitActive){
if(isLocal){
var _this = this;
if(!this.paused){
this.paused = new Promise(function(resolve) {
_this.resumeLocal = function() {
resolve();
_this.paused = null; // Allow pause to be checked externally for paused state.
};
});
}
return !doNotWaitActive && this.whenCurrentJobsFinished();
}else{
return pauseResumeGlobal(this, true);
}
}...
return Promise.reject(new Error('some unexpected error'));
});
```
A queue can be paused and resumed globally (pass `true` to pause processing for
just this worker):
```javascript
queue.pause().then(function(){
// queue is paused now
});
queue.resume().then(function(){
// queue is resumed now
})
```
...process = function (concurrency, handler){
var _this = this;
if(typeof concurrency === 'function'){
handler = concurrency;
concurrency = 1;
}
this.setHandler(handler);
var runQueueWhenReady = function(){
_this.bclient.once('ready', function(){
_this.run(concurrency).catch(function(err){
console.error(err);
});
});
};
// attempt to restart the queue when the client throws
// an error or the connection is dropped by redis
this.bclient.on('error', runQueueWhenReady);
this.bclient.on('end', runQueueWhenReady);
return this.run(concurrency).catch(function(err){
console.error(err);
throw err;
});
}...
var Queue = require('bull');
var videoQueue = Queue('video transcoding', 6379, '127.0.0.1');
var audioQueue = Queue('audio transcoding', 6379, '127.0.0.1');
var imageQueue = Queue('image transcoding', 6379, '127.0.0.1');
var pdfQueue = Queue('pdf transcoding', 6379, '127.0.0.1');
videoQueue.process(function(job, done){
// job.data contains the custom data passed when the job was created
// job.jobId contains id of this job.
// transcode video asynchronously and report progress
job.progress(42);
...processJob = function (job){
var _this = this;
var lockRenewId;
var timmerStopped = false;
if(!job){
return Promise.resolve();
}
//
// TODO:
// There are two cases to take into consideration regarding locks.
// 1) The lock renewer fails to renew a lock, this should make this job
// unable to complete, since some other worker is also working on it.
// 2) The lock renewer is called more seldom than the check for stalled
// jobs, so we can assume the job has been stalled and is already being processed
// by another worker. See #308
//
var renew = false;
var lockRenewer = function(){
return job.takeLock(renew, true).then(function(lock){
if(lock && !timmerStopped){
renew = true;
lockRenewId = _this.timers.set('lockRenewer', _this.LOCK_RENEW_TIME, lockRenewer);
}
// TODO: if we failed to re-acquire the lock while trying to renew, should we let the job
// handler know and cancel the timer?
return lock;
}, function(err){
console.error('Error renewing lock ' + err);
});
};
var timeoutMs = job.opts.timeout;
function stopTimer(){
timmerStopped = true;
_this.timers.clear(lockRenewId);
return Promise.resolve();
}
function handleCompleted(data){
try{
JSON.stringify(data);
}catch(err){
return handleFailed(err);
}
// This substraction is duplicate in handleCompleted and handleFailed because it have to be made before throwing any
// event completed or failed in order to allow pause() to work correctly without getting stuck.
_this.processing--;
stopTimer();
if(_this.closed){
return;
}
return job.moveToCompleted(data).then(function(){
return _this.distEmit('completed', job, data);
});
}
function handleFailed(err){
var error = err.cause || err; //Handle explicit rejection
// TODO: Should moveToFailed ensure the lock atomically in one of its Lua scripts?
// See https://github.com/OptimalBits/bull/pull/415#issuecomment-269744735
job.takeLock(true /* renwew */, false /* ensureActive */).then( function(lock) {
return stopTimer()
.then(job.moveToFailed(err))
.then(job.releaseLock.bind(job))
.then(function(){
return _this.distEmit('failed', job, error);
});
}, function(err){
console.error('failed to re-obtain lock before moving to failed, bailing: ', err);
stopTimer();
});
_this.processing--;
}
return lockRenewer().then(function(locked){
if(locked){
var jobPromise = _this.handler(job);
if(timeoutMs){
jobPromise = jobPromise.timeout(timeoutMs);
}
_this.distEmit('active', job, jobPromise);
return jobPromise
.then(handleCompleted, handleFailed);
}
});
}...
var fn = function() {
return queue.moveUnlockedJobsToWait().then(queue.getNextJob.bind(queue, {
block: false
}))
.then(function(job) {
if (job) {
emptyLoop = false;
return queue.processJob(job).then(function() {
if (++i < nbJobsToProcess && !_this.paused) {
return fn();
}
})
} else {
//nothing It will release loop and call next priority queue even if we have no reach nbJobsToProcess
}
...processJobs = function (resolve, reject){
var _this = this;
var processJobs = this.processJobs.bind(this, resolve, reject);
if(!this.closing){
process.nextTick(function(){
(_this.paused || Promise.resolve())
.then(_this.getNextJob)
.then(_this.processJob)
.then(processJobs, function(err){
console.error('Error processing job:', err);
processJobs();
}).catch(reject);
});
}else{
resolve(this.closing);
}
}n/a
resume = function (isLocal){
if(isLocal){
if(this.resumeLocal){
this.resumeLocal();
}
return Promise.resolve();
}else{
return pauseResumeGlobal(this, false);
}
}...
A queue can be paused and resumed globally (pass `true` to pause processing for
just this worker):
```javascript
queue.pause().then(function(){
// queue is paused now
});
queue.resume().then(function(){
// queue is resumed now
})
```
A queue emits also some useful events:
```javascript
.on('ready', function() {
...retryJob = function (job) {
return job.retry();
}n/a
run = function (concurrency){
var promises = [];
var _this = this;
return this.moveUnlockedJobsToWait().then(function(){
while(concurrency--){
promises.push(new Promise(_this.processJobs));
}
_this.startMoveUnlockedJobsToWait();
return Promise.all(promises);
});
}...
}
PriorityQueue.prototype.process = function(handler) {
this.handler = handler;
this.queues.forEach(function(queue, key) {
queue.setHandler(handler);
});
return this.run();
}
//
// TODO: Remove the polling mechanism using pub/sub.
//
PriorityQueue.prototype.run = function() {
var _this = this;
...setHandler = function (handler){
if(this.handler) {
throw new Error('Cannot define a handler more than once per Queue instance');
}
handler = handler.bind(this);
if(handler.length > 1){
this.handler = Promise.promisify(handler);
}else{
this.handler = Promise.method(handler);
}
}...
return queue.close( doNotWaitJobs );
});
}
PriorityQueue.prototype.process = function(handler) {
this.handler = handler;
this.queues.forEach(function(queue, key) {
queue.setHandler(handler);
});
return this.run();
}
//
// TODO: Remove the polling mechanism using pub/sub.
//
...startMoveUnlockedJobsToWait = function () {
if (this.STALLED_JOB_CHECK_INTERVAL > 0){
clearInterval(this.moveUnlockedJobsToWaitInterval);
this.moveUnlockedJobsToWaitInterval =
setInterval(this.moveUnlockedJobsToWait, this.STALLED_JOB_CHECK_INTERVAL);
}
}n/a
toKey = function (queueType){
return [this.keyPrefix, this.name, queueType].join(':');
}...
};
Job.fromId = function(queue, jobId){
// jobId can be undefined if moveJob returns undefined
if(!jobId) {
return Promise.resolve();
}
return queue.client.hgetall(queue.toKey(jobId)).then(function(jobData){
if(!_.isEmpty(jobData)){
return Job.fromData(queue, jobId, jobData);
}else{
return null;
}
});
};
...updateDelayTimer = function (newDelayedTimestamp){
var _this = this;
if(newDelayedTimestamp < _this.delayedTimestamp && newDelayedTimestamp < (MAX_TIMEOUT_MS + Date.now())){
clearTimeout(this.delayTimer);
this.delayedTimestamp = newDelayedTimestamp;
var nextDelayedJob = newDelayedTimestamp - Date.now();
nextDelayedJob = nextDelayedJob < 0 ? 0 : nextDelayedJob;
this.delayTimer = setTimeout(function(){
scripts.updateDelaySet(_this, _this.delayedTimestamp).then(function(nextTimestamp){
if(nextTimestamp){
nextTimestamp = nextTimestamp < Date.now() ? Date.now() : nextTimestamp;
}else{
nextTimestamp = Number.MAX_VALUE;
}
_this.updateDelayTimer(nextTimestamp);
}).catch(function(err){
console.error('Error updating the delay timer', err);
});
_this.delayedTimestamp = Number.MAX_VALUE;
}, nextDelayedJob);
}
}n/a
whenCurrentJobsFinished = function (){
var _this = this;
var resolver;
var count = this.processing + this.retrieving;
return new Promise(function(resolve) {
if(count === 0){
resolve();
}else{
resolver = _.after(count, function(){
_this.removeListener('stalled', resolver);
_this.removeListener('completed', resolver);
_this.removeListener('failed', resolver);
_this.removeListener('no-job-retrieved', resolver);
clearInterval(_this.moveUnlockedJobsToWaitInterval);
resolve();
});
_this.on('stalled', resolver);
_this.on('completed', resolver);
_this.on('failed', resolver);
_this.on('no-job-retrieved', resolver);
_this.startMoveUnlockedJobsToWait();
}
});
}n/a
whenCurrentMoveFinished = function (){
var currentMove = this.bclient.commandQueue.peekFront()
return currentMove && currentMove.command.promise || Promise.resolve();
}n/a
_isJobInList = function (keyVar, argVar, operator) {
keyVar = keyVar || 'KEYS[1]';
argVar = argVar || 'ARGV[1]';
operator = operator || 'return';
return [
'local function item_in_list (list, item)',
' for _, v in pairs(list) do',
' if v == item then',
' return 1',
' end',
' end',
' return nil',
'end',
['local items = redis.call("LRANGE",', keyVar, ' , 0, -1)'].join(''),
[operator, ' item_in_list(items, ', argVar, ')'].join('')
].join('\n');
}...
' return nil',
'end',
['local items = redis.call("LRANGE",', keyVar, ' , 0, -1)'].join(''),
[operator, ' item_in_list(items, ', argVar, ')'].join('')
].join('\n');
},
isJobInList: function(client, listKey, jobId){
return execScript(client, 'isJobInList', this._isJobInList(), 1, listKey,
jobId).then(function(result){
return result === 1;
});
},
addJob: function(client, toKey, job, opts){
var delayed;
var scriptName;
...addJob = function (client, toKey, job, opts){
var delayed;
var scriptName;
opts = opts || {};
opts.lifo = !!(opts.lifo);
var delayTimestamp = job.timestamp + job.delay;
if(job.delay && delayTimestamp > Date.now()){
delayed = true;
scriptName = 'addJob:delayed';
} else {
scriptName = 'addJob'+(opts.lifo?':lifo':'') + (opts.priority?':priority':'');
}
/*
if(isCommandDefined(client, scriptName)){
return client[scriptName].apply(client, args);
};
*/
var jobArgs = _.flatten(_.toPairs(job));
var keys = _.map(['wait', 'paused', 'meta-paused', 'jobs', 'id', 'delayed', 'priority'], function(name){
return toKey(name);
});
var baseKey = toKey('');
var argvs = _.map(jobArgs, function(arg, index){
return ', ARGV['+(index+4)+']';
})
var script = [
'local jobCounter = redis.call("INCR", KEYS[5])',
'local jobId',
'if ARGV[2] == "" then jobId = jobCounter else jobId = ARGV[2] end',
'local jobIdKey = ARGV[1] .. jobId',
'if redis.call("EXISTS", jobIdKey) == 1 then return jobId end',
'redis.call("HMSET", jobIdKey' + argvs.join('') + ')',
];
var delayTimestamp = job.timestamp + job.delay;
if(delayed){
script.push.apply(script, [
' local timestamp = tonumber(ARGV[' + (argvs.length + 4) + ']) * 0x1000 + bit.band(jobCounter, 0xfff)',
' redis.call("ZADD", KEYS[6], timestamp, jobId)',
' redis.call("PUBLISH", KEYS[6], (timestamp / 0x1000))',
' return jobId',
]);
}else{
var push, pushPaused;
var add = _.template('redis.call("<%= direction %>", <%= waitQueue %>, jobId)');
if(opts.lifo){
push = add({direction: 'RPUSH', waitQueue: 'KEYS[1]'});
pushPaused = add({direction: 'RPUSH', waitQueue: 'KEYS[2]'});
}else if(opts.priority){
script.push.apply(script, [
' redis.call("ZADD", KEYS[7], ARGV[3], jobId)',
' local count = redis.call("ZCOUNT", KEYS[7], 0, ARGV[3])',
]);
var priorityAdd = _.template([
' local len = redis.call("LLEN", <%= waitQueue %>)',
' local id = redis.call("LINDEX", <%= waitQueue %>, len - (count-1))',
' if id then',
' redis.call("LINSERT", <%= waitQueue %>, "BEFORE", id, jobId)',
' else',
' redis.call("RPUSH", <%= waitQueue %>, jobId)',
' end',
].join('\n'));
push = priorityAdd({waitQueue: 'KEYS[1]'});
pushPaused = priorityAdd({waitQueue: 'KEYS[2]'});
}else{
push = add({direction: 'LPUSH', waitQueue: 'KEYS[1]'});
pushPaused = add({direction: 'LPUSH', waitQueue: 'KEYS[2]'});
}
//
// Whe check for the meta-paused key to decide if we are paused or not
// (since an empty list and !EXISTS are not really the same)
script.push.apply(script, [
'if redis.call("EXISTS", KEYS[3]) ~= 1 then',
push,
'else',
pushPaused,
'end',
'redis.call("PUBLISH", KEYS[4], jobId)',
'return jobId .. ""'
]);
}
var args = [
client,
scriptName,
script.join('\n'),
keys.length
];
args.push.apply(args, keys);
args.push(baseKey);
args.push(opts.customJobId || '');
args.push(opts.priority);
args.push.apply(args, jobArgs);
args.push(delayTimestamp);
return execScript.apply(scripts, args);
}...
this.attemptsMade = 0;
};
function addJob(queue, job){
var opts = job.opts;
var jobData = job.toData();
var toKey = _.bind(queue.toKey, queue);
return scripts.addJob(queue.client, toKey, jobData, {
lifo: opts.lifo,
customJobId: opts.jobId,
priority: opts.priority
});
}
Job.create = function(queue, data, opts){
...cleanJobsInSet = function (queue, set, ts, limit) {
var command;
var removeCommand;
var breakEarlyCommand = '';
var hash;
limit = limit || 0;
switch(set) {
case 'completed':
case 'failed':
command = 'local jobs = redis.call("SMEMBERS", KEYS[1])';
removeCommand = 'redis.call("SREM", KEYS[1], job)';
hash = 'cleanSet';
break;
case 'wait':
case 'active':
case 'paused':
command = 'local jobs = redis.call("LRANGE", KEYS[1], 0, -1)';
removeCommand = 'redis.call("LREM", KEYS[1], 0, job)';
hash = 'cleanList';
break;
case 'delayed':
command = 'local jobs = redis.call("ZRANGE", KEYS[1], 0, -1)';
removeCommand = 'redis.call("ZREM", KEYS[1], job)';
hash = 'cleanOSet';
break;
}
if(limit > 0) {
breakEarlyCommand = [
'if deletedCount >= limit then',
' break',
'end',
].join('\n');
hash = hash + 'WithLimit';
}
var script = [
command,
'local deleted = {}',
'local deletedCount = 0',
'local limit = tonumber(ARGV[3])',
'local jobTS',
'for _, job in ipairs(jobs) do',
breakEarlyCommand,
' local jobKey = ARGV[1] .. job',
' if (redis.call("EXISTS", jobKey .. ":lock") == 0) then',
' jobTS = redis.call("HGET", jobKey, "timestamp")',
' if(not jobTS or jobTS < ARGV[2]) then',
removeCommand,
' redis.call("DEL", jobKey)',
' deletedCount = deletedCount + 1',
' table.insert(deleted, job)',
' end',
' end',
'end',
'return deleted'
].join('\n');
var args = [
queue.client,
hash,
script,
1,
queue.toKey(set),
queue.toKey(''),
ts,
limit
];
return execScript.apply(scripts, args);
}n/a
isJobInList = function (client, listKey, jobId){
return execScript(client, 'isJobInList', this._isJobInList(), 1, listKey, jobId).then(function(result){
return result === 1;
});
}...
return this.queue.client
.sismember(this.queue.toKey(list), this.jobId).then(function(isMember){
return isMember === 1;
});
};
Job.prototype._isInList = function(list) {
return scripts.isJobInList(this.queue.client, this.queue.toKey(list), this.jobId);
};
Job.prototype._moveToSet = function(set, context){
var queue = this.queue;
var jobId = this.jobId;
return scripts.moveToSet(queue, set, jobId, context);
...move = function (job, src, target){
// TODO: Depending on the source we should use LREM, SREM or ZREM.
// TODO: Depending on the target we should use LPUSH, SADD, etc.
var keys = _.map([
src,
target,
job.jobId
], function(name){
return job.queue.toKey(name);
}
);
var deleteJob = 'redis.call("DEL", KEYS[3])';
var moveJob = [
'redis.call("SADD", KEYS[2], ARGV[1])',
'redis.call("HSET", KEYS[3], "returnvalue", ARGV[2])',
].join('\n');
var script = [
'if redis.call("EXISTS", KEYS[3]) == 1 then', // Make sure job exists
' redis.call("LREM", KEYS[1], -1, ARGV[1])',
target ? moveJob : deleteJob,
' return 0',
'else',
' return -1',
'end'
].join('\n');
var args = [
job.queue.client,
'move' + src + (target ? target : ''),
script,
keys.length,
keys[0],
keys[1],
keys[2],
job.jobId,
job.returnvalue ? JSON.stringify(job.returnvalue) : ''
];
var returnLockOrErrorCode = function(lock) {
return lock ? execScript.apply(scripts, args) : -2;
};
var throwUnexpectedErrors = function(err) {
if (!(err instanceof Redlock.LockError)) {
throw err;
}
};
return job.takeLock(!!job.lock)
.then(returnLockOrErrorCode, throwUnexpectedErrors)
.then(function(result){
switch (result){
case -1:
if(src){
throw new Error('Missing Job ' + job.jobId + ' when trying to move from ' + src + ' to ' + target);
} else {
throw new Error('Missing Job ' + job.jobId + ' when trying to remove it from ' + src);
}
case -2:
throw new Error('Cannot get lock for job ' + job.jobId + ' when trying to move from ' + src);
default:
return job.releaseLock()
}
});
}...
Job.prototype.move = function(src, target, returnValue){
if(target === 'completed'){
this.returnvalue = returnValue || 0;
if(this.opts.removeOnComplete){
target = void 0;
}
}
return scripts.move(this, src, target);
}
Job.prototype.discard = function(){
this._discarded = true;
}
Job.prototype.moveToFailed = function(err){
...moveToCompleted = function (job, removeOnComplete){
return scripts.move(job, 'active', removeOnComplete ? void 0 : 'completed');
}...
}
}
return Promise.resolve(false);
};
Job.prototype.moveToCompleted = function(returnValue){
this.returnvalue = returnValue || 0;
return scripts.moveToCompleted(this, this.opts.removeOnComplete);
};
Job.prototype.move = function(src, target, returnValue){
if(target === 'completed'){
this.returnvalue = returnValue || 0;
if(this.opts.removeOnComplete){
target = void 0;
...moveToSet = function (queue, set, jobId, context){
//
// Bake in the job id first 12 bits into the timestamp
// to guarantee correct execution order of delayed jobs
// (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
//
// WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
//
context = _.isUndefined(context) ? 0 : context;
if(set === 'delayed') {
context = +context || 0;
context = context < 0 ? 0 : context;
if(context > 0){
context = context * 0x1000 + (jobId & 0xfff);
}
}
var keys = _.map([
'active',
set,
jobId
], function(name){
return queue.toKey(name);
}
);
var args = [
queue.client,
'moveToSet',
moveToSetScript,
keys.length,
keys[0],
keys[1],
keys[2],
JSON.stringify(context),
jobId
];
return execScript.apply(scripts, args).then(function(result){
if(result === -1){
throw new Error('Missing Job ' + jobId + ' when trying to move from active to ' + set);
}
});
}...
return scripts.isJobInList(this.queue.client, this.queue.toKey(list), this.jobId);
};
Job.prototype._moveToSet = function(set, context){
var queue = this.queue;
var jobId = this.jobId;
return scripts.moveToSet(queue, set, jobId, context);
};
Job.prototype._getBackOff = function() {
var backoff = 0;
var delay;
if(this.opts.backoff){
if(!isNaN(this.opts.backoff)){
...moveUnlockedJobsToWait = function (queue){
var script = [
'local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])',
'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)',
'local stalled = {}',
'local failed = {}',
'for _, job in ipairs(activeJobs) do',
' local jobKey = ARGV[2] .. job',
' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then',
// Remove from the active queue.
' redis.call("LREM", KEYS[1], 1, job)',
' local lockAcquired = redis.call("HGET", jobKey, "lockAcquired")',
' if(lockAcquired) then',
// If it was previously locked then we consider it 'stalled' (Case A above). If this job
// has been stalled too many times, such as if it crashes the worker, then fail it.
' local stalledCount = redis.call("HINCRBY", jobKey, "stalledCounter", 1)',
' if(stalledCount > MAX_STALLED_JOB_COUNT) then',
' redis.call("SADD", KEYS[3], job)',
' redis.call("HSET", jobKey, "failedReason", "job stalled more than allowable limit")',
' table.insert(failed, job)',
' else',
// Move the job back to the wait queue, to immediately be picked up by a waiting worker.
' redis.call("RPUSH", KEYS[2], job)',
' table.insert(stalled, job)',
' end',
' else',
// Move the job back to the wait queue, to immediately be picked up by a waiting worker.
' redis.call("RPUSH", KEYS[2], job)',
' end',
' end',
'end',
'return {failed, stalled}'
].join('\n');
var args = [
queue.client,
'moveUnlockedJobsToWait',
script,
3,
queue.toKey('active'),
queue.toKey('wait'),
queue.toKey('failed'),
queue.MAX_STALLED_JOB_COUNT,
queue.toKey('')
];
return execScript.apply(scripts, args);
}...
// the index is reversed to the actual priority number (0 is 'critical')
// so flip it to get the correct "priority index"
var nbJobsToProcess = _this.strategy(PriorityQueue.priorities.critical - index);
var i = 0;
var fn = function() {
return queue.moveUnlockedJobsToWait().then(queue.getNextJob.bind(queue, {
block: false
}))
.then(function(job) {
if (job) {
emptyLoop = false;
return queue.processJob(job).then(function() {
if (++i < nbJobsToProcess && !_this.paused) {
...releaseLock = function (job){
var lock = job.lock;
if (!lock) {
throw new Error('Unable to release nonexisting lock');
}
return lock.unlock()
}...
};
/**
Releases the lock. Only locks owned by the queue instance can be released.
*/
Job.prototype.releaseLock = function(){
var _this = this;
return scripts.releaseLock(this)
.then(function() { _this.lock = null; });
};
Job.prototype.delayIfNeeded = function(){
if(this.delay){
var jobDelayedTimestamp = this.timestamp + this.delay;
if(jobDelayedTimestamp > Date.now()){
...remove = function (queue, jobId){
var script = [
'redis.call("LREM", KEYS[1], 0, ARGV[1])',
'redis.call("LREM", KEYS[2], 0, ARGV[1])',
'redis.call("ZREM", KEYS[3], ARGV[1])',
'redis.call("LREM", KEYS[4], 0, ARGV[1])',
'redis.call("SREM", KEYS[5], ARGV[1])',
'redis.call("SREM", KEYS[6], ARGV[1])',
'redis.call("DEL", KEYS[7])'].join('\n');
var keys = _.map([
'active',
'wait',
'delayed',
'paused',
'completed',
'failed',
jobId], function(name){
return queue.toKey(name);
}
);
var args = [
queue.client,
'remove',
script,
keys.length,
keys[0],
keys[1],
keys[2],
keys[3],
keys[4],
keys[5],
keys[6],
jobId
];
return execScript.apply(scripts, args);
}...
return _this.moveToDelayed(Date.now() + backoff);
}else{
// If not, retry immediately
return _this._retryAtOnce();
}
} else if(_this.opts.removeOnFail){
return _this.releaseLock().then(function(){
return _this.remove();
});
}
// If not, move to failed
return _this._moveToSet('failed');
});
};
...reprocessJob = function (job, options) {
var push = (job.opts.lifo ? 'R' : 'L') + 'PUSH';
var script = [
'if (redis.call("EXISTS", KEYS[1]) == 1) then',
' if (redis.call("EXISTS", KEYS[2]) == 0) then',
' if (redis.call("SREM", KEYS[3], ARGV[1]) == 1) then',
' redis.call("' + push + '", KEYS[4], ARGV[1])',
' redis.call("PUBLISH", KEYS[5], ARGV[1])',
' return 1',
' else',
' return -2',
' end',
' else',
' return -1',
' end',
'else',
' return 0',
'end'
].join('\n');
var queue = job.queue;
var keys = [
queue.toKey(job.jobId),
queue.toKey(job.jobId) + ':lock',
queue.toKey(options.state),
queue.toKey('wait'),
queue.toKey('jobs')
];
var args = [
queue.client,
'retryJob',
script,
5,
keys[0],
keys[1],
keys[2],
keys[3],
keys[4],
job.jobId
];
return execScript.apply(scripts, args);
}...
* @return {Promise} If resolved and return code is 1, then the queue emits a waiting event
* otherwise the operation was not a success and throw the corresponding error. If the promise
* rejects, it indicates that the script failed to execute
*/
Job.prototype.retry = function(){
var queue = this.queue;
var _this = this;
return scripts.reprocessJob(this, { state: 'failed' }).then(function(result
) {
if (result === 1) {
queue.emit('waiting', _this);
} else if (result === 0) {
throw new Error('Couldn\'t retry job: The job doesn\'t exist');
} else if (result === -1) {
throw new Error('Couldn\'t retry job: The job is locked');
} else if (result === -2) {
...takeLock = function (queue, job, renew, ensureActive){
var lock = job.lock;
if (renew && !lock) {
throw new Error('Unable to renew nonexisting lock');
}
if (renew) {
return lock.extend(queue.LOCK_DURATION);
}
if (lock) {
return Promise.resolve(lock);
}
var redlock;
if (ensureActive) {
var isJobInList = this._isJobInList('KEYS[2]', 'ARGV[3]', 'if');
var lockAcquired = ['and redis.call("HSET", KEYS[3], "lockAcquired", "1")'].join('');
var success = 'then return 1 else return 0 end';
var opts = {
lockScript: function(lockScript) {
return [
isJobInList,
lockScript.replace('return', 'and'),
lockAcquired,
success
].join('\n');
},
extraLockKeys: [job.queue.toKey('active'), queue.toKey(job.jobId)],
extraLockArgs: [job.jobId]
};
redlock = new Redlock(queue.clients, _.extend(opts, queue.redlock));
} else {
redlock = new Redlock(queue.clients, queue.redlock);
}
return redlock.lock(job.lockKey(), queue.LOCK_DURATION).catch(function(err){
//
// Failing to lock due to already locked is not an error.
//
if(err.name != 'LockError'){
throw err;
}
});
}...
/**
Takes a lock for this job so that no other queue worker can process it at the
same time.
*/
Job.prototype.takeLock = function(renew, ensureActive){
var _this = this;
return scripts.takeLock(this.queue, this, renew, ensureActive).then(function(lock) {
if (lock){
_this.lock = lock;
}
return lock || false;
});
};
...updateDelaySet = function (queue, delayedTimestamp){
var script = [
'local RESULT = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")',
'local jobId = RESULT[1]',
'local score = RESULT[2]',
'if (score ~= nil) then',
' score = score / 0x1000 ',
' if (score <= tonumber(ARGV[2])) then',
' redis.call("ZREM", KEYS[1], jobId)',
' redis.call("LREM", KEYS[2], 0, jobId)',
' redis.call("LPUSH", KEYS[3], jobId)',
' redis.call("PUBLISH", KEYS[4], jobId)',
' redis.call("HSET", ARGV[1] .. jobId, "delay", 0)',
' local nextTimestamp = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")[2]',
' if(nextTimestamp ~= nil) then',
' nextTimestamp = nextTimestamp / 0x1000',
' redis.call("PUBLISH", KEYS[1], nextTimestamp)',
' end',
' return nextTimestamp',
' end',
' return score',
'end'].join('\n');
var keys = _.map([
'delayed',
'active',
'wait',
'jobs'], function(name){
return queue.toKey(name);
});
var args = [
queue.client,
'updateDelaySet',
script,
keys.length,
keys[0],
keys[1],
keys[2],
keys[3],
queue.toKey(''),
delayedTimestamp
];
return execScript.apply(scripts, args);
}n/a
function EventEmitter() {
EventEmitter.init.call(this);
}n/a
distEmit = function (evt) {
var _this = this;
var args = Array.prototype.slice.call(arguments);
this.emit.apply(this, args);
args[0] = this.uuid;
// Emit to other nodes
return new Promise(function (resolve, reject) {
_this.pubClient.publish(evt, JSON.stringify(args), function (err) {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}...
}
Job.create = function(queue, data, opts){
var job = new Job(queue, data, opts);
return addJob(queue, job).then(function(jobId){
job.jobId = jobId;
queue.distEmit('waiting', job.toJSON());
debuglog('Job added', jobId);
return job;
});
};
Job.fromId = function(queue, jobId){
// jobId can be undefined if moveJob returns undefined
...off = function (evt, listener) {
var _this = this;
var args = Array.prototype.slice.call(arguments);
EventEmitter.prototype.removeListener.apply(this, args);
// TODO: we should take into consideration isGlobal.
if (!_this.listenerCount(evt)) {
_this.subClient.unsubscribe(evt);
}
}n/a
on = function (evt, listener, isGlobal) {
var _this = this;
var args = Array.prototype.slice.call(arguments);
EventEmitter.prototype.on.apply(this, args);
if (isGlobal) {
return new Promise(function (resolve, reject) {
_this.subClient.subscribe(args[0], function (err) {
if (err) {
reject(err);
} else {
resolve();
}
});
})
}
return Promise.resolve(void 0);
}...
queue.resume().then(function(){
// queue is resumed now
})
```
A queue emits also some useful events:
```javascript
.on('ready', function() {
// Queue ready for job
// All Redis connections are done
})
.on('error', function(error) {
// Error
})
.on('active', function(job, jobPromise){
...removeListener = function (evt, listener) {
var _this = this;
var args = Array.prototype.slice.call(arguments);
EventEmitter.prototype.removeListener.apply(this, args);
// TODO: we should take into consideration isGlobal.
if (!_this.listenerCount(evt)) {
_this.subClient.unsubscribe(evt);
}
}...
reject(err);
removeListeners();
clearInterval(interval);
}
}
function removeListeners(){
_this.queue.removeListener('completed', onCompleted);
_this.queue.removeListener('failed', onFailed);
}
_this.queue.on('completed', onCompleted);
_this.queue.on('failed', onFailed);
//
...function TimerManager(){
this.idle = true;
this.listeners = [];
this.timers = {};
}n/a
clear = function (id){
var timers = this.timers;
var timer = timers[id];
if(!timer) {
return;
}
clearTimeout(timer.timer);
delete timers[id];
if(!this.idle && (_.size(timers) === 0)){
while(this.listeners.length){
this.listeners.pop()();
}
this.idle = true;
}
}...
@param {Function} fn - Function to execute after delay
@returns {Number} id - The timer id. Used to clear the timer
*/
TimerManager.prototype.set = function(name, delay, fn){
var id = uuid.v4();
var now = Date.now();
var timer = setTimeout(function (timerInstance, timeoutId) {
timerInstance.clear(timeoutId);
try{
fn();
}catch(err){
console.error(err);
}
}, delay, this, id);
...clearAll = function (){
var _this = this;
_.each(this.timers, function(timer, id){
_this.clear(id);
});
}n/a
set = function (name, delay, fn){
var id = uuid.v4();
var now = Date.now();
var timer = setTimeout(function (timerInstance, timeoutId) {
timerInstance.clear(timeoutId);
try{
fn();
}catch(err){
console.error(err);
}
}, delay, this, id);
// XXX only the timer is used, but the
// other fields are useful for
// troubleshooting/debugging
this.timers[id] = {
name: name,
created: now,
timer: timer
};
this.idle = false;
return id;
}n/a
whenIdle = function () {
var _this = this;
return new Promise(function(resolve){
if(_this.idle) {
resolve();
} else{
_this.listeners.unshift(resolve);
}
});
}n/a