function connect(url, connOptions) { return Promise.fromCallback(function(cb) { return raw_connect(url, connOptions, cb); }) .then(function(conn) { return new ChannelModel(conn); }); }
...
ch.ack(msg);
}
});
}
}
require('amqplib/callback_api')
.connect('amqp://localhost', function(err, conn) {
if (err != null) bail(err);
consumer(conn);
publisher(conn);
});
```
## Promise API example
...
ack = function (tag, allUpTo) { return { deliveryTag: tag, multiple: !!allUpTo }; }
...
var ok = conn.createChannel(on_open);
function on_open(err, ch) {
if (err != null) bail(err);
ch.assertQueue(q);
ch.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
ch.ack(msg);
}
});
}
}
require('amqplib/callback_api')
.connect('amqp://localhost', function(err, conn) {
...
assertExchange = function (exchange, type, options) { options = options || EMPTY_OPTIONS; var argt = Object.create(options.arguments || null); setIfDefined(argt, 'alternate-exchange', options.alternateExchange); return { exchange: exchange, ticket: 0, type: type, passive: false, durable: (options.durable === undefined) ? true : options.durable, autoDelete: !!options.autoDelete, internal: !!options.internal, nowait: false, arguments: argt }; }
...
Args.unbindQueue(queue, source, pattern, argt),
defs.QueueUnbindOk, cb);
};
Channel.prototype.assertExchange = function(ex, type, options, cb0) {
var cb = callbackWrapper(this, cb0);
this._rpc(defs.ExchangeDeclare,
Args.assertExchange(ex, type, options),
defs.ExchangeDeclareOk,
function(e, _) { cb(e, {exchange: ex}); });
return this;
};
Channel.prototype.checkExchange = function(exchange, cb) {
return this.rpc(defs.ExchangeDeclare,
...
assertQueue = function (queue, options) { queue = queue || ''; options = options || EMPTY_OPTIONS; var argt = Object.create(options.arguments || null); setIfDefined(argt, 'x-expires', options.expires); setIfDefined(argt, 'x-message-ttl', options.messageTtl); setIfDefined(argt, 'x-dead-letter-exchange', options.deadLetterExchange); setIfDefined(argt, 'x-dead-letter-routing-key', options.deadLetterRoutingKey); setIfDefined(argt, 'x-max-length', options.maxLength); setIfDefined(argt, 'x-max-priority', options.maxPriority); return { queue: queue, exclusive: !!options.exclusive, durable: (options.durable === undefined) ? true : options.durable, autoDelete: !!options.autoDelete, arguments: argt, passive: false, // deprecated but we have to include it ticket: 0, nowait: false }; }
...
}
// Publisher
function publisher(conn) {
conn.createChannel(on_open);
function on_open(err, ch) {
if (err != null) bail(err);
ch.assertQueue(q);
ch.sendToQueue(q, new Buffer('something to do'));
}
}
// Consumer
function consumer(conn) {
var ok = conn.createChannel(on_open);
...
bindExchange = function (dest, source, pattern, argt) { return { source: source, destination: dest, routingKey: pattern, arguments: argt, ticket: 0, nowait: false }; }
...
Args.deleteExchange(exchange, options),
defs.ExchangeDeleteOk, cb);
};
Channel.prototype.bindExchange =
function(dest, source, pattern, argt, cb) {
return this.rpc(defs.ExchangeBind,
Args.bindExchange(dest, source, pattern, argt),
defs.ExchangeBindOk, cb);
};
Channel.prototype.unbindExchange =
function(dest, source, pattern, argt, cb) {
return this.rpc(defs.ExchangeUnbind,
Args.unbindExchange(dest, source, pattern, argt),
...
bindQueue = function (queue, source, pattern, argt) { return { queue: queue, exchange: source, routingKey: pattern, arguments: argt, ticket: 0, nowait: false }; }
...
Args.purgeQueue(queue),
defs.QueuePurgeOk, cb);
};
Channel.prototype.bindQueue =
function(queue, source, pattern, argt, cb) {
return this.rpc(defs.QueueBind,
Args.bindQueue(queue, source, pattern, argt),
defs.QueueBindOk, cb);
};
Channel.prototype.unbindQueue =
function(queue, source, pattern, argt, cb) {
return this.rpc(defs.QueueUnbind,
Args.unbindQueue(queue, source, pattern, argt),
...
cancel = function (consumerTag) { return { consumerTag: consumerTag, nowait: false }; }
...
return this;
};
Channel.prototype.cancel = function(consumerTag, cb0) {
var cb = callbackWrapper(this, cb0);
var self = this;
this._rpc(
defs.BasicCancel, Args.cancel(consumerTag), defs.BasicCancelOk,
function(err, ok) {
if (err === null) {
self.unregisterConsumer(consumerTag);
cb(null, ok.fields);
}
else cb(err);
});
...
checkExchange = function (exchange) { return { exchange: exchange, passive: true, // switch to 'may as well be another method' mode nowait: false, // ff are ignored durable: true, internal: false, type: '', autoDelete: false, ticket: 0 }; }
...
defs.ExchangeDeclareOk,
function(e, _) { cb(e, {exchange: ex}); });
return this;
};
Channel.prototype.checkExchange = function(exchange, cb) {
return this.rpc(defs.ExchangeDeclare,
Args.checkExchange(exchange),
defs.ExchangeDeclareOk, cb);
};
Channel.prototype.deleteExchange = function(exchange, options, cb) {
return this.rpc(defs.ExchangeDelete,
Args.deleteExchange(exchange, options),
defs.ExchangeDeleteOk, cb);
...
checkQueue = function (queue) { return { queue: queue, passive: true, // switch to "completely different" mode nowait: false, durable: true, autoDelete: false, exclusive: false, // ignored ticket: 0, }; }
...
return this.rpc(defs.QueueDeclare,
Args.assertQueue(queue, options),
defs.QueueDeclareOk, cb);
};
Channel.prototype.checkQueue = function(queue, cb) {
return this.rpc(defs.QueueDeclare,
Args.checkQueue(queue),
defs.QueueDeclareOk, cb);
};
Channel.prototype.deleteQueue = function(queue, options, cb) {
return this.rpc(defs.QueueDelete,
Args.deleteQueue(queue, options),
defs.QueueDeleteOk, cb);
...
consume = function (queue, options) { options = options || EMPTY_OPTIONS; var argt = Object.create(options.arguments || null); setIfDefined(argt, 'x-priority', options.priority); return { ticket: 0, queue: queue, consumerTag: options.consumerTag || '', noLocal: !!options.noLocal, noAck: !!options.noAck, exclusive: !!options.exclusive, nowait: false, arguments: argt }; }
...
// Consumer
function consumer(conn) {
var ok = conn.createChannel(on_open);
function on_open(err, ch) {
if (err != null) bail(err);
ch.assertQueue(q);
ch.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
ch.ack(msg);
}
});
}
}
...
deleteExchange = function (exchange, options) { options = options || EMPTY_OPTIONS; return { exchange: exchange, ifUnused: !!options.ifUnused, ticket: 0, nowait: false }; }
...
return this.rpc(defs.ExchangeDeclare,
Args.checkExchange(exchange),
defs.ExchangeDeclareOk, cb);
};
Channel.prototype.deleteExchange = function(exchange, options, cb) {
return this.rpc(defs.ExchangeDelete,
Args.deleteExchange(exchange, options),
defs.ExchangeDeleteOk, cb);
};
Channel.prototype.bindExchange =
function(dest, source, pattern, argt, cb) {
return this.rpc(defs.ExchangeBind,
Args.bindExchange(dest, source, pattern, argt),
...
deleteQueue = function (queue, options) { options = options || EMPTY_OPTIONS; return { queue: queue, ifUnused: !!options.ifUnused, ifEmpty: !!options.ifEmpty, ticket: 0, nowait: false }; }
...
return this.rpc(defs.QueueDeclare,
Args.checkQueue(queue),
defs.QueueDeclareOk, cb);
};
Channel.prototype.deleteQueue = function(queue, options, cb) {
return this.rpc(defs.QueueDelete,
Args.deleteQueue(queue, options),
defs.QueueDeleteOk, cb);
};
Channel.prototype.purgeQueue = function(queue, cb) {
return this.rpc(defs.QueuePurge,
Args.purgeQueue(queue),
defs.QueuePurgeOk, cb);
...
get = function (queue, options) { options = options || EMPTY_OPTIONS; return { ticket: 0, queue: queue, noAck: !!options.noAck }; }
...
else cb(err);
});
return this;
};
Channel.prototype.get = function(queue, options, cb0) {
var self = this;
var fields = Args.get(queue, options);
var cb = callbackWrapper(this, cb0);
this.sendOrEnqueue(defs.BasicGet, fields, function(err, f) {
if (err === null) {
if (f.id === defs.BasicGetEmpty) {
cb(null, false);
}
else if (f.id === defs.BasicGetOk) {
...
nack = function (tag, allUpTo, requeue) { return { deliveryTag: tag, multiple: !!allUpTo, requeue: (requeue === undefined) ? true : requeue }; }
...
this.sendImmediately(defs.BasicAck, Args.ack(0, true));
return this;
};
Channel.prototype.nack = function(message, allUpTo, requeue) {
this.sendImmediately(
defs.BasicNack,
Args.nack(message.fields.deliveryTag, allUpTo, requeue));
return this;
};
Channel.prototype.nackAll = function(requeue) {
this.sendImmediately(
defs.BasicNack, Args.nack(0, true, requeue))
return this;
...
prefetch = function (count, global) { return { prefetchCount: count || 0, prefetchSize: 0, global: !!global }; }
...
defs.BasicReject,
Args.reject(message.fields.deliveryTag, requeue));
return this;
};
Channel.prototype.prefetch = function(count, global, cb) {
return this.rpc(defs.BasicQos,
Args.prefetch(count, global),
defs.BasicQosOk, cb);
};
Channel.prototype.recover = function(cb) {
return this.rpc(defs.BasicRecover,
Args.recover(),
defs.BasicRecoverOk, cb);
...
publish = function (exchange, routingKey, options) { options = options || EMPTY_OPTIONS; // The CC and BCC fields expect an array of "longstr", which would // normally be buffer values in JavaScript; however, since a field // array (or table) cannot have shortstr values, the codec will // encode all strings as longstrs anyway. function convertCC(cc) { if (cc === undefined) { return undefined; } else if (Array.isArray(cc)) { return cc.map(String); } else return [String(cc)]; } var headers = Object.create(options.headers || null); setIfDefined(headers, 'CC', convertCC(options.CC)); setIfDefined(headers, 'BCC', convertCC(options.BCC)); var deliveryMode; // undefined will default to 1 (non-persistent) // Previously I overloaded deliveryMode be a boolean meaning // 'persistent or not'; better is to name this option for what it // is, but I need to have backwards compatibility for applications // that either supply a numeric or boolean value. if (options.persistent !== undefined) deliveryMode = (options.persistent) ? 2 : 1; else if (typeof options.deliveryMode === 'number') deliveryMode = options.deliveryMode; else if (options.deliveryMode) // is supplied and truthy deliveryMode = 2; var expiration = options.expiration; if (expiration !== undefined) expiration = expiration.toString(); return { // method fields exchange: exchange, routingKey: routingKey, mandatory: !!options.mandatory, immediate: false, // RabbitMQ doesn't implement this any more ticket: undefined, // properties contentType: options.contentType, contentEncoding: options.contentEncoding, headers: headers, deliveryMode: deliveryMode, priority: options.priority, correlationId: options.correlationId, replyTo: options.replyTo, expiration: expiration, messageId: options.messageId, timestamp: options.timestamp, type: options.type, userId: options.userId, appId: options.appId, clusterId: undefined }; }
...
return this.rpc(defs.ExchangeUnbind,
Args.unbindExchange(dest, source, pattern, argt),
defs.ExchangeUnbindOk, cb);
};
Channel.prototype.publish =
function(exchange, routingKey, content, options) {
var fieldsAndProps = Args.publish(exchange, routingKey, options);
return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
};
Channel.prototype.sendToQueue = function(queue, content, options) {
return this.publish('', queue, content, options);
};
...
purgeQueue = function (queue) { return { queue: queue, ticket: 0, nowait: false }; }
...
return this.rpc(defs.QueueDelete,
Args.deleteQueue(queue, options),
defs.QueueDeleteOk, cb);
};
Channel.prototype.purgeQueue = function(queue, cb) {
return this.rpc(defs.QueuePurge,
Args.purgeQueue(queue),
defs.QueuePurgeOk, cb);
};
Channel.prototype.bindQueue =
function(queue, source, pattern, argt, cb) {
return this.rpc(defs.QueueBind,
Args.bindQueue(queue, source, pattern, argt),
...
recover = function () { return {requeue: true}; }
...
return this.rpc(defs.BasicQos,
Args.prefetch(count, global),
defs.BasicQosOk, cb);
};
Channel.prototype.recover = function(cb) {
return this.rpc(defs.BasicRecover,
Args.recover(),
defs.BasicRecoverOk, cb);
};
function ConfirmChannel(connection) {
Channel.call(this, connection);
}
inherits(ConfirmChannel, Channel);
...
reject = function (tag, requeue) { return { deliveryTag: tag, requeue: (requeue === undefined) ? true : requeue }; }
...
defs.BasicNack, Args.nack(0, true, requeue))
return this;
};
Channel.prototype.reject = function(message, requeue) {
this.sendImmediately(
defs.BasicReject,
Args.reject(message.fields.deliveryTag, requeue));
return this;
};
Channel.prototype.prefetch = function(count, global, cb) {
return this.rpc(defs.BasicQos,
Args.prefetch(count, global),
defs.BasicQosOk, cb);
...
unbindExchange = function (dest, source, pattern, argt) { return { source: source, destination: dest, routingKey: pattern, arguments: argt, ticket: 0, nowait: false }; }
...
Args.bindExchange(dest, source, pattern, argt),
defs.ExchangeBindOk, cb);
};
Channel.prototype.unbindExchange =
function(dest, source, pattern, argt, cb) {
return this.rpc(defs.ExchangeUnbind,
Args.unbindExchange(dest, source, pattern, argt),
defs.ExchangeUnbindOk, cb);
};
Channel.prototype.publish =
function(exchange, routingKey, content, options) {
var fieldsAndProps = Args.publish(exchange, routingKey, options);
return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
...
unbindQueue = function (queue, source, pattern, argt) { return { queue: queue, exchange: source, routingKey: pattern, arguments: argt, ticket: 0, nowait: false }; }
...
Args.bindQueue(queue, source, pattern, argt),
defs.QueueBindOk, cb);
};
Channel.prototype.unbindQueue =
function(queue, source, pattern, argt, cb) {
return this.rpc(defs.QueueUnbind,
Args.unbindQueue(queue, source, pattern, argt),
defs.QueueUnbindOk, cb);
};
Channel.prototype.assertExchange = function(ex, type, options, cb0) {
var cb = callbackWrapper(this, cb0);
this._rpc(defs.ExchangeDeclare,
Args.assertExchange(ex, type, options),
...
function BitSet(size) { if (size) { var numWords = Math.ceil(size / 32); this.words = new Array(numWords); } else { this.words = []; } this.wordsInUse = 0; // = number, not index }
n/a
function connect(url, options, cb) { if (typeof url === 'function') cb = url, url = false, options = false; else if (typeof options === 'function') cb = options, options = false; raw_connect(url, options, function(err, c) { if (err === null) cb(null, new CallbackModel(c)); else cb(err); }); }
...
ch.ack(msg);
}
});
}
}
require('amqplib/callback_api')
.connect('amqp://localhost', function(err, conn) {
if (err != null) bail(err);
consumer(conn);
publisher(conn);
});
```
## Promise API example
...
function CallbackModel(connection) { if (!(this instanceof CallbackModel)) return new CallbackModel(connection); EventEmitter.call( this ); this.connection = connection; var self = this; ['error', 'close', 'blocked', 'unblocked'].forEach(function(ev) { connection.on(ev, self.emit.bind(self, ev)); }); }
n/a
function Channel(connection) { BaseChannel.call(this, connection); this.on('delivery', this.handleDelivery.bind(this)); this.on('cancel', this.handleCancel.bind(this)); }
n/a
function ConfirmChannel(connection) { Channel.call(this, connection); }
n/a
function BaseChannel(connection) { Channel.call(this, connection); this.consumers = {}; }
n/a
function Channel(connection) { EventEmitter.call( this ); this.connection = connection; // for the presently outstanding RPC this.reply = null; // for the RPCs awaiting action this.pending = []; // for unconfirmed messages this.lwm = 1; // the least, unconfirmed deliveryTag this.unconfirmed = []; // rolling window of delivery callbacks this.on('ack', this.handleConfirm.bind(this, function(cb) { if (cb) cb(null); })); this.on('nack', this.handleConfirm.bind(this, function(cb) { if (cb) cb(new Error('message nacked')); })); // message frame state machine this.handleMessage = acceptDeliveryOrReturn; }
n/a
function acceptMessage(continuation) { var totalSize = 0, remaining = 0; var buffers = null; var message = { fields: null, properties: null, content: null }; return headers; // expect a headers frame function headers(f) { if (f.id === defs.BasicProperties) { message.properties = f.fields; totalSize = remaining = f.size; // for zero-length messages, content frames aren't required. if (totalSize === 0) { message.content = new Buffer(0); continuation(message); return acceptDeliveryOrReturn; } else { return content; } } else { throw "Expected headers frame after delivery"; } } // expect a content frame // %%% TODO cancelled messages (sent as zero-length content frame) function content(f) { if (f.content) { var size = f.content.length; remaining -= size; if (remaining === 0) { if (buffers !== null) { buffers.push(f.content); message.content = Buffer.concat(buffers); } else { message.content = f.content; } continuation(message); return acceptDeliveryOrReturn; } else if (remaining < 0) { throw fmt("Too much content sent! Expected %d bytes", totalSize); } else { if (buffers !== null) buffers.push(f.content); else buffers = [f.content]; return content; } } else throw "Expected content frame after headers" } }
n/a
function Channel(connection) { BaseChannel.call(this, connection); this.on('delivery', this.handleDelivery.bind(this)); this.on('cancel', this.handleCancel.bind(this)); }
n/a
function ChannelModel(connection) { if (!(this instanceof ChannelModel)) return new ChannelModel(connection); EventEmitter.call( this ); this.connection = connection; var self = this; ['error', 'close', 'blocked', 'unblocked'].forEach(function(ev) { connection.on(ev, self.emit.bind(self, ev)); }); }
n/a
function ConfirmChannel(connection) { Channel.call(this, connection); }
n/a
function decodeFields(slice) { var fields = {}, offset = 0, size = slice.length; var len, key, val; function decodeFieldValue() { var tag = String.fromCharCode(slice[offset]); offset++; switch (tag) { case 'b': val = slice.readInt8(offset); offset++; break; case 'S': len = slice.readUInt32BE(offset); offset += 4; val = slice.toString('utf8', offset, offset + len); offset += len; break; case 'I': val = slice.readInt32BE(offset); offset += 4; break; case 'D': // only positive decimals, apparently. var places = slice[offset]; offset++; var digits = slice.readUInt32BE(offset); offset += 4; val = {'!': 'decimal', value: {places: places, digits: digits}}; break; case 'T': val = ints.readUInt64BE(slice, offset); offset += 8; val = {'!': 'timestamp', value: val}; break; case 'F': len = slice.readUInt32BE(offset); offset += 4; val = decodeFields(slice.slice(offset, offset + len)); offset += len; break; case 'A': len = slice.readUInt32BE(offset); offset += 4; decodeArray(offset + len); // NB decodeArray will itself update offset and val break; case 'd': val = slice.readDoubleBE(offset); offset += 8; break; case 'f': val = slice.readFloatBE(offset); offset += 4; break; case 'l': val = ints.readInt64BE(slice, offset); offset += 8; break; case 's': val = slice.readInt16BE(offset); offset += 2; break; case 't': val = slice[offset] != 0; offset++; break; case 'V': val = null; break; case 'x': len = slice.readUInt32BE(offset); offset += 4; val = slice.slice(offset, offset + len); offset += len; break; default: throw new TypeError('Unexpected type tag "' + tag +'"'); } } function decodeArray(until) { var vals = []; while (offset < until) { decodeFieldValue(); vals.push(val); } val = vals; } while (offset < size) { len = slice.readUInt8(offset); offset++; key = slice.toString('utf8', offset, offset + len); offset += len; decodeFieldValue(); fields[key] = val; } return fields; }
n/a
function encodeTable(buffer, val, offset) { var start = offset; offset += 4; // leave room for the table length for (var key in val) { if (val[key] !== undefined) { var len = Buffer.byteLength(key); buffer.writeUInt8(len, offset); offset++; buffer.write(key, offset, 'utf8'); offset += len; offset += encodeFieldValue(buffer, val[key], offset); } } var size = offset - start; buffer.writeUInt32BE(size - 4, start); return size; }
n/a
function connect(url, socketOptions, openCallback) { // tls.connect uses `util._extend()` on the options given it, which // copies only properties mentioned in `Object.keys()`, when // processing the options. So I have to make copies too, rather // than using `Object.create()`. var sockopts = clone(socketOptions || {}); url = url || 'amqp://localhost'; var noDelay = !!sockopts.noDelay; var timeout = sockopts.timeout; var keepAlive = !!sockopts.keepAlive; // 0 is default for node var keepAliveDelay = sockopts.keepAliveDelay || 0; var extraClientProperties = sockopts.clientProperties || {}; var protocol, fields; if (typeof url === 'object') { protocol = (url.protocol || 'amqp') + ':'; sockopts.host = url.hostname; sockopts.port = url.port || ((protocol === 'amqp:') ? 5672 : 5671); var user, pass; if (!url.username) { user = 'guest'; pass = url.password || 'guest'; } else { user = url.username; pass = url.password; } fields = openFrames(url.vhost, null, sockopts.credentials || credentials.plain(user, pass), extraClientProperties); } else { var parts = URL.parse(url, true); // yes, parse the query string protocol = parts.protocol; sockopts.host = parts.hostname; sockopts.port = parseInt(parts.port) || ((protocol === 'amqp:') ? 5672 : 5671); var vhost = parts.pathname ? parts.pathname.substr(1) : null; fields = openFrames(vhost, parts.query, sockopts.credentials || credentialsFromUrl(parts), extraClientProperties); } var sockok = false; var sock; function onConnect() { sockok = true; sock.setNoDelay(noDelay); if (keepAlive) sock.setKeepAlive(keepAlive, keepAliveDelay); var c = new Connection(sock); c.open(fields, function(err, ok) { // disable timeout once the connection is open, we don't want // it fouling things if (timeout) sock.setTimeout(0); if (err === null) { openCallback(null, c); } else openCallback(err); }); } if (protocol === 'amqp:') { sock = require('net').connect(sockopts, onConnect); } else if (protocol === 'amqps:') { sock = require('tls').connect(sockopts, onConnect); } else { throw new Error("Expected amqp: or amqps: as the protocol; got " + protocol); } if (timeout) { sock.setTimeout( timeout, openCallback.bind(this, new Error('connect ETIMEDOUT'))); } sock.once('error', function(err) { if (!sockok) openCallback(err); }); }
...
ch.ack(msg);
}
});
}
}
require('amqplib/callback_api')
.connect('amqp://localhost', function(err, conn) {
if (err != null) bail(err);
consumer(conn);
publisher(conn);
});
```
## Promise API example
...
function Connection(underlying) { EventEmitter.call( this ); var stream = this.stream = wrapStream(underlying); this.muxer = new Mux(stream); // frames this.rest = new Buffer(0); this.frameMax = constants.FRAME_MIN_SIZE; this.sentSinceLastCheck = false; this.recvSinceLastCheck = false; this.expectSocketClose = false; this.freeChannels = new BitSet(); this.channels = [{channel: {accept: channel0(this)}, buffer: underlying}]; }
n/a
function isFatalError(error) { switch (error && error.code) { case defs.constants.CONNECTION_FORCED: case defs.constants.REPLY_SUCCESS: return false; default: return true; } }
n/a
external = function () { return { mechanism: 'EXTERNAL', response: function() { return new Buffer(''); } } }
n/a
plain = function (user, passwd) { return { mechanism: 'PLAIN', response: function() { return new Buffer(['', user, passwd].join(String.fromCharCode(0))) } } }
...
function credentialsFromUrl(parts) {
var user = 'guest', passwd = 'guest';
if (parts.auth) {
var auth = parts.auth.split(':');
user = auth[0];
passwd = auth[1];
}
return credentials.plain(user, passwd);
}
function connect(url, socketOptions, openCallback) {
// tls.connect uses `util._extend()` on the options given it, which
// copies only properties mentioned in `Object.keys()`, when
// processing the options. So I have to make copies too, rather
// than using `Object.create()`.
...
decode = function (id, buf) { switch (id) { case 655370: return decodeConnectionStart(buf); case 655371: return decodeConnectionStartOk(buf); case 655380: return decodeConnectionSecure(buf); case 655381: return decodeConnectionSecureOk(buf); case 655390: return decodeConnectionTune(buf); case 655391: return decodeConnectionTuneOk(buf); case 655400: return decodeConnectionOpen(buf); case 655401: return decodeConnectionOpenOk(buf); case 655410: return decodeConnectionClose(buf); case 655411: return decodeConnectionCloseOk(buf); case 655420: return decodeConnectionBlocked(buf); case 655421: return decodeConnectionUnblocked(buf); case 1310730: return decodeChannelOpen(buf); case 1310731: return decodeChannelOpenOk(buf); case 1310740: return decodeChannelFlow(buf); case 1310741: return decodeChannelFlowOk(buf); case 1310760: return decodeChannelClose(buf); case 1310761: return decodeChannelCloseOk(buf); case 1966090: return decodeAccessRequest(buf); case 1966091: return decodeAccessRequestOk(buf); case 2621450: return decodeExchangeDeclare(buf); case 2621451: return decodeExchangeDeclareOk(buf); case 2621460: return decodeExchangeDelete(buf); case 2621461: return decodeExchangeDeleteOk(buf); case 2621470: return decodeExchangeBind(buf); case 2621471: return decodeExchangeBindOk(buf); case 2621480: return decodeExchangeUnbind(buf); case 2621491: return decodeExchangeUnbindOk(buf); case 3276810: return decodeQueueDeclare(buf); case 3276811: return decodeQueueDeclareOk(buf); case 3276820: return decodeQueueBind(buf); case 3276821: return decodeQueueBindOk(buf); case 3276830: return decodeQueuePurge(buf); case 3276831: return decodeQueuePurgeOk(buf); case 3276840: return decodeQueueDelete(buf); case 3276841: return decodeQueueDeleteOk(buf); case 3276850: return decodeQueueUnbind(buf); case 3276851: return decodeQueueUnbindOk(buf); case 3932170: return decodeBasicQos(buf); case 3932171: return decodeBasicQosOk(buf); case 3932180: return decodeBasicConsume(buf); case 3932181: return decodeBasicConsumeOk(buf); case 3932190: return decodeBasicCancel(buf); case 3932191: return decodeBasicCancelOk(buf); case 3932200: return decodeBasicPublish(buf); case 3932210: return decodeBasicReturn(buf); case 3932220: return decodeBasicDeliver(buf); case 3932230: return decodeBasicGet(buf); case 3932231: return decodeBasicGetOk(buf); case 3932232: return decodeBasicGetEmpty(buf); case 3932240: return decodeBasicAck(buf); case 3932250: return decodeBasicReject(buf); case 3932260: return decodeBasicRecoverAsync(buf); case 3932270: return decodeBasicRecover(buf); case 3932271: return decodeBasicRecoverOk(buf); case 3932280: return decodeBasicNack(buf); case 5898250: return decodeTxSelect(buf); case 5898251: return decodeTxSelectOk(buf); case 5898260: return decodeTxCommit(buf); case 5898261: return decodeTxCommitOk(buf); case 5898270: return decodeTxRollback(buf); case 5898271: return decodeTxRollbackOk(buf); case 5570570: return decodeConfirmSelect(buf); case 5570571: return decodeConfirmSelectOk(buf); case 60: return decodeBasicProperties(buf); default: throw new Error("Unknown class/method ID"); } }
n/a
encodeMethod = function (id, channel, fields) { switch (id) { case 655370: return encodeConnectionStart(channel, fields); case 655371: return encodeConnectionStartOk(channel, fields); case 655380: return encodeConnectionSecure(channel, fields); case 655381: return encodeConnectionSecureOk(channel, fields); case 655390: return encodeConnectionTune(channel, fields); case 655391: return encodeConnectionTuneOk(channel, fields); case 655400: return encodeConnectionOpen(channel, fields); case 655401: return encodeConnectionOpenOk(channel, fields); case 655410: return encodeConnectionClose(channel, fields); case 655411: return encodeConnectionCloseOk(channel, fields); case 655420: return encodeConnectionBlocked(channel, fields); case 655421: return encodeConnectionUnblocked(channel, fields); case 1310730: return encodeChannelOpen(channel, fields); case 1310731: return encodeChannelOpenOk(channel, fields); case 1310740: return encodeChannelFlow(channel, fields); case 1310741: return encodeChannelFlowOk(channel, fields); case 1310760: return encodeChannelClose(channel, fields); case 1310761: return encodeChannelCloseOk(channel, fields); case 1966090: return encodeAccessRequest(channel, fields); case 1966091: return encodeAccessRequestOk(channel, fields); case 2621450: return encodeExchangeDeclare(channel, fields); case 2621451: return encodeExchangeDeclareOk(channel, fields); case 2621460: return encodeExchangeDelete(channel, fields); case 2621461: return encodeExchangeDeleteOk(channel, fields); case 2621470: return encodeExchangeBind(channel, fields); case 2621471: return encodeExchangeBindOk(channel, fields); case 2621480: return encodeExchangeUnbind(channel, fields); case 2621491: return encodeExchangeUnbindOk(channel, fields); case 3276810: return encodeQueueDeclare(channel, fields); case 3276811: return encodeQueueDeclareOk(channel, fields); case 3276820: return encodeQueueBind(channel, fields); case 3276821: return encodeQueueBindOk(channel, fields); case 3276830: return encodeQueuePurge(channel, fields); case 3276831: return encodeQueuePurgeOk(channel, fields); case 3276840: return encodeQueueDelete(channel, fields); case 3276841: return encodeQueueDeleteOk(channel, fields); case 3276850: return encodeQueueUnbind(channel, fields); case 3276851: return encodeQueueUnbindOk(channel, fields); case 3932170: return encodeBasicQos(channel, fields); case 3932171: return encodeBasicQosOk(channel, fields); case 3932180: return encodeBasicConsume(channel, fields); case 3932181: return encodeBasicConsumeOk(channel, fields); case 3932190: return encodeBasicCancel(channel, fields); case 3932191: return encodeBasicCancelOk(channel, fields); case 3932200: return encodeBasicPublish(channel, fields); case 3932210: return encodeBasicReturn(channel, fields); case 3932220: return encodeBasicDeliver(channel, fields); case 3932230: return encodeBasicGet(channel, fields); case 3932231: return encodeBasicGetOk(channel, fields); case 3932232: return encodeBasicGetEmpty(channel, fields); case 3932240: return encodeBasicAck(channel, fields); case 3932250: return encodeBasicReject(channel, fields); case 3932260: return encodeBasicRecoverAsync(channel, fields); case 3932270: return encodeBasicRecover(channel, fields); case 3932271: return encodeBasicRecoverOk(channel, fields); case 3932280: return encodeBasicNack(channel, fields); case 5898250: return encodeTxSelect(channel, fields); case 5898251: return encodeTxSelectOk(channel, fields); case 5898260: return encodeTxCommit(channel, fields); case 5898261: return encodeTxCommitOk(channel, fields); case 5898270: return encodeTxRollback(cha ...
n/a
encodeProperties = function (id, channel, size, fields) { switch (id) { case 60: return encodeBasicProperties(channel, size, fields); default: throw new Error("Unknown class/properties ID"); } }
n/a
info = function (id) { switch (id) { case 655370: return methodInfoConnectionStart; case 655371: return methodInfoConnectionStartOk; case 655380: return methodInfoConnectionSecure; case 655381: return methodInfoConnectionSecureOk; case 655390: return methodInfoConnectionTune; case 655391: return methodInfoConnectionTuneOk; case 655400: return methodInfoConnectionOpen; case 655401: return methodInfoConnectionOpenOk; case 655410: return methodInfoConnectionClose; case 655411: return methodInfoConnectionCloseOk; case 655420: return methodInfoConnectionBlocked; case 655421: return methodInfoConnectionUnblocked; case 1310730: return methodInfoChannelOpen; case 1310731: return methodInfoChannelOpenOk; case 1310740: return methodInfoChannelFlow; case 1310741: return methodInfoChannelFlowOk; case 1310760: return methodInfoChannelClose; case 1310761: return methodInfoChannelCloseOk; case 1966090: return methodInfoAccessRequest; case 1966091: return methodInfoAccessRequestOk; case 2621450: return methodInfoExchangeDeclare; case 2621451: return methodInfoExchangeDeclareOk; case 2621460: return methodInfoExchangeDelete; case 2621461: return methodInfoExchangeDeleteOk; case 2621470: return methodInfoExchangeBind; case 2621471: return methodInfoExchangeBindOk; case 2621480: return methodInfoExchangeUnbind; case 2621491: return methodInfoExchangeUnbindOk; case 3276810: return methodInfoQueueDeclare; case 3276811: return methodInfoQueueDeclareOk; case 3276820: return methodInfoQueueBind; case 3276821: return methodInfoQueueBindOk; case 3276830: return methodInfoQueuePurge; case 3276831: return methodInfoQueuePurgeOk; case 3276840: return methodInfoQueueDelete; case 3276841: return methodInfoQueueDeleteOk; case 3276850: return methodInfoQueueUnbind; case 3276851: return methodInfoQueueUnbindOk; case 3932170: return methodInfoBasicQos; case 3932171: return methodInfoBasicQosOk; case 3932180: return methodInfoBasicConsume; case 3932181: return methodInfoBasicConsumeOk; case 3932190: return methodInfoBasicCancel; case 3932191: return methodInfoBasicCancelOk; case 3932200: return methodInfoBasicPublish; case 3932210: return methodInfoBasicReturn; case 3932220: return methodInfoBasicDeliver; case 3932230: return methodInfoBasicGet; case 3932231: return methodInfoBasicGetOk; case 3932232: return methodInfoBasicGetEmpty; case 3932240: return methodInfoBasicAck; case 3932250: return methodInfoBasicReject; case 3932260: return methodInfoBasicRecoverAsync; case 3932270: return methodInfoBasicRecover; case 3932271: return methodInfoBasicRecoverOk; case 3932280: return methodInfoBasicNack; case 5898250: return methodInfoTxSelect; case 5898251: return methodInfoTxSelectOk; case 5898260: return methodInfoTxCommit; case 5898261: return methodInfoTxCommitOk; case 5898270: return methodInfoTxRollback; case 5898271: return methodInfoTxRollbackOk; case 5570570: return methodInfoConfirmSelect; case 5570571: return methodInfoConfirmSelectOk; case 60: return propertiesInfoBasicProperties; default: throw new Error("Unknown class/method ID"); } }
...
var code = close.fields.replyCode;
return format('%d (%s) with message "%s"',
code, defs.constant_strs[code],
close.fields.replyText);
}
module.exports.methodName = function(id) {
return defs.info(id).name;
};
module.exports.inspect = function(frame, showFields) {
if (frame === HEARTBEAT) {
return '<Heartbeat>';
}
else if (!frame.id) {
...
function IllegalOperationError(msg, stack) { var tmp = new Error(); this.message = msg; this.stack = this.toString() + '\n' + trimStack(tmp.stack, 2); this.stackAtStateChange = stack; }
n/a
function stackCapture(reason) { var e = new Error(); return 'Stack capture: ' + reason + '\n' + trimStack(e.stack, 2); }
n/a
closeMessage = function (close) { var code = close.fields.replyCode; return format('%d (%s) with message "%s"', code, defs.constant_strs[code], close.fields.replyText); }
n/a
inspect = function (frame, showFields) { if (frame === HEARTBEAT) { return '<Heartbeat>'; } else if (!frame.id) { return format('<Content channel:%d size:%d>', frame.channel, frame.size); } else { var info = defs.info(frame.id); return format('<%s channel:%d%s>', info.name, frame.channel, (showFields) ? ' ' + JSON.stringify(frame.fields, undefined, 2) : ''); } }
n/a
methodName = function (id) { return defs.info(id).name; }
n/a
decodeFrame = function (frame) { var payload = frame.payload; switch (frame.type) { case FRAME_METHOD: var idAndArgs = methodPattern(payload); var id = idAndArgs.id; var fields = decode(id, idAndArgs.args); return {id: id, channel: frame.channel, fields: fields}; case FRAME_HEADER: var parts = headerPattern(payload); var id = parts['class']; var fields = decode(id, parts.flagsAndfields); return {id: id, channel: frame.channel, size: parts.size, fields: fields}; case FRAME_BODY: return {channel: frame.channel, content: frame.payload}; case FRAME_HEARTBEAT: return HEARTBEAT; default: throw new Error('Unknown frame type ' + frame.type); } }
n/a
makeBodyFrame = function (channel, payload) { return bodyCons({channel: channel, size: payload.length, payload: payload}); }
n/a
function parseFrame(bin, max) { var fh = frameHeaderPattern(bin); if (fh) { var size = fh.size, rest = fh.rest; if (size > max) { throw new Error('Frame size exceeds frame max'); } else if (rest.length > size) { if (rest[size] !== FRAME_END) throw new Error('Invalid frame'); return { type: fh.type, channel: fh.channel, size: size, payload: rest.slice(0, size), rest: rest.slice(size + 1) }; } } return false; }
n/a
function Heart(interval, checkSend, checkRecv) { EventEmitter.call(this); this.interval = interval; var intervalMs = interval * module.exports.UNITS_TO_MS; // Function#bind is my new best friend var beat = this.emit.bind(this, 'beat'); var timeout = this.emit.bind(this, 'timeout'); this.sendTimer = setInterval( this.runHeartbeat.bind(this, checkSend, beat), intervalMs / 2); // A timeout occurs if I see nothing for *two consecutive* intervals var recvMissed = 0; function missedTwo() { if (!checkRecv()) return (++recvMissed < 2); else { recvMissed = 0; return true; } } this.recvTimer = setInterval( this.runHeartbeat.bind(this, missedTwo, timeout), intervalMs); }
n/a
function Mux(downstream) { this.newStreams = []; this.oldStreams = []; this.blocked = false; this.scheduledRead = false; this.out = downstream; var self = this; downstream.on('drain', function() { self.blocked = false; self._readIncoming(); }); }
n/a