function ACL(permission, id) {
if (typeof permission !== 'number' || permission < 1 || permission > 31) {
throw new Error('permission must be a valid integer.');
}
if (!(id instanceof Id)) {
throw new Error('id must be an instance of Id class.');
}
this.permission = permission;
this.id = id;
}...
**Example**
```javascript
zookeeper.setACL(
'/test/demo',
[
new zookeeper.ACL(
zookeeeper.Permission.ADMIN,
new zookeeper.Id('ip', '127.0.0.1')
)
],
function (error, acls, stat) {
if (error) {
console.log(error.stack);
...function ConnectionManager(connectionString, options, stateListener) {
events.EventEmitter.call(this);
this.watcherManager = new WatcherManager();
this.connectionStringParser = new ConnectionStringParser(connectionString);
this.servers = this.connectionStringParser.getServers();
this.chrootPath = this.connectionStringParser.getChrootPath();
this.nextServerIndex = 0;
this.serverAttempts = 0;
this.state = STATES.DISCONNECTED;
this.options = options;
this.spinDelay = options.spinDelay;
this.updateTimeout(options.sessionTimeout);
this.connectTimeoutHandler = null;
this.xid = 0;
this.sessionId = new Buffer(8);
if (Buffer.isBuffer(options.sessionId)) {
options.sessionId.copy(this.sessionId);
} else {
this.sessionId.fill(0);
}
this.sessionPassword = new Buffer(16);
if (Buffer.isBuffer(options.sessionPassword)) {
options.sessionPassword.copy(this.sessionPassword);
} else {
this.sessionPassword.fill(0);
}
// scheme:auth pairs
this.credentials = [];
// Last seen zxid.
this.zxid = new Buffer(8);
this.zxid.fill(0);
this.pendingBuffer = null;
this.packetQueue = new PacketQueue();
this.packetQueue.on('readable', this.onPacketQueueReadable.bind(this));
this.pendingQueue = [];
this.on('state', stateListener);
}n/a
function ConnectionStringParser(connectionString) {
assert(
connectionString && typeof connectionString === 'string',
'connectionString must be a non-empty string.'
);
this.connectionString = connectionString;
// Handle chroot
var index = connectionString.indexOf('/'),
hostList = [],
servers = [];
if (index !== -1 && index !== (connectionString.length - 1)) {
this.chrootPath = connectionString.substring(index);
Path.validate(this.chrootPath);
} else {
this.chrootPath = undefined;
}
if (index !== -1) {
hostList = connectionString.substring(0, index).split(',');
} else {
hostList = connectionString.split(',');
}
hostList.filter(function (item) {
// Filter out empty string.
return item;
}).forEach(function (item) {
var parts = item.split(':');
servers.push({
host : parts[0],
port : parts[1] || DEFAULT_PORT
});
});
assert(
servers.length > 0,
'connectionString must contain at least one server.'
);
// Randomize the list.
this.servers = u.shuffle(servers);
}n/a
function Event(type, name, path) {
validateType(type);
assert(
name && typeof name === 'string',
'name must be a non-empty string.'
);
this.type = type;
this.name = name;
this.path = path;
}n/a
function Exception(code, name, path, ctor) {
if (!ctor) {
ctor = path;
path = undefined;
}
validateCode(code);
assert(
name && typeof name === 'string',
'name must be a non-empty string.'
);
assert(typeof ctor === 'function', 'ctor must be a function.');
Error.captureStackTrace(this, ctor || Exception);
this.code = code;
this.name = name;
this.path = path;
this.message = 'Exception: ' + name + '[' + code + ']';
if (path) {
this.message += '@' + path;
}
}n/a
function Error() { [native code] }n/a
function Id(scheme, id) {
if (!scheme || typeof scheme !== 'string') {
throw new Error('scheme must be a non-empty string.');
}
if (typeof id !== 'string') {
throw new Error('id must be a string.');
}
this.scheme = scheme;
this.id = id;
}...
```javascript
zookeeper.setACL(
'/test/demo',
[
new zookeeper.ACL(
zookeeeper.Permission.ADMIN,
new zookeeper.Id('ip', '127.0.0.1')
)
],
function (error, acls, stat) {
if (error) {
console.log(error.stack);
return;
}
...function PacketQueue() {
events.EventEmitter.call(this);
this.queue = [];
}n/a
function Transaction(connectionManager) {
if (!(this instanceof Transaction)) {
return new Transaction(connectionManager);
}
assert(
connectionManager instanceof ConnectionManager,
'connectionManager must be an instance of ConnectionManager.'
);
this.ops = [];
this.connectionManager = connectionManager;
}n/a
function WatcherManager() {
this.dataWatchers = {};
this.childWatchers = {};
this.existenceWatchers = {};
}n/a
function createClient(connectionString, options) {
return new Client(connectionString, options);
}...
## Example
1\. Create a node using given path:
```javascript
var zookeeper = require('node-zookeeper-client');
var client = zookeeper.createClient('localhost:2181');
var path = process.argv[2];
client.once('connected', function () {
console.log('Connected to the server.');
client.create(path, function (error) {
if (error) {
...function ACL(permission, id) {
if (typeof permission !== 'number' || permission < 1 || permission > 31) {
throw new Error('permission must be a valid integer.');
}
if (!(id instanceof Id)) {
throw new Error('id must be an instance of Id class.');
}
this.permission = permission;
this.id = id;
}...
**Example**
```javascript
zookeeper.setACL(
'/test/demo',
[
new zookeeper.ACL(
zookeeeper.Permission.ADMIN,
new zookeeper.Id('ip', '127.0.0.1')
)
],
function (error, acls, stat) {
if (error) {
console.log(error.stack);
...function fromRecord(record) {
if (!(record instanceof jute.data.ACL)) {
throw new Error('record must be an instace of jute.data.ACL.');
}
return new ACL(record.perms, Id.fromRecord(record.id));
}...
return;
}
var acls;
if (Array.isArray(response.payload.acl)) {
acls = response.payload.acl.map(function (item) {
return ACL.fromRecord(item);
});
}
next(null, acls, response.payload.stat);
});
},
callback
...toRecord = function () {
return new jute.data.ACL(
this.permission,
this.id.toRecord()
);
}...
header = new jute.protocol.RequestHeader();
header.type = jute.OP_CODES.CREATE;
payload = new jute.protocol.CreateRequest();
payload.path = path;
payload.acl = acls.map(function (item) {
return item.toRecord();
});
payload.flags = mode;
if (Buffer.isBuffer(data)) {
payload.data = new Buffer(data.length);
data.copy(payload.data);
}
...function ConnectionManager(connectionString, options, stateListener) {
events.EventEmitter.call(this);
this.watcherManager = new WatcherManager();
this.connectionStringParser = new ConnectionStringParser(connectionString);
this.servers = this.connectionStringParser.getServers();
this.chrootPath = this.connectionStringParser.getChrootPath();
this.nextServerIndex = 0;
this.serverAttempts = 0;
this.state = STATES.DISCONNECTED;
this.options = options;
this.spinDelay = options.spinDelay;
this.updateTimeout(options.sessionTimeout);
this.connectTimeoutHandler = null;
this.xid = 0;
this.sessionId = new Buffer(8);
if (Buffer.isBuffer(options.sessionId)) {
options.sessionId.copy(this.sessionId);
} else {
this.sessionId.fill(0);
}
this.sessionPassword = new Buffer(16);
if (Buffer.isBuffer(options.sessionPassword)) {
options.sessionPassword.copy(this.sessionPassword);
} else {
this.sessionPassword.fill(0);
}
// scheme:auth pairs
this.credentials = [];
// Last seen zxid.
this.zxid = new Buffer(8);
this.zxid.fill(0);
this.pendingBuffer = null;
this.packetQueue = new PacketQueue();
this.packetQueue.on('readable', this.onPacketQueueReadable.bind(this));
this.pendingQueue = [];
this.on('state', stateListener);
}n/a
function EventEmitter() {
EventEmitter.init.call(this);
}n/a
addAuthInfo = function (scheme, auth) {
if (!scheme || typeof scheme !== 'string') {
throw new Error('scheme must be a non-empty string.');
}
if (!Buffer.isBuffer(auth)) {
throw new Error('auth must be a valid instance of Buffer');
}
var header,
payload,
request;
this.credentials.push({
scheme : scheme,
auth : auth
});
switch (this.state) {
case STATES.CONNECTED:
case STATES.CONNECTED_READ_ONLY:
// Only queue the auth request when connected.
header = new jute.protocol.RequestHeader();
payload = new jute.protocol.AuthPacket();
header.xid = jute.XID_AUTHENTICATION;
header.type = jute.OP_CODES.AUTH;
payload.type = 0;
payload.scheme = scheme;
payload.auth = auth;
this.queue(new jute.Request(header, payload));
break;
case STATES.DISCONNECTED:
case STATES.CONNECTING:
case STATES.CLOSING:
case STATES.CLOSED:
case STATES.SESSION_EXPIRED:
case STATES.AUTHENTICATION_FAILED:
// Skip when we are not in a live state.
return;
default:
throw new Error('Unknown state: ' + this.state);
}
}...
* scheme `String` - The authentication scheme.
* auth `Buffer` - The authentication data buffer.
**Example**
```javascript
zookeeper.addAuthInfo('ip', new Buffer('127.0.0.1'));
```
---
#### State getState()
Return the current client [state](#state).
...cleanupPendingQueue = function (errorCode) {
var pendingPacket = this.pendingQueue.shift();
while (pendingPacket) {
if (pendingPacket.callback) {
pendingPacket.callback(Exception.create(errorCode));
}
pendingPacket = this.pendingQueue.shift();
}
}...
retry = false;
break;
default:
errorCode = Exception.CONNECTION_LOSS;
retry = true;
}
this.cleanupPendingQueue(errorCode);
this.setState(STATES.DISCONNECTED);
if (retry) {
this.connect();
} else {
this.setState(STATES.CLOSED);
}
...close = function () {
var self = this,
header = new jute.protocol.RequestHeader(),
request;
self.setState(STATES.CLOSING);
header.type = jute.OP_CODES.CLOSE_SESSION;
request = new jute.Request(header, null);
self.queue(request);
}...
client.create(path, function (error) {
if (error) {
console.log('Failed to create node: %s due to: %s.', path, error);
} else {
console.log('Node: %s is successfully created.', path);
}
client.close();
});
});
client.connect();
```
2\. List and watch the children of given node:
...connect = function () {
var self = this;
self.setState(STATES.CONNECTING);
self.findNextServer(function (server) {
self.socket = net.connect(server);
self.connectTimeoutHandler = setTimeout(
self.onSocketConnectTimeout.bind(self),
self.connectTimeout
);
// Disable the Nagle algorithm.
self.socket.setNoDelay();
self.socket.on('connect', self.onSocketConnected.bind(self));
self.socket.on('data', self.onSocketData.bind(self));
self.socket.on('drain', self.onSocketDrain.bind(self));
self.socket.on('close', self.onSocketClosed.bind(self));
self.socket.on('error', self.onSocketError.bind(self));
});
}...
console.log('Node: %s is successfully created.', path);
}
client.close();
});
});
client.connect();
```
2\. List and watch the children of given node:
```javascript
var zookeeper = require('node-zookeeper-client');
...findNextServer = function (callback) {
var self = this;
self.nextServerIndex %= self.servers.length;
if (self.serverAttempts === self.servers.length) {
setTimeout(function () {
callback(self.servers[self.nextServerIndex]);
self.nextServerIndex += 1;
// reset attempts since we already waited for enough time.
self.serverAttempts = 0;
}, Math.random() * self.spinDelay);
} else {
self.serverAttempts += 1;
process.nextTick(function () {
callback(self.servers[self.nextServerIndex]);
self.nextServerIndex += 1;
});
}
}...
};
ConnectionManager.prototype.connect = function () {
var self = this;
self.setState(STATES.CONNECTING);
self.findNextServer(function (server) {
self.socket = net.connect(server);
self.connectTimeoutHandler = setTimeout(
self.onSocketConnectTimeout.bind(self),
self.connectTimeout
);
...getSessionId = function () {
var result = new Buffer(8);
this.sessionId.copy(result);
return result;
}...
[`Buffer`](http://nodejs.org/api/buffer.html) since Javascript does not support
long integer natively.
**Example**
```javascript
var client = zookeeper.createClient({...});
var id = client.getSessionId();
console.log('Session id is: %s', id.toString('hex'));
```
---
#### Buffer getSessionPassword()
...getSessionPassword = function () {
var result = new Buffer(16);
this.sessionPassword.copy(result);
return result;
}...
The value returned is an instance of
[`Buffer`](http://nodejs.org/api/buffer.html).
**Example**
```javascript
var client = zookeeper.createClient({...});
var pwd = client.getSessionPassword();
```
---
#### Number getSessionTimeout()
Returns the *negotiated* session timeout (in milliseconds) for this client
...getSessionTimeout = function () {
return this.sessionTimeout;
}...
instance. The value returned is not valid until the client connects to a server
and may change after a re-connect.
**Example**
```javascript
var client = zookeeper.createClient({...});
var sessionTimeout = client.getSessionTimeout();
```
---
### State
After the `connect()` method is invoked, the ZooKeeper client starts its
...onPacketQueueReadable = function () {
var packet,
header;
switch (this.state) {
case STATES.CONNECTED:
case STATES.CONNECTED_READ_ONLY:
case STATES.CLOSING:
// Continue
break;
case STATES.DISCONNECTED:
case STATES.CONNECTING:
case STATES.CLOSED:
case STATES.SESSION_EXPIRED:
case STATES.AUTHENTICATION_FAILED:
// Skip since we can not send traffic out
return;
default:
throw new Error('Unknown state: ' + this.state);
}
while ((packet = this.packetQueue.shift()) !== undefined) {
header = packet.request.header;
if (header !== null &&
header.type !== jute.OP_CODES.PING &&
header.type !== jute.OP_CODES.AUTH) {
header.xid = this.xid;
this.xid += 1;
// Only put requests that are not connect, ping and auth into
// the pending queue.
this.pendingQueue.push(packet);
}
if (!this.socket.write(packet.request.toBuffer())) {
// Back pressure is handled here, when the socket emit
// drain event, this method will be invoked again.
break;
}
if (header.type === jute.OP_CODES.CLOSE_SESSION) {
// The close session should be the final packet sent to the
// server.
break;
}
}
}...
self.sessionId = connectResponse.sessionId;
self.sessionPassword = connectResponse.passwd;
self.updateTimeout(connectResponse.timeOut);
self.setState(STATES.CONNECTED);
// Check if we have anything to send out just in case.
self.onPacketQueueReadable();
self.socket.setTimeout(
self.pingTimeout,
self.onSocketTimeout.bind(self)
);
}
...onSocketClosed = function (hasError) {
var retry = false,
errorCode,
pendingPacket;
switch (this.state) {
case STATES.CLOSING:
errorCode = Exception.CONNECTION_LOSS;
retry = false;
break;
case STATES.SESSION_EXPIRED:
errorCode = Exception.SESSION_EXPIRED;
retry = false;
break;
case STATES.AUTHENTICATION_FAILED:
errorCode = Exception.AUTH_FAILED;
retry = false;
break;
default:
errorCode = Exception.CONNECTION_LOSS;
retry = true;
}
this.cleanupPendingQueue(errorCode);
this.setState(STATES.DISCONNECTED);
if (retry) {
this.connect();
} else {
this.setState(STATES.CLOSED);
}
}n/a
onSocketConnectTimeout = function () {
// Destroy the current socket so the socket closed event
// will be trigger.
this.socket.destroy();
}n/a
onSocketConnected = function () {
var connectRequest,
authRequest,
setWatchesRequest,
header,
payload;
if (this.connectTimeoutHandler) {
clearTimeout(this.connectTimeoutHandler);
}
connectRequest = new jute.Request(null, new jute.protocol.ConnectRequest(
jute.PROTOCOL_VERSION,
this.zxid,
this.sessionTimeout,
this.sessionId,
this.sessionPassword
));
// XXX No read only support yet.
this.socket.write(connectRequest.toBuffer());
// Set auth info
if (this.credentials.length > 0) {
this.credentials.forEach(function (credential) {
header = new jute.protocol.RequestHeader();
payload = new jute.protocol.AuthPacket();
header.xid = jute.XID_AUTHENTICATION;
header.type = jute.OP_CODES.AUTH;
payload.type = 0;
payload.scheme = credential.scheme;
payload.auth = credential.auth;
authRequest = new jute.Request(header, payload);
this.queue(authRequest);
}, this);
}
// Reset the watchers if we have any.
if (!this.watcherManager.isEmpty()) {
header = new jute.protocol.RequestHeader();
payload = new jute.protocol.SetWatches();
header.type = jute.OP_CODES.SET_WATCHES;
header.xid = jute.XID_SET_WATCHES;
payload.setChrootPath(this.chrootPath);
payload.relativeZxid = this.zxid;
payload.dataWatches = this.watcherManager.getDataWatcherPaths();
payload.existWatches = this.watcherManager.getExistenceWatcherPaths();
payload.childWatches = this.watcherManager.getChildWatcherPaths();
setWatchesRequest = new jute.Request(header, payload);
this.queue(setWatchesRequest);
}
}n/a
onSocketData = function (buffer) {
var self = this,
offset = 0,
size = 0,
connectResponse,
pendingPacket,
responseHeader,
responsePayload,
response,
event;
// Combine the pending buffer with the new buffer.
if (self.pendingBuffer) {
buffer = Buffer.concat(
[self.pendingBuffer, buffer],
self.pendingBuffer.length + buffer.length
);
}
// We need at least 4 bytes
if (buffer.length < 4) {
self.pendingBuffer = buffer;
return;
}
size = buffer.readInt32BE(offset);
offset += 4;
if (buffer.length < size + 4) {
// More data are coming.
self.pendingBuffer = buffer;
return;
}
if (buffer.length === size + 4) {
// The size is perfect.
self.pendingBuffer = null;
} else {
// We have extra bytes, splice them out as pending buffer.
self.pendingBuffer = buffer.slice(size + 4);
buffer = buffer.slice(0, size + 4);
}
if (self.state === STATES.CONNECTING) {
// Handle connect response.
connectResponse = new jute.protocol.ConnectResponse();
offset += connectResponse.deserialize(buffer, offset);
if (connectResponse.timeOut <= 0) {
self.setState(STATES.SESSION_EXPIRED);
} else {
// Reset the server connection attempts since we connected now.
self.serverAttempts = 0;
self.sessionId = connectResponse.sessionId;
self.sessionPassword = connectResponse.passwd;
self.updateTimeout(connectResponse.timeOut);
self.setState(STATES.CONNECTED);
// Check if we have anything to send out just in case.
self.onPacketQueueReadable();
self.socket.setTimeout(
self.pingTimeout,
self.onSocketTimeout.bind(self)
);
}
} else {
// Handle all other repsonses.
responseHeader = new jute.protocol.ReplyHeader();
offset += responseHeader.deserialize(buffer, offset);
//TODO BETTTER LOGGING
switch (responseHeader.xid) {
case jute.XID_PING:
break;
case jute.XID_AUTHENTICATION:
if (responseHeader.err === Exception.AUTH_FAILED) {
self.setState(STATES.AUTHENTICATION_FAILED);
}
break;
case jute.XID_NOTIFICATION:
event = new jute.protocol.WatcherEvent();
if (self.chrootPath) {
event.setChrootPath(self.chrootPath);
}
offset += event.deserialize(buffer, offset);
self.watcherManager.emit(event);
break;
default:
pendingPacket = self.pendingQueue.shift();
if (!pendingPacket) {
// TODO, better error handling and logging need to be done.
// Need to clean up and do a reconnect.
// throw new Error(
// 'Nothing in pending queue but got data from server.'
// );
self.socket.destroy(); // this will trigger reconnect
return;
}
if (pendingPacket.request.header.xid !== responseHeader.xid) {
// TODO, better error handling/logging need to bee done here.
// Need to clean up and do a reconnect.
//throw new Error(
//'Xid out of order. Got xid: ' +
//responseHeader.xid + ' with error code: ' +
//responseHeader.err + ', expected xid: ' +
//pendingPacket.request.header.xid + '.'
//);
self.socket.destroy(); // this will trigger reconnect
return;
}
if (responseHeader.zxid) {
// TODO, In Java implementation, the condition is to
// check whether the long zxid is greater than 0, here
// use buffer so we simplify. ......
}
}
}
}
// We have more data to process, need to recursively process it.
if (self.pendingBuffer) {
self.onSocketData(new Buffer(0));
}
};
ConnectionManager.prototype.onSocketDrain = function () {
// Trigger write on socket.
this.onPacketQueueReadable();
};
...onSocketDrain = function () {
// Trigger write on socket.
this.onPacketQueueReadable();
}n/a
onSocketError = function (error) {
if (this.connectTimeoutHandler) {
clearTimeout(this.connectTimeoutHandler);
}
// After socket error, the socket closed event will be triggered,
// we will retry connect in that listener function.
}n/a
onSocketTimeout = function () {
var header,
request;
if (this.socket &&
(this.state === STATES.CONNECTED ||
this.state === STATES.CONNECTED_READ_ONLY)) {
header = new jute.protocol.RequestHeader(
jute.XID_PING,
jute.OP_CODES.PING
);
request = new jute.Request(header, null);
this.queue(request);
// Re-register the timeout handler since it only fired once.
this.socket.setTimeout(
this.pingTimeout,
this.onSocketTimeout.bind(this)
);
}
}n/a
queue = function (request, callback) {
if (typeof request !== 'object') {
throw new Error('request must be a valid instance of jute.Request.');
}
if (this.chrootPath && request.payload) {
request.payload.setChrootPath(this.chrootPath);
}
callback = callback || function () {};
switch (this.state) {
case STATES.DISCONNECTED:
case STATES.CONNECTING:
case STATES.CONNECTED:
case STATES.CONNECTED_READ_ONLY:
// queue the packet
this.packetQueue.push({
request : request,
callback : callback
});
break;
case STATES.CLOSING:
if (request.header &&
request.header.type === jute.OP_CODES.CLOSE_SESSION) {
this.packetQueue.push({
request : request,
callback : callback
});
} else {
callback(Exception.create(Exception.CONNECTION_LOSS));
}
break;
case STATES.CLOSED:
callback(Exception.create(Exception.CONNECTION_LOSS));
return;
case STATES.SESSION_EXPIRED:
callback(Exception.create(Exception.SESSION_EXPIRED));
return;
case STATES.AUTHENTICATION_FAILED:
callback(Exception.create(Exception.AUTH_FAILED));
return;
default:
throw new Error('Unknown state: ' + this.state);
}
}...
}
request = new jute.Request(header, payload);
attempt(
self,
function (attempts, next) {
self.connectionManager.queue(request, function (error, response) {
if (error) {
next(error);
return;
}
next(null, response.payload.path);
});
...registerChildWatcher = function (path, watcher) {
this.watcherManager.registerChildWatcher(path, watcher);
}...
self.connectionManager.queue(request, function (error, response) {
if (error) {
next(error);
return;
}
if (watcher) {
self.connectionManager.registerChildWatcher(path, watcher);
}
next(null, response.payload.children, response.payload.stat);
});
},
callback
);
...registerDataWatcher = function (path, watcher) {
this.watcherManager.registerDataWatcher(path, watcher);
}...
self.connectionManager.queue(request, function (error, response) {
if (error) {
next(error);
return;
}
if (watcher) {
self.connectionManager.registerDataWatcher(path, watcher);
}
next(null, response.payload.data, response.payload.stat);
});
},
callback
);
...registerExistenceWatcher = function (path, watcher) {
this.watcherManager.registerExistenceWatcher(path, watcher);
}...
if (watcher) {
if (existence) {
self.connectionManager.registerDataWatcher(
path,
watcher
);
} else {
self.connectionManager.registerExistenceWatcher(
path,
watcher
);
}
}
next(
...setState = function (state) {
if (typeof state !== 'number') {
throw new Error('state must be a valid number.');
}
if (this.state !== state) {
this.state = state;
this.emit('state', this.state);
}
}...
ConnectionManager.prototype.getSessionTimeout = function () {
return this.sessionTimeout;
};
ConnectionManager.prototype.connect = function () {
var self = this;
self.setState(STATES.CONNECTING);
self.findNextServer(function (server) {
self.socket = net.connect(server);
self.connectTimeoutHandler = setTimeout(
self.onSocketConnectTimeout.bind(self),
self.connectTimeout
...updateTimeout = function (sessionTimeout) {
this.sessionTimeout = sessionTimeout;
// Designed to have time to try all the servers.
this.connectTimeout = Math.floor(sessionTimeout / this.servers.length);
// We at least send out one ping one third of the session timeout, so
// the read timeout is two third of the session timeout.
this.pingTimeout = Math.floor(this.sessionTimeout / 3);
//this.readTimeout = Math.floor(sessionTimeout * 2 / 3);
}...
this.serverAttempts = 0;
this.state = STATES.DISCONNECTED;
this.options = options;
this.spinDelay = options.spinDelay;
this.updateTimeout(options.sessionTimeout);
this.connectTimeoutHandler = null;
this.xid = 0;
this.sessionId = new Buffer(8);
if (Buffer.isBuffer(options.sessionId)) {
options.sessionId.copy(this.sessionId);
...function ConnectionStringParser(connectionString) {
assert(
connectionString && typeof connectionString === 'string',
'connectionString must be a non-empty string.'
);
this.connectionString = connectionString;
// Handle chroot
var index = connectionString.indexOf('/'),
hostList = [],
servers = [];
if (index !== -1 && index !== (connectionString.length - 1)) {
this.chrootPath = connectionString.substring(index);
Path.validate(this.chrootPath);
} else {
this.chrootPath = undefined;
}
if (index !== -1) {
hostList = connectionString.substring(0, index).split(',');
} else {
hostList = connectionString.split(',');
}
hostList.filter(function (item) {
// Filter out empty string.
return item;
}).forEach(function (item) {
var parts = item.split(':');
servers.push({
host : parts[0],
port : parts[1] || DEFAULT_PORT
});
});
assert(
servers.length > 0,
'connectionString must contain at least one server.'
);
// Randomize the list.
this.servers = u.shuffle(servers);
}n/a
getChrootPath = function () {
return this.chrootPath;
}...
function ConnectionManager(connectionString, options, stateListener) {
events.EventEmitter.call(this);
this.watcherManager = new WatcherManager();
this.connectionStringParser = new ConnectionStringParser(connectionString);
this.servers = this.connectionStringParser.getServers();
this.chrootPath = this.connectionStringParser.getChrootPath();
this.nextServerIndex = 0;
this.serverAttempts = 0;
this.state = STATES.DISCONNECTED;
this.options = options;
this.spinDelay = options.spinDelay;
...getConnectionString = function () {
return this.connectionString;
}n/a
getServers = function () {
return this.servers.slice(0);
}...
*/
function ConnectionManager(connectionString, options, stateListener) {
events.EventEmitter.call(this);
this.watcherManager = new WatcherManager();
this.connectionStringParser = new ConnectionStringParser(connectionString);
this.servers = this.connectionStringParser.getServers();
this.chrootPath = this.connectionStringParser.getChrootPath();
this.nextServerIndex = 0;
this.serverAttempts = 0;
this.state = STATES.DISCONNECTED;
this.options = options;
...function Event(type, name, path) {
validateType(type);
assert(
name && typeof name === 'string',
'name must be a non-empty string.'
);
this.type = type;
this.name = name;
this.path = path;
}n/a
function create(watcherEvent) {
assert(watcherEvent, 'watcherEvent must be a valid object.');
validateType(watcherEvent.type);
var name,
i = 0,
keys = Object.keys(TYPES);
while (i < keys.length) {
if (TYPES[keys[i]] === watcherEvent.type) {
name = keys[i];
break;
}
i += 1;
}
return new Event(watcherEvent.type, name, watcherEvent.path);
}...
var client = zookeeper.createClient('localhost:2181');
var path = process.argv[2];
client.once('connected', function () {
console.log('Connected to the server.');
client.create(path, function (error) {
if (error) {
console.log('Failed to create node: %s due to: %s.', path, error);
} else {
console.log('Node: %s is successfully created.', path);
}
client.close();
...getName = function () {
return this.name;
}n/a
getPath = function () {
return this.path;
}n/a
getType = function () {
return this.type;
}n/a
toString = function () {
var result = this.name + '[' + this.type + ']';
if (this.path) {
result += '@' + this.path;
}
return result;
}...
},
function (error, data, stat) {
if (error) {
console.log(error.stack);
return;
}
console.log('Got data: %s', data.toString('utf8'));
}
);
```
---
#### void setData(path, data, [version], callback)
...function Exception(code, name, path, ctor) {
if (!ctor) {
ctor = path;
path = undefined;
}
validateCode(code);
assert(
name && typeof name === 'string',
'name must be a non-empty string.'
);
assert(typeof ctor === 'function', 'ctor must be a function.');
Error.captureStackTrace(this, ctor || Exception);
this.code = code;
this.name = name;
this.path = path;
this.message = 'Exception: ' + name + '[' + code + ']';
if (path) {
this.message += '@' + path;
}
}n/a
function create(code, path) {
validateCode(code);
var name,
i = 0,
keys = Object.keys(CODES);
while (i < keys.length) {
if (CODES[keys[i]] === code) {
name = keys[i];
break;
}
i += 1;
}
return new Exception(code, name, path, create);
}...
var client = zookeeper.createClient('localhost:2181');
var path = process.argv[2];
client.once('connected', function () {
console.log('Connected to the server.');
client.create(path, function (error) {
if (error) {
console.log('Failed to create node: %s due to: %s.', path, error);
} else {
console.log('Node: %s is successfully created.', path);
}
client.close();
...function Error() { [native code] }n/a
getCode = function () {
return this.code;
}...
* `Exception.AUTH_FAILED`
**Example**
```javascript
zookeeper.create('/test/demo', function (error, path) {
if (error) {
if (error.getCode() == zookeeper.Exception.NODE_EXISTS) {
console.log('Node exists.');
} else {
console.log(error.stack);
}
return;
}
...getName = function () {
return this.name;
}n/a
getPath = function () {
return this.path;
}n/a
toString = function () {
return this.message;
}...
},
function (error, data, stat) {
if (error) {
console.log(error.stack);
return;
}
console.log('Got data: %s', data.toString('utf8'));
}
);
```
---
#### void setData(path, data, [version], callback)
...function Error() { [native code] }n/a
function captureStackTrace() { [native code] }n/a
function Id(scheme, id) {
if (!scheme || typeof scheme !== 'string') {
throw new Error('scheme must be a non-empty string.');
}
if (typeof id !== 'string') {
throw new Error('id must be a string.');
}
this.scheme = scheme;
this.id = id;
}...
```javascript
zookeeper.setACL(
'/test/demo',
[
new zookeeper.ACL(
zookeeeper.Permission.ADMIN,
new zookeeper.Id('ip', '127.0.0.1')
)
],
function (error, acls, stat) {
if (error) {
console.log(error.stack);
return;
}
...function fromRecord(record) {
if (!(record instanceof jute.data.Id)) {
throw new Error('record must be an instace of jute.data.Id.');
}
return new Id(record.scheme, record.id);
}...
return;
}
var acls;
if (Array.isArray(response.payload.acl)) {
acls = response.payload.acl.map(function (item) {
return ACL.fromRecord(item);
});
}
next(null, acls, response.payload.stat);
});
},
callback
...toRecord = function () {
return new jute.data.Id(
this.scheme,
this.id
);
}...
header = new jute.protocol.RequestHeader();
header.type = jute.OP_CODES.CREATE;
payload = new jute.protocol.CreateRequest();
payload.path = path;
payload.acl = acls.map(function (item) {
return item.toRecord();
});
payload.flags = mode;
if (Buffer.isBuffer(data)) {
payload.data = new Buffer(data.length);
data.copy(payload.data);
}
...function PacketQueue() {
events.EventEmitter.call(this);
this.queue = [];
}n/a
function EventEmitter() {
EventEmitter.init.call(this);
}n/a
push = function (packet) {
if (typeof packet !== 'object') {
throw new Error('packet must be a valid object.');
}
this.queue.push(packet);
this.emit('readable');
}...
});
},
function (error) {
var args = [],
result = results[count - 1];
if (callback) {
args.push(result.error);
Array.prototype.push.apply(args, result.args);
callback.apply(null, args);
}
}
);
}
...shift = function () {
return this.queue.shift();
}...
};
ConnectionManager.prototype.registerExistenceWatcher = function (path, watcher) {
this.watcherManager.registerExistenceWatcher(path, watcher);
};
ConnectionManager.prototype.cleanupPendingQueue = function (errorCode) {
var pendingPacket = this.pendingQueue.shift();
while (pendingPacket) {
if (pendingPacket.callback) {
pendingPacket.callback(Exception.create(errorCode));
}
pendingPacket = this.pendingQueue.shift();
...unshift = function (packet) {
if (typeof packet !== 'object') {
throw new Error('packet must be a valid object.');
}
this.queue.unshift(packet);
this.emit('readable');
}...
PacketQueue.prototype.unshift = function (packet) {
if (typeof packet !== 'object') {
throw new Error('packet must be a valid object.');
}
this.queue.unshift(packet);
this.emit('readable');
};
PacketQueue.prototype.shift = function () {
return this.queue.shift();
};
...function validate(path) {
assert(
path && typeof path === 'string',
'Node path must be a non-empty string.'
);
assert(path[0] === '/', 'Node path must start with / character.');
// Shortcut, no need to check more since the path is the root.
if (path.length === 1) {
return;
}
assert(
path[path.length - 1] !== '/',
'Node path must not end with / character.'
);
assert(
!/\/\//.test(path),
'Node path must not contain empty node name.'
);
assert(
!/\/\.(\.)?(\/|$)/.test(path),
'Node path must not contain relative path(s).'
);
// TODO filter out special characters
}...
Client.prototype.create = function (path, data, acls, mode, callback) {
var self = this,
optionalArgs = [data, acls, mode, callback],
header,
payload,
request;
Path.validate(path);
// Reset arguments so we can reassign correct value to them.
data = acls = mode = callback = undefined;
optionalArgs.forEach(function (arg, index) {
if (Array.isArray(arg)) {
acls = arg;
} else if (typeof arg === 'number') {
...function Transaction(connectionManager) {
if (!(this instanceof Transaction)) {
return new Transaction(connectionManager);
}
assert(
connectionManager instanceof ConnectionManager,
'connectionManager must be an instance of ConnectionManager.'
);
this.ops = [];
this.connectionManager = connectionManager;
}n/a
check = function (path, version) {
version = version || -1;
Path.validate(path);
assert(typeof version === 'number', 'version must be a number.');
this.ops.push({
type : jute.OP_CODES.CHECK,
path : path,
version : version
});
return this;
}n/a
commit = function (callback) {
assert(typeof callback === 'function', 'callback must be a function');
var self = this,
header = new jute.protocol.RequestHeader(),
payload = new jute.TransactionRequest(this.ops),
request;
header.type = jute.OP_CODES.MULTI;
request = new jute.Request(header, payload);
this.connectionManager.queue(request, function (error, response) {
if (error) {
callback(error);
return;
}
var result,
i;
for (i = 0; i < response.payload.results.length; i += 1) {
result = response.payload.results[i];
// Find if there is an op which caused the transaction to fail.
if (result.type === jute.OP_CODES.ERROR &&
result.err !== Exception.OK) {
error = Exception.create(result.err);
break;
}
}
callback(error, response.payload.results);
});
}n/a
create = function (path, data, acls, mode) {
var optionalArgs = [data, acls, mode],
self = this,
currentPath = '',
nodes;
Path.validate(path);
// Reset arguments so we can reassign correct value to them.
data = acls = mode = undefined;
optionalArgs.forEach(function (arg, index) {
if (Array.isArray(arg)) {
acls = arg;
} else if (typeof arg === 'number') {
mode = arg;
} else if (Buffer.isBuffer(arg)) {
data = arg;
}
});
acls = Array.isArray(acls) ? acls : ACL.OPEN_ACL_UNSAFE;
mode = typeof mode === 'number' ? mode : CreateMode.PERSISTENT;
assert(
data === null || data === undefined || Buffer.isBuffer(data),
'data must be a valid buffer, null or undefined.'
);
assert(acls.length > 0, 'acls must be a non-empty array.');
this.ops.push({
type : jute.OP_CODES.CREATE,
path : path,
data : data,
acls : acls,
mode : mode
});
return this;
}...
var client = zookeeper.createClient('localhost:2181');
var path = process.argv[2];
client.once('connected', function () {
console.log('Connected to the server.');
client.create(path, function (error) {
if (error) {
console.log('Failed to create node: %s due to: %s.', path, error);
} else {
console.log('Node: %s is successfully created.', path);
}
client.close();
...remove = function (path, version) {
version = version || -1;
Path.validate(path);
assert(typeof version === 'number', 'version must be a number.');
this.ops.push({
type : jute.OP_CODES.DELETE,
path : path,
version : version
});
return this;
}...
* path `String` - Path of the node.
* version `Number` - The version of the node, optional, defaults to -1.
* callback(error) `Function` - The callback function.
**Example**
```javascript
zookeeper.remove('/test/demo', -1, function (error) {
if (error) {
console.log(error.stack);
return;
}
console.log('Node is deleted.');
});
...setData = function (path, data, version) {
version = version || -1;
Path.validate(path);
assert(
data === null || data === undefined || Buffer.isBuffer(data),
'data must be a valid buffer, null or undefined.'
);
assert(typeof version === 'number', 'version must be a number.');
this.ops.push({
type : jute.OP_CODES.SET_DATA,
path : path,
data : data,
version : version
});
return this;
}...
* version `Number` - The version of the node, optional, defaults to -1.
* callback(error, stat) `Function` - The callback function. The `stat` is an
instance of [`Stat`](#stat).
**Example**
```javascript
zookeeper.setData('/test/demo', null, 2, function (error, stat) {
if (error) {
console.log(error.stack);
return;
}
console.log('Data is set.');
});
...function WatcherManager() {
this.dataWatchers = {};
this.childWatchers = {};
this.existenceWatchers = {};
}n/a
emit = function (watcherEvent) {
if (!watcherEvent) {
throw new Error('watcherEvent must be a valid object.');
}
var emitters = [],
event;
switch (watcherEvent.type) {
case Event.NODE_DATA_CHANGED:
case Event.NODE_CREATED:
if (this.dataWatchers[watcherEvent.path]) {
emitters.push(this.dataWatchers[watcherEvent.path]);
delete this.dataWatchers[watcherEvent.path];
}
if (this.existenceWatchers[watcherEvent.path]) {
emitters.push(this.existenceWatchers[watcherEvent.path]);
delete this.existenceWatchers[watcherEvent.path];
}
break;
case Event.NODE_CHILDREN_CHANGED:
if (this.childWatchers[watcherEvent.path]) {
emitters.push(this.childWatchers[watcherEvent.path]);
delete this.childWatchers[watcherEvent.path];
}
break;
case Event.NODE_DELETED:
if (this.dataWatchers[watcherEvent.path]) {
emitters.push(this.dataWatchers[watcherEvent.path]);
delete this.dataWatchers[watcherEvent.path];
}
if (this.childWatchers[watcherEvent.path]) {
emitters.push(this.childWatchers[watcherEvent.path]);
delete this.childWatchers[watcherEvent.path];
}
break;
default:
throw new Error('Unknown event type: ' + watcherEvent.type);
}
if (emitters.length < 1) {
return;
}
event = Event.create(watcherEvent);
emitters.forEach(function (emitter) {
emitter.emit('notification', event);
});
}...
/**
* Default state listener to emit user-friendly events.
*/
function defaultStateListener(state) {
switch (state) {
case State.DISCONNECTED:
this.emit('disconnected');
break;
case State.SYNC_CONNECTED:
this.emit('connected');
break;
case State.CONNECTED_READ_ONLY:
this.emit('connectedReadOnly');
break;
...getChildWatcherPaths = function () {
return getWatcherPaths(this, 'child');
}...
header.type = jute.OP_CODES.SET_WATCHES;
header.xid = jute.XID_SET_WATCHES;
payload.setChrootPath(this.chrootPath);
payload.relativeZxid = this.zxid;
payload.dataWatches = this.watcherManager.getDataWatcherPaths();
payload.existWatches = this.watcherManager.getExistenceWatcherPaths();
payload.childWatches = this.watcherManager.getChildWatcherPaths();
setWatchesRequest = new jute.Request(header, payload);
this.queue(setWatchesRequest);
}
};
ConnectionManager.prototype.onSocketTimeout = function () {
...getDataWatcherPaths = function () {
return getWatcherPaths(this, 'data');
}...
payload = new jute.protocol.SetWatches();
header.type = jute.OP_CODES.SET_WATCHES;
header.xid = jute.XID_SET_WATCHES;
payload.setChrootPath(this.chrootPath);
payload.relativeZxid = this.zxid;
payload.dataWatches = this.watcherManager.getDataWatcherPaths();
payload.existWatches = this.watcherManager.getExistenceWatcherPaths();
payload.childWatches = this.watcherManager.getChildWatcherPaths();
setWatchesRequest = new jute.Request(header, payload);
this.queue(setWatchesRequest);
}
};
...getExistenceWatcherPaths = function () {
return getWatcherPaths(this, 'existence');
}...
header.type = jute.OP_CODES.SET_WATCHES;
header.xid = jute.XID_SET_WATCHES;
payload.setChrootPath(this.chrootPath);
payload.relativeZxid = this.zxid;
payload.dataWatches = this.watcherManager.getDataWatcherPaths();
payload.existWatches = this.watcherManager.getExistenceWatcherPaths();
payload.childWatches = this.watcherManager.getChildWatcherPaths();
setWatchesRequest = new jute.Request(header, payload);
this.queue(setWatchesRequest);
}
};
...isEmpty = function () {
var empty = true,
watchers,
paths,
i,
j;
watchers = [this.dataWatchers, this.existenceWatchers, this.childWatchers];
for (i = 0; i < watchers.length; i += 1) {
paths = Object.keys(watchers[i]);
for (j = 0; j < paths.length; j += 1) {
if (watchers[i][paths[j]].listeners('notification').length > 0) {
empty = false;
break;
}
}
}
return empty;
}...
authRequest = new jute.Request(header, payload);
this.queue(authRequest);
}, this);
}
// Reset the watchers if we have any.
if (!this.watcherManager.isEmpty()) {
header = new jute.protocol.RequestHeader();
payload = new jute.protocol.SetWatches();
header.type = jute.OP_CODES.SET_WATCHES;
header.xid = jute.XID_SET_WATCHES;
payload.setChrootPath(this.chrootPath);
...registerChildWatcher = function (path, watcher) {
registerWatcher(this, 'child', path, watcher);
}...
self.connectionManager.queue(request, function (error, response) {
if (error) {
next(error);
return;
}
if (watcher) {
self.connectionManager.registerChildWatcher(path, watcher);
}
next(null, response.payload.children, response.payload.stat);
});
},
callback
);
...registerDataWatcher = function (path, watcher) {
registerWatcher(this, 'data', path, watcher);
}...
self.connectionManager.queue(request, function (error, response) {
if (error) {
next(error);
return;
}
if (watcher) {
self.connectionManager.registerDataWatcher(path, watcher);
}
next(null, response.payload.data, response.payload.stat);
});
},
callback
);
...registerExistenceWatcher = function (path, watcher) {
registerWatcher(this, 'existence', path, watcher);
}...
if (watcher) {
if (existence) {
self.connectionManager.registerDataWatcher(
path,
watcher
);
} else {
self.connectionManager.registerExistenceWatcher(
path,
watcher
);
}
}
next(
...