function Queue(name, redisPort, redisHost, redisOptions){ if(!(this instanceof Queue)){ return new Queue(name, redisPort, redisHost, redisOptions); } if(_.isObject(redisPort)) { var opts = redisPort; var redisOpts = opts.redis || {}; redisPort = redisOpts.port; redisHost = redisOpts.host; redisOptions = redisOpts.opts || {}; redisOptions.db = redisOpts.DB || redisOpts.DB; } else if(parseInt(redisPort) == redisPort) { redisPort = parseInt(redisPort); redisOptions = redisOptions || {}; } else if(_.isString(redisPort)) { try { var redisUrl = url.parse(redisPort); assert(_.isObject(redisHost) || _.isUndefined(redisHost), 'Expected an object as redis option'); redisOptions = redisHost || {}; redisPort = redisUrl.port; redisHost = redisUrl.hostname; if (redisUrl.auth) { redisOptions.password = redisUrl.auth.split(':')[1]; } } catch (e) { throw new Error(e.message); } } redisOptions = redisOptions || {}; function createClient(type) { var client; if(_.isFunction(redisOptions.createClient)){ client = redisOptions.createClient(type); }else{ client = new redis(redisPort, redisHost, redisOptions); } return client; } redisPort = redisPort || 6379; redisHost = redisHost || '127.0.0.1'; var _this = this; this.name = name; this.keyPrefix = redisOptions.keyPrefix || 'bull'; // // We cannot use ioredis keyPrefix feature until we // stop creating keys dynamically in lua scripts. // delete redisOptions.keyPrefix; // // Create queue client (used to add jobs, pause queues, etc); // this.client = createClient('client'); getRedisVersion(this.client).then(function(version){ if(semver.lt(version, MINIMUM_REDIS_VERSION)){ throw new Error('Redis version needs to be greater than ' + MINIMUM_REDIS_VERSION + '. Current: ' + version); } }).catch(function(err){ _this.emit('error', err); }); // // Keep track of cluster clients for redlock // this.clients = [this.client]; if (redisOptions.clients) { this.clients.push.apply(this.clients, redisOptions.clients); } this.redlock = { driftFactor: REDLOCK_DRIFT_FACTOR, retryCount: REDLOCK_RETRY_COUNT, retryDelay: REDLOCK_RETRY_DELAY }; _.extend(this.redlock, redisOptions.redlock || {}); // // Create blocking client (used to wait for jobs) // this.bclient = createClient('block'); // // Create event subscriber client (receive messages from other instance of the queue) // this.eclient = createClient('subscriber'); this.delayTimer = null; this.processing = 0; this.retrieving = 0; this.LOCK_RENEW_TIME = LOCK_RENEW_TIME; this.LOCK_DURATION = LOCK_DURATION; this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL; this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT; // bubble up Redis error events [this.client, this.bclient, this.eclient].forEach(function (client) { client.on('error', _this.emit.bind(_this, 'error')); }); // keeps track of active timers. used by close() to // ensure that disconnect() is deferred until all // scheduled redis commands have been executed this.timers = new TimerManager(); // emit ready when redis connections ready var initializers = [this.client, this.bclient, this.eclient].map(function (client) { return new Promise(function(resolve, reject) { client.once('ready', resolve); client.once('error', reject); }); }); this._initializing = Promise.all(initializers).then(function(){ return Promise.join( _this.eclient.subscribe(_this.toKey('delayed')), _this.eclient.subscribe(_this.toKey('paused')) ); }).then(function(){ debuglog(name + ' queue ready'); _this.emit('ready'); }, function(err){ console.error('Error initializing queue:', err); }); Disturbed.call(this, _this.client, _this.eclient); // // Listen distributed queue events // listenDistEvent('stalled'); // listenDistEvent('active'); // listenDist ...
n/a
job = function (queue, data, opts){ opts = opts || {}; this.queue = queue; this.data = data; this.opts = opts; this._progress = 0; this.delay = opts.delay || 0; this.timestamp = opts.timestamp || Date.now(); this.stacktrace = []; if(this.opts.attempts > 1){ this.attempts = opts.attempts; }else{ this.attempts = 1; } this.returnvalue = null; this.attemptsMade = 0; }
n/a
priority_queue = function (name, redisPort, redisHost, redisOptions) { if (!(this instanceof PriorityQueue)) { return new PriorityQueue(name, redisPort, redisHost, redisOptions); } console.warn("DEPRECATION NOTICE: PriorityQueue has been deprecated and will be removed in bull 3.0.0, please use the priority option instead."); var _this = this; this.paused = false; this.queues = []; for (var key in PriorityQueue.priorities) { var queue = Queue(PriorityQueue.getQueueName(name, key), redisPort, redisHost, redisOptions); this.queues[PriorityQueue.priorities[key]] = queue; } var groupEvents = ['ready', 'paused', 'resumed'] groupEvents.forEach(function(event) { Promise.map(_this.queues, function(queue) { return new Promise(function(resolve, reject) { queue.once(event, resolve); }); }).then(_this.emit.bind(_this, event)) }) var singleEvents = ['error', 'active', 'stalled', 'progress', 'completed', 'failed', 'cleaned'] singleEvents.forEach(function(event) { _this.queues.forEach(function(queue) { queue.on(event, _this.emit.bind(_this, event)) }) }) this.strategy = Strategy.exponential; }
n/a
function Queue(name, redisPort, redisHost, redisOptions){ if(!(this instanceof Queue)){ return new Queue(name, redisPort, redisHost, redisOptions); } if(_.isObject(redisPort)) { var opts = redisPort; var redisOpts = opts.redis || {}; redisPort = redisOpts.port; redisHost = redisOpts.host; redisOptions = redisOpts.opts || {}; redisOptions.db = redisOpts.DB || redisOpts.DB; } else if(parseInt(redisPort) == redisPort) { redisPort = parseInt(redisPort); redisOptions = redisOptions || {}; } else if(_.isString(redisPort)) { try { var redisUrl = url.parse(redisPort); assert(_.isObject(redisHost) || _.isUndefined(redisHost), 'Expected an object as redis option'); redisOptions = redisHost || {}; redisPort = redisUrl.port; redisHost = redisUrl.hostname; if (redisUrl.auth) { redisOptions.password = redisUrl.auth.split(':')[1]; } } catch (e) { throw new Error(e.message); } } redisOptions = redisOptions || {}; function createClient(type) { var client; if(_.isFunction(redisOptions.createClient)){ client = redisOptions.createClient(type); }else{ client = new redis(redisPort, redisHost, redisOptions); } return client; } redisPort = redisPort || 6379; redisHost = redisHost || '127.0.0.1'; var _this = this; this.name = name; this.keyPrefix = redisOptions.keyPrefix || 'bull'; // // We cannot use ioredis keyPrefix feature until we // stop creating keys dynamically in lua scripts. // delete redisOptions.keyPrefix; // // Create queue client (used to add jobs, pause queues, etc); // this.client = createClient('client'); getRedisVersion(this.client).then(function(version){ if(semver.lt(version, MINIMUM_REDIS_VERSION)){ throw new Error('Redis version needs to be greater than ' + MINIMUM_REDIS_VERSION + '. Current: ' + version); } }).catch(function(err){ _this.emit('error', err); }); // // Keep track of cluster clients for redlock // this.clients = [this.client]; if (redisOptions.clients) { this.clients.push.apply(this.clients, redisOptions.clients); } this.redlock = { driftFactor: REDLOCK_DRIFT_FACTOR, retryCount: REDLOCK_RETRY_COUNT, retryDelay: REDLOCK_RETRY_DELAY }; _.extend(this.redlock, redisOptions.redlock || {}); // // Create blocking client (used to wait for jobs) // this.bclient = createClient('block'); // // Create event subscriber client (receive messages from other instance of the queue) // this.eclient = createClient('subscriber'); this.delayTimer = null; this.processing = 0; this.retrieving = 0; this.LOCK_RENEW_TIME = LOCK_RENEW_TIME; this.LOCK_DURATION = LOCK_DURATION; this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL; this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT; // bubble up Redis error events [this.client, this.bclient, this.eclient].forEach(function (client) { client.on('error', _this.emit.bind(_this, 'error')); }); // keeps track of active timers. used by close() to // ensure that disconnect() is deferred until all // scheduled redis commands have been executed this.timers = new TimerManager(); // emit ready when redis connections ready var initializers = [this.client, this.bclient, this.eclient].map(function (client) { return new Promise(function(resolve, reject) { client.once('ready', resolve); client.once('error', reject); }); }); this._initializing = Promise.all(initializers).then(function(){ return Promise.join( _this.eclient.subscribe(_this.toKey('delayed')), _this.eclient.subscribe(_this.toKey('paused')) ); }).then(function(){ debuglog(name + ' queue ready'); _this.emit('ready'); }, function(err){ console.error('Error initializing queue:', err); }); Disturbed.call(this, _this.client, _this.eclient); // // Listen distributed queue events // listenDistEvent('stalled'); // listenDistEvent('active'); // listenDist ...
n/a
super_ = function (pubClient, subClient) { var _this = this; EventEmitter.call(this); this.uuid = uuid(); this.pubClient = pubClient; this.subClient = subClient; subClient.on('message', function (channel, msg) { var count = _this.listenerCount(channel); if (count) { var args; try { args = JSON.parse(msg); } catch (err) { console.error('Parsing event message', err); } if (args[0] !== _this.uuid) { args[0] = channel; _this.emit.apply(_this, args); } } }); }
n/a
function TimerManager(){ this.idle = true; this.listeners = []; this.timers = {}; }
n/a
job = function (queue, data, opts){ opts = opts || {}; this.queue = queue; this.data = data; this.opts = opts; this._progress = 0; this.delay = opts.delay || 0; this.timestamp = opts.timestamp || Date.now(); this.stacktrace = []; if(this.opts.attempts > 1){ this.attempts = opts.attempts; }else{ this.attempts = 1; } this.returnvalue = null; this.attemptsMade = 0; }
n/a
create = function (queue, data, opts){ var job = new Job(queue, data, opts); return addJob(queue, job).then(function(jobId){ job.jobId = jobId; queue.distEmit('waiting', job.toJSON()); debuglog('Job added', jobId); return job; }); }
n/a
fromData = function (queue, jobId, data){ var job = new Job(queue, JSON.parse(data.data), JSON.parse(data.opts)); job.jobId = jobId; job._progress = parseInt(data.progress); job.delay = parseInt(data.delay); job.timestamp = parseInt(data.timestamp); job.failedReason = data.failedReason; job.attempts = parseInt(data.attempts); if(isNaN(job.attempts)) { job.attempts = 1; // Default to 1 try for legacy jobs } job.attemptsMade = parseInt(data.attemptsMade); var _traces; try{ _traces = JSON.parse(data.stacktrace); if(!(_traces instanceof Array)){ _traces = []; } }catch (err){ _traces = []; } job.stacktrace = _traces; try{ job.returnvalue = JSON.parse(data.returnvalue); }catch (e){ //swallow exception because the returnvalue got corrupted somehow. debuglog('corrupted returnvalue: ' + data.returnvalue, e); } return job; }
...
Job.fromId = function(queue, jobId){
// jobId can be undefined if moveJob returns undefined
if(!jobId) {
return Promise.resolve();
}
return queue.client.hgetall(queue.toKey(jobId)).then(function(jobData){
if(!_.isEmpty(jobData)){
return Job.fromData(queue, jobId, jobData);
}else{
return null;
}
});
};
Job.prototype.toData = function(){
...
fromId = function (queue, jobId){ // jobId can be undefined if moveJob returns undefined if(!jobId) { return Promise.resolve(); } return queue.client.hgetall(queue.toKey(jobId)).then(function(jobData){ if(!_.isEmpty(jobData)){ return Job.fromData(queue, jobId, jobData); }else{ return null; } }); }
...
var _this = this;
function status(resolve, reject){
return _this.isCompleted().then(function(completed){
if(!completed){
return _this.isFailed().then(function(failed){
if(failed){
return Job.fromId(_this.queue, _this.jobId, 'failedReason').then(
function(data){
reject(Error(data.failedReason));
return true;
});
}
});
}
resolve();
...
fromJSON = function (queue, json){ var job = new Job(queue, json.data, json.opts); job.jobId = json.opts.jobId; job._progress = parseInt(json.progress); job.delay = parseInt(json.delay); job.timestamp = parseInt(json.timestamp); job.attempts = parseInt(json.attempts); if(isNaN(job.attempts)) { job.attempts = 1; // Default to 1 try for legacy jobs } job.attemptsMade = parseInt(json.attemptsMade); var _traces; try{ _traces = JSON.parse(json.stacktrace); if(!(_traces instanceof Array)){ _traces = []; } }catch (err){ _traces = []; } job.stacktrace = _traces; try{ job.returnvalue = JSON.parse(json.returnvalue); }catch (e){ //swallow exception because the returnvalue got corrupted somehow. debuglog('corrupted returnvalue: ' + json.returnvalue, e); } return job; }
n/a
_getBackOff = function () { var backoff = 0; var delay; if(this.opts.backoff){ if(!isNaN(this.opts.backoff)){ backoff = this.opts.backoff; }else if(this.opts.backoff.type === 'fixed'){ backoff = this.opts.backoff.delay; }else if(this.opts.backoff.type === 'exponential'){ delay = this.opts.backoff.delay; backoff = Math.round((Math.pow(2, this.attemptsMade) - 1) * delay); } } return backoff; }
...
Job.prototype.moveToFailed = function(err){
var _this = this;
return this._saveAttempt(err).then(function() {
// Check if an automatic retry should be performed
if(_this.attemptsMade < _this.attempts && !_this._discarded){
// Check if backoff is needed
var backoff = _this._getBackOff();
if(backoff){
// If so, move to delayed
return _this.moveToDelayed(Date.now() + backoff);
}else{
// If not, retry immediately
return _this._retryAtOnce();
}
...
_isDone = function (list){ return this.queue.client .sismember(this.queue.toKey(list), this.jobId).then(function(isMember){ return isMember === 1; }); }
...
} else if (result === -2) {
throw new Error('Couldn\'t retry job: The job has been already retried or has not failed');
}
});
};
Job.prototype.isCompleted = function(){
return this._isDone('completed');
};
Job.prototype.isFailed = function(){
return this._isDone('failed');
};
Job.prototype.isDelayed = function() {
...
_isInList = function (list) { return scripts.isJobInList(this.queue.client, this.queue.toKey(list), this.jobId); }
...
return this.queue.client
.zrank(this.queue.toKey('delayed'), this.jobId).then(function(rank) {
return rank !== null;
});
};
Job.prototype.isActive = function() {
return this._isInList('active');
};
Job.prototype.isWaiting = function() {
return this._isInList('wait');
};
Job.prototype.isPaused = function() {
...
_moveToSet = function (set, context){ var queue = this.queue; var jobId = this.jobId; return scripts.moveToSet(queue, set, jobId, context); }
...
}
} else if(_this.opts.removeOnFail){
return _this.releaseLock().then(function(){
return _this.remove();
});
}
// If not, move to failed
return _this._moveToSet('failed');
});
};
Job.prototype.moveToDelayed = function(timestamp){
return this._moveToSet('delayed', timestamp);
};
...
_retryAtOnce = function (){ var queue = this.queue; var jobId = this.jobId; var script = [ 'if redis.call("EXISTS", KEYS[3]) == 1 then', ' redis.call("LREM", KEYS[1], 0, ARGV[2])', ' redis.call(ARGV[1], KEYS[2], ARGV[2])', ' return 0', 'else', ' return -1', 'end' ].join('\n'); var keys = _.map(['active', 'wait', jobId], function(name){ return queue.toKey(name); }); var pushCmd = (this.opts.lifo ? 'R' : 'L') + 'PUSH'; return queue.client.eval( script, keys.length, keys[0], keys[1], keys[2], pushCmd, jobId).then(function(result){ if(result === -1){ throw new Error('Missing Job ' + jobId + ' during retry'); } }); }
...
// Check if backoff is needed
var backoff = _this._getBackOff();
if(backoff){
// If so, move to delayed
return _this.moveToDelayed(Date.now() + backoff);
}else{
// If not, retry immediately
return _this._retryAtOnce();
}
} else if(_this.opts.removeOnFail){
return _this.releaseLock().then(function(){
return _this.remove();
});
}
// If not, move to failed
...
_saveAttempt = function (err){ if(isNaN(this.attemptsMade)){ this.attemptsMade = 1; }else{ this.attemptsMade++; } var params = { attemptsMade: this.attemptsMade }; this.stacktrace.push(err.stack); params.stacktrace = JSON.stringify(this.stacktrace); params.failedReason = err.message; return this.queue.client.hmset(this.queue.toKey(this.jobId), params); }
...
Job.prototype.discard = function(){
this._discarded = true;
}
Job.prototype.moveToFailed = function(err){
var _this = this;
return this._saveAttempt(err).then(function() {
// Check if an automatic retry should be performed
if(_this.attemptsMade < _this.attempts && !_this._discarded){
// Check if backoff is needed
var backoff = _this._getBackOff();
if(backoff){
// If so, move to delayed
return _this.moveToDelayed(Date.now() + backoff);
...
delayIfNeeded = function (){ if(this.delay){ var jobDelayedTimestamp = this.timestamp + this.delay; if(jobDelayedTimestamp > Date.now()){ return this.moveToDelayed(jobDelayedTimestamp).then(function(){ return true; }); } } return Promise.resolve(false); }
n/a
discard = function (){ this._discarded = true; }
n/a
finished = function (){ var _this = this; function status(resolve, reject){ return _this.isCompleted().then(function(completed){ if(!completed){ return _this.isFailed().then(function(failed){ if(failed){ return Job.fromId(_this.queue, _this.jobId, 'failedReason').then(function(data){ reject(Error(data.failedReason)); return true; }); } }); } resolve(); return true; }); } return new Promise(function(resolve, reject){ status(resolve, reject).then(function(finished){ if(!finished){ var interval; function onCompleted(job){ if(String(job.jobId) === String(_this.jobId)){ resolve(); removeListeners(); clearInterval(interval); } } function onFailed(job, err){ if(String(job.jobId) === String(_this.jobId)){ reject(err); removeListeners(); clearInterval(interval); } } function removeListeners(){ _this.queue.removeListener('completed', onCompleted); _this.queue.removeListener('failed', onFailed); } _this.queue.on('completed', onCompleted); _this.queue.on('failed', onFailed); // // Watchdog // interval = setInterval(function(){ status(resolve, reject).then(function(finished){ if(finished){ removeListeners(); clearInterval(interval ); } }) }, 5000); }; }); }); }
n/a
getState = function () { var _this = this; var fns = [ { fn: 'isCompleted', state: 'completed' }, { fn: 'isFailed', state: 'failed' }, { fn: 'isDelayed', state: 'delayed' }, { fn: 'isActive', state: 'active' }, { fn: 'isWaiting', state: 'waiting' }, { fn: 'isPaused', state: 'paused' } ]; return Promise.reduce(fns, function(state, fn) { if(state){ return state; } return _this[fn.fn]().then(function(result) { return result ? fn.state : null; }); }, null).then(function(result) { return result ? result : 'stuck'; }); }
...
};
Job.prototype.isPaused = function() {
return this._isInList('paused');
};
Job.prototype.isStuck = function() {
return this.getState().then(function(state) {
return state === 'stuck';
});
};
Job.prototype.getState = function() {
var _this = this;
var fns = [
...
isActive = function () { return this._isInList('active'); }
n/a
isCompleted = function (){ return this._isDone('completed'); }
...
* TODO: Add a watchdog to check if the job has finished periodically.
* since pubsub does not give any guarantees.
*/
Job.prototype.finished = function(){
var _this = this;
function status(resolve, reject){
return _this.isCompleted().then(function(completed){
if(!completed){
return _this.isFailed().then(function(failed){
if(failed){
return Job.fromId(_this.queue, _this.jobId, 'failedReason').then(function(data){
reject(Error(data.failedReason));
return true;
});
...
isDelayed = function () { return this.queue.client .zrank(this.queue.toKey('delayed'), this.jobId).then(function(rank) { return rank !== null; }); }
n/a
isFailed = function (){ return this._isDone('failed'); }
...
*/
Job.prototype.finished = function(){
var _this = this;
function status(resolve, reject){
return _this.isCompleted().then(function(completed){
if(!completed){
return _this.isFailed().then(function(failed){
if(failed){
return Job.fromId(_this.queue, _this.jobId, 'failedReason').then(function(data){
reject(Error(data.failedReason));
return true;
});
}
});
...
isPaused = function () { return this._isInList('paused'); }
n/a
isStuck = function () { return this.getState().then(function(state) { return state === 'stuck'; }); }
n/a
isWaiting = function () { return this._isInList('wait'); }
n/a
lockKey = function (){ return this.queue.toKey(this.jobId) + ':lock'; }
...
extraLockArgs: [job.jobId]
};
redlock = new Redlock(queue.clients, _.extend(opts, queue.redlock));
} else {
redlock = new Redlock(queue.clients, queue.redlock);
}
return redlock.lock(job.lockKey(), queue.LOCK_DURATION).catch(function(err){
//
// Failing to lock due to already locked is not an error.
//
if(err.name != 'LockError'){
throw err;
}
});
...
move = function (src, target, returnValue){ if(target === 'completed'){ this.returnvalue = returnValue || 0; if(this.opts.removeOnComplete){ target = void 0; } } return scripts.move(this, src, target); }
...
Job.prototype.move = function(src, target, returnValue){
if(target === 'completed'){
this.returnvalue = returnValue || 0;
if(this.opts.removeOnComplete){
target = void 0;
}
}
return scripts.move(this, src, target);
}
Job.prototype.discard = function(){
this._discarded = true;
}
Job.prototype.moveToFailed = function(err){
...
moveToCompleted = function (returnValue){ this.returnvalue = returnValue || 0; return scripts.moveToCompleted(this, this.opts.removeOnComplete); }
...
}
}
return Promise.resolve(false);
};
Job.prototype.moveToCompleted = function(returnValue){
this.returnvalue = returnValue || 0;
return scripts.moveToCompleted(this, this.opts.removeOnComplete);
};
Job.prototype.move = function(src, target, returnValue){
if(target === 'completed'){
this.returnvalue = returnValue || 0;
if(this.opts.removeOnComplete){
target = void 0;
...
moveToDelayed = function (timestamp){ return this._moveToSet('delayed', timestamp); }
...
.then(function() { _this.lock = null; });
};
Job.prototype.delayIfNeeded = function(){
if(this.delay){
var jobDelayedTimestamp = this.timestamp + this.delay;
if(jobDelayedTimestamp > Date.now()){
return this.moveToDelayed(jobDelayedTimestamp).then(function(){
return true;
});
}
}
return Promise.resolve(false);
};
...
moveToFailed = function (err){ var _this = this; return this._saveAttempt(err).then(function() { // Check if an automatic retry should be performed if(_this.attemptsMade < _this.attempts && !_this._discarded){ // Check if backoff is needed var backoff = _this._getBackOff(); if(backoff){ // If so, move to delayed return _this.moveToDelayed(Date.now() + backoff); }else{ // If not, retry immediately return _this._retryAtOnce(); } } else if(_this.opts.removeOnFail){ return _this.releaseLock().then(function(){ return _this.remove(); }); } // If not, move to failed return _this._moveToSet('failed'); }); }
n/a
progress = function (progress){ if(progress){ var _this = this; this._progress = progress; return this.queue.client.hset(this.queue.toKey(this.jobId), 'progress', progress).then(function(){ _this.queue.distEmit('progress', _this.toJSON(), progress); }); }else{ return this._progress; } }
...
videoQueue.process(function(job, done){
// job.data contains the custom data passed when the job was created
// job.jobId contains id of this job.
// transcode video asynchronously and report progress
job.progress(42);
// call done when finished
done();
// or give a error if error
done(Error('error transcoding'));
...
promote = function (){ var queue = this.queue; var jobId = this.jobId; var script = [ 'if redis.call("ZREM", KEYS[1], ARGV[1]) == 1 then', ' redis.call("LPUSH", KEYS[2], ARGV[1])', ' return 0', 'else', ' return -1', 'end' ].join('\n'); var keys = _.map(['delayed', 'wait'], function(name){ return queue.toKey(name); }); return queue.client.eval( script, keys.length, keys[0], keys[1], jobId).then(function(result){ if(result === -1){ throw new Error('Job ' + jobId + ' is not in a delayed state'); } }); }
n/a
releaseLock = function (){ var _this = this; return scripts.releaseLock(this) .then(function() { _this.lock = null; }); }
...
};
/**
Releases the lock. Only locks owned by the queue instance can be released.
*/
Job.prototype.releaseLock = function(){
var _this = this;
return scripts.releaseLock(this)
.then(function() { _this.lock = null; });
};
Job.prototype.delayIfNeeded = function(){
if(this.delay){
var jobDelayedTimestamp = this.timestamp + this.delay;
if(jobDelayedTimestamp > Date.now()){
...
remove = function (){ var queue = this.queue; var job = this; return job.takeLock().then(function(lock) { if (!lock) { throw new Error('Could not get lock for job: ' + job.jobId + '. Cannot remove job.'); } return scripts.remove(queue, job.jobId) .then(function() { queue.emit('removed', job.toJSON()); }) .finally(function () { return job.releaseLock(); }); }); }
...
return _this.moveToDelayed(Date.now() + backoff);
}else{
// If not, retry immediately
return _this._retryAtOnce();
}
} else if(_this.opts.removeOnFail){
return _this.releaseLock().then(function(){
return _this.remove();
});
}
// If not, move to failed
return _this._moveToSet('failed');
});
};
...
renewLock = function (){ return this.takeLock(true /* Renew */); }
n/a
retry = function (){ var queue = this.queue; var _this = this; return scripts.reprocessJob(this, { state: 'failed' }).then(function(result) { if (result === 1) { queue.emit('waiting', _this); } else if (result === 0) { throw new Error('Couldn\'t retry job: The job doesn\'t exist'); } else if (result === -1) { throw new Error('Couldn\'t retry job: The job is locked'); } else if (result === -2) { throw new Error('Couldn\'t retry job: The job has been already retried or has not failed'); } }); }
n/a
takeLock = function (renew, ensureActive){ var _this = this; return scripts.takeLock(this.queue, this, renew, ensureActive).then(function(lock) { if (lock){ _this.lock = lock; } return lock || false; }); }
...
/**
Takes a lock for this job so that no other queue worker can process it at the
same time.
*/
Job.prototype.takeLock = function(renew, ensureActive){
var _this = this;
return scripts.takeLock(this.queue, this, renew, ensureActive).then(function(lock) {
if (lock){
_this.lock = lock;
}
return lock || false;
});
};
...
toData = function (){ return { data: JSON.stringify(this.data || {}), opts: JSON.stringify(this.opts || {}), progress: this._progress, delay: this.delay, timestamp: this.timestamp, attempts: this.attempts, attemptsMade: this.attemptsMade, failedReason: this.failedReason, stacktrace: JSON.stringify(this.stacktrace || null), returnvalue: JSON.stringify(this.returnvalue || null) }; }
...
}
this.returnvalue = null;
this.attemptsMade = 0;
};
function addJob(queue, job){
var opts = job.opts;
var jobData = job.toData();
var toKey = _.bind(queue.toKey, queue);
return scripts.addJob(queue.client, toKey, jobData, {
lifo: opts.lifo,
customJobId: opts.jobId,
priority: opts.priority
});
}
...
toJSON = function (){ var opts = _.extend({}, this.opts || {}); opts.jobId = this.jobId; return { id: this.jobId, data: this.data || {}, opts: opts, progress: this._progress, delay: this.delay, timestamp: this.timestamp, attempts: this.attempts, attemptsMade: this.attemptsMade, failedReason: this.failedReason, stacktrace: this.stacktrace || null, returnvalue: this.returnvalue || null }; }
...
}
Job.create = function(queue, data, opts){
var job = new Job(queue, data, opts);
return addJob(queue, job).then(function(jobId){
job.jobId = jobId;
queue.distEmit('waiting', job.toJSON());
debuglog('Job added', jobId);
return job;
});
};
Job.fromId = function(queue, jobId){
// jobId can be undefined if moveJob returns undefined
...
priority_queue = function (name, redisPort, redisHost, redisOptions) { if (!(this instanceof PriorityQueue)) { return new PriorityQueue(name, redisPort, redisHost, redisOptions); } console.warn("DEPRECATION NOTICE: PriorityQueue has been deprecated and will be removed in bull 3.0.0, please use the priority option instead."); var _this = this; this.paused = false; this.queues = []; for (var key in PriorityQueue.priorities) { var queue = Queue(PriorityQueue.getQueueName(name, key), redisPort, redisHost, redisOptions); this.queues[PriorityQueue.priorities[key]] = queue; } var groupEvents = ['ready', 'paused', 'resumed'] groupEvents.forEach(function(event) { Promise.map(_this.queues, function(queue) { return new Promise(function(resolve, reject) { queue.once(event, resolve); }); }).then(_this.emit.bind(_this, event)) }) var singleEvents = ['error', 'active', 'stalled', 'progress', 'completed', 'failed', 'cleaned'] singleEvents.forEach(function(event) { _this.queues.forEach(function(queue) { queue.on(event, _this.emit.bind(_this, event)) }) }) this.strategy = Strategy.exponential; }
n/a
genericGetter = function (fnName) { return function() { var args = arguments; return Promise.map(this.queues, function(queue) { return queue[fnName].apply(queue, args); }).then(function(results) { var jobs = []; results.forEach(function(val) { jobs = jobs.concat(val); }); return jobs; }) } }
...
jobs = jobs.concat(val);
});
return jobs;
})
}
}
PriorityQueue.prototype.getWaiting = PriorityQueue.genericGetter("getWaiting"
;);
PriorityQueue.prototype.getActive = PriorityQueue.genericGetter("getActive");
PriorityQueue.prototype.getDelayed = PriorityQueue.genericGetter("getDelayed");
PriorityQueue.prototype.getCompleted = PriorityQueue.genericGetter("getCompleted");
PriorityQueue.prototype.getFailed = PriorityQueue.genericGetter("getFailed");
// ---------------------------------------------------------------------
...
getQueueName = function (name, priority) { return name + ':prio:' + priority; }
...
console.warn("DEPRECATION NOTICE: PriorityQueue has been deprecated and will be removed in bull 3.0.0, please use the priority
option instead.");
var _this = this;
this.paused = false;
this.queues = [];
for (var key in PriorityQueue.priorities) {
var queue = Queue(PriorityQueue.getQueueName(name, key), redisPort, redisHost, redisOptions
);
this.queues[PriorityQueue.priorities[key]] = queue;
}
var groupEvents = ['ready', 'paused', 'resumed']
groupEvents.forEach(function(event) {
Promise.map(_this.queues, function(queue) {
return new Promise(function(resolve, reject) {
...
function EventEmitter() { EventEmitter.init.call(this); }
n/a
add = function (data, opts) { return this.getQueue(opts && opts.priority).add(data, opts); }
...
});
pdfQueue.process(function(job){
// Processors can also return promises instead of using the done callback
return pdfAsyncProcessor();
});
videoQueue.add({video: 'http://example.com/video1.mov'});
audioQueue.add({audio: 'http://example.com/audio1.mp3'});
imageQueue.add({image: 'http://example.com/image1.tiff'});
```
Alternatively, you can use return promises instead of using the `done` callback:
```javascript
...
clean = function (grace, type) { var _this = this; return Promise.map(this.queues, function(queue) { return queue.clean(grace, type); }).then(function (results) { var jobs = [].concat.apply([], results); var tp = type || 'completed'; _this.emit('cleaned', jobs, tp); return Promise.resolve(jobs); }); }
...
Tells the queue remove jobs of a specific type created outside of a grace period.
__Example__
```javascript
//cleans all jobs that completed over 5 seconds ago.
queue.clean(5000);
//clean all jobs that failed over 10 seconds ago.
queue.clean(10000, 'failed');
queue.on('cleaned', function (job, type) {
console.log('Cleaned %s %s jobs', job.length, type);
});
```
...
close = function ( doNotWaitJobs ) { return this.closing = Promise.map(this.queues, function(queue) { return queue.close( doNotWaitJobs ); }); }
...
shutdown.
```javascript
var Queue = require('bull');
var queue = Queue('example');
var after100 = _.after(100, function () {
queue.close().then(function () { console.log('done') })
});
queue.on('completed', after100);
```
`close` can be called from anywhere, with one caveat: if called
from within a job handler the queue won't close until *after*
...
count = function () { return Promise.map(this.queues, function(queue) { return queue.count(); }).then(function(results) { var sum = 0; results.forEach(function(val) { sum += val; }); return sum; }) }
...
return Promise.resolve(jobs);
});
}
PriorityQueue.prototype.count = function() {
return Promise.map(this.queues, function(queue) {
return queue.count();
}).then(function(results) {
var sum = 0;
results.forEach(function(val) {
sum += val;
});
return sum;
})
...
disconnect = function () { return Promise.map(this.queues, function(queue) { return queue.disconnect(); }) }
...
*
* @type {number}
*/
PriorityQueue.prototype.waitAfterEmptyLoop = 200;
PriorityQueue.prototype.disconnect = function() {
return Promise.map(this.queues, function(queue) {
return queue.disconnect();
})
}
PriorityQueue.prototype.close = function( doNotWaitJobs ) {
return this.closing = Promise.map(this.queues, function(queue) {
return queue.close( doNotWaitJobs );
});
...
empty = function () { return Promise.map(this.queues, function(queue) { return queue.empty(); }); }
...
PriorityQueue.prototype.add = function(data, opts) {
return this.getQueue(opts && opts.priority).add(data, opts);
}
PriorityQueue.prototype.empty = function() {
return Promise.map(this.queues, function(queue) {
return queue.empty();
});
}
PriorityQueue.prototype.pause = function(localOnly) {
var _this = this;
_this.paused = Promise.map(this.queues, function(queue) {
...
getActive = function () { var args = arguments; return Promise.map(this.queues, function(queue) { return queue[fnName].apply(queue, args); }).then(function(results) { var jobs = []; results.forEach(function(val) { jobs = jobs.concat(val); }); return jobs; }) }
n/a
getCompleted = function () { var args = arguments; return Promise.map(this.queues, function(queue) { return queue[fnName].apply(queue, args); }).then(function(results) { var jobs = []; results.forEach(function(val) { jobs = jobs.concat(val); }); return jobs; }) }
n/a
getDelayed = function () { var args = arguments; return Promise.map(this.queues, function(queue) { return queue[fnName].apply(queue, args); }).then(function(results) { var jobs = []; results.forEach(function(val) { jobs = jobs.concat(val); }); return jobs; }) }
n/a
getFailed = function () { var args = arguments; return Promise.map(this.queues, function(queue) { return queue[fnName].apply(queue, args); }).then(function(results) { var jobs = []; results.forEach(function(val) { jobs = jobs.concat(val); }); return jobs; }) }
n/a
getQueue = function (priority) { if (!(priority in PriorityQueue.priorities)) { //in case of unknown priority, we use normal priority = "normal"; } var queue = this.queues[PriorityQueue.priorities[priority]]; return queue; }
...
PriorityQueue.prototype.setLockRenewTime = function(lockRenewTime) {
this.queues.forEach(function(queue) {
queue.LOCK_RENEW_TIME = lockRenewTime;
})
}
PriorityQueue.prototype.add = function(data, opts) {
return this.getQueue(opts && opts.priority).add(data, opts);
}
PriorityQueue.prototype.empty = function() {
return Promise.map(this.queues, function(queue) {
return queue.empty();
});
}
...
getWaiting = function () { var args = arguments; return Promise.map(this.queues, function(queue) { return queue[fnName].apply(queue, args); }).then(function(results) { var jobs = []; results.forEach(function(val) { jobs = jobs.concat(val); }); return jobs; }) }
n/a
pause = function (localOnly) { var _this = this; _this.paused = Promise.map(this.queues, function(queue) { return queue.pause(localOnly || false); }).then(_this.emit.bind(_this, 'paused')); return _this.paused; }
...
return Promise.reject(new Error('some unexpected error'));
});
```
A queue can be paused and resumed globally (pass `true` to pause processing for
just this worker):
```javascript
queue.pause().then(function(){
// queue is paused now
});
queue.resume().then(function(){
// queue is resumed now
})
```
...
process = function (handler) { this.handler = handler; this.queues.forEach(function(queue, key) { queue.setHandler(handler); }); return this.run(); }
...
var Queue = require('bull');
var videoQueue = Queue('video transcoding', 6379, '127.0.0.1');
var audioQueue = Queue('audio transcoding', 6379, '127.0.0.1');
var imageQueue = Queue('image transcoding', 6379, '127.0.0.1');
var pdfQueue = Queue('pdf transcoding', 6379, '127.0.0.1');
videoQueue.process(function(job, done){
// job.data contains the custom data passed when the job was created
// job.jobId contains id of this job.
// transcode video asynchronously and report progress
job.progress(42);
...
resume = function (localOnly) { var _this = this; _this.paused = false; return Promise.map(this.queues, function(queue) { return queue.resume(localOnly || false); }).then(_this.emit.bind(_this, 'resumed')).then(function() { if (_this.handler) { _this.run(); } }); }
...
A queue can be paused and resumed globally (pass `true` to pause processing for
just this worker):
```javascript
queue.pause().then(function(){
// queue is paused now
});
queue.resume().then(function(){
// queue is resumed now
})
```
A queue emits also some useful events:
```javascript
.on('ready', function() {
...
run = function () { var _this = this; // .reverse() is done in place and therefore mutating the queues array // so a copy is needed to prevent harmful side effects and general voodoo var reversedQueues = _this.queues.slice().reverse(); var loop = function() { var emptyLoop = true; return Promise.each(reversedQueues, function(queue, index) { if(_this.closing){ return _this.closing; } // the index is reversed to the actual priority number (0 is 'critical') // so flip it to get the correct "priority index" var nbJobsToProcess = _this.strategy(PriorityQueue.priorities.critical - index); var i = 0; var fn = function() { return queue.moveUnlockedJobsToWait().then(queue.getNextJob.bind(queue, { block: false })) .then(function(job) { if (job) { emptyLoop = false; return queue.processJob(job).then(function() { if (++i < nbJobsToProcess && !_this.paused) { return fn(); } }) } else { //nothing It will release loop and call next priority queue even if we have no reach nbJobsToProcess } }) } return fn(); }).then(function() { if (!_this.paused) { return Promise.delay((emptyLoop) ? _this.waitAfterEmptyLoop : 0).then(loop); } }); } return loop(); }
...
}
PriorityQueue.prototype.process = function(handler) {
this.handler = handler;
this.queues.forEach(function(queue, key) {
queue.setHandler(handler);
});
return this.run();
}
//
// TODO: Remove the polling mechanism using pub/sub.
//
PriorityQueue.prototype.run = function() {
var _this = this;
...
setLockRenewTime = function (lockRenewTime) { this.queues.forEach(function(queue) { queue.LOCK_RENEW_TIME = lockRenewTime; }) }
n/a
function Queue(name, redisPort, redisHost, redisOptions){ if(!(this instanceof Queue)){ return new Queue(name, redisPort, redisHost, redisOptions); } if(_.isObject(redisPort)) { var opts = redisPort; var redisOpts = opts.redis || {}; redisPort = redisOpts.port; redisHost = redisOpts.host; redisOptions = redisOpts.opts || {}; redisOptions.db = redisOpts.DB || redisOpts.DB; } else if(parseInt(redisPort) == redisPort) { redisPort = parseInt(redisPort); redisOptions = redisOptions || {}; } else if(_.isString(redisPort)) { try { var redisUrl = url.parse(redisPort); assert(_.isObject(redisHost) || _.isUndefined(redisHost), 'Expected an object as redis option'); redisOptions = redisHost || {}; redisPort = redisUrl.port; redisHost = redisUrl.hostname; if (redisUrl.auth) { redisOptions.password = redisUrl.auth.split(':')[1]; } } catch (e) { throw new Error(e.message); } } redisOptions = redisOptions || {}; function createClient(type) { var client; if(_.isFunction(redisOptions.createClient)){ client = redisOptions.createClient(type); }else{ client = new redis(redisPort, redisHost, redisOptions); } return client; } redisPort = redisPort || 6379; redisHost = redisHost || '127.0.0.1'; var _this = this; this.name = name; this.keyPrefix = redisOptions.keyPrefix || 'bull'; // // We cannot use ioredis keyPrefix feature until we // stop creating keys dynamically in lua scripts. // delete redisOptions.keyPrefix; // // Create queue client (used to add jobs, pause queues, etc); // this.client = createClient('client'); getRedisVersion(this.client).then(function(version){ if(semver.lt(version, MINIMUM_REDIS_VERSION)){ throw new Error('Redis version needs to be greater than ' + MINIMUM_REDIS_VERSION + '. Current: ' + version); } }).catch(function(err){ _this.emit('error', err); }); // // Keep track of cluster clients for redlock // this.clients = [this.client]; if (redisOptions.clients) { this.clients.push.apply(this.clients, redisOptions.clients); } this.redlock = { driftFactor: REDLOCK_DRIFT_FACTOR, retryCount: REDLOCK_RETRY_COUNT, retryDelay: REDLOCK_RETRY_DELAY }; _.extend(this.redlock, redisOptions.redlock || {}); // // Create blocking client (used to wait for jobs) // this.bclient = createClient('block'); // // Create event subscriber client (receive messages from other instance of the queue) // this.eclient = createClient('subscriber'); this.delayTimer = null; this.processing = 0; this.retrieving = 0; this.LOCK_RENEW_TIME = LOCK_RENEW_TIME; this.LOCK_DURATION = LOCK_DURATION; this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL; this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT; // bubble up Redis error events [this.client, this.bclient, this.eclient].forEach(function (client) { client.on('error', _this.emit.bind(_this, 'error')); }); // keeps track of active timers. used by close() to // ensure that disconnect() is deferred until all // scheduled redis commands have been executed this.timers = new TimerManager(); // emit ready when redis connections ready var initializers = [this.client, this.bclient, this.eclient].map(function (client) { return new Promise(function(resolve, reject) { client.once('ready', resolve); client.once('error', reject); }); }); this._initializing = Promise.all(initializers).then(function(){ return Promise.join( _this.eclient.subscribe(_this.toKey('delayed')), _this.eclient.subscribe(_this.toKey('paused')) ); }).then(function(){ debuglog(name + ' queue ready'); _this.emit('ready'); }, function(err){ console.error('Error initializing queue:', err); }); Disturbed.call(this, _this.client, _this.eclient); // // Listen distributed queue events // listenDistEvent('stalled'); // listenDistEvent('active'); // listenDist ...
n/a
super_ = function (pubClient, subClient) { var _this = this; EventEmitter.call(this); this.uuid = uuid(); this.pubClient = pubClient; this.subClient = subClient; subClient.on('message', function (channel, msg) { var count = _this.listenerCount(channel); if (count) { var args; try { args = JSON.parse(msg); } catch (err) { console.error('Parsing event message', err); } if (args[0] !== _this.uuid) { args[0] = channel; _this.emit.apply(_this, args); } } }); }
n/a
add = function (data, opts){ return Job.create(this, data, opts); }
...
});
pdfQueue.process(function(job){
// Processors can also return promises instead of using the done callback
return pdfAsyncProcessor();
});
videoQueue.add({video: 'http://example.com/video1.mov'});
audioQueue.add({audio: 'http://example.com/audio1.mp3'});
imageQueue.add({image: 'http://example.com/image1.tiff'});
```
Alternatively, you can use return promises instead of using the `done` callback:
```javascript
...
clean = function (grace, type, limit) { var _this = this; return new Promise(function (resolve, reject) { if(grace === undefined || grace === null) { return reject(new Error('You must define a grace period.')); } if(!type) { type = 'completed'; } if(_.indexOf([ 'completed', 'wait', 'active', 'delayed', 'failed'], type) === -1){ return reject(new Error('Cannot clean unkown queue type')); } return scripts.cleanJobsInSet(_this, type, Date.now() - grace, limit).then(function (jobs) { _this.distEmit('cleaned', jobs, type); resolve(jobs); return null; }).catch(function (err) { _this.emit('error', err); reject(err); }); }); }
...
Tells the queue remove jobs of a specific type created outside of a grace period.
__Example__
```javascript
//cleans all jobs that completed over 5 seconds ago.
queue.clean(5000);
//clean all jobs that failed over 10 seconds ago.
queue.clean(10000, 'failed');
queue.on('cleaned', function (job, type) {
console.log('Cleaned %s %s jobs', job.length, type);
});
```
...
close = function ( doNotWaitJobs ){ var _this = this; if(this.closing){ return this.closing; } return this.closing = this._initializing.then(function(){ clearTimeout(_this.delayTimer); clearInterval(_this.guardianTimer); clearInterval(_this.moveUnlockedJobsToWaitInterval); _this.timers.clearAll(); return _this.timers.whenIdle().then(function(){ return _this.pause(true, doNotWaitJobs); }).then(function(){ return _this.disconnect(); }).then(function(){ _this.closed = true; }); }); }
...
shutdown.
```javascript
var Queue = require('bull');
var queue = Queue('example');
var after100 = _.after(100, function () {
queue.close().then(function () { console.log('done') })
});
queue.on('completed', after100);
```
`close` can be called from anywhere, with one caveat: if called
from within a job handler the queue won't close until *after*
...
count = function (){ var multi = this.multi(); multi.llen(this.toKey('wait')); multi.llen(this.toKey('paused')); multi.zcard(this.toKey('delayed')); return multi.exec().then(function(res){ return Math.max(res[0][1], res[1][1]) + res[2][1]; }); }
...
return Promise.resolve(jobs);
});
}
PriorityQueue.prototype.count = function() {
return Promise.map(this.queues, function(queue) {
return queue.count();
}).then(function(results) {
var sum = 0;
results.forEach(function(val) {
sum += val;
});
return sum;
})
...
disconnect = function (){ var _this = this; function endClients(){ var timeoutMsg = 'Timed out while waiting for redis clients to close'; return new Promise(function(resolve) { _this.bclient.end(true); _this.bclient.stream.once('close', resolve); }).timeout(CLIENT_CLOSE_TIMEOUT_MS, timeoutMsg) .catch(function(err){ if(!(err instanceof Promise.TimeoutError)){ throw err; } }); } return Promise.join( _this.client.quit(), _this.eclient.quit() ).then(endClients, endClients); }
...
*
* @type {number}
*/
PriorityQueue.prototype.waitAfterEmptyLoop = 200;
PriorityQueue.prototype.disconnect = function() {
return Promise.map(this.queues, function(queue) {
return queue.disconnect();
})
}
PriorityQueue.prototype.close = function( doNotWaitJobs ) {
return this.closing = Promise.map(this.queues, function(queue) {
return queue.close( doNotWaitJobs );
});
...
distEmit = function (){ var args = Array.prototype.slice.call(arguments); // Emit local event this.emit.apply(this, args); // Emit global event args[0] = args[0] + '@' + this.name; return Disturbed.prototype.distEmit.apply(this, args); }
...
}
Job.create = function(queue, data, opts){
var job = new Job(queue, data, opts);
return addJob(queue, job).then(function(jobId){
job.jobId = jobId;
queue.distEmit('waiting', job.toJSON());
debuglog('Job added', jobId);
return job;
});
};
Job.fromId = function(queue, jobId){
// jobId can be undefined if moveJob returns undefined
...
empty = function (){ var _this = this; // Get all jobids and empty all lists atomically. var multi = this.multi(); multi.lrange(this.toKey('wait'), 0, -1); multi.lrange(this.toKey('paused'), 0, -1); multi.del(this.toKey('wait')); multi.del(this.toKey('paused')); multi.del(this.toKey('meta-paused')); multi.del(this.toKey('delayed')); return multi.exec().spread(function(waiting, paused){ waiting = waiting[1]; paused = paused[1]; var jobKeys = (paused.concat(waiting)).map(_this.toKey, _this); if(jobKeys.length){ multi = _this.multi(); multi.del.apply(multi, jobKeys); return multi.exec(); } }); }
...
PriorityQueue.prototype.add = function(data, opts) {
return this.getQueue(opts && opts.priority).add(data, opts);
}
PriorityQueue.prototype.empty = function() {
return Promise.map(this.queues, function(queue) {
return queue.empty();
});
}
PriorityQueue.prototype.pause = function(localOnly) {
var _this = this;
_this.paused = Promise.map(this.queues, function(queue) {
...
getActive = function (){ return this.getJobs('active', 'LIST'); }
n/a
getActiveCount = function () { return this.client.llen(this.toKey('active')); }
n/a
getCompleted = function (){ return this.getJobs('completed', 'SET'); }
n/a
getCompletedCount = function () { return this.client.scard(this.toKey('completed')); }
n/a
getDelayed = function (){ return this.getJobs('delayed', 'ZSET'); }
n/a
getDelayedCount = function () { return this.client.zcard(this.toKey('delayed')); }
n/a
getFailed = function (){ return this.getJobs('failed', 'SET'); }
n/a
getFailedCount = function () { return this.client.scard(this.toKey('failed')); }
n/a
getJob = function (jobId){ return Job.fromId(this, jobId); }
n/a
getJobCountByTypes = function () { var _this = this; var args = _.compact(Array.prototype.slice.call(arguments)); var types = _.compact(args.join(',').replace(/ /g, '').split(',')); var multi = this.multi(); _.each(types, function(type) { var key = _this.toKey(type); switch(type) { case 'completed': case 'failed': multi.scard(key); break; case 'delayed': multi.zcard(key); break; case 'active': case 'wait': case 'paused': multi.llen(key); break; } }); return multi.exec().then(function(res){ return res.map(function(v) { return v[1] }).reduce(function(a, b) { return a + b }); }) || 0; }
n/a
getJobCounts = function (){ var types = ['wait', 'active', 'completed', 'failed', 'delayed']; var counts = {}; return this.client.multi() .llen(this.toKey('wait')) .llen(this.toKey('active')) .scard(this.toKey('completed')) .scard(this.toKey('failed')) .zcard(this.toKey('delayed')) .exec().then(function(result){ result.forEach(function(res, index){ counts[types[index]] = res[1] || 0; }); return counts; }); }
n/a
getJobMoveCount = function (){ return this.bclient.commandQueue.length; }
n/a
getJobs = function (queueType, type, start, end){ var _this = this; var key = this.toKey(queueType); var jobs; start = _.isUndefined(start) ? 0 : start; end = _.isUndefined(end) ? -1 : end; switch(type){ case 'LIST': jobs = this.client.lrange(key, start, end); break; case 'SET': jobs = this.client.smembers(key).then(function(jobIds) { // Can't set a range for smembers. So do the slice programatically instead. // Note that redis ranges are inclusive, so handling for javascript accordingly if (end === -1) { return jobIds.slice(start); } return jobIds.slice(start, end + 1); }); break; case 'ZSET': jobs = this.client.zrange(key, start, end); break; } return jobs.then(function(jobIds){ var jobsFromId = jobIds.map(_this.getJobFromId); return Promise.all(jobsFromId); }); }
n/a
getNextJob = function (opts){ var _this = this; if(!this.closing){ this.retrieving++; return this.moveJob('wait', 'active', opts) .then(this.getJobFromId) .tap(function(job) { _this.retrieving--; if (job) { _this.processing++; } else { _this.emit('no-job-retrieved'); } }) .catch(function(err) { _this.retrieving--; _this.emit('no-job-retrieved'); throw err; }); }else{ return Promise.reject(); } }
n/a
getPausedCount = function () { return this.client.llen(this.toKey('paused')); }
n/a
getWaiting = function (){ return Promise.join( this.getJobs('wait', 'LIST'), this.getJobs('paused', 'LIST')).spread(function(waiting, paused){ return _.concat(waiting, paused); }); }
n/a
getWaitingCount = function () { return this.client.llen(this.toKey('wait')); }
n/a
isReady = function (){ var _this = this; return this._initializing.then(function(){ return _this; }); }
n/a
moveJob = function (src, dst, opts) { var args = arguments; var _this = this; var move; if(opts && opts.block === false){ if(!this.closing){ move = this.bclient.rpoplpush(this.toKey(src), this.toKey(dst)); }else{ move = Promise.reject(); } } else if (this.closing || this.paused) { move = Promise.resolve(); } else if (this.getJobMoveCount()) { move = this.whenCurrentMoveFinished().then(function() { return _this.moveJob.apply(_this, args); }); }else{ move = this.bclient.brpoplpush( this.toKey(src), this.toKey(dst), Math.floor(this.LOCK_RENEW_TIME / 1000)); } return move.then(function(jobId){ // // Unfortunatelly this cannot be performed atomically, which will lead to a // slight hazard for priority queues (will only affect its order). // if(jobId){ return _this.client.zrem(_this.toKey('priority'), jobId).then(function(){ return jobId; }); } }, function(err){ if(!_this.closing){ throw err; } }); }
n/a
moveUnlockedJobsToWait = function (){ var _this = this; return scripts.moveUnlockedJobsToWait(this).then(function(responses){ var handleFailedJobs = responses[0].map(function(jobId){ return _this.getJobFromId(jobId).then(function(job){ _this.distEmit('failed', job, new Error('job stalled more than allowable limit')); return null; }); }); var handleStalledJobs = responses[1].map(function(jobId){ return _this.getJobFromId(jobId).then(function(job){ _this.distEmit('stalled', job); return null; }); }); return Promise.all(handleFailedJobs.concat(handleStalledJobs)); }).catch(function(err){ console.error('Failed to handle unlocked job in active:', err); }); }
...
// the index is reversed to the actual priority number (0 is 'critical')
// so flip it to get the correct "priority index"
var nbJobsToProcess = _this.strategy(PriorityQueue.priorities.critical - index);
var i = 0;
var fn = function() {
return queue.moveUnlockedJobsToWait().then(queue.getNextJob.bind(queue, {
block: false
}))
.then(function(job) {
if (job) {
emptyLoop = false;
return queue.processJob(job).then(function() {
if (++i < nbJobsToProcess && !_this.paused) {
...
multi = function (){ return this.client.multi(); }
...
};
/*
Queue.prototype.empty = function(){
var _this = this;
// Get all jobids and empty all lists atomically.
var multi = this.multi();
multi.lrange(this.toKey('wait'), 0, -1);
multi.lrange(this.toKey('paused'), 0, -1);
multi.del(this.toKey('wait'));
multi.del(this.toKey('paused'));
multi.del(this.toKey('meta-paused'));
multi.del(this.toKey('delayed'));
...
on = function (){ var args = Array.prototype.slice.call(arguments); var promise = Disturbed.prototype.on.apply(this, args); var _this = this; promise.catch(function(err){ _this.emit('error', err); }); return this; }
...
queue.resume().then(function(){
// queue is resumed now
})
```
A queue emits also some useful events:
```javascript
.on('ready', function() {
// Queue ready for job
// All Redis connections are done
})
.on('error', function(error) {
// Error
})
.on('active', function(job, jobPromise){
...
once = function (){ var args = Array.prototype.slice.call(arguments); Disturbed.prototype.once.apply(this, args); return this; }
...
this.queues[PriorityQueue.priorities[key]] = queue;
}
var groupEvents = ['ready', 'paused', 'resumed']
groupEvents.forEach(function(event) {
Promise.map(_this.queues, function(queue) {
return new Promise(function(resolve, reject) {
queue.once(event, resolve);
});
}).then(_this.emit.bind(_this, event))
})
var singleEvents = ['error', 'active', 'stalled', 'progress', 'completed', '
;failed', 'cleaned']
singleEvents.forEach(function(event) {
_this.queues.forEach(function(queue) {
...
pause = function (isLocal, doNotWaitActive){ if(isLocal){ var _this = this; if(!this.paused){ this.paused = new Promise(function(resolve) { _this.resumeLocal = function() { resolve(); _this.paused = null; // Allow pause to be checked externally for paused state. }; }); } return !doNotWaitActive && this.whenCurrentJobsFinished(); }else{ return pauseResumeGlobal(this, true); } }
...
return Promise.reject(new Error('some unexpected error'));
});
```
A queue can be paused and resumed globally (pass `true` to pause processing for
just this worker):
```javascript
queue.pause().then(function(){
// queue is paused now
});
queue.resume().then(function(){
// queue is resumed now
})
```
...
process = function (concurrency, handler){ var _this = this; if(typeof concurrency === 'function'){ handler = concurrency; concurrency = 1; } this.setHandler(handler); var runQueueWhenReady = function(){ _this.bclient.once('ready', function(){ _this.run(concurrency).catch(function(err){ console.error(err); }); }); }; // attempt to restart the queue when the client throws // an error or the connection is dropped by redis this.bclient.on('error', runQueueWhenReady); this.bclient.on('end', runQueueWhenReady); return this.run(concurrency).catch(function(err){ console.error(err); throw err; }); }
...
var Queue = require('bull');
var videoQueue = Queue('video transcoding', 6379, '127.0.0.1');
var audioQueue = Queue('audio transcoding', 6379, '127.0.0.1');
var imageQueue = Queue('image transcoding', 6379, '127.0.0.1');
var pdfQueue = Queue('pdf transcoding', 6379, '127.0.0.1');
videoQueue.process(function(job, done){
// job.data contains the custom data passed when the job was created
// job.jobId contains id of this job.
// transcode video asynchronously and report progress
job.progress(42);
...
processJob = function (job){ var _this = this; var lockRenewId; var timmerStopped = false; if(!job){ return Promise.resolve(); } // // TODO: // There are two cases to take into consideration regarding locks. // 1) The lock renewer fails to renew a lock, this should make this job // unable to complete, since some other worker is also working on it. // 2) The lock renewer is called more seldom than the check for stalled // jobs, so we can assume the job has been stalled and is already being processed // by another worker. See #308 // var renew = false; var lockRenewer = function(){ return job.takeLock(renew, true).then(function(lock){ if(lock && !timmerStopped){ renew = true; lockRenewId = _this.timers.set('lockRenewer', _this.LOCK_RENEW_TIME, lockRenewer); } // TODO: if we failed to re-acquire the lock while trying to renew, should we let the job // handler know and cancel the timer? return lock; }, function(err){ console.error('Error renewing lock ' + err); }); }; var timeoutMs = job.opts.timeout; function stopTimer(){ timmerStopped = true; _this.timers.clear(lockRenewId); return Promise.resolve(); } function handleCompleted(data){ try{ JSON.stringify(data); }catch(err){ return handleFailed(err); } // This substraction is duplicate in handleCompleted and handleFailed because it have to be made before throwing any // event completed or failed in order to allow pause() to work correctly without getting stuck. _this.processing--; stopTimer(); if(_this.closed){ return; } return job.moveToCompleted(data).then(function(){ return _this.distEmit('completed', job, data); }); } function handleFailed(err){ var error = err.cause || err; //Handle explicit rejection // TODO: Should moveToFailed ensure the lock atomically in one of its Lua scripts? // See https://github.com/OptimalBits/bull/pull/415#issuecomment-269744735 job.takeLock(true /* renwew */, false /* ensureActive */).then( function(lock) { return stopTimer() .then(job.moveToFailed(err)) .then(job.releaseLock.bind(job)) .then(function(){ return _this.distEmit('failed', job, error); }); }, function(err){ console.error('failed to re-obtain lock before moving to failed, bailing: ', err); stopTimer(); }); _this.processing--; } return lockRenewer().then(function(locked){ if(locked){ var jobPromise = _this.handler(job); if(timeoutMs){ jobPromise = jobPromise.timeout(timeoutMs); } _this.distEmit('active', job, jobPromise); return jobPromise .then(handleCompleted, handleFailed); } }); }
...
var fn = function() {
return queue.moveUnlockedJobsToWait().then(queue.getNextJob.bind(queue, {
block: false
}))
.then(function(job) {
if (job) {
emptyLoop = false;
return queue.processJob(job).then(function() {
if (++i < nbJobsToProcess && !_this.paused) {
return fn();
}
})
} else {
//nothing It will release loop and call next priority queue even if we have no reach nbJobsToProcess
}
...
processJobs = function (resolve, reject){ var _this = this; var processJobs = this.processJobs.bind(this, resolve, reject); if(!this.closing){ process.nextTick(function(){ (_this.paused || Promise.resolve()) .then(_this.getNextJob) .then(_this.processJob) .then(processJobs, function(err){ console.error('Error processing job:', err); processJobs(); }).catch(reject); }); }else{ resolve(this.closing); } }
n/a
resume = function (isLocal){ if(isLocal){ if(this.resumeLocal){ this.resumeLocal(); } return Promise.resolve(); }else{ return pauseResumeGlobal(this, false); } }
...
A queue can be paused and resumed globally (pass `true` to pause processing for
just this worker):
```javascript
queue.pause().then(function(){
// queue is paused now
});
queue.resume().then(function(){
// queue is resumed now
})
```
A queue emits also some useful events:
```javascript
.on('ready', function() {
...
retryJob = function (job) { return job.retry(); }
n/a
run = function (concurrency){ var promises = []; var _this = this; return this.moveUnlockedJobsToWait().then(function(){ while(concurrency--){ promises.push(new Promise(_this.processJobs)); } _this.startMoveUnlockedJobsToWait(); return Promise.all(promises); }); }
...
}
PriorityQueue.prototype.process = function(handler) {
this.handler = handler;
this.queues.forEach(function(queue, key) {
queue.setHandler(handler);
});
return this.run();
}
//
// TODO: Remove the polling mechanism using pub/sub.
//
PriorityQueue.prototype.run = function() {
var _this = this;
...
setHandler = function (handler){ if(this.handler) { throw new Error('Cannot define a handler more than once per Queue instance'); } handler = handler.bind(this); if(handler.length > 1){ this.handler = Promise.promisify(handler); }else{ this.handler = Promise.method(handler); } }
...
return queue.close( doNotWaitJobs );
});
}
PriorityQueue.prototype.process = function(handler) {
this.handler = handler;
this.queues.forEach(function(queue, key) {
queue.setHandler(handler);
});
return this.run();
}
//
// TODO: Remove the polling mechanism using pub/sub.
//
...
startMoveUnlockedJobsToWait = function () { if (this.STALLED_JOB_CHECK_INTERVAL > 0){ clearInterval(this.moveUnlockedJobsToWaitInterval); this.moveUnlockedJobsToWaitInterval = setInterval(this.moveUnlockedJobsToWait, this.STALLED_JOB_CHECK_INTERVAL); } }
n/a
toKey = function (queueType){ return [this.keyPrefix, this.name, queueType].join(':'); }
...
};
Job.fromId = function(queue, jobId){
// jobId can be undefined if moveJob returns undefined
if(!jobId) {
return Promise.resolve();
}
return queue.client.hgetall(queue.toKey(jobId)).then(function(jobData){
if(!_.isEmpty(jobData)){
return Job.fromData(queue, jobId, jobData);
}else{
return null;
}
});
};
...
updateDelayTimer = function (newDelayedTimestamp){ var _this = this; if(newDelayedTimestamp < _this.delayedTimestamp && newDelayedTimestamp < (MAX_TIMEOUT_MS + Date.now())){ clearTimeout(this.delayTimer); this.delayedTimestamp = newDelayedTimestamp; var nextDelayedJob = newDelayedTimestamp - Date.now(); nextDelayedJob = nextDelayedJob < 0 ? 0 : nextDelayedJob; this.delayTimer = setTimeout(function(){ scripts.updateDelaySet(_this, _this.delayedTimestamp).then(function(nextTimestamp){ if(nextTimestamp){ nextTimestamp = nextTimestamp < Date.now() ? Date.now() : nextTimestamp; }else{ nextTimestamp = Number.MAX_VALUE; } _this.updateDelayTimer(nextTimestamp); }).catch(function(err){ console.error('Error updating the delay timer', err); }); _this.delayedTimestamp = Number.MAX_VALUE; }, nextDelayedJob); } }
n/a
whenCurrentJobsFinished = function (){ var _this = this; var resolver; var count = this.processing + this.retrieving; return new Promise(function(resolve) { if(count === 0){ resolve(); }else{ resolver = _.after(count, function(){ _this.removeListener('stalled', resolver); _this.removeListener('completed', resolver); _this.removeListener('failed', resolver); _this.removeListener('no-job-retrieved', resolver); clearInterval(_this.moveUnlockedJobsToWaitInterval); resolve(); }); _this.on('stalled', resolver); _this.on('completed', resolver); _this.on('failed', resolver); _this.on('no-job-retrieved', resolver); _this.startMoveUnlockedJobsToWait(); } }); }
n/a
whenCurrentMoveFinished = function (){ var currentMove = this.bclient.commandQueue.peekFront() return currentMove && currentMove.command.promise || Promise.resolve(); }
n/a
_isJobInList = function (keyVar, argVar, operator) { keyVar = keyVar || 'KEYS[1]'; argVar = argVar || 'ARGV[1]'; operator = operator || 'return'; return [ 'local function item_in_list (list, item)', ' for _, v in pairs(list) do', ' if v == item then', ' return 1', ' end', ' end', ' return nil', 'end', ['local items = redis.call("LRANGE",', keyVar, ' , 0, -1)'].join(''), [operator, ' item_in_list(items, ', argVar, ')'].join('') ].join('\n'); }
...
' return nil',
'end',
['local items = redis.call("LRANGE",', keyVar, ' , 0, -1)'].join(''),
[operator, ' item_in_list(items, ', argVar, ')'].join('')
].join('\n');
},
isJobInList: function(client, listKey, jobId){
return execScript(client, 'isJobInList', this._isJobInList(), 1, listKey,
jobId).then(function(result){
return result === 1;
});
},
addJob: function(client, toKey, job, opts){
var delayed;
var scriptName;
...
addJob = function (client, toKey, job, opts){
var delayed;
var scriptName;
opts = opts || {};
opts.lifo = !!(opts.lifo);
var delayTimestamp = job.timestamp + job.delay;
if(job.delay && delayTimestamp > Date.now()){
delayed = true;
scriptName = 'addJob:delayed';
} else {
scriptName = 'addJob'+(opts.lifo?':lifo':'') + (opts.priority?':priority':'');
}
/*
if(isCommandDefined(client, scriptName)){
return client[scriptName].apply(client, args);
};
*/
var jobArgs = _.flatten(_.toPairs(job));
var keys = _.map(['wait', 'paused', 'meta-paused', 'jobs', 'id', 'delayed', 'priority'], function(name){
return toKey(name);
});
var baseKey = toKey('');
var argvs = _.map(jobArgs, function(arg, index){
return ', ARGV['+(index+4)+']';
})
var script = [
'local jobCounter = redis.call("INCR", KEYS[5])',
'local jobId',
'if ARGV[2] == "" then jobId = jobCounter else jobId = ARGV[2] end',
'local jobIdKey = ARGV[1] .. jobId',
'if redis.call("EXISTS", jobIdKey) == 1 then return jobId end',
'redis.call("HMSET", jobIdKey' + argvs.join('') + ')',
];
var delayTimestamp = job.timestamp + job.delay;
if(delayed){
script.push.apply(script, [
' local timestamp = tonumber(ARGV[' + (argvs.length + 4) + ']) * 0x1000 + bit.band(jobCounter, 0xfff)',
' redis.call("ZADD", KEYS[6], timestamp, jobId)',
' redis.call("PUBLISH", KEYS[6], (timestamp / 0x1000))',
' return jobId',
]);
}else{
var push, pushPaused;
var add = _.template('redis.call("<%= direction %>", <%= waitQueue %>, jobId)');
if(opts.lifo){
push = add({direction: 'RPUSH', waitQueue: 'KEYS[1]'});
pushPaused = add({direction: 'RPUSH', waitQueue: 'KEYS[2]'});
}else if(opts.priority){
script.push.apply(script, [
' redis.call("ZADD", KEYS[7], ARGV[3], jobId)',
' local count = redis.call("ZCOUNT", KEYS[7], 0, ARGV[3])',
]);
var priorityAdd = _.template([
' local len = redis.call("LLEN", <%= waitQueue %>)',
' local id = redis.call("LINDEX", <%= waitQueue %>, len - (count-1))',
' if id then',
' redis.call("LINSERT", <%= waitQueue %>, "BEFORE", id, jobId)',
' else',
' redis.call("RPUSH", <%= waitQueue %>, jobId)',
' end',
].join('\n'));
push = priorityAdd({waitQueue: 'KEYS[1]'});
pushPaused = priorityAdd({waitQueue: 'KEYS[2]'});
}else{
push = add({direction: 'LPUSH', waitQueue: 'KEYS[1]'});
pushPaused = add({direction: 'LPUSH', waitQueue: 'KEYS[2]'});
}
//
// Whe check for the meta-paused key to decide if we are paused or not
// (since an empty list and !EXISTS are not really the same)
script.push.apply(script, [
'if redis.call("EXISTS", KEYS[3]) ~= 1 then',
push,
'else',
pushPaused,
'end',
'redis.call("PUBLISH", KEYS[4], jobId)',
'return jobId .. ""'
]);
}
var args = [
client,
scriptName,
script.join('\n'),
keys.length
];
args.push.apply(args, keys);
args.push(baseKey);
args.push(opts.customJobId || '');
args.push(opts.priority);
args.push.apply(args, jobArgs);
args.push(delayTimestamp);
return execScript.apply(scripts, args);
}
...
this.attemptsMade = 0;
};
function addJob(queue, job){
var opts = job.opts;
var jobData = job.toData();
var toKey = _.bind(queue.toKey, queue);
return scripts.addJob(queue.client, toKey, jobData, {
lifo: opts.lifo,
customJobId: opts.jobId,
priority: opts.priority
});
}
Job.create = function(queue, data, opts){
...
cleanJobsInSet = function (queue, set, ts, limit) { var command; var removeCommand; var breakEarlyCommand = ''; var hash; limit = limit || 0; switch(set) { case 'completed': case 'failed': command = 'local jobs = redis.call("SMEMBERS", KEYS[1])'; removeCommand = 'redis.call("SREM", KEYS[1], job)'; hash = 'cleanSet'; break; case 'wait': case 'active': case 'paused': command = 'local jobs = redis.call("LRANGE", KEYS[1], 0, -1)'; removeCommand = 'redis.call("LREM", KEYS[1], 0, job)'; hash = 'cleanList'; break; case 'delayed': command = 'local jobs = redis.call("ZRANGE", KEYS[1], 0, -1)'; removeCommand = 'redis.call("ZREM", KEYS[1], job)'; hash = 'cleanOSet'; break; } if(limit > 0) { breakEarlyCommand = [ 'if deletedCount >= limit then', ' break', 'end', ].join('\n'); hash = hash + 'WithLimit'; } var script = [ command, 'local deleted = {}', 'local deletedCount = 0', 'local limit = tonumber(ARGV[3])', 'local jobTS', 'for _, job in ipairs(jobs) do', breakEarlyCommand, ' local jobKey = ARGV[1] .. job', ' if (redis.call("EXISTS", jobKey .. ":lock") == 0) then', ' jobTS = redis.call("HGET", jobKey, "timestamp")', ' if(not jobTS or jobTS < ARGV[2]) then', removeCommand, ' redis.call("DEL", jobKey)', ' deletedCount = deletedCount + 1', ' table.insert(deleted, job)', ' end', ' end', 'end', 'return deleted' ].join('\n'); var args = [ queue.client, hash, script, 1, queue.toKey(set), queue.toKey(''), ts, limit ]; return execScript.apply(scripts, args); }
n/a
isJobInList = function (client, listKey, jobId){ return execScript(client, 'isJobInList', this._isJobInList(), 1, listKey, jobId).then(function(result){ return result === 1; }); }
...
return this.queue.client
.sismember(this.queue.toKey(list), this.jobId).then(function(isMember){
return isMember === 1;
});
};
Job.prototype._isInList = function(list) {
return scripts.isJobInList(this.queue.client, this.queue.toKey(list), this.jobId);
};
Job.prototype._moveToSet = function(set, context){
var queue = this.queue;
var jobId = this.jobId;
return scripts.moveToSet(queue, set, jobId, context);
...
move = function (job, src, target){ // TODO: Depending on the source we should use LREM, SREM or ZREM. // TODO: Depending on the target we should use LPUSH, SADD, etc. var keys = _.map([ src, target, job.jobId ], function(name){ return job.queue.toKey(name); } ); var deleteJob = 'redis.call("DEL", KEYS[3])'; var moveJob = [ 'redis.call("SADD", KEYS[2], ARGV[1])', 'redis.call("HSET", KEYS[3], "returnvalue", ARGV[2])', ].join('\n'); var script = [ 'if redis.call("EXISTS", KEYS[3]) == 1 then', // Make sure job exists ' redis.call("LREM", KEYS[1], -1, ARGV[1])', target ? moveJob : deleteJob, ' return 0', 'else', ' return -1', 'end' ].join('\n'); var args = [ job.queue.client, 'move' + src + (target ? target : ''), script, keys.length, keys[0], keys[1], keys[2], job.jobId, job.returnvalue ? JSON.stringify(job.returnvalue) : '' ]; var returnLockOrErrorCode = function(lock) { return lock ? execScript.apply(scripts, args) : -2; }; var throwUnexpectedErrors = function(err) { if (!(err instanceof Redlock.LockError)) { throw err; } }; return job.takeLock(!!job.lock) .then(returnLockOrErrorCode, throwUnexpectedErrors) .then(function(result){ switch (result){ case -1: if(src){ throw new Error('Missing Job ' + job.jobId + ' when trying to move from ' + src + ' to ' + target); } else { throw new Error('Missing Job ' + job.jobId + ' when trying to remove it from ' + src); } case -2: throw new Error('Cannot get lock for job ' + job.jobId + ' when trying to move from ' + src); default: return job.releaseLock() } }); }
...
Job.prototype.move = function(src, target, returnValue){
if(target === 'completed'){
this.returnvalue = returnValue || 0;
if(this.opts.removeOnComplete){
target = void 0;
}
}
return scripts.move(this, src, target);
}
Job.prototype.discard = function(){
this._discarded = true;
}
Job.prototype.moveToFailed = function(err){
...
moveToCompleted = function (job, removeOnComplete){ return scripts.move(job, 'active', removeOnComplete ? void 0 : 'completed'); }
...
}
}
return Promise.resolve(false);
};
Job.prototype.moveToCompleted = function(returnValue){
this.returnvalue = returnValue || 0;
return scripts.moveToCompleted(this, this.opts.removeOnComplete);
};
Job.prototype.move = function(src, target, returnValue){
if(target === 'completed'){
this.returnvalue = returnValue || 0;
if(this.opts.removeOnComplete){
target = void 0;
...
moveToSet = function (queue, set, jobId, context){ // // Bake in the job id first 12 bits into the timestamp // to guarantee correct execution order of delayed jobs // (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp) // // WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail // context = _.isUndefined(context) ? 0 : context; if(set === 'delayed') { context = +context || 0; context = context < 0 ? 0 : context; if(context > 0){ context = context * 0x1000 + (jobId & 0xfff); } } var keys = _.map([ 'active', set, jobId ], function(name){ return queue.toKey(name); } ); var args = [ queue.client, 'moveToSet', moveToSetScript, keys.length, keys[0], keys[1], keys[2], JSON.stringify(context), jobId ]; return execScript.apply(scripts, args).then(function(result){ if(result === -1){ throw new Error('Missing Job ' + jobId + ' when trying to move from active to ' + set); } }); }
...
return scripts.isJobInList(this.queue.client, this.queue.toKey(list), this.jobId);
};
Job.prototype._moveToSet = function(set, context){
var queue = this.queue;
var jobId = this.jobId;
return scripts.moveToSet(queue, set, jobId, context);
};
Job.prototype._getBackOff = function() {
var backoff = 0;
var delay;
if(this.opts.backoff){
if(!isNaN(this.opts.backoff)){
...
moveUnlockedJobsToWait = function (queue){ var script = [ 'local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])', 'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)', 'local stalled = {}', 'local failed = {}', 'for _, job in ipairs(activeJobs) do', ' local jobKey = ARGV[2] .. job', ' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then', // Remove from the active queue. ' redis.call("LREM", KEYS[1], 1, job)', ' local lockAcquired = redis.call("HGET", jobKey, "lockAcquired")', ' if(lockAcquired) then', // If it was previously locked then we consider it 'stalled' (Case A above). If this job // has been stalled too many times, such as if it crashes the worker, then fail it. ' local stalledCount = redis.call("HINCRBY", jobKey, "stalledCounter", 1)', ' if(stalledCount > MAX_STALLED_JOB_COUNT) then', ' redis.call("SADD", KEYS[3], job)', ' redis.call("HSET", jobKey, "failedReason", "job stalled more than allowable limit")', ' table.insert(failed, job)', ' else', // Move the job back to the wait queue, to immediately be picked up by a waiting worker. ' redis.call("RPUSH", KEYS[2], job)', ' table.insert(stalled, job)', ' end', ' else', // Move the job back to the wait queue, to immediately be picked up by a waiting worker. ' redis.call("RPUSH", KEYS[2], job)', ' end', ' end', 'end', 'return {failed, stalled}' ].join('\n'); var args = [ queue.client, 'moveUnlockedJobsToWait', script, 3, queue.toKey('active'), queue.toKey('wait'), queue.toKey('failed'), queue.MAX_STALLED_JOB_COUNT, queue.toKey('') ]; return execScript.apply(scripts, args); }
...
// the index is reversed to the actual priority number (0 is 'critical')
// so flip it to get the correct "priority index"
var nbJobsToProcess = _this.strategy(PriorityQueue.priorities.critical - index);
var i = 0;
var fn = function() {
return queue.moveUnlockedJobsToWait().then(queue.getNextJob.bind(queue, {
block: false
}))
.then(function(job) {
if (job) {
emptyLoop = false;
return queue.processJob(job).then(function() {
if (++i < nbJobsToProcess && !_this.paused) {
...
releaseLock = function (job){ var lock = job.lock; if (!lock) { throw new Error('Unable to release nonexisting lock'); } return lock.unlock() }
...
};
/**
Releases the lock. Only locks owned by the queue instance can be released.
*/
Job.prototype.releaseLock = function(){
var _this = this;
return scripts.releaseLock(this)
.then(function() { _this.lock = null; });
};
Job.prototype.delayIfNeeded = function(){
if(this.delay){
var jobDelayedTimestamp = this.timestamp + this.delay;
if(jobDelayedTimestamp > Date.now()){
...
remove = function (queue, jobId){ var script = [ 'redis.call("LREM", KEYS[1], 0, ARGV[1])', 'redis.call("LREM", KEYS[2], 0, ARGV[1])', 'redis.call("ZREM", KEYS[3], ARGV[1])', 'redis.call("LREM", KEYS[4], 0, ARGV[1])', 'redis.call("SREM", KEYS[5], ARGV[1])', 'redis.call("SREM", KEYS[6], ARGV[1])', 'redis.call("DEL", KEYS[7])'].join('\n'); var keys = _.map([ 'active', 'wait', 'delayed', 'paused', 'completed', 'failed', jobId], function(name){ return queue.toKey(name); } ); var args = [ queue.client, 'remove', script, keys.length, keys[0], keys[1], keys[2], keys[3], keys[4], keys[5], keys[6], jobId ]; return execScript.apply(scripts, args); }
...
return _this.moveToDelayed(Date.now() + backoff);
}else{
// If not, retry immediately
return _this._retryAtOnce();
}
} else if(_this.opts.removeOnFail){
return _this.releaseLock().then(function(){
return _this.remove();
});
}
// If not, move to failed
return _this._moveToSet('failed');
});
};
...
reprocessJob = function (job, options) { var push = (job.opts.lifo ? 'R' : 'L') + 'PUSH'; var script = [ 'if (redis.call("EXISTS", KEYS[1]) == 1) then', ' if (redis.call("EXISTS", KEYS[2]) == 0) then', ' if (redis.call("SREM", KEYS[3], ARGV[1]) == 1) then', ' redis.call("' + push + '", KEYS[4], ARGV[1])', ' redis.call("PUBLISH", KEYS[5], ARGV[1])', ' return 1', ' else', ' return -2', ' end', ' else', ' return -1', ' end', 'else', ' return 0', 'end' ].join('\n'); var queue = job.queue; var keys = [ queue.toKey(job.jobId), queue.toKey(job.jobId) + ':lock', queue.toKey(options.state), queue.toKey('wait'), queue.toKey('jobs') ]; var args = [ queue.client, 'retryJob', script, 5, keys[0], keys[1], keys[2], keys[3], keys[4], job.jobId ]; return execScript.apply(scripts, args); }
...
* @return {Promise} If resolved and return code is 1, then the queue emits a waiting event
* otherwise the operation was not a success and throw the corresponding error. If the promise
* rejects, it indicates that the script failed to execute
*/
Job.prototype.retry = function(){
var queue = this.queue;
var _this = this;
return scripts.reprocessJob(this, { state: 'failed' }).then(function(result
) {
if (result === 1) {
queue.emit('waiting', _this);
} else if (result === 0) {
throw new Error('Couldn\'t retry job: The job doesn\'t exist');
} else if (result === -1) {
throw new Error('Couldn\'t retry job: The job is locked');
} else if (result === -2) {
...
takeLock = function (queue, job, renew, ensureActive){ var lock = job.lock; if (renew && !lock) { throw new Error('Unable to renew nonexisting lock'); } if (renew) { return lock.extend(queue.LOCK_DURATION); } if (lock) { return Promise.resolve(lock); } var redlock; if (ensureActive) { var isJobInList = this._isJobInList('KEYS[2]', 'ARGV[3]', 'if'); var lockAcquired = ['and redis.call("HSET", KEYS[3], "lockAcquired", "1")'].join(''); var success = 'then return 1 else return 0 end'; var opts = { lockScript: function(lockScript) { return [ isJobInList, lockScript.replace('return', 'and'), lockAcquired, success ].join('\n'); }, extraLockKeys: [job.queue.toKey('active'), queue.toKey(job.jobId)], extraLockArgs: [job.jobId] }; redlock = new Redlock(queue.clients, _.extend(opts, queue.redlock)); } else { redlock = new Redlock(queue.clients, queue.redlock); } return redlock.lock(job.lockKey(), queue.LOCK_DURATION).catch(function(err){ // // Failing to lock due to already locked is not an error. // if(err.name != 'LockError'){ throw err; } }); }
...
/**
Takes a lock for this job so that no other queue worker can process it at the
same time.
*/
Job.prototype.takeLock = function(renew, ensureActive){
var _this = this;
return scripts.takeLock(this.queue, this, renew, ensureActive).then(function(lock) {
if (lock){
_this.lock = lock;
}
return lock || false;
});
};
...
updateDelaySet = function (queue, delayedTimestamp){ var script = [ 'local RESULT = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")', 'local jobId = RESULT[1]', 'local score = RESULT[2]', 'if (score ~= nil) then', ' score = score / 0x1000 ', ' if (score <= tonumber(ARGV[2])) then', ' redis.call("ZREM", KEYS[1], jobId)', ' redis.call("LREM", KEYS[2], 0, jobId)', ' redis.call("LPUSH", KEYS[3], jobId)', ' redis.call("PUBLISH", KEYS[4], jobId)', ' redis.call("HSET", ARGV[1] .. jobId, "delay", 0)', ' local nextTimestamp = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")[2]', ' if(nextTimestamp ~= nil) then', ' nextTimestamp = nextTimestamp / 0x1000', ' redis.call("PUBLISH", KEYS[1], nextTimestamp)', ' end', ' return nextTimestamp', ' end', ' return score', 'end'].join('\n'); var keys = _.map([ 'delayed', 'active', 'wait', 'jobs'], function(name){ return queue.toKey(name); }); var args = [ queue.client, 'updateDelaySet', script, keys.length, keys[0], keys[1], keys[2], keys[3], queue.toKey(''), delayedTimestamp ]; return execScript.apply(scripts, args); }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
distEmit = function (evt) { var _this = this; var args = Array.prototype.slice.call(arguments); this.emit.apply(this, args); args[0] = this.uuid; // Emit to other nodes return new Promise(function (resolve, reject) { _this.pubClient.publish(evt, JSON.stringify(args), function (err) { if (err) { reject(err); } else { resolve(); } }); }); }
...
}
Job.create = function(queue, data, opts){
var job = new Job(queue, data, opts);
return addJob(queue, job).then(function(jobId){
job.jobId = jobId;
queue.distEmit('waiting', job.toJSON());
debuglog('Job added', jobId);
return job;
});
};
Job.fromId = function(queue, jobId){
// jobId can be undefined if moveJob returns undefined
...
off = function (evt, listener) { var _this = this; var args = Array.prototype.slice.call(arguments); EventEmitter.prototype.removeListener.apply(this, args); // TODO: we should take into consideration isGlobal. if (!_this.listenerCount(evt)) { _this.subClient.unsubscribe(evt); } }
n/a
on = function (evt, listener, isGlobal) { var _this = this; var args = Array.prototype.slice.call(arguments); EventEmitter.prototype.on.apply(this, args); if (isGlobal) { return new Promise(function (resolve, reject) { _this.subClient.subscribe(args[0], function (err) { if (err) { reject(err); } else { resolve(); } }); }) } return Promise.resolve(void 0); }
...
queue.resume().then(function(){
// queue is resumed now
})
```
A queue emits also some useful events:
```javascript
.on('ready', function() {
// Queue ready for job
// All Redis connections are done
})
.on('error', function(error) {
// Error
})
.on('active', function(job, jobPromise){
...
removeListener = function (evt, listener) { var _this = this; var args = Array.prototype.slice.call(arguments); EventEmitter.prototype.removeListener.apply(this, args); // TODO: we should take into consideration isGlobal. if (!_this.listenerCount(evt)) { _this.subClient.unsubscribe(evt); } }
...
reject(err);
removeListeners();
clearInterval(interval);
}
}
function removeListeners(){
_this.queue.removeListener('completed', onCompleted);
_this.queue.removeListener('failed', onFailed);
}
_this.queue.on('completed', onCompleted);
_this.queue.on('failed', onFailed);
//
...
function TimerManager(){ this.idle = true; this.listeners = []; this.timers = {}; }
n/a
clear = function (id){ var timers = this.timers; var timer = timers[id]; if(!timer) { return; } clearTimeout(timer.timer); delete timers[id]; if(!this.idle && (_.size(timers) === 0)){ while(this.listeners.length){ this.listeners.pop()(); } this.idle = true; } }
...
@param {Function} fn - Function to execute after delay
@returns {Number} id - The timer id. Used to clear the timer
*/
TimerManager.prototype.set = function(name, delay, fn){
var id = uuid.v4();
var now = Date.now();
var timer = setTimeout(function (timerInstance, timeoutId) {
timerInstance.clear(timeoutId);
try{
fn();
}catch(err){
console.error(err);
}
}, delay, this, id);
...
clearAll = function (){ var _this = this; _.each(this.timers, function(timer, id){ _this.clear(id); }); }
n/a
set = function (name, delay, fn){ var id = uuid.v4(); var now = Date.now(); var timer = setTimeout(function (timerInstance, timeoutId) { timerInstance.clear(timeoutId); try{ fn(); }catch(err){ console.error(err); } }, delay, this, id); // XXX only the timer is used, but the // other fields are useful for // troubleshooting/debugging this.timers[id] = { name: name, created: now, timer: timer }; this.idle = false; return id; }
n/a
whenIdle = function () { var _this = this; return new Promise(function(resolve){ if(_this.idle) { resolve(); } else{ _this.listeners.unshift(resolve); } }); }
n/a