function ACL(permission, id) { if (typeof permission !== 'number' || permission < 1 || permission > 31) { throw new Error('permission must be a valid integer.'); } if (!(id instanceof Id)) { throw new Error('id must be an instance of Id class.'); } this.permission = permission; this.id = id; }
...
**Example**
```javascript
zookeeper.setACL(
'/test/demo',
[
new zookeeper.ACL(
zookeeeper.Permission.ADMIN,
new zookeeper.Id('ip', '127.0.0.1')
)
],
function (error, acls, stat) {
if (error) {
console.log(error.stack);
...
function ConnectionManager(connectionString, options, stateListener) { events.EventEmitter.call(this); this.watcherManager = new WatcherManager(); this.connectionStringParser = new ConnectionStringParser(connectionString); this.servers = this.connectionStringParser.getServers(); this.chrootPath = this.connectionStringParser.getChrootPath(); this.nextServerIndex = 0; this.serverAttempts = 0; this.state = STATES.DISCONNECTED; this.options = options; this.spinDelay = options.spinDelay; this.updateTimeout(options.sessionTimeout); this.connectTimeoutHandler = null; this.xid = 0; this.sessionId = new Buffer(8); if (Buffer.isBuffer(options.sessionId)) { options.sessionId.copy(this.sessionId); } else { this.sessionId.fill(0); } this.sessionPassword = new Buffer(16); if (Buffer.isBuffer(options.sessionPassword)) { options.sessionPassword.copy(this.sessionPassword); } else { this.sessionPassword.fill(0); } // scheme:auth pairs this.credentials = []; // Last seen zxid. this.zxid = new Buffer(8); this.zxid.fill(0); this.pendingBuffer = null; this.packetQueue = new PacketQueue(); this.packetQueue.on('readable', this.onPacketQueueReadable.bind(this)); this.pendingQueue = []; this.on('state', stateListener); }
n/a
function ConnectionStringParser(connectionString) { assert( connectionString && typeof connectionString === 'string', 'connectionString must be a non-empty string.' ); this.connectionString = connectionString; // Handle chroot var index = connectionString.indexOf('/'), hostList = [], servers = []; if (index !== -1 && index !== (connectionString.length - 1)) { this.chrootPath = connectionString.substring(index); Path.validate(this.chrootPath); } else { this.chrootPath = undefined; } if (index !== -1) { hostList = connectionString.substring(0, index).split(','); } else { hostList = connectionString.split(','); } hostList.filter(function (item) { // Filter out empty string. return item; }).forEach(function (item) { var parts = item.split(':'); servers.push({ host : parts[0], port : parts[1] || DEFAULT_PORT }); }); assert( servers.length > 0, 'connectionString must contain at least one server.' ); // Randomize the list. this.servers = u.shuffle(servers); }
n/a
function Event(type, name, path) { validateType(type); assert( name && typeof name === 'string', 'name must be a non-empty string.' ); this.type = type; this.name = name; this.path = path; }
n/a
function Exception(code, name, path, ctor) { if (!ctor) { ctor = path; path = undefined; } validateCode(code); assert( name && typeof name === 'string', 'name must be a non-empty string.' ); assert(typeof ctor === 'function', 'ctor must be a function.'); Error.captureStackTrace(this, ctor || Exception); this.code = code; this.name = name; this.path = path; this.message = 'Exception: ' + name + '[' + code + ']'; if (path) { this.message += '@' + path; } }
n/a
function Error() { [native code] }
n/a
function Id(scheme, id) { if (!scheme || typeof scheme !== 'string') { throw new Error('scheme must be a non-empty string.'); } if (typeof id !== 'string') { throw new Error('id must be a string.'); } this.scheme = scheme; this.id = id; }
...
```javascript
zookeeper.setACL(
'/test/demo',
[
new zookeeper.ACL(
zookeeeper.Permission.ADMIN,
new zookeeper.Id('ip', '127.0.0.1')
)
],
function (error, acls, stat) {
if (error) {
console.log(error.stack);
return;
}
...
function PacketQueue() { events.EventEmitter.call(this); this.queue = []; }
n/a
function Transaction(connectionManager) { if (!(this instanceof Transaction)) { return new Transaction(connectionManager); } assert( connectionManager instanceof ConnectionManager, 'connectionManager must be an instance of ConnectionManager.' ); this.ops = []; this.connectionManager = connectionManager; }
n/a
function WatcherManager() { this.dataWatchers = {}; this.childWatchers = {}; this.existenceWatchers = {}; }
n/a
function createClient(connectionString, options) { return new Client(connectionString, options); }
...
## Example
1\. Create a node using given path:
```javascript
var zookeeper = require('node-zookeeper-client');
var client = zookeeper.createClient('localhost:2181');
var path = process.argv[2];
client.once('connected', function () {
console.log('Connected to the server.');
client.create(path, function (error) {
if (error) {
...
function ACL(permission, id) { if (typeof permission !== 'number' || permission < 1 || permission > 31) { throw new Error('permission must be a valid integer.'); } if (!(id instanceof Id)) { throw new Error('id must be an instance of Id class.'); } this.permission = permission; this.id = id; }
...
**Example**
```javascript
zookeeper.setACL(
'/test/demo',
[
new zookeeper.ACL(
zookeeeper.Permission.ADMIN,
new zookeeper.Id('ip', '127.0.0.1')
)
],
function (error, acls, stat) {
if (error) {
console.log(error.stack);
...
function fromRecord(record) { if (!(record instanceof jute.data.ACL)) { throw new Error('record must be an instace of jute.data.ACL.'); } return new ACL(record.perms, Id.fromRecord(record.id)); }
...
return;
}
var acls;
if (Array.isArray(response.payload.acl)) {
acls = response.payload.acl.map(function (item) {
return ACL.fromRecord(item);
});
}
next(null, acls, response.payload.stat);
});
},
callback
...
toRecord = function () { return new jute.data.ACL( this.permission, this.id.toRecord() ); }
...
header = new jute.protocol.RequestHeader();
header.type = jute.OP_CODES.CREATE;
payload = new jute.protocol.CreateRequest();
payload.path = path;
payload.acl = acls.map(function (item) {
return item.toRecord();
});
payload.flags = mode;
if (Buffer.isBuffer(data)) {
payload.data = new Buffer(data.length);
data.copy(payload.data);
}
...
function ConnectionManager(connectionString, options, stateListener) { events.EventEmitter.call(this); this.watcherManager = new WatcherManager(); this.connectionStringParser = new ConnectionStringParser(connectionString); this.servers = this.connectionStringParser.getServers(); this.chrootPath = this.connectionStringParser.getChrootPath(); this.nextServerIndex = 0; this.serverAttempts = 0; this.state = STATES.DISCONNECTED; this.options = options; this.spinDelay = options.spinDelay; this.updateTimeout(options.sessionTimeout); this.connectTimeoutHandler = null; this.xid = 0; this.sessionId = new Buffer(8); if (Buffer.isBuffer(options.sessionId)) { options.sessionId.copy(this.sessionId); } else { this.sessionId.fill(0); } this.sessionPassword = new Buffer(16); if (Buffer.isBuffer(options.sessionPassword)) { options.sessionPassword.copy(this.sessionPassword); } else { this.sessionPassword.fill(0); } // scheme:auth pairs this.credentials = []; // Last seen zxid. this.zxid = new Buffer(8); this.zxid.fill(0); this.pendingBuffer = null; this.packetQueue = new PacketQueue(); this.packetQueue.on('readable', this.onPacketQueueReadable.bind(this)); this.pendingQueue = []; this.on('state', stateListener); }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
addAuthInfo = function (scheme, auth) { if (!scheme || typeof scheme !== 'string') { throw new Error('scheme must be a non-empty string.'); } if (!Buffer.isBuffer(auth)) { throw new Error('auth must be a valid instance of Buffer'); } var header, payload, request; this.credentials.push({ scheme : scheme, auth : auth }); switch (this.state) { case STATES.CONNECTED: case STATES.CONNECTED_READ_ONLY: // Only queue the auth request when connected. header = new jute.protocol.RequestHeader(); payload = new jute.protocol.AuthPacket(); header.xid = jute.XID_AUTHENTICATION; header.type = jute.OP_CODES.AUTH; payload.type = 0; payload.scheme = scheme; payload.auth = auth; this.queue(new jute.Request(header, payload)); break; case STATES.DISCONNECTED: case STATES.CONNECTING: case STATES.CLOSING: case STATES.CLOSED: case STATES.SESSION_EXPIRED: case STATES.AUTHENTICATION_FAILED: // Skip when we are not in a live state. return; default: throw new Error('Unknown state: ' + this.state); } }
...
* scheme `String` - The authentication scheme.
* auth `Buffer` - The authentication data buffer.
**Example**
```javascript
zookeeper.addAuthInfo('ip', new Buffer('127.0.0.1'));
```
---
#### State getState()
Return the current client [state](#state).
...
cleanupPendingQueue = function (errorCode) { var pendingPacket = this.pendingQueue.shift(); while (pendingPacket) { if (pendingPacket.callback) { pendingPacket.callback(Exception.create(errorCode)); } pendingPacket = this.pendingQueue.shift(); } }
...
retry = false;
break;
default:
errorCode = Exception.CONNECTION_LOSS;
retry = true;
}
this.cleanupPendingQueue(errorCode);
this.setState(STATES.DISCONNECTED);
if (retry) {
this.connect();
} else {
this.setState(STATES.CLOSED);
}
...
close = function () { var self = this, header = new jute.protocol.RequestHeader(), request; self.setState(STATES.CLOSING); header.type = jute.OP_CODES.CLOSE_SESSION; request = new jute.Request(header, null); self.queue(request); }
...
client.create(path, function (error) {
if (error) {
console.log('Failed to create node: %s due to: %s.', path, error);
} else {
console.log('Node: %s is successfully created.', path);
}
client.close();
});
});
client.connect();
```
2\. List and watch the children of given node:
...
connect = function () { var self = this; self.setState(STATES.CONNECTING); self.findNextServer(function (server) { self.socket = net.connect(server); self.connectTimeoutHandler = setTimeout( self.onSocketConnectTimeout.bind(self), self.connectTimeout ); // Disable the Nagle algorithm. self.socket.setNoDelay(); self.socket.on('connect', self.onSocketConnected.bind(self)); self.socket.on('data', self.onSocketData.bind(self)); self.socket.on('drain', self.onSocketDrain.bind(self)); self.socket.on('close', self.onSocketClosed.bind(self)); self.socket.on('error', self.onSocketError.bind(self)); }); }
...
console.log('Node: %s is successfully created.', path);
}
client.close();
});
});
client.connect();
```
2\. List and watch the children of given node:
```javascript
var zookeeper = require('node-zookeeper-client');
...
findNextServer = function (callback) { var self = this; self.nextServerIndex %= self.servers.length; if (self.serverAttempts === self.servers.length) { setTimeout(function () { callback(self.servers[self.nextServerIndex]); self.nextServerIndex += 1; // reset attempts since we already waited for enough time. self.serverAttempts = 0; }, Math.random() * self.spinDelay); } else { self.serverAttempts += 1; process.nextTick(function () { callback(self.servers[self.nextServerIndex]); self.nextServerIndex += 1; }); } }
...
};
ConnectionManager.prototype.connect = function () {
var self = this;
self.setState(STATES.CONNECTING);
self.findNextServer(function (server) {
self.socket = net.connect(server);
self.connectTimeoutHandler = setTimeout(
self.onSocketConnectTimeout.bind(self),
self.connectTimeout
);
...
getSessionId = function () { var result = new Buffer(8); this.sessionId.copy(result); return result; }
...
[`Buffer`](http://nodejs.org/api/buffer.html) since Javascript does not support
long integer natively.
**Example**
```javascript
var client = zookeeper.createClient({...});
var id = client.getSessionId();
console.log('Session id is: %s', id.toString('hex'));
```
---
#### Buffer getSessionPassword()
...
getSessionPassword = function () { var result = new Buffer(16); this.sessionPassword.copy(result); return result; }
...
The value returned is an instance of
[`Buffer`](http://nodejs.org/api/buffer.html).
**Example**
```javascript
var client = zookeeper.createClient({...});
var pwd = client.getSessionPassword();
```
---
#### Number getSessionTimeout()
Returns the *negotiated* session timeout (in milliseconds) for this client
...
getSessionTimeout = function () { return this.sessionTimeout; }
...
instance. The value returned is not valid until the client connects to a server
and may change after a re-connect.
**Example**
```javascript
var client = zookeeper.createClient({...});
var sessionTimeout = client.getSessionTimeout();
```
---
### State
After the `connect()` method is invoked, the ZooKeeper client starts its
...
onPacketQueueReadable = function () { var packet, header; switch (this.state) { case STATES.CONNECTED: case STATES.CONNECTED_READ_ONLY: case STATES.CLOSING: // Continue break; case STATES.DISCONNECTED: case STATES.CONNECTING: case STATES.CLOSED: case STATES.SESSION_EXPIRED: case STATES.AUTHENTICATION_FAILED: // Skip since we can not send traffic out return; default: throw new Error('Unknown state: ' + this.state); } while ((packet = this.packetQueue.shift()) !== undefined) { header = packet.request.header; if (header !== null && header.type !== jute.OP_CODES.PING && header.type !== jute.OP_CODES.AUTH) { header.xid = this.xid; this.xid += 1; // Only put requests that are not connect, ping and auth into // the pending queue. this.pendingQueue.push(packet); } if (!this.socket.write(packet.request.toBuffer())) { // Back pressure is handled here, when the socket emit // drain event, this method will be invoked again. break; } if (header.type === jute.OP_CODES.CLOSE_SESSION) { // The close session should be the final packet sent to the // server. break; } } }
...
self.sessionId = connectResponse.sessionId;
self.sessionPassword = connectResponse.passwd;
self.updateTimeout(connectResponse.timeOut);
self.setState(STATES.CONNECTED);
// Check if we have anything to send out just in case.
self.onPacketQueueReadable();
self.socket.setTimeout(
self.pingTimeout,
self.onSocketTimeout.bind(self)
);
}
...
onSocketClosed = function (hasError) { var retry = false, errorCode, pendingPacket; switch (this.state) { case STATES.CLOSING: errorCode = Exception.CONNECTION_LOSS; retry = false; break; case STATES.SESSION_EXPIRED: errorCode = Exception.SESSION_EXPIRED; retry = false; break; case STATES.AUTHENTICATION_FAILED: errorCode = Exception.AUTH_FAILED; retry = false; break; default: errorCode = Exception.CONNECTION_LOSS; retry = true; } this.cleanupPendingQueue(errorCode); this.setState(STATES.DISCONNECTED); if (retry) { this.connect(); } else { this.setState(STATES.CLOSED); } }
n/a
onSocketConnectTimeout = function () { // Destroy the current socket so the socket closed event // will be trigger. this.socket.destroy(); }
n/a
onSocketConnected = function () { var connectRequest, authRequest, setWatchesRequest, header, payload; if (this.connectTimeoutHandler) { clearTimeout(this.connectTimeoutHandler); } connectRequest = new jute.Request(null, new jute.protocol.ConnectRequest( jute.PROTOCOL_VERSION, this.zxid, this.sessionTimeout, this.sessionId, this.sessionPassword )); // XXX No read only support yet. this.socket.write(connectRequest.toBuffer()); // Set auth info if (this.credentials.length > 0) { this.credentials.forEach(function (credential) { header = new jute.protocol.RequestHeader(); payload = new jute.protocol.AuthPacket(); header.xid = jute.XID_AUTHENTICATION; header.type = jute.OP_CODES.AUTH; payload.type = 0; payload.scheme = credential.scheme; payload.auth = credential.auth; authRequest = new jute.Request(header, payload); this.queue(authRequest); }, this); } // Reset the watchers if we have any. if (!this.watcherManager.isEmpty()) { header = new jute.protocol.RequestHeader(); payload = new jute.protocol.SetWatches(); header.type = jute.OP_CODES.SET_WATCHES; header.xid = jute.XID_SET_WATCHES; payload.setChrootPath(this.chrootPath); payload.relativeZxid = this.zxid; payload.dataWatches = this.watcherManager.getDataWatcherPaths(); payload.existWatches = this.watcherManager.getExistenceWatcherPaths(); payload.childWatches = this.watcherManager.getChildWatcherPaths(); setWatchesRequest = new jute.Request(header, payload); this.queue(setWatchesRequest); } }
n/a
onSocketData = function (buffer) { var self = this, offset = 0, size = 0, connectResponse, pendingPacket, responseHeader, responsePayload, response, event; // Combine the pending buffer with the new buffer. if (self.pendingBuffer) { buffer = Buffer.concat( [self.pendingBuffer, buffer], self.pendingBuffer.length + buffer.length ); } // We need at least 4 bytes if (buffer.length < 4) { self.pendingBuffer = buffer; return; } size = buffer.readInt32BE(offset); offset += 4; if (buffer.length < size + 4) { // More data are coming. self.pendingBuffer = buffer; return; } if (buffer.length === size + 4) { // The size is perfect. self.pendingBuffer = null; } else { // We have extra bytes, splice them out as pending buffer. self.pendingBuffer = buffer.slice(size + 4); buffer = buffer.slice(0, size + 4); } if (self.state === STATES.CONNECTING) { // Handle connect response. connectResponse = new jute.protocol.ConnectResponse(); offset += connectResponse.deserialize(buffer, offset); if (connectResponse.timeOut <= 0) { self.setState(STATES.SESSION_EXPIRED); } else { // Reset the server connection attempts since we connected now. self.serverAttempts = 0; self.sessionId = connectResponse.sessionId; self.sessionPassword = connectResponse.passwd; self.updateTimeout(connectResponse.timeOut); self.setState(STATES.CONNECTED); // Check if we have anything to send out just in case. self.onPacketQueueReadable(); self.socket.setTimeout( self.pingTimeout, self.onSocketTimeout.bind(self) ); } } else { // Handle all other repsonses. responseHeader = new jute.protocol.ReplyHeader(); offset += responseHeader.deserialize(buffer, offset); //TODO BETTTER LOGGING switch (responseHeader.xid) { case jute.XID_PING: break; case jute.XID_AUTHENTICATION: if (responseHeader.err === Exception.AUTH_FAILED) { self.setState(STATES.AUTHENTICATION_FAILED); } break; case jute.XID_NOTIFICATION: event = new jute.protocol.WatcherEvent(); if (self.chrootPath) { event.setChrootPath(self.chrootPath); } offset += event.deserialize(buffer, offset); self.watcherManager.emit(event); break; default: pendingPacket = self.pendingQueue.shift(); if (!pendingPacket) { // TODO, better error handling and logging need to be done. // Need to clean up and do a reconnect. // throw new Error( // 'Nothing in pending queue but got data from server.' // ); self.socket.destroy(); // this will trigger reconnect return; } if (pendingPacket.request.header.xid !== responseHeader.xid) { // TODO, better error handling/logging need to bee done here. // Need to clean up and do a reconnect. //throw new Error( //'Xid out of order. Got xid: ' + //responseHeader.xid + ' with error code: ' + //responseHeader.err + ', expected xid: ' + //pendingPacket.request.header.xid + '.' //); self.socket.destroy(); // this will trigger reconnect return; } if (responseHeader.zxid) { // TODO, In Java implementation, the condition is to // check whether the long zxid is greater than 0, here // use buffer so we simplify. ...
...
}
}
}
}
// We have more data to process, need to recursively process it.
if (self.pendingBuffer) {
self.onSocketData(new Buffer(0));
}
};
ConnectionManager.prototype.onSocketDrain = function () {
// Trigger write on socket.
this.onPacketQueueReadable();
};
...
onSocketDrain = function () { // Trigger write on socket. this.onPacketQueueReadable(); }
n/a
onSocketError = function (error) { if (this.connectTimeoutHandler) { clearTimeout(this.connectTimeoutHandler); } // After socket error, the socket closed event will be triggered, // we will retry connect in that listener function. }
n/a
onSocketTimeout = function () { var header, request; if (this.socket && (this.state === STATES.CONNECTED || this.state === STATES.CONNECTED_READ_ONLY)) { header = new jute.protocol.RequestHeader( jute.XID_PING, jute.OP_CODES.PING ); request = new jute.Request(header, null); this.queue(request); // Re-register the timeout handler since it only fired once. this.socket.setTimeout( this.pingTimeout, this.onSocketTimeout.bind(this) ); } }
n/a
queue = function (request, callback) { if (typeof request !== 'object') { throw new Error('request must be a valid instance of jute.Request.'); } if (this.chrootPath && request.payload) { request.payload.setChrootPath(this.chrootPath); } callback = callback || function () {}; switch (this.state) { case STATES.DISCONNECTED: case STATES.CONNECTING: case STATES.CONNECTED: case STATES.CONNECTED_READ_ONLY: // queue the packet this.packetQueue.push({ request : request, callback : callback }); break; case STATES.CLOSING: if (request.header && request.header.type === jute.OP_CODES.CLOSE_SESSION) { this.packetQueue.push({ request : request, callback : callback }); } else { callback(Exception.create(Exception.CONNECTION_LOSS)); } break; case STATES.CLOSED: callback(Exception.create(Exception.CONNECTION_LOSS)); return; case STATES.SESSION_EXPIRED: callback(Exception.create(Exception.SESSION_EXPIRED)); return; case STATES.AUTHENTICATION_FAILED: callback(Exception.create(Exception.AUTH_FAILED)); return; default: throw new Error('Unknown state: ' + this.state); } }
...
}
request = new jute.Request(header, payload);
attempt(
self,
function (attempts, next) {
self.connectionManager.queue(request, function (error, response) {
if (error) {
next(error);
return;
}
next(null, response.payload.path);
});
...
registerChildWatcher = function (path, watcher) { this.watcherManager.registerChildWatcher(path, watcher); }
...
self.connectionManager.queue(request, function (error, response) {
if (error) {
next(error);
return;
}
if (watcher) {
self.connectionManager.registerChildWatcher(path, watcher);
}
next(null, response.payload.children, response.payload.stat);
});
},
callback
);
...
registerDataWatcher = function (path, watcher) { this.watcherManager.registerDataWatcher(path, watcher); }
...
self.connectionManager.queue(request, function (error, response) {
if (error) {
next(error);
return;
}
if (watcher) {
self.connectionManager.registerDataWatcher(path, watcher);
}
next(null, response.payload.data, response.payload.stat);
});
},
callback
);
...
registerExistenceWatcher = function (path, watcher) { this.watcherManager.registerExistenceWatcher(path, watcher); }
...
if (watcher) {
if (existence) {
self.connectionManager.registerDataWatcher(
path,
watcher
);
} else {
self.connectionManager.registerExistenceWatcher(
path,
watcher
);
}
}
next(
...
setState = function (state) { if (typeof state !== 'number') { throw new Error('state must be a valid number.'); } if (this.state !== state) { this.state = state; this.emit('state', this.state); } }
...
ConnectionManager.prototype.getSessionTimeout = function () {
return this.sessionTimeout;
};
ConnectionManager.prototype.connect = function () {
var self = this;
self.setState(STATES.CONNECTING);
self.findNextServer(function (server) {
self.socket = net.connect(server);
self.connectTimeoutHandler = setTimeout(
self.onSocketConnectTimeout.bind(self),
self.connectTimeout
...
updateTimeout = function (sessionTimeout) { this.sessionTimeout = sessionTimeout; // Designed to have time to try all the servers. this.connectTimeout = Math.floor(sessionTimeout / this.servers.length); // We at least send out one ping one third of the session timeout, so // the read timeout is two third of the session timeout. this.pingTimeout = Math.floor(this.sessionTimeout / 3); //this.readTimeout = Math.floor(sessionTimeout * 2 / 3); }
...
this.serverAttempts = 0;
this.state = STATES.DISCONNECTED;
this.options = options;
this.spinDelay = options.spinDelay;
this.updateTimeout(options.sessionTimeout);
this.connectTimeoutHandler = null;
this.xid = 0;
this.sessionId = new Buffer(8);
if (Buffer.isBuffer(options.sessionId)) {
options.sessionId.copy(this.sessionId);
...
function ConnectionStringParser(connectionString) { assert( connectionString && typeof connectionString === 'string', 'connectionString must be a non-empty string.' ); this.connectionString = connectionString; // Handle chroot var index = connectionString.indexOf('/'), hostList = [], servers = []; if (index !== -1 && index !== (connectionString.length - 1)) { this.chrootPath = connectionString.substring(index); Path.validate(this.chrootPath); } else { this.chrootPath = undefined; } if (index !== -1) { hostList = connectionString.substring(0, index).split(','); } else { hostList = connectionString.split(','); } hostList.filter(function (item) { // Filter out empty string. return item; }).forEach(function (item) { var parts = item.split(':'); servers.push({ host : parts[0], port : parts[1] || DEFAULT_PORT }); }); assert( servers.length > 0, 'connectionString must contain at least one server.' ); // Randomize the list. this.servers = u.shuffle(servers); }
n/a
getChrootPath = function () { return this.chrootPath; }
...
function ConnectionManager(connectionString, options, stateListener) {
events.EventEmitter.call(this);
this.watcherManager = new WatcherManager();
this.connectionStringParser = new ConnectionStringParser(connectionString);
this.servers = this.connectionStringParser.getServers();
this.chrootPath = this.connectionStringParser.getChrootPath();
this.nextServerIndex = 0;
this.serverAttempts = 0;
this.state = STATES.DISCONNECTED;
this.options = options;
this.spinDelay = options.spinDelay;
...
getConnectionString = function () { return this.connectionString; }
n/a
getServers = function () { return this.servers.slice(0); }
...
*/
function ConnectionManager(connectionString, options, stateListener) {
events.EventEmitter.call(this);
this.watcherManager = new WatcherManager();
this.connectionStringParser = new ConnectionStringParser(connectionString);
this.servers = this.connectionStringParser.getServers();
this.chrootPath = this.connectionStringParser.getChrootPath();
this.nextServerIndex = 0;
this.serverAttempts = 0;
this.state = STATES.DISCONNECTED;
this.options = options;
...
function Event(type, name, path) { validateType(type); assert( name && typeof name === 'string', 'name must be a non-empty string.' ); this.type = type; this.name = name; this.path = path; }
n/a
function create(watcherEvent) { assert(watcherEvent, 'watcherEvent must be a valid object.'); validateType(watcherEvent.type); var name, i = 0, keys = Object.keys(TYPES); while (i < keys.length) { if (TYPES[keys[i]] === watcherEvent.type) { name = keys[i]; break; } i += 1; } return new Event(watcherEvent.type, name, watcherEvent.path); }
...
var client = zookeeper.createClient('localhost:2181');
var path = process.argv[2];
client.once('connected', function () {
console.log('Connected to the server.');
client.create(path, function (error) {
if (error) {
console.log('Failed to create node: %s due to: %s.', path, error);
} else {
console.log('Node: %s is successfully created.', path);
}
client.close();
...
getName = function () { return this.name; }
n/a
getPath = function () { return this.path; }
n/a
getType = function () { return this.type; }
n/a
toString = function () { var result = this.name + '[' + this.type + ']'; if (this.path) { result += '@' + this.path; } return result; }
...
},
function (error, data, stat) {
if (error) {
console.log(error.stack);
return;
}
console.log('Got data: %s', data.toString('utf8'));
}
);
```
---
#### void setData(path, data, [version], callback)
...
function Exception(code, name, path, ctor) { if (!ctor) { ctor = path; path = undefined; } validateCode(code); assert( name && typeof name === 'string', 'name must be a non-empty string.' ); assert(typeof ctor === 'function', 'ctor must be a function.'); Error.captureStackTrace(this, ctor || Exception); this.code = code; this.name = name; this.path = path; this.message = 'Exception: ' + name + '[' + code + ']'; if (path) { this.message += '@' + path; } }
n/a
function create(code, path) { validateCode(code); var name, i = 0, keys = Object.keys(CODES); while (i < keys.length) { if (CODES[keys[i]] === code) { name = keys[i]; break; } i += 1; } return new Exception(code, name, path, create); }
...
var client = zookeeper.createClient('localhost:2181');
var path = process.argv[2];
client.once('connected', function () {
console.log('Connected to the server.');
client.create(path, function (error) {
if (error) {
console.log('Failed to create node: %s due to: %s.', path, error);
} else {
console.log('Node: %s is successfully created.', path);
}
client.close();
...
function Error() { [native code] }
n/a
getCode = function () { return this.code; }
...
* `Exception.AUTH_FAILED`
**Example**
```javascript
zookeeper.create('/test/demo', function (error, path) {
if (error) {
if (error.getCode() == zookeeper.Exception.NODE_EXISTS) {
console.log('Node exists.');
} else {
console.log(error.stack);
}
return;
}
...
getName = function () { return this.name; }
n/a
getPath = function () { return this.path; }
n/a
toString = function () { return this.message; }
...
},
function (error, data, stat) {
if (error) {
console.log(error.stack);
return;
}
console.log('Got data: %s', data.toString('utf8'));
}
);
```
---
#### void setData(path, data, [version], callback)
...
function Error() { [native code] }
n/a
function captureStackTrace() { [native code] }
n/a
function Id(scheme, id) { if (!scheme || typeof scheme !== 'string') { throw new Error('scheme must be a non-empty string.'); } if (typeof id !== 'string') { throw new Error('id must be a string.'); } this.scheme = scheme; this.id = id; }
...
```javascript
zookeeper.setACL(
'/test/demo',
[
new zookeeper.ACL(
zookeeeper.Permission.ADMIN,
new zookeeper.Id('ip', '127.0.0.1')
)
],
function (error, acls, stat) {
if (error) {
console.log(error.stack);
return;
}
...
function fromRecord(record) { if (!(record instanceof jute.data.Id)) { throw new Error('record must be an instace of jute.data.Id.'); } return new Id(record.scheme, record.id); }
...
return;
}
var acls;
if (Array.isArray(response.payload.acl)) {
acls = response.payload.acl.map(function (item) {
return ACL.fromRecord(item);
});
}
next(null, acls, response.payload.stat);
});
},
callback
...
toRecord = function () { return new jute.data.Id( this.scheme, this.id ); }
...
header = new jute.protocol.RequestHeader();
header.type = jute.OP_CODES.CREATE;
payload = new jute.protocol.CreateRequest();
payload.path = path;
payload.acl = acls.map(function (item) {
return item.toRecord();
});
payload.flags = mode;
if (Buffer.isBuffer(data)) {
payload.data = new Buffer(data.length);
data.copy(payload.data);
}
...
function PacketQueue() { events.EventEmitter.call(this); this.queue = []; }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
push = function (packet) { if (typeof packet !== 'object') { throw new Error('packet must be a valid object.'); } this.queue.push(packet); this.emit('readable'); }
...
});
},
function (error) {
var args = [],
result = results[count - 1];
if (callback) {
args.push(result.error);
Array.prototype.push.apply(args, result.args);
callback.apply(null, args);
}
}
);
}
...
shift = function () { return this.queue.shift(); }
...
};
ConnectionManager.prototype.registerExistenceWatcher = function (path, watcher) {
this.watcherManager.registerExistenceWatcher(path, watcher);
};
ConnectionManager.prototype.cleanupPendingQueue = function (errorCode) {
var pendingPacket = this.pendingQueue.shift();
while (pendingPacket) {
if (pendingPacket.callback) {
pendingPacket.callback(Exception.create(errorCode));
}
pendingPacket = this.pendingQueue.shift();
...
unshift = function (packet) { if (typeof packet !== 'object') { throw new Error('packet must be a valid object.'); } this.queue.unshift(packet); this.emit('readable'); }
...
PacketQueue.prototype.unshift = function (packet) {
if (typeof packet !== 'object') {
throw new Error('packet must be a valid object.');
}
this.queue.unshift(packet);
this.emit('readable');
};
PacketQueue.prototype.shift = function () {
return this.queue.shift();
};
...
function validate(path) { assert( path && typeof path === 'string', 'Node path must be a non-empty string.' ); assert(path[0] === '/', 'Node path must start with / character.'); // Shortcut, no need to check more since the path is the root. if (path.length === 1) { return; } assert( path[path.length - 1] !== '/', 'Node path must not end with / character.' ); assert( !/\/\//.test(path), 'Node path must not contain empty node name.' ); assert( !/\/\.(\.)?(\/|$)/.test(path), 'Node path must not contain relative path(s).' ); // TODO filter out special characters }
...
Client.prototype.create = function (path, data, acls, mode, callback) {
var self = this,
optionalArgs = [data, acls, mode, callback],
header,
payload,
request;
Path.validate(path);
// Reset arguments so we can reassign correct value to them.
data = acls = mode = callback = undefined;
optionalArgs.forEach(function (arg, index) {
if (Array.isArray(arg)) {
acls = arg;
} else if (typeof arg === 'number') {
...
function Transaction(connectionManager) { if (!(this instanceof Transaction)) { return new Transaction(connectionManager); } assert( connectionManager instanceof ConnectionManager, 'connectionManager must be an instance of ConnectionManager.' ); this.ops = []; this.connectionManager = connectionManager; }
n/a
check = function (path, version) { version = version || -1; Path.validate(path); assert(typeof version === 'number', 'version must be a number.'); this.ops.push({ type : jute.OP_CODES.CHECK, path : path, version : version }); return this; }
n/a
commit = function (callback) { assert(typeof callback === 'function', 'callback must be a function'); var self = this, header = new jute.protocol.RequestHeader(), payload = new jute.TransactionRequest(this.ops), request; header.type = jute.OP_CODES.MULTI; request = new jute.Request(header, payload); this.connectionManager.queue(request, function (error, response) { if (error) { callback(error); return; } var result, i; for (i = 0; i < response.payload.results.length; i += 1) { result = response.payload.results[i]; // Find if there is an op which caused the transaction to fail. if (result.type === jute.OP_CODES.ERROR && result.err !== Exception.OK) { error = Exception.create(result.err); break; } } callback(error, response.payload.results); }); }
n/a
create = function (path, data, acls, mode) { var optionalArgs = [data, acls, mode], self = this, currentPath = '', nodes; Path.validate(path); // Reset arguments so we can reassign correct value to them. data = acls = mode = undefined; optionalArgs.forEach(function (arg, index) { if (Array.isArray(arg)) { acls = arg; } else if (typeof arg === 'number') { mode = arg; } else if (Buffer.isBuffer(arg)) { data = arg; } }); acls = Array.isArray(acls) ? acls : ACL.OPEN_ACL_UNSAFE; mode = typeof mode === 'number' ? mode : CreateMode.PERSISTENT; assert( data === null || data === undefined || Buffer.isBuffer(data), 'data must be a valid buffer, null or undefined.' ); assert(acls.length > 0, 'acls must be a non-empty array.'); this.ops.push({ type : jute.OP_CODES.CREATE, path : path, data : data, acls : acls, mode : mode }); return this; }
...
var client = zookeeper.createClient('localhost:2181');
var path = process.argv[2];
client.once('connected', function () {
console.log('Connected to the server.');
client.create(path, function (error) {
if (error) {
console.log('Failed to create node: %s due to: %s.', path, error);
} else {
console.log('Node: %s is successfully created.', path);
}
client.close();
...
remove = function (path, version) { version = version || -1; Path.validate(path); assert(typeof version === 'number', 'version must be a number.'); this.ops.push({ type : jute.OP_CODES.DELETE, path : path, version : version }); return this; }
...
* path `String` - Path of the node.
* version `Number` - The version of the node, optional, defaults to -1.
* callback(error) `Function` - The callback function.
**Example**
```javascript
zookeeper.remove('/test/demo', -1, function (error) {
if (error) {
console.log(error.stack);
return;
}
console.log('Node is deleted.');
});
...
setData = function (path, data, version) { version = version || -1; Path.validate(path); assert( data === null || data === undefined || Buffer.isBuffer(data), 'data must be a valid buffer, null or undefined.' ); assert(typeof version === 'number', 'version must be a number.'); this.ops.push({ type : jute.OP_CODES.SET_DATA, path : path, data : data, version : version }); return this; }
...
* version `Number` - The version of the node, optional, defaults to -1.
* callback(error, stat) `Function` - The callback function. The `stat` is an
instance of [`Stat`](#stat).
**Example**
```javascript
zookeeper.setData('/test/demo', null, 2, function (error, stat) {
if (error) {
console.log(error.stack);
return;
}
console.log('Data is set.');
});
...
function WatcherManager() { this.dataWatchers = {}; this.childWatchers = {}; this.existenceWatchers = {}; }
n/a
emit = function (watcherEvent) { if (!watcherEvent) { throw new Error('watcherEvent must be a valid object.'); } var emitters = [], event; switch (watcherEvent.type) { case Event.NODE_DATA_CHANGED: case Event.NODE_CREATED: if (this.dataWatchers[watcherEvent.path]) { emitters.push(this.dataWatchers[watcherEvent.path]); delete this.dataWatchers[watcherEvent.path]; } if (this.existenceWatchers[watcherEvent.path]) { emitters.push(this.existenceWatchers[watcherEvent.path]); delete this.existenceWatchers[watcherEvent.path]; } break; case Event.NODE_CHILDREN_CHANGED: if (this.childWatchers[watcherEvent.path]) { emitters.push(this.childWatchers[watcherEvent.path]); delete this.childWatchers[watcherEvent.path]; } break; case Event.NODE_DELETED: if (this.dataWatchers[watcherEvent.path]) { emitters.push(this.dataWatchers[watcherEvent.path]); delete this.dataWatchers[watcherEvent.path]; } if (this.childWatchers[watcherEvent.path]) { emitters.push(this.childWatchers[watcherEvent.path]); delete this.childWatchers[watcherEvent.path]; } break; default: throw new Error('Unknown event type: ' + watcherEvent.type); } if (emitters.length < 1) { return; } event = Event.create(watcherEvent); emitters.forEach(function (emitter) { emitter.emit('notification', event); }); }
...
/**
* Default state listener to emit user-friendly events.
*/
function defaultStateListener(state) {
switch (state) {
case State.DISCONNECTED:
this.emit('disconnected');
break;
case State.SYNC_CONNECTED:
this.emit('connected');
break;
case State.CONNECTED_READ_ONLY:
this.emit('connectedReadOnly');
break;
...
getChildWatcherPaths = function () { return getWatcherPaths(this, 'child'); }
...
header.type = jute.OP_CODES.SET_WATCHES;
header.xid = jute.XID_SET_WATCHES;
payload.setChrootPath(this.chrootPath);
payload.relativeZxid = this.zxid;
payload.dataWatches = this.watcherManager.getDataWatcherPaths();
payload.existWatches = this.watcherManager.getExistenceWatcherPaths();
payload.childWatches = this.watcherManager.getChildWatcherPaths();
setWatchesRequest = new jute.Request(header, payload);
this.queue(setWatchesRequest);
}
};
ConnectionManager.prototype.onSocketTimeout = function () {
...
getDataWatcherPaths = function () { return getWatcherPaths(this, 'data'); }
...
payload = new jute.protocol.SetWatches();
header.type = jute.OP_CODES.SET_WATCHES;
header.xid = jute.XID_SET_WATCHES;
payload.setChrootPath(this.chrootPath);
payload.relativeZxid = this.zxid;
payload.dataWatches = this.watcherManager.getDataWatcherPaths();
payload.existWatches = this.watcherManager.getExistenceWatcherPaths();
payload.childWatches = this.watcherManager.getChildWatcherPaths();
setWatchesRequest = new jute.Request(header, payload);
this.queue(setWatchesRequest);
}
};
...
getExistenceWatcherPaths = function () { return getWatcherPaths(this, 'existence'); }
...
header.type = jute.OP_CODES.SET_WATCHES;
header.xid = jute.XID_SET_WATCHES;
payload.setChrootPath(this.chrootPath);
payload.relativeZxid = this.zxid;
payload.dataWatches = this.watcherManager.getDataWatcherPaths();
payload.existWatches = this.watcherManager.getExistenceWatcherPaths();
payload.childWatches = this.watcherManager.getChildWatcherPaths();
setWatchesRequest = new jute.Request(header, payload);
this.queue(setWatchesRequest);
}
};
...
isEmpty = function () { var empty = true, watchers, paths, i, j; watchers = [this.dataWatchers, this.existenceWatchers, this.childWatchers]; for (i = 0; i < watchers.length; i += 1) { paths = Object.keys(watchers[i]); for (j = 0; j < paths.length; j += 1) { if (watchers[i][paths[j]].listeners('notification').length > 0) { empty = false; break; } } } return empty; }
...
authRequest = new jute.Request(header, payload);
this.queue(authRequest);
}, this);
}
// Reset the watchers if we have any.
if (!this.watcherManager.isEmpty()) {
header = new jute.protocol.RequestHeader();
payload = new jute.protocol.SetWatches();
header.type = jute.OP_CODES.SET_WATCHES;
header.xid = jute.XID_SET_WATCHES;
payload.setChrootPath(this.chrootPath);
...
registerChildWatcher = function (path, watcher) { registerWatcher(this, 'child', path, watcher); }
...
self.connectionManager.queue(request, function (error, response) {
if (error) {
next(error);
return;
}
if (watcher) {
self.connectionManager.registerChildWatcher(path, watcher);
}
next(null, response.payload.children, response.payload.stat);
});
},
callback
);
...
registerDataWatcher = function (path, watcher) { registerWatcher(this, 'data', path, watcher); }
...
self.connectionManager.queue(request, function (error, response) {
if (error) {
next(error);
return;
}
if (watcher) {
self.connectionManager.registerDataWatcher(path, watcher);
}
next(null, response.payload.data, response.payload.stat);
});
},
callback
);
...
registerExistenceWatcher = function (path, watcher) { registerWatcher(this, 'existence', path, watcher); }
...
if (watcher) {
if (existence) {
self.connectionManager.registerDataWatcher(
path,
watcher
);
} else {
self.connectionManager.registerExistenceWatcher(
path,
watcher
);
}
}
next(
...