description and source-codefunction adapter(uri, opts) {
opts = opts || {};
// handle options only
if ('object' == typeof uri) {
opts = uri;
uri = null;
}
// opts
var pub = opts.pubClient;
var sub = opts.subClient;
var prefix = opts.key || 'socket.io';
var subEvent = opts.subEvent || 'messageBuffer';
var requestsTimeout = opts.requestsTimeout || 1000;
var withChannelMultiplexing = false !== opts.withChannelMultiplexing;
// init clients if needed
function createClient() {
if (uri) {
// handle uri string
return redis(uri, opts);
} else {
return redis(opts);
}
}
if (!pub) pub = createClient();
if (!sub) sub = createClient();
// this server's key
var uid = uid2(6);
function Redis(nsp){
Adapter.call(this, nsp);
this.uid = uid;
this.prefix = prefix;
this.requestsTimeout = requestsTimeout;
this.withChannelMultiplexing = withChannelMultiplexing;
this.channel = prefix + '#' + nsp.name + '#';
this.requestChannel = prefix + '-request#' + this.nsp.name + '#';
this.responseChannel = prefix + '-response#' + this.nsp.name + '#';
this.requests = {};
this.customHook = function(data, cb){ cb(null); }
if (String.prototype.startsWith) {
this.channelMatches = function (messageChannel, subscribedChannel) {
return messageChannel.startsWith(subscribedChannel);
}
} else { // Fallback to other impl for older Node.js
this.channelMatches = function (messageChannel, subscribedChannel) {
return messageChannel.substr(0, subscribedChannel.length) === subscribedChannel;
}
}
this.pubClient = pub;
this.subClient = sub;
var self = this;
sub.subscribe([this.channel, this.requestChannel, this.responseChannel], function(err){
if (err) self.emit('error', err);
});
sub.on(subEvent, this.onmessage.bind(this));
function onError(err) {
self.emit('error', err);
}
pub.on('error', onError);
sub.on('error', onError);
}
/**
* Inherits from `Adapter`.
*/
Redis.prototype.__proto__ = Adapter.prototype;
/**
* Called with a subscription message
*
* @api private
*/
Redis.prototype.onmessage = function(channel, msg){
channel = channel.toString();
if (this.channelMatches(channel, this.requestChannel)) {
return this.onrequest(channel, msg);
} else if (this.channelMatches(channel, this.responseChannel)) {
return this.onresponse(channel, msg);
} else if (!this.channelMatches(channel, this.channel)) {
return debug('ignore different channel');
}
var args = msgpack.decode(msg);
var packet;
if (uid == args.shift()) return debug('ignore same uid');
packet = args[0];
if (packet && packet.nsp === undefined) {
packet.nsp = '/';
}
if (!packet || packet.nsp != this.nsp.name) {
return debug('ignore different namespace');
}
args.push(true);
this.broadcast.apply(this, args);
};
/**
* Called on request from another node
*
* @api private
*/
Redis.prototype.onrequest = function(channel, msg){
var self = this;
var request;
try {
request = JSON.parse(msg);
} catch(err){
self.emit('error', err);
return;
}
debug('received request %j', request);
switch (request.type) {
case requestTypes.clients:
Adapter.prototype.clients.call(self, request.rooms, function(err, clients){
if(err){
self.emit('error', err);
return;
}
var response = JSON.stringify({
requestid: request.requestid,
clients: clients
});
pub.publish(self.responseChannel, response);
});
break;
case requestTypes.clientRooms:
Adapter.prototype.clientRooms.call(self, request.sid, function(err, rooms){
if(err){
self.emit('error', err);
return;
}
if (!rooms) ...