class Receiver {
/**
* Creates a Receiver instance.
*
* @param {Object} extensions An object containing the negotiated extensions
* @param {Number} maxPayload The maximum allowed message length
* @param {String} binaryType The type for binary data
*/
constructor (extensions, maxPayload, binaryType) {
this.binaryType = binaryType || constants.BINARY_TYPES[0];
this.extensions = extensions || {};
this.maxPayload = maxPayload | 0;
this.bufferedBytes = 0;
this.buffers = [];
this.compressed = false;
this.payloadLength = 0;
this.fragmented = 0;
this.masked = false;
this.fin = false;
this.mask = null;
this.opcode = 0;
this.totalPayloadLength = 0;
this.messageLength = 0;
this.fragments = [];
this.cleanupCallback = null;
this.hadError = false;
this.dead = false;
this.loop = false;
this.onmessage = null;
this.onclose = null;
this.onerror = null;
this.onping = null;
this.onpong = null;
this.state = GET_INFO;
}
/**
* Consumes bytes from the available buffered data.
*
* @param {Number} bytes The number of bytes to consume
* @return {Buffer} Consumed bytes
* @private
*/
readBuffer (bytes) {
var offset = 0;
var dst;
var l;
this.bufferedBytes -= bytes;
if (bytes === this.buffers[0].length) return this.buffers.shift();
if (bytes < this.buffers[0].length) {
dst = this.buffers[0].slice(0, bytes);
this.buffers[0] = this.buffers[0].slice(bytes);
return dst;
}
dst = Buffer.allocUnsafe(bytes);
while (bytes > 0) {
l = this.buffers[0].length;
if (bytes >= l) {
this.buffers[0].copy(dst, offset);
offset += l;
this.buffers.shift();
} else {
this.buffers[0].copy(dst, offset, 0, bytes);
this.buffers[0] = this.buffers[0].slice(bytes);
}
bytes -= l;
}
return dst;
}
/**
* Checks if the number of buffered bytes is bigger or equal than `n` and
* calls `cleanup` if necessary.
*
* @param {Number} n The number of bytes to check against
* @return {Boolean} `true` if `bufferedBytes >= n`, else `false`
* @private
*/
hasBufferedBytes (n) {
if (this.bufferedBytes >= n) return true;
this.loop = false;
if (this.dead) this.cleanup(this.cleanupCallback);
return false;
}
/**
* Adds new data to the parser.
*
* @public
*/
add (data) {
if (this.dead) return;
this.bufferedBytes += data.length;
this.buffers.push(data);
this.startLoop();
}
/**
* Starts the parsing loop.
*
* @private
*/
startLoop () {
this.loop = true;
while (this.loop) {
switch (this.state) {
case GET_INFO:
this.getInfo();
break;
case GET_PAYLOAD_LENGTH_16:
this.getPayloadLength16();
break;
case GET_PAYLOAD_LENGTH_64:
this.getPayloadLength64();
break;
case GET_MASK:
this.getMask();
break;
case GET_DATA:
this.getData();
break;
default: // `INFLATING`
this.loop = false;
}
}
}
/**
* Reads the first two bytes of a frame.
*
* @private
*/
getInfo () {
if (!this.hasBufferedBytes(2)) return;
const buf = this.readBuffer(2);
if ((buf[0] & 0x30) !== 0x00) {
this.error(new Error('RSV2 and RSV3 must be clear'), 1002);
return;
}
const compressed = (buf[0] & 0x40) === 0x40;
if (compressed && !this.extensions[PerMessageDeflate.extensionName]) {
this.error(new Error('RSV1 must be clear'), 1002);
return;
}
this.fin = (buf[0] & 0x80) === 0x80;
this.opcode = buf[0] & 0x0f;
this.payloadLength = buf[1] & 0x7f;
if (this.opcode === 0x00) {
if (compressed) {
this.error(new Error('RSV1 must be clear'), 1002);
return;
}
if (!this.fragmented) {
this.error(new Error(`invalid opcode: ${this.opcode}`), 1002);
return;
} else { ...
n/a
class Sender {
/**
* Creates a Sender instance.
*
* @param {net.Socket} socket The connection socket
* @param {Object} extensions An object containing the negotiated extensions
*/
constructor (socket, extensions) {
this.perMessageDeflate = (extensions || {})[PerMessageDeflate.extensionName];
this._socket = socket;
this.firstFragment = true;
this.compress = false;
this.bufferedBytes = 0;
this.deflating = false;
this.queue = [];
this.onerror = null;
}
/**
* Frames a piece of data according to the HyBi WebSocket protocol.
*
* @param {Buffer} data The data to frame
* @param {Object} options Options object
* @param {Number} options.opcode The opcode
* @param {Boolean} options.readOnly Specifies whether `data` can be modified
* @param {Boolean} options.fin Specifies whether or not to set the FIN bit
* @param {Boolean} options.mask Specifies whether or not to mask `data`
* @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit
* @return {Buffer[]} The framed data as a list of `Buffer` instances
* @public
*/
static frame (data, options) {
const merge = data.length < 1024 || (options.mask && options.readOnly);
var offset = options.mask ? 6 : 2;
var payloadLength = data.length;
if (data.length >= 65536) {
offset += 8;
payloadLength = 127;
} else if (data.length > 125) {
offset += 2;
payloadLength = 126;
}
const target = Buffer.allocUnsafe(merge ? data.length + offset : offset);
target[0] = options.fin ? options.opcode | 0x80 : options.opcode;
if (options.rsv1) target[0] |= 0x40;
if (payloadLength === 126) {
target.writeUInt16BE(data.length, 2, true);
} else if (payloadLength === 127) {
target.writeUInt32BE(0, 2, true);
target.writeUInt32BE(data.length, 6, true);
}
if (!options.mask) {
target[1] = payloadLength;
if (merge) {
data.copy(target, offset);
return [target];
}
return [target, data];
}
const mask = crypto.randomBytes(4);
target[1] = payloadLength | 0x80;
target[offset - 4] = mask[0];
target[offset - 3] = mask[1];
target[offset - 2] = mask[2];
target[offset - 1] = mask[3];
if (merge) {
bufferUtil.mask(data, mask, target, offset, data.length);
return [target];
}
bufferUtil.mask(data, mask, data, 0, data.length);
return [target, data];
}
/**
* Sends a close message to the other peer.
*
* @param {(Number|undefined)} code The status code component of the body
* @param {String} data The message component of the body
* @param {Boolean} mask Specifies whether or not to mask the message
* @param {Function} cb Callback
* @public
*/
close (code, data, mask, cb) {
if (code !== undefined && (typeof code !== 'number' || !ErrorCodes.isValidErrorCode(code))) {
throw new Error('first argument must be a valid error code number');
}
const buf = Buffer.allocUnsafe(2 + (data ? Buffer.byteLength(data) : 0));
buf.writeUInt16BE(code || 1000, 0, true);
if (buf.length > 2) buf.write(data, 2);
if (this.deflating) {
this.enqueue([this.doClose, buf, mask, cb]);
} else {
this.doClose(buf, mask, cb);
}
}
/**
* Frames and sends a close message.
*
* @param {Buffer} data The message to send
* @param {Boolean} mask Specifies whether or not to mask `data`
* @param {Function} cb Callback
* @private
*/
doClose (data, mask, cb) {
this.sendFrame(Sender.frame(data, {
readOnly: false,
opcode: 0x08,
rsv1: false,
fin: true,
mask
}), cb);
}
/**
* Sends a ping message to the other peer.
*
* @param {*} data The message to send
* @param {Boolean} mask Specifies whether or not to mask `data`
* @public
*/
ping (data, mask) {
var readOnly = true;
if (!Buffer.isBuffer(data)) {
if (data instanceof ArrayBuffer) {
data = Buffer.from(data);
} else if (ArrayBuffer.isV ...
n/a
class WebSocketServer extends EventEmitter {
/**
* Create a `WebSocketServer` instance.
*
* @param {Object} options Configuration options
* @param {String} options.host The hostname where to bind the server
* @param {Number} options.port The port where to bind the server
* @param {http.Server} options.server A pre-created HTTP/S server to use
* @param {Function} options.verifyClient An hook to reject connections
* @param {Function} options.handleProtocols An hook to handle protocols
* @param {String} options.path Accept only connections matching this path
* @param {Boolean} options.noServer Enable no server mode
* @param {Boolean} options.clientTracking Specifies whether or not to track clients
* @param {(Boolean|Object)} options.perMessageDeflate Enable/disable permessage-deflate
* @param {Number} options.maxPayload The maximum allowed message size
* @param {Function} callback A listener for the `listening` event
*/
constructor (options, callback) {
super();
options = Object.assign({
maxPayload: 100 * 1024 * 1024,
perMessageDeflate: true,
handleProtocols: null,
clientTracking: true,
verifyClient: null,
noServer: false,
backlog: null, // use default (511 as implemented in net.js)
server: null,
host: null,
path: null,
port: null
}, options);
if (options.port == null && !options.server && !options.noServer) {
throw new TypeError('missing or invalid options');
}
if (options.port != null) {
this._server = http.createServer((req, res) => {
const body = http.STATUS_CODES[426];
res.writeHead(426, {
'Content-Length': body.length,
'Content-Type': 'text/plain'
});
res.end(body);
});
this._server.allowHalfOpen = false;
this._server.listen(options.port, options.host, options.backlog, callback);
} else if (options.server) {
this._server = options.server;
}
if (this._server) {
this._ultron = new Ultron(this._server);
this._ultron.on('listening', () => this.emit('listening'));
this._ultron.on('error', (err) => this.emit('error', err));
this._ultron.on('upgrade', (req, socket, head) => {
this.handleUpgrade(req, socket, head, (client) => {
this.emit(`connection${req.url}`, client);
this.emit('connection', client);
});
});
}
if (options.clientTracking) this.clients = new Set();
this.options = options;
this.path = options.path;
}
/**
* Close the server.
*
* @param {Function} cb Callback
* @public
*/
close (cb) {
//
// Terminate all associated clients.
//
if (this.clients) {
for (const client of this.clients) client.terminate();
}
const server = this._server;
if (server) {
this._ultron.destroy();
this._ultron = this._server = null;
//
// Close the http server if it was internally created.
//
if (this.options.port != null) return server.close(cb);
}
if (cb) cb();
}
/**
* See if a given request should be handled by this server instance.
*
* @param {http.IncomingMessage} req Request object to inspect
* @return {Boolean} `true` if the request is valid, else `false`
* @public
*/
shouldHandle (req) {
if (this.options.path && url.parse(req.url).pathname !== this.options.path) {
return false;
}
return true;
}
/**
* Handle a HTTP Upgrade request.
*
* @param {http.IncomingMessage} req The request object
* @param {net.Socket} socket The network socket between the server and client
* @param {Buffer} head The first packet of the upgraded stream
* @param {Function} cb Callback
* @public
*/
handleUpgrade (req, socket, head, cb) {
socket.on('error', socketError);
const version = +req.headers['sec-websocket-version'];
if (
req.method !== 'GET' || req.headers.upgrade.toLowerCase() !== 'websocket' ||
!req.headers['sec-websocket-key'] || ...
...
To disable the extension you can set the `perMessageDeflate` option to `false`.
On the server:
```js
const WebSocket = require('ws');
const wss = new WebSocket.Server({
perMessageDeflate: false,
port: 8080
});
```
On the client:
...
(list, totalLength) => { const target = Buffer.allocUnsafe(totalLength); var offset = 0; for (var i = 0; i < list.length; i++) { const buf = list[i]; buf.copy(target, offset); offset += buf.length; } return target; }
...
* @public
*/
const format = (value) => {
return Object.keys(value).map((token) => {
var paramsList = value[token];
if (!Array.isArray(paramsList)) paramsList = [paramsList];
return paramsList.map((params) => {
return [token].concat(Object.keys(params).map((k) => {
var p = params[k];
if (!Array.isArray(p)) p = [p];
return p.map((v) => v === true ? k : `${k}=${v}`).join('; ');
})).join('; ');
}).join(', ');
}).join(', ');
};
...
(source, mask, output, offset, length) => { for (var i = 0; i < length; i++) { output[offset + i] = source[i] ^ mask[i & 3]; } }
...
target[1] = payloadLength | 0x80;
target[offset - 4] = mask[0];
target[offset - 3] = mask[1];
target[offset - 2] = mask[2];
target[offset - 1] = mask[3];
if (merge) {
bufferUtil.mask(data, mask, target, offset, data.length);
return [target];
}
bufferUtil.mask(data, mask, data, 0, data.length);
return [target, data];
}
...
(buffer, mask) => { // Required until https://github.com/nodejs/node/issues/9006 is resolved. const length = buffer.length; for (var i = 0; i < length; i++) { buffer[i] ^= mask[i & 3]; } }
...
getData () {
var data = constants.EMPTY_BUFFER;
if (this.payloadLength) {
if (!this.hasBufferedBytes(this.payloadLength)) return;
data = this.readBuffer(this.payloadLength);
if (this.masked) bufferUtil.unmask(data, this.mask);
}
if (this.opcode > 0x07) {
this.controlMessage(data);
} else if (this.compressed) {
this.state = INFLATING;
this.decompress(data);
...
() => {}
n/a
isValidErrorCode = function (code) { return (code >= 1000 && code <= 1013 && code !== 1004 && code !== 1005 && code !== 1006) || (code >= 3000 && code <= 4999); }
...
this.loop = false;
this.cleanup(this.cleanupCallback);
} else if (data.length === 1) {
this.error(new Error('invalid payload length'), 1002);
} else {
const code = data.readUInt16BE(0, true);
if (!ErrorCodes.isValidErrorCode(code)) {
this.error(new Error(`invalid status code: ${code}`), 1002);
return;
}
const buf = data.slice(2);
if (!isValidUTF8(buf)) {
...
addEventListener(method, listener) { if (typeof listener !== 'function') return; function onMessage (data, flags) { listener.call(this, new MessageEvent(data, !!flags.binary, this)); } function onClose (code, message) { listener.call(this, new CloseEvent(code, message, this)); } function onError (event) { event.type = 'error'; event.target = this; listener.call(this, event); } function onOpen () { listener.call(this, new OpenEvent(this)); } if (method === 'message') { onMessage._listener = listener; this.on(method, onMessage); } else if (method === 'close') { onClose._listener = listener; this.on(method, onClose); } else if (method === 'error') { onError._listener = listener; this.on(method, onError); } else if (method === 'open') { onOpen._listener = listener; this.on(method, onOpen); } else { this.on(method, listener); } }
n/a
removeEventListener(method, listener) { const listeners = this.listeners(method); for (var i = 0; i < listeners.length; i++) { if (listeners[i] === listener || listeners[i]._listener === listener) { this.removeListener(method, listeners[i]); } } }
n/a
(value) => { return Object.keys(value).map((token) => { var paramsList = value[token]; if (!Array.isArray(paramsList)) paramsList = [paramsList]; return paramsList.map((params) => { return [token].concat(Object.keys(params).map((k) => { var p = params[k]; if (!Array.isArray(p)) p = [p]; return p.map((v) => v === true ? k : `${k}=${v}`).join('; '); })).join('; '); }).join(', '); }).join(', '); }
...
if (props.length) {
const serverExtensions = props.reduce((obj, key) => {
obj[key] = [extensions[key].params];
return obj;
}, {});
headers.push(`Sec-WebSocket-Extensions: ${Extensions.format(serverExtensions)}`);
}
//
// Allow external modification/inspection of handshake headers.
//
this.emit('headers', headers);
...
(value) => { value = value || ''; const extensions = {}; value.split(',').forEach((v) => { const params = v.split(';'); const token = params.shift().trim(); const paramsList = extensions[token] = extensions[token] || []; const parsedParams = {}; params.forEach((param) => { const parts = param.trim().split('='); const key = parts[0]; var value = parts[1]; if (value === undefined) { value = true; } else { // unquote value if (value[0] === '"') { value = value.slice(1); } if (value[value.length - 1] === '"') { value = value.slice(0, value.length - 1); } } (parsedParams[key] = parsedParams[key] || []).push(value); }); paramsList.push(parsedParams); }); return extensions; }
...
res.send({ msg: "hello" });
});
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
wss.on('connection', function connection(ws) {
const location = url.parse(ws.upgradeReq.url, true);
// You might use location.query.access_token to authenticate or share sessions
// or ws.upgradeReq.headers.cookie (see http://stackoverflow.com/a/16395220/151312)
ws.on('message', function incoming(message) {
console.log('received: %s', message);
});
...