BrokerNotAvailableError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
...
requestArgs.unshift(this.clientId, correlationId);
var request = encode.apply(null, requestArgs);
var broker = this.brokerForLeader(this.coordinatorId);
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [decode, cb]);
broker.write(request);
};
Client.prototype.sendGroupCoordinatorRequest = function (groupId, cb) {
...
BrokerReadable = function (options) { Readable.call(this, options); }
n/a
BrokerTransform = function (options) { Transform.call(this, options); this.noAckBatchSize = options ? options.noAckBatchSize : null; this.noAckBatchAge = options ? options.noAckBatchAge : null; this._KafkaBuffer = new KafkaBuffer(this.noAckBatchSize, this.noAckBatchAge); }
n/a
BrokerWrapper = function (socket, noAckBatchOptions) { this.socket = socket; var self = this; var readable = new BrokerReadable(); var transform = new BrokerTransform(noAckBatchOptions); readable.pipe(transform); transform.on('readable', function () { var bulkMessage = null; while (bulkMessage = transform.read()) { // eslint-disable-line no-cond-assign self.socket.write(bulkMessage); } }); this.readableSocket = readable; }
n/a
Client = function (connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions) { if (this instanceof Client === false) { return new Client(connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions); } this.sslOptions = sslOptions; this.ssl = !!sslOptions; if (clientId) { validateConfig('clientId', clientId); } this.connectionString = connectionString || 'localhost:2181/'; this.clientId = clientId || 'kafka-node-client'; this.zkOptions = zkOptions; this.noAckBatchOptions = noAckBatchOptions; this.brokers = {}; this.longpollingBrokers = {}; this.topicMetadata = {}; this.topicPartitions = {}; this.correlationId = 0; this._socketId = 0; this.cbqueue = {}; this.brokerMetadata = {}; this.ready = false; this.connect(); }
...
partitionerType: 2
}
```
``` js
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
```
### Events
- `ready`: this event is emitted when producer is ready to send messages.
- `error`: this is the error event propagates from internal client, producer should always listen it.
...
Consumer = function (client, topics, options) { if (!topics) { throw new Error('Must have payloads'); } utils.validateTopics(topics); this.fetchCount = 0; this.client = client; this.options = _.defaults((options || {}), DEFAULTS); this.ready = false; this.paused = this.options.paused; this.id = nextId(); this.payloads = this.buildPayloads(topics); this.connect(); this.encoding = this.options.encoding; if (this.options.groupId) { utils.validateConfig('options.groupId', this.options.groupId); } }
n/a
function ConsumerGroup(memberOptions, topics) { EventEmitter.call(this); const self = this; this.options = _.defaults((memberOptions || {}), DEFAULTS); if (!this.options.heartbeatInterval) { this.options.heartbeatInterval = Math.floor(this.options.sessionTimeout / 3); } if (memberOptions.ssl === true) { memberOptions.ssl = {}; } if (!(this.options.fromOffset in ACCEPTED_FROM_OFFSET)) { throw new Error(`fromOffset ${this.options.fromOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join(', ')}`); } if (!(this.options.outOfRangeOffset in ACCEPTED_FROM_OFFSET)) { throw new Error(`outOfRangeOffset ${this.options.outOfRangeOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join (', ')}`); } this.client = new Client(memberOptions.host, memberOptions.id, memberOptions.zk, memberOptions.batch, memberOptions.ssl); if (_.isString(topics)) { topics = [topics]; } assert(Array.isArray(topics), 'Array of topics is required'); this.topics = topics; this.recovery = new ConsumerGroupRecovery(this); this.setupProtocols(this.options.protocol); if (this.options.connectOnReady && !this.options.migrateHLC) { this.client.once('ready', this.connect.bind(this)); } if (this.options.migrateHLC) { const ConsumerGroupMigrator = require('./consumerGroupMigrator'); this.migrator = new ConsumerGroupMigrator(this); this.migrator.on('error', function (error) { self.emit('error', error); }); } this.client.on('error', function (err) { logger.error('Error from %s', self.client.clientId, err); self.emit('error', err); }); const recoverFromBrokerChange = _.debounce(function () { logger.debug('brokersChanged refreshing metadata'); self.client.refreshMetadata(self.topics, function (error) { if (error) { self.emit(error); return; } self.paused = false; if (!self.ready && !self.connecting) { if (self.reconnectTimer) { // brokers changed so bypass backoff retry and reconnect now clearTimeout(self.reconnectTimer); self.reconnectTimer = null; } self.connect(); } else if (!self.connecting) { self.fetch(); } }); }, 200); this.client.on('brokersChanged', function () { self.pause(); recoverFromBrokerChange(); }); this.client.on('reconnect', function (lastError) { self.fetch(); }); this.on('offsetOutOfRange', topic => { this.pause(); if (this.options.outOfRangeOffset === 'none') { this.emit('error', new errors.InvalidConsumerOffsetError(`Offset out of range for topic "${topic.topic}" partition ${topic .partition}`)); return; } topic.time = ACCEPTED_FROM_OFFSET[this.options.outOfRangeOffset]; this.getOffset().fetch([topic], (error, result) => { if (error) { this.emit('error', new errors.InvalidConsumerOffsetError(`Fetching ${this.options.outOfRangeOffset} offset failed`, error )); return; } const offset = _.head(result[topic.topic][topic.partition]); const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset; logger.debug('replacing %s-%s stale offset of %d with %d', topic.topic, topic.partition, oldOffset, offset); this.setOffset(topic.topic, topic.partition, offset); this.resume(); }); }); // 'done' will be emit when a message fetch request complete this.on('done', function (topics) { self.updateOffsets(topics); if (!self.paused) { setImmediate(function () { self.fetch(); }); } }); if (this.options.groupId) { validateConfig('options.groupId', this.options.groupId); } this.isLeader = false; this.coordinatorId = null; this.generationId = null; this.ready = false; this.topicPayloads = []; }
n/a
CustomPartitioner = function (partitioner) { this.getPartition = partitioner; }
n/a
CyclicPartitioner = function () { this.c = 0; }
n/a
DefaultPartitioner = function () {}
n/a
FailedToRebalanceConsumerError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
...
operation.attempt(function (currentAttempt) {
self.rebalanceAttempt(oldTopicPayloads, function (err) {
if (operation.retry(err)) {
return;
}
if (err) {
self.rebalancing = false;
return self.emit('error', new errors.FailedToRebalanceConsumerError(operation
.mainError().toString()));
} else {
var topicNames = self.topicPayloads.map(function (p) {
return p.topic;
});
self.client.refreshMetadata(topicNames, function (err) {
register();
if (err) {
...
FailedToRegisterConsumerError = function (message, nested) { NestedError.call(this, message, nested); this.message = message; }
...
} else {
callback(new Error(util.format('Consumer %s is not registered in group %s', self.id, self.options.groupId)));
}
});
}
], function (error) {
if (error) {
self.emit('error', new errors.FailedToRegisterConsumerError(error.toString
(), error));
}
});
}
}, 20000);
function fetchAndUpdateOffsets (cb) {
self.fetchOffset(self.topicPayloads, function (err, topics) {
...
GroupCoordinatorNotAvailableError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
GroupLoadInProgressError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
HeartbeatTimeoutError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
HighLevelConsumer = function (client, topics, options) { if (!topics) { throw new Error('Must have payloads'); } this.fetchCount = 0; this.client = client; this.options = _.defaults((options || {}), DEFAULTS); this.initialised = false; this.ready = false; this.closing = false; this.paused = this.options.paused; this.rebalancing = false; this.pendingRebalances = 0; this.committing = false; this.needToCommit = false; this.id = this.options.id || this.options.groupId + '_' + uuid.v4(); this.payloads = this.buildPayloads(topics); this.topicPayloads = this.buildTopicPayloads(topics); this.connect(); if (this.options.groupId) { validateConfig('options.groupId', this.options.groupId); } }
n/a
function HighLevelProducer(client, options, customPartitioner) { BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.cyclic, customPartitioner); }
n/a
function BaseProducer(client, options, defaultPartitionerType, customPartitioner) { options = options || {}; this.ready = false; this.client = client; this.requireAcks = options.requireAcks === undefined ? DEFAULTS.requireAcks : options.requireAcks; this.ackTimeoutMs = options.ackTimeoutMs === undefined ? DEFAULTS.ackTimeoutMs : options.ackTimeoutMs; if (customPartitioner !== undefined && options.partitionerType !== PARTITIONER_TYPES.custom) { throw new Error('Partitioner Type must be custom if providing a customPartitioner.'); } else if (customPartitioner === undefined && options.partitionerType === PARTITIONER_TYPES.custom) { throw new Error('No customer partitioner defined'); } var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType]; // eslint-disable-next-line this.partitioner = new partitionerType(customPartitioner); this.connect(); }
n/a
IllegalGenerationError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
InvalidConfigError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
InvalidConsumerOffsetError = function (message, nested) { NestedError.apply(this, arguments); }
...
this.client.on('reconnect', function (lastError) {
self.fetch();
});
this.on('offsetOutOfRange', topic => {
this.pause();
if (this.options.outOfRangeOffset === 'none') {
this.emit('error', new errors.InvalidConsumerOffsetError(`Offset out of range
for topic "${topic.topic}" partition ${topic.partition}`));
return;
}
topic.time = ACCEPTED_FROM_OFFSET[this.options.outOfRangeOffset];
this.getOffset().fetch([topic], (error, result) => {
if (error) {
...
KafkaBuffer = function (batchSize, batchAge) { this._batch_size = batchSize; this._batch_age = batchAge; this._batch_age_timer = null; this._buffer = null; }
n/a
function KeyedMessage(key, value) { exports.Message.call(this, 0, 0, key, value); }
n/a
KeyedPartitioner = function () {}
n/a
NotCoordinatorForGroupError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
Offset = function (client) { var self = this; this.client = client; this.ready = this.client.ready; this.client.on('ready', function () { self.ready = true; self.emit('ready'); }); this.client.once('connect', function () { self.emit('connect'); }); this.client.on('error', function (err) { self.emit('error', err); }); }
...
* `cb`: *Function*, the callback
Example
```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.fetch([
{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }
], function (err, data) {
// data
// { 't': { '0': [999] } }
});
```
...
function Producer(client, options, customPartitioner) { BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.default, customPartitioner); }
n/a
RandomPartitioner = function () {}
n/a
RebalanceInProgressError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
TopicsNotExistError = function (topics) { Error.captureStackTrace(this, this); this.topics = topics; this.message = 'The topic(s) ' + topics.toString() + ' do not exist'; }
...
*/
Client.prototype.topicExists = function (topics, cb) {
var notExistsTopics = [];
var self = this;
async.each(topics, checkZK, function (err) {
if (err) return cb(err);
if (notExistsTopics.length) return cb(new errors.TopicsNotExistError(notExistsTopics
));
cb();
});
function checkZK (topic, cb) {
self.zk.topicExists(topic, function (err, existed, topic) {
if (err) return cb(err);
if (!existed) notExistsTopics.push(topic);
...
UnknownMemberIdError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
function ConsumerGroupMigrator(consumerGroup) { EventEmitter.call(this); assert(consumerGroup); const self = this; this.consumerGroup = consumerGroup; this.client = consumerGroup.client; var verified = 0; if (consumerGroup.options.migrateRolling) { this.zk = zookeeper.createClient(consumerGroup.client.connectionString, {retries: 10}); this.zk.on('connected', function () { self.filterByExistingZkTopics(function (error, topics) { if (error) { return self.emit('error', error); } if (topics.length) { self.checkForOwnersAndListenForChange(topics); } else { logger.debug('No HLC topics exist in zookeeper.'); self.connectConsumerGroup(); } }); }); this.on('noOwnersForTopics', function (topics) { logger.debug('No owners for topics %s reported.', topics); if (++verified <= NUMER_OF_TIMES_TO_VERIFY) { logger.debug('%s verify %d of %d HLC has given up ownership by checking again in %d', self.client.clientId, verified, NUMER_OF_TIMES_TO_VERIFY, VERIFY_WAIT_TIME_MS); setTimeout(function () { self.checkForOwners(topics); }, VERIFY_WAIT_TIME_MS); } else { self.connectConsumerGroup(); } }); this.on('topicOwnerChange', _.debounce(function (topics) { verified = 0; self.checkForOwnersAndListenForChange(topics); }, 250)); this.zk.connect(); } else { this.connectConsumerGroup(); } }
n/a
function ConsumerGroupRecovery(consumerGroup) { this.consumerGroup = consumerGroup; this.options = consumerGroup.options; }
n/a
BrokerNotAvailableError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
...
requestArgs.unshift(this.clientId, correlationId);
var request = encode.apply(null, requestArgs);
var broker = this.brokerForLeader(this.coordinatorId);
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [decode, cb]);
broker.write(request);
};
Client.prototype.sendGroupCoordinatorRequest = function (groupId, cb) {
...
function Error() { [native code] }
n/a
BrokerReadable = function (options) { Readable.call(this, options); }
n/a
function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); this._readableState = new ReadableState(options, this); // legacy this.readable = true; if (options && typeof options.read === 'function') this._read = options.read; Stream.call(this); }
n/a
_read = function (size) {}
n/a
BrokerTransform = function (options) { Transform.call(this, options); this.noAckBatchSize = options ? options.noAckBatchSize : null; this.noAckBatchAge = options ? options.noAckBatchAge : null; this._KafkaBuffer = new KafkaBuffer(this.noAckBatchSize, this.noAckBatchAge); }
n/a
function Transform(options) { if (!(this instanceof Transform)) return new Transform(options); Duplex.call(this, options); this._transformState = new TransformState(this); var stream = this; // start out asking for a readable event once data is transformed. this._readableState.needReadable = true; // we have implemented the _read method, and done the other things // that Readable wants before the first _read call, so unset the // sync guard flag. this._readableState.sync = false; if (options) { if (typeof options.transform === 'function') this._transform = options.transform; if (typeof options.flush === 'function') this._flush = options.flush; } // When the writable side finishes, then flush out anything remaining. this.once('prefinish', function() { if (typeof this._flush === 'function') this._flush(function(er) { done(stream, er); }); else done(stream); }); }
n/a
_transform = function (chunk, enc, done) { this._KafkaBuffer.addChunk(chunk, this._transformNext.bind(this)); done(); }
n/a
_transformNext = function () { this.push(this._KafkaBuffer.getBatch()); this._KafkaBuffer.truncateBatch(); }
n/a
BrokerWrapper = function (socket, noAckBatchOptions) { this.socket = socket; var self = this; var readable = new BrokerReadable(); var transform = new BrokerTransform(noAckBatchOptions); readable.pipe(transform); transform.on('readable', function () { var bulkMessage = null; while (bulkMessage = transform.read()) { // eslint-disable-line no-cond-assign self.socket.write(bulkMessage); } }); this.readableSocket = readable; }
n/a
write = function (buffer) { this.socket.write(buffer); }
...
var broker = this.brokerForLeader(this.coordinatorId);
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [decode, cb]);
broker.write(request);
};
Client.prototype.sendGroupCoordinatorRequest = function (groupId, cb) {
this.sendGroupRequest(protocol.encodeGroupCoordinatorRequest, protocol.decodeGroupCoordinatorResponse, arguments);
};
Client.prototype.sendJoinGroupRequest = function (groupId, memberId, sessionTimeout, groupProtocol, cb) {
...
writeAsync = function (buffer) { this.readableSocket.push(buffer); }
...
if (longpolling) {
if (broker.socket.waiting) continue;
broker.socket.waiting = true;
}
if (decoder.requireAcks === 0) {
broker.writeAsync(request);
cb(null, { result: 'no ack' });
} else {
this.queueCallback(broker.socket, correlationId, [decoder, cb]);
broker.write(request);
}
}
};
...
Client = function (connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions) { if (this instanceof Client === false) { return new Client(connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions); } this.sslOptions = sslOptions; this.ssl = !!sslOptions; if (clientId) { validateConfig('clientId', clientId); } this.connectionString = connectionString || 'localhost:2181/'; this.clientId = clientId || 'kafka-node-client'; this.zkOptions = zkOptions; this.noAckBatchOptions = noAckBatchOptions; this.brokers = {}; this.longpollingBrokers = {}; this.topicMetadata = {}; this.topicPartitions = {}; this.correlationId = 0; this._socketId = 0; this.cbqueue = {}; this.brokerMetadata = {}; this.ready = false; this.connect(); }
...
partitionerType: 2
}
```
``` js
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
```
### Events
- `ready`: this event is emitted when producer is ready to send messages.
- `error`: this is the error event propagates from internal client, producer should always listen it.
...
function EventEmitter() { EventEmitter.init.call(this); }
n/a
addTopics = function (topics, cb) { var self = this; this.topicExists(topics, function (err) { if (err) return cb(err); self.loadMetadataForTopics(topics, function (err, resp) { if (err) return cb(err); self.updateMetadatas(resp); cb(null, topics); }); }); }
...
* `topics`: **Array**, array of topics to add
* `cb`: **Function**,the callback
* `fromOffset`: **Boolean**, if true, the consumer will fetch message from the specified offset, otherwise it will fetch message
from the last commited offset of the topic.
Example:
``` js
consumer.addTopics(['t1', 't2'], function (err, added) {
});
or
consumer.addTopics([{ topic: 't1', offset: 10 }], function (err, added) {
}, true);
```
...
brokerForLeader = function (leader, longpolling) { var addr; var brokers = this.getBrokers(longpolling); // If leader is not give, choose the first broker as leader if (typeof leader === 'undefined') { if (!_.isEmpty(brokers)) { addr = Object.keys(brokers)[0]; return brokers[addr]; } else if (!_.isEmpty(this.brokerMetadata)) { leader = Object.keys(this.brokerMetadata)[0]; } else { return; } } var broker = _.find(this.brokerProfiles, {id: leader}); if (!broker) { return; } addr = broker.host + ':' + broker.port; return brokers[addr] || this.setupBroker(broker.host, broker.port, longpolling, brokers); }
...
requestArgs = _.values(requestArgs);
var cb = requestArgs.pop();
var correlationId = this.nextId();
requestArgs.unshift(this.clientId, correlationId);
var request = encode.apply(null, requestArgs);
var broker = this.brokerForLeader(this.coordinatorId);
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [decode, cb]);
broker.write(request);
...
checkMetadatas = function (payloads) { if (_.isEmpty(this.topicMetadata)) return [ [], payloads ]; // out: [ [metadata exists], [metadata not exists] ] var out = [ [], [] ]; payloads.forEach(function (p) { if (this.hasMetadata(p.topic, p.partition)) out[0].push(p); else out[1].push(p); }.bind(this)); return out; }
...
}
};
Client.prototype.send = function (payloads, encoder, decoder, cb) {
var self = this;
var _payloads = payloads;
// payloads: [ [metadata exists], [metadata not exists] ]
payloads = this.checkMetadatas(payloads);
if (payloads[0].length && !payloads[1].length) {
this.sendToBroker(_.flatten(payloads), encoder, decoder, cb);
return;
}
if (payloads[1].length) {
var topicNames = payloads[1].map(function (p) { return p.topic; });
this.loadMetadataForTopics(topicNames, function (err, resp) {
...
clearCallbackQueue = function (socket, error) { var socketId = socket.socketId; var longpolling = socket.longpolling; if (!this.cbqueue.hasOwnProperty(socketId)) { return; } var queue = this.cbqueue[socketId]; if (!longpolling) { Object.keys(queue).forEach(function (key) { var handlers = queue[key]; var cb = handlers[1]; cb(error); }); } delete this.cbqueue[socketId]; }
...
socket.on('error', function (err) {
this.error = err;
self.emit('error', err);
});
socket.on('close', function (hadError) {
self.emit('close', this);
if (hadError && this.error) {
self.clearCallbackQueue(this, this.error);
} else {
self.clearCallbackQueue(this, new errors.BrokerNotAvailableError('Broker not available'));
}
retry(this);
});
socket.on('end', function () {
retry(this);
...
close = function (cb) { this.closeBrokers(this.brokers); this.closeBrokers(this.longpollingBrokers); this.zk.close(); cb && cb(); }
...
### close(force, cb)
* `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false`
Example
```js
consumer.close(true, cb);
consumer.close(cb); //force is disabled
```
## HighLevelConsumer
⚠️ ***This consumer has been deprecated in the latest version of Kafka (0.10.1) and is likely to be removed in the future. Please
use the ConsumerGroup instead.***
### HighLevelConsumer(client, payloads, options)
...
closeBrokers = function (brokers) { _.each(brokers, function (broker) { broker.socket.closing = true; broker.socket.end(); }); }
...
self.brokerProfiles[addr] = brokerProfile;
self.brokerProfiles[addr].id = key;
});
};
Client.prototype.close = function (cb) {
this.closeBrokers(this.brokers);
this.closeBrokers(this.longpollingBrokers);
this.zk.close();
cb && cb();
};
Client.prototype.closeBrokers = function (brokers) {
_.each(brokers, function (broker) {
...
connect = function () { var zk = this.zk = new Zookeeper(this.connectionString, this.zkOptions); var self = this; zk.once('init', function (brokers) { try { self.ready = true; self.brokerMetadata = brokers; self.setupBrokerProfiles(brokers); Object .keys(self.brokerProfiles) .some(function (key, index) { var broker = self.brokerProfiles[key]; self.setupBroker(broker.host, broker.port, false, self.brokers); // Only connect one broker return !index; }); self.emit('ready'); } catch (error) { self.ready = false; self.emit('error', error); } }); zk.on('brokersChanged', function (brokerMetadata) { try { self.brokerMetadata = brokerMetadata; logger.debug('brokersChanged', brokerMetadata); self.setupBrokerProfiles(brokerMetadata); self.refreshBrokers(); // Emit after a 3 seconds setTimeout(function () { self.emit('brokersChanged'); }, 3000); } catch (error) { self.emit('error', error); } }); zk.once('disconnected', function () { if (!zk.closed) { zk.close(); self.connect(); self.emit('zkReconnect'); } }); zk.on('error', function (err) { self.emit('error', err); }); }
...
}
var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.connect();
}
util.inherits(BaseProducer, events.EventEmitter);
BaseProducer.prototype.connect = function () {
// emiter...
var self = this;
...
createBroker = function (host, port, longpolling) { var self = this; var socket; if (self.ssl) { socket = tls.connect(port, host, self.sslOptions); } else { socket = net.createConnection(port, host); } socket.addr = host + ':' + port; socket.host = host; socket.port = port; socket.socketId = this.nextSocketId(); if (longpolling) socket.longpolling = true; socket.on('connect', function () { var lastError = this.error; this.error = null; if (lastError) { this.waiting = false; self.emit('reconnect'); } else { self.emit('connect'); } }); socket.on('error', function (err) { this.error = err; self.emit('error', err); }); socket.on('close', function (hadError) { self.emit('close', this); if (hadError && this.error) { self.clearCallbackQueue(this, this.error); } else { self.clearCallbackQueue(this, new errors.BrokerNotAvailableError('Broker not available')); } retry(this); }); socket.on('end', function () { retry(this); }); socket.buffer = new Buffer([]); socket.on('data', function (data) { this.buffer = Buffer.concat([this.buffer, data]); self.handleReceivedData(this); }); socket.setKeepAlive(true, 60000); function retry (s) { if (s.retrying || s.closing) return; s.retrying = true; s.retryTimer = setTimeout(function () { if (s.closing) return; self.reconnectBroker(s); }, 1000); } return new BrokerWrapper(socket, this.noAckBatchOptions); }
...
Client.prototype.getBrokers = function (longpolling) {
return longpolling ? this.longpollingBrokers : this.brokers;
};
Client.prototype.setupBroker = function (host, port, longpolling, brokers) {
var brokerKey = host + ':' + port;
brokers[brokerKey] = this.createBroker(host, port, longpolling);
return brokers[brokerKey];
};
Client.prototype.createBroker = function (host, port, longpolling) {
var self = this;
var socket;
if (self.ssl) {
...
createTopics = function (topics, isAsync, cb) { topics = typeof topics === 'string' ? [topics] : topics; if (typeof isAsync === 'function' && typeof cb === 'undefined') { cb = isAsync; isAsync = true; } try { validateKafkaTopics(topics); } catch (e) { if (isAsync) return cb(e); throw e; } cb = _.once(cb); const getTopicsFromKafka = (topics, callback) => { this.loadMetadataForTopics(topics, function (error, resp) { if (error) { return callback(error); } callback(null, Object.keys(resp[1].metadata)); }); }; const operation = retry.operation({ minTimeout: 200, maxTimeout: 2000 }); operation.attempt(currentAttempt => { logger.debug('create topics currentAttempt', currentAttempt); getTopicsFromKafka(topics, function (error, kafkaTopics) { if (error) { if (operation.retry(error)) { return; } } logger.debug('kafka reported topics', kafkaTopics); const left = _.difference(topics, kafkaTopics); if (left.length === 0) { logger.debug(`Topics created ${kafkaTopics}`); return cb(null, kafkaTopics); } logger.debug(`Topics left ${left.join(', ')}`); if (!operation.retry(new Error(`Topics not created ${left}`))) { cb(operation.mainError()); } }); }); if (!isAsync) { cb(null); } }
...
``` js
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
// Create topics sync
producer.createTopics(['t','t1'], false, function (err, data) {
console.log(data);
});
// Create topics async
producer.createTopics(['t'], true, function (err, data) {});
producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg
```
...
getBrokers = function (longpolling) { return longpolling ? this.longpollingBrokers : this.brokers; }
...
Client.prototype.leaderByPartition = function (topic, partition) {
var topicMetadata = this.topicMetadata;
return topicMetadata[topic] && topicMetadata[topic][partition] && topicMetadata[topic][partition].leader;
};
Client.prototype.brokerForLeader = function (leader, longpolling) {
var addr;
var brokers = this.getBrokers(longpolling);
// If leader is not give, choose the first broker as leader
if (typeof leader === 'undefined') {
if (!_.isEmpty(brokers)) {
addr = Object.keys(brokers)[0];
return brokers[addr];
} else if (!_.isEmpty(this.brokerMetadata)) {
leader = Object.keys(this.brokerMetadata)[0];
...
handleReceivedData = function (socket) { var vars = Binary.parse(socket.buffer).word32bu('size').word32bu('correlationId').vars; var size = vars.size + 4; var correlationId = vars.correlationId; if (socket.buffer.length >= size) { var resp = socket.buffer.slice(0, size); var handlers = this.unqueueCallback(socket, correlationId); if (!handlers) return; var decoder = handlers[0]; var cb = handlers[1]; var result = decoder(resp); (result instanceof Error) ? cb.call(this, result) : cb.call(this, null, result); socket.buffer = socket.buffer.slice(size); if (socket.longpolling) socket.waiting = false; } else { return; } if (socket.buffer.length) { setImmediate(function () { this.handleReceivedData(socket); }.bind(this)); } }
...
});
socket.on('end', function () {
retry(this);
});
socket.buffer = new Buffer([]);
socket.on('data', function (data) {
this.buffer = Buffer.concat([this.buffer, data]);
self.handleReceivedData(this);
});
socket.setKeepAlive(true, 60000);
function retry (s) {
if (s.retrying || s.closing) return;
s.retrying = true;
s.retryTimer = setTimeout(function () {
...
hasMetadata = function (topic, partition) { var brokerMetadata = this.brokerMetadata; var leader = this.leaderByPartition(topic, partition); return (leader !== undefined) && brokerMetadata[leader]; }
...
};
Client.prototype.checkMetadatas = function (payloads) {
if (_.isEmpty(this.topicMetadata)) return [ [], payloads ];
// out: [ [metadata exists], [metadata not exists] ]
var out = [ [], [] ];
payloads.forEach(function (p) {
if (this.hasMetadata(p.topic, p.partition)) out[0].push(p);
else out[1].push(p);
}.bind(this));
return out;
};
Client.prototype.hasMetadata = function (topic, partition) {
var brokerMetadata = this.brokerMetadata;
...
leaderByPartition = function (topic, partition) { var topicMetadata = this.topicMetadata; return topicMetadata[topic] && topicMetadata[topic][partition] && topicMetadata[topic][partition].leader; }
...
else out[1].push(p);
}.bind(this));
return out;
};
Client.prototype.hasMetadata = function (topic, partition) {
var brokerMetadata = this.brokerMetadata;
var leader = this.leaderByPartition(topic, partition);
return (leader !== undefined) && brokerMetadata[leader];
};
Client.prototype.updateMetadatas = function (metadatas) {
// _.extend(this.brokerMetadata, metadatas[0])
_.extend(this.topicMetadata, metadatas[1].metadata);
...
loadMetadataForTopics = function (topics, cb) { var correlationId = this.nextId(); var request = protocol.encodeMetadataRequest(this.clientId, correlationId, topics); var broker = this.brokerForLeader(); if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) { return cb(new errors.BrokerNotAvailableError('Broker not available')); } this.queueCallback(broker.socket, correlationId, [protocol.decodeMetadataResponse, cb]); broker.write(request); }
...
## How do I get a list of all topics?
Call `client.loadMetadataForTopics` with a blank topic array to get the entire list of available topics (and available brokers).
```js
client.once('connect', function () {
client.loadMetadataForTopics([], function (error, results) {
if (error) {
return console.error(error);
}
console.log('%j', _.get(results, '1.metadata'));
});
});
```
...
nextId = function () { if (this.correlationId >= MAX_INT32) { this.correlationId = 0; } return this.correlationId++; }
...
var decoder = protocol.decodeOffsetResponse;
this.send(payloads, encoder, decoder, cb);
};
Client.prototype.sendGroupRequest = function (encode, decode, requestArgs) {
requestArgs = _.values(requestArgs);
var cb = requestArgs.pop();
var correlationId = this.nextId();
requestArgs.unshift(this.clientId, correlationId);
var request = encode.apply(null, requestArgs);
var broker = this.brokerForLeader(this.coordinatorId);
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
...
nextSocketId = function () { return this._socketId++; }
...
socket = tls.connect(port, host, self.sslOptions);
} else {
socket = net.createConnection(port, host);
}
socket.addr = host + ':' + port;
socket.host = host;
socket.port = port;
socket.socketId = this.nextSocketId();
if (longpolling) socket.longpolling = true;
socket.on('connect', function () {
var lastError = this.error;
this.error = null;
if (lastError) {
this.waiting = false;
...
payloadsByLeader = function (payloads) { return payloads.reduce(function (out, p) { var leader = this.leaderByPartition(p.topic, p.partition); out[leader] = out[leader] || []; out[leader].push(p); return out; }.bind(this), {}); }
...
self.sendToBroker(payloads[1].concat(payloads[0]), encoder, decoder, cb);
});
}
};
Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) {
var longpolling = encoder.name === 'encodeFetchRequest';
payloads = this.payloadsByLeader(payloads);
if (!longpolling) {
cb = wrap(payloads, cb);
}
for (var leader in payloads) {
if (!payloads.hasOwnProperty(leader)) {
continue;
}
...
queueCallback = function (socket, id, data) { var socketId = socket.socketId; var queue; if (this.cbqueue.hasOwnProperty(socketId)) { queue = this.cbqueue[socketId]; } else { queue = {}; this.cbqueue[socketId] = queue; } queue[id] = data; }
...
var request = encode.apply(null, requestArgs);
var broker = this.brokerForLeader(this.coordinatorId);
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [decode, cb]);
broker.write(request);
};
Client.prototype.sendGroupCoordinatorRequest = function (groupId, cb) {
this.sendGroupRequest(protocol.encodeGroupCoordinatorRequest, protocol.decodeGroupCoordinatorResponse, arguments);
};
...
reconnectBroker = function (oldSocket) { oldSocket.retrying = false; if (oldSocket.error) { oldSocket.destroy(); } var brokers = this.getBrokers(oldSocket.longpolling); var newBroker = this.setupBroker(oldSocket.host, oldSocket.port, oldSocket.longpolling, brokers); newBroker.socket.error = oldSocket.error; }
...
socket.setKeepAlive(true, 60000);
function retry (s) {
if (s.retrying || s.closing) return;
s.retrying = true;
s.retryTimer = setTimeout(function () {
if (s.closing) return;
self.reconnectBroker(s);
}, 1000);
}
return new BrokerWrapper(socket, this.noAckBatchOptions);
};
Client.prototype.reconnectBroker = function (oldSocket) {
oldSocket.retrying = false;
...
refreshBrokers = function () { var self = this; var validBrokers = Object.keys(this.brokerProfiles); function closeDeadBrokers (brokers) { var deadBrokerKeys = _.difference(Object.keys(brokers), validBrokers); if (deadBrokerKeys.length) { self.closeBrokers(deadBrokerKeys.map(function (key) { var broker = brokers[key]; delete brokers[key]; return broker; })); } } closeDeadBrokers(this.brokers); closeDeadBrokers(this.longpollingBrokers); }
...
}
});
zk.on('brokersChanged', function (brokerMetadata) {
try {
self.brokerMetadata = brokerMetadata;
logger.debug('brokersChanged', brokerMetadata);
self.setupBrokerProfiles(brokerMetadata);
self.refreshBrokers();
// Emit after a 3 seconds
setTimeout(function () {
self.emit('brokersChanged');
}, 3000);
} catch (error) {
self.emit('error', error);
}
...
refreshMetadata = function (topicNames, cb) { var self = this; if (!topicNames.length) return cb(); attemptRequestMetadata(topicNames, cb); function attemptRequestMetadata (topics, cb) { var operation = retry.operation({ minTimeout: 200, maxTimeout: 1000 }); operation.attempt(function (currentAttempt) { logger.debug('refresh metadata currentAttempt', currentAttempt); self.loadMetadataForTopics(topics, function (err, resp) { err = err || resp[1].error; if (operation.retry(err)) { return; } if (err) { logger.debug('refresh metadata error', err.message); return cb(err); } self.updateMetadatas(resp); cb(); }); }); } }
...
Error:
```
BrokerNotAvailableError: Could not find the leader
```
Call `client.refreshMetadata()` before sending the first message. Reference issue [#354
](https://github.com/SOHU-Co/kafka-node/issues/354)
## How do I debug an issue?
This module uses the [debug module](https://github.com/visionmedia/debug) so you can just run below before starting your app.
```bash
...
removeTopicMetadata = function (topics, cb) { topics.forEach(function (t) { if (this.topicMetadata[t]) delete this.topicMetadata[t]; }.bind(this)); cb(null, topics.length); }
...
Consumer.prototype.removeTopics = function (topics, cb) {
topics = typeof topics === 'string' ? [topics] : topics;
this.payloads = this.payloads.filter(function (p) {
return !~topics.indexOf(p.topic);
});
this.client.removeTopicMetadata(topics, cb);
};
Consumer.prototype.close = function (force, cb) {
this.ready = false;
if (typeof force === 'function') {
cb = force;
force = false;
...
send = function (payloads, encoder, decoder, cb) { var self = this; var _payloads = payloads; // payloads: [ [metadata exists], [metadata not exists] ] payloads = this.checkMetadatas(payloads); if (payloads[0].length && !payloads[1].length) { this.sendToBroker(_.flatten(payloads), encoder, decoder, cb); return; } if (payloads[1].length) { var topicNames = payloads[1].map(function (p) { return p.topic; }); this.loadMetadataForTopics(topicNames, function (err, resp) { if (err) { return cb(err); } var error = resp[1].error; if (error) { return cb(error); } self.updateMetadatas(resp); // check payloads again payloads = self.checkMetadatas(_payloads); if (payloads[1].length) { return cb(new errors.BrokerNotAvailableError('Could not find the leader')); } self.sendToBroker(payloads[1].concat(payloads[0]), encoder, decoder, cb); }); } }
...
producer = new Producer(client),
km = new KeyedMessage('key', 'message'),
payloads = [
{ topic: 'topic1', messages: 'hi', partition: 0 },
{ topic: 'topic2', messages: ['hello', 'world', km] }
];
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
producer.on('error', function (err) {})
```
> ⚠️**WARNING**: Batch multiple messages of the same topic/partition together as an array on the `messages` attribute otherwise
you may lose messages!
...
sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) { var self = this; var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes); var decoder = protocol.decodeFetchResponse(function (err, type, message) { if (err) { if (err.message === 'OffsetOutOfRange') { return consumer.emit('offsetOutOfRange', err); } else if (err.message === 'NotLeaderForPartition' || err.message === 'UnknownTopicOrPartition') { return self.emit('brokersChanged'); } return consumer.emit('error', err); } var encoding = consumer.options.encoding; if (type === 'message') { if (encoding !== 'buffer' && message.value) { message.value = message.value.toString(encoding); } consumer.emit('message', message); } else { consumer.emit('done', message); } }, maxTickMessages); this.send(payloads, encoder, decoder, function (err) { if (err) { Array.prototype.unshift.call(arguments, 'error'); consumer.emit.apply(consumer, arguments); } }); }
...
cb(null, 'Nothing to be committed');
}
}
Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit;
Consumer.prototype.fetch = function () {
if (!this.ready || this.paused) return;
this.client.sendFetchRequest(this, this.payloads, this.options.fetchMaxWaitMs, this
.options.fetchMinBytes);
};
Consumer.prototype.fetchOffset = function (payloads, cb) {
this.client.sendOffsetFetchRequest(this.options.groupId, payloads, cb);
};
Consumer.prototype.addTopics = function (topics, cb, fromOffset) {
...
sendGroupCoordinatorRequest = function (groupId, cb) { this.sendGroupRequest(protocol.encodeGroupCoordinatorRequest, protocol.decodeGroupCoordinatorResponse, arguments); }
...
this.emit('rebalancing');
async.waterfall([
function (callback) {
if (self.client.coordinatorId) {
return callback(null, null);
}
self.client.sendGroupCoordinatorRequest(self.options.groupId, callback);
},
function (coordinatorInfo, callback) {
logger.debug('GroupCoordinator Response:', coordinatorInfo);
if (coordinatorInfo) {
self.setCoordinatorId(coordinatorInfo.coordinatorId);
}
...
sendGroupRequest = function (encode, decode, requestArgs) { requestArgs = _.values(requestArgs); var cb = requestArgs.pop(); var correlationId = this.nextId(); requestArgs.unshift(this.clientId, correlationId); var request = encode.apply(null, requestArgs); var broker = this.brokerForLeader(this.coordinatorId); if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) { return cb(new errors.BrokerNotAvailableError('Broker not available')); } this.queueCallback(broker.socket, correlationId, [decode, cb]); broker.write(request); }
...
var decoder = protocol.decodeOffsetCommitResponse;
this.send(payloads, encoder, decoder, cb);
};
Client.prototype.sendOffsetCommitV2Request = function (group, generationId, memberId, payloads, cb) {
var encoder = protocol.encodeOffsetCommitV2Request;
var decoder = protocol.decodeOffsetCommitResponse;
this.sendGroupRequest(encoder, decoder, arguments);
};
Client.prototype.sendOffsetFetchV1Request = function (group, payloads, cb) {
var encoder = protocol.encodeOffsetFetchV1Request;
var decoder = protocol.decodeOffsetFetchV1Response;
this.sendGroupRequest(encoder, decoder, arguments);
};
...
sendHeartbeatRequest = function (groupId, generationId, memberId, cb) { this.sendGroupRequest(protocol.encodeGroupHeartbeat, protocol.decodeGroupHeartbeat, arguments); }
...
constructor (client, handler) {
this.client = client;
this.handler = handler;
this.pending = true;
}
send (groupId, generationId, memberId) {
this.client.sendHeartbeatRequest(groupId, generationId, memberId, (error) => {
if (this.canceled) {
logger.debug('heartbeat yielded after being canceled', error);
return;
}
this.pending = false;
this.handler(error);
});
...
sendJoinGroupRequest = function (groupId, memberId, sessionTimeout, groupProtocol, cb) { this.sendGroupRequest(protocol.encodeJoinGroupRequest, protocol.decodeJoinGroupResponse, arguments); }
...
},
function (coordinatorInfo, callback) {
logger.debug('GroupCoordinator Response:', coordinatorInfo);
if (coordinatorInfo) {
self.setCoordinatorId(coordinatorInfo.coordinatorId);
}
self.client.sendJoinGroupRequest(self.options.groupId, emptyStrIfNull(self.memberId
), self.options.sessionTimeout, self.protocols, callback);
},
function (joinGroupResponse, callback) {
self.handleJoinGroup(joinGroupResponse, callback);
},
function (groupAssignment, callback) {
...
sendLeaveGroupRequest = function (groupId, memberId, cb) { this.sendGroupRequest(protocol.encodeLeaveGroupRequest, protocol.decodeLeaveGroupResponse, arguments); }
...
};
ConsumerGroup.prototype.leaveGroup = function (callback) {
logger.debug('%s leaving group', this.client.clientId);
var self = this;
this.stopHeartbeats();
if (self.generationId != null && self.memberId) {
this.client.sendLeaveGroupRequest(this.options.groupId, this.memberId, function (
error) {
self.generationId = null;
callback(error);
});
} else {
callback(null);
}
};
...
sendOffsetCommitRequest = function (group, payloads, cb) { var encoder = protocol.encodeOffsetCommitRequest(group); var decoder = protocol.decodeOffsetCommitResponse; this.send(payloads, encoder, decoder, cb); }
...
}.bind(this), this.options.autoCommitIntervalMs);
var payloads = this.payloads;
if (this.pausedPayloads) payloads = payloads.concat(this.pausedPayloads);
var commits = payloads.filter(function (p) { return p.offset !== 0; });
if (commits.length) {
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
}
Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit;
Consumer.prototype.fetch = function () {
...
sendOffsetCommitV2Request = function (group, generationId, memberId, payloads, cb) { var encoder = protocol.encodeOffsetCommitV2Request; var decoder = protocol.decodeOffsetCommitResponse; this.sendGroupRequest(encoder, decoder, arguments); }
...
ConsumerGroup.prototype.fetchOffset = function (payloads, cb) {
this.client.sendOffsetFetchV1Request(this.options.groupId, payloads, cb);
};
ConsumerGroup.prototype.sendOffsetCommitRequest = function (commits, cb) {
if (this.generationId && this.memberId) {
this.client.sendOffsetCommitV2Request(this.options.groupId, this.generationId, this
.memberId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
};
ConsumerGroup.prototype.close = function (force, cb) {
var self = this;
...
sendOffsetFetchRequest = function (group, payloads, cb) { var encoder = protocol.encodeOffsetFetchRequest(group); var decoder = protocol.decodeOffsetFetchResponse; this.send(payloads, encoder, decoder, cb); }
...
}
}
);
};
ConsumerGroupMigrator.prototype.saveHighLevelConsumerOffsets = function (topicPartitionList, callback) {
const self = this;
this.client.sendOffsetFetchRequest(this.consumerGroup.options.groupId, topicPartitionList
, function (error, results) {
logger.debug('sendOffsetFetchRequest response:', results, error);
if (error) {
return callback(error);
}
self.offsets = results;
callback(null);
});
...
sendOffsetFetchV1Request = function (group, payloads, cb) { var encoder = protocol.encodeOffsetFetchV1Request; var decoder = protocol.decodeOffsetFetchV1Response; this.sendGroupRequest(encoder, decoder, arguments); }
...
const heartbeat = new Heartbeat(this.client, heartbeatCallback);
heartbeat.send(this.options.groupId, this.generationId, this.memberId);
return heartbeat;
};
ConsumerGroup.prototype.fetchOffset = function (payloads, cb) {
this.client.sendOffsetFetchV1Request(this.options.groupId, payloads, cb);
};
ConsumerGroup.prototype.sendOffsetCommitRequest = function (commits, cb) {
if (this.generationId && this.memberId) {
this.client.sendOffsetCommitV2Request(this.options.groupId, this.generationId, this.memberId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
...
sendOffsetRequest = function (payloads, cb) { var encoder = protocol.encodeOffsetRequest; var decoder = protocol.decodeOffsetResponse; this.send(payloads, encoder, decoder, cb); }
...
HighLevelConsumer.prototype.fetchOffset = function (payloads, cb) {
logger.debug('in fetchOffset %s payloads: %j', this.id, payloads);
this.client.sendOffsetFetchRequest(this.options.groupId, payloads, cb);
};
HighLevelConsumer.prototype.offsetRequest = function (payloads, cb) {
this.client.sendOffsetRequest(payloads, cb);
};
/**
* Register a consumer against a group
*
* @param consumer to register
*
...
sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, cb) { var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs); var decoder = protocol.decodeProduceResponse; var self = this; decoder.requireAcks = requireAcks; async.each(payloads, buildRequest, function (err) { if (err) return cb(err); self.send(payloads, encoder, decoder, function (err, result) { if (err) { if (err.message === 'NotLeaderForPartition') { self.emit('brokersChanged'); } cb(err); } else { cb(null, result); } }); }); function buildRequest (payload, cb) { var attributes = payload.attributes; var codec = getCodec(attributes); if (!codec) return cb(); var innerSet = encodeMessageSet(payload.messages); codec.encode(innerSet, function (err, message) { if (err) return cb(err); payload.messages = [ new Message(0, attributes, '', message) ]; cb(); }); } }
...
* @param {BaseProducer~sendCallback} cb A function to call once the send has completed
*/
BaseProducer.prototype.send = function (payloads, cb) {
var client = this.client;
var requireAcks = this.requireAcks;
var ackTimeoutMs = this.ackTimeoutMs;
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks
, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic
], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
...
sendSyncGroupRequest = function (groupId, generationId, memberId, groupAssignment, cb) { this.sendGroupRequest(protocol.encodeSyncGroupRequest, protocol.decodeSyncGroupResponse, arguments); }
...
function (joinGroupResponse, callback) {
self.handleJoinGroup(joinGroupResponse, callback);
},
function (groupAssignment, callback) {
logger.debug('SyncGroup Request from %s', self.memberId);
self.client.sendSyncGroupRequest(self.options.groupId, self.generationId, self.memberId
, groupAssignment, callback);
},
function (syncGroupResponse, callback) {
self.handleSyncGroup(syncGroupResponse, callback);
}
], function (error, startFetch) {
self.connecting = false;
...
sendToBroker = function (payloads, encoder, decoder, cb) { var longpolling = encoder.name === 'encodeFetchRequest'; payloads = this.payloadsByLeader(payloads); if (!longpolling) { cb = wrap(payloads, cb); } for (var leader in payloads) { if (!payloads.hasOwnProperty(leader)) { continue; } var correlationId = this.nextId(); var request = encoder(this.clientId, correlationId, payloads[leader]); var broker = this.brokerForLeader(leader, longpolling); if (!broker || !broker.socket || broker.socket.error || broker.socket.closing || broker.socket.destroyed) { return cb(new errors.BrokerNotAvailableError('Could not find the leader'), payloads[leader]); } if (longpolling) { if (broker.socket.waiting) continue; broker.socket.waiting = true; } if (decoder.requireAcks === 0) { broker.writeAsync(request); cb(null, { result: 'no ack' }); } else { this.queueCallback(broker.socket, correlationId, [decoder, cb]); broker.write(request); } } }
...
Client.prototype.send = function (payloads, encoder, decoder, cb) {
var self = this;
var _payloads = payloads;
// payloads: [ [metadata exists], [metadata not exists] ]
payloads = this.checkMetadatas(payloads);
if (payloads[0].length && !payloads[1].length) {
this.sendToBroker(_.flatten(payloads), encoder, decoder, cb);
return;
}
if (payloads[1].length) {
var topicNames = payloads[1].map(function (p) { return p.topic; });
this.loadMetadataForTopics(topicNames, function (err, resp) {
if (err) {
return cb(err);
...
setupBroker = function (host, port, longpolling, brokers) { var brokerKey = host + ':' + port; brokers[brokerKey] = this.createBroker(host, port, longpolling); return brokers[brokerKey]; }
...
self.ready = true;
self.brokerMetadata = brokers;
self.setupBrokerProfiles(brokers);
Object
.keys(self.brokerProfiles)
.some(function (key, index) {
var broker = self.brokerProfiles[key];
self.setupBroker(broker.host, broker.port, false, self.brokers);
// Only connect one broker
return !index;
});
self.emit('ready');
} catch (error) {
self.ready = false;
self.emit('error', error);
...
setupBrokerProfiles = function (brokers) { this.brokerProfiles = Object.create(null); var self = this; var protocol = self.ssl ? 'ssl:' : 'plaintext:'; Object.keys(brokers).forEach(function (key) { var brokerProfile = brokers[key]; var addr; if (brokerProfile.endpoints && brokerProfile.endpoints.length) { var endpoint = _.find(brokerProfile.endpoints, function (endpoint) { return url.parse(endpoint).protocol === protocol; }); if (endpoint == null) { throw new Error(['No kafka endpoint found for broker: ', key, ' with protocol ', protocol].join('')); } var endpointUrl = url.parse(endpoint); addr = endpointUrl.hostname + ':' + endpointUrl.port; brokerProfile.host = endpointUrl.hostname; brokerProfile.port = endpointUrl.port; } else { addr = brokerProfile.host + ':' + brokerProfile.port; } assert(brokerProfile.host && brokerProfile.port, 'kafka host or port is empty'); self.brokerProfiles[addr] = brokerProfile; self.brokerProfiles[addr].id = key; }); }
...
Client.prototype.connect = function () {
var zk = this.zk = new Zookeeper(this.connectionString, this.zkOptions);
var self = this;
zk.once('init', function (brokers) {
try {
self.ready = true;
self.brokerMetadata = brokers;
self.setupBrokerProfiles(brokers);
Object
.keys(self.brokerProfiles)
.some(function (key, index) {
var broker = self.brokerProfiles[key];
self.setupBroker(broker.host, broker.port, false, self.brokers);
// Only connect one broker
return !index;
...
topicExists = function (topics, cb) { var notExistsTopics = []; var self = this; async.each(topics, checkZK, function (err) { if (err) return cb(err); if (notExistsTopics.length) return cb(new errors.TopicsNotExistError(notExistsTopics)); cb(); }); function checkZK (topic, cb) { self.zk.topicExists(topic, function (err, existed, topic) { if (err) return cb(err); if (!existed) notExistsTopics.push(topic); cb(); }); } }
...
async.each(topics, checkZK, function (err) {
if (err) return cb(err);
if (notExistsTopics.length) return cb(new errors.TopicsNotExistError(notExistsTopics));
cb();
});
function checkZK (topic, cb) {
self.zk.topicExists(topic, function (err, existed, topic) {
if (err) return cb(err);
if (!existed) notExistsTopics.push(topic);
cb();
});
}
};
...
unqueueCallback = function (socket, id) { var socketId = socket.socketId; if (!this.cbqueue.hasOwnProperty(socketId)) { return null; } var queue = this.cbqueue[socketId]; if (!queue.hasOwnProperty(id)) { return null; } var result = queue[id]; // cleanup socket queue delete queue[id]; if (!Object.keys(queue).length) { delete this.cbqueue[socketId]; } return result; }
...
Client.prototype.handleReceivedData = function (socket) {
var vars = Binary.parse(socket.buffer).word32bu('size').word32bu('correlationId').vars;
var size = vars.size + 4;
var correlationId = vars.correlationId;
if (socket.buffer.length >= size) {
var resp = socket.buffer.slice(0, size);
var handlers = this.unqueueCallback(socket, correlationId);
if (!handlers) return;
var decoder = handlers[0];
var cb = handlers[1];
var result = decoder(resp);
(result instanceof Error)
? cb.call(this, result)
...
updateMetadatas = function (metadatas) { // _.extend(this.brokerMetadata, metadatas[0]) _.extend(this.topicMetadata, metadatas[1].metadata); for (var topic in this.topicMetadata) { if (!this.topicMetadata.hasOwnProperty(topic)) { continue; } this.topicPartitions[topic] = Object.keys(this.topicMetadata[topic]).map(function (val) { return parseInt(val, 10); }); } }
...
Client.prototype.addTopics = function (topics, cb) {
var self = this;
this.topicExists(topics, function (err) {
if (err) return cb(err);
self.loadMetadataForTopics(topics, function (err, resp) {
if (err) return cb(err);
self.updateMetadatas(resp);
cb(null, topics);
});
});
};
Client.prototype.nextId = function () {
if (this.correlationId >= MAX_INT32) {
...
Consumer = function (client, topics, options) { if (!topics) { throw new Error('Must have payloads'); } utils.validateTopics(topics); this.fetchCount = 0; this.client = client; this.options = _.defaults((options || {}), DEFAULTS); this.ready = false; this.paused = this.options.paused; this.id = nextId(); this.payloads = this.buildPayloads(topics); this.connect(); this.encoding = this.options.encoding; if (this.options.groupId) { utils.validateConfig('options.groupId', this.options.groupId); } }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
addTopics = function (topics, cb, fromOffset) { fromOffset = !!fromOffset; var self = this; if (!this.ready) { setTimeout(function () { self.addTopics(topics, cb, fromOffset); } , 100); return; } // The default is that the topics is a string array of topic names var topicNames = topics; // If the topics is actually an object and not string we assume it is an array of payloads if (typeof topics[0] === 'object') { topicNames = topics.map(function (p) { return p.topic; }); } this.client.addTopics( topicNames, function (err, added) { if (err) return cb && cb(err, added); var payloads = self.buildPayloads(topics); var reFetch = !self.payloads.length; if (fromOffset) { payloads.forEach(function (p) { self.payloads.push(p); }); if (reFetch) self.fetch(); cb && cb(null, added); return; } // update offset of topics that will be added self.fetchOffset(payloads, function (err, offsets) { if (err) return cb(err); payloads.forEach(function (p) { var offset = offsets[p.topic][p.partition]; if (offset === -1) offset = 0; p.offset = offset; self.payloads.push(p); }); if (reFetch) self.fetch(); cb && cb(null, added); }); } ); }
...
* `topics`: **Array**, array of topics to add
* `cb`: **Function**,the callback
* `fromOffset`: **Boolean**, if true, the consumer will fetch message from the specified offset, otherwise it will fetch message
from the last commited offset of the topic.
Example:
``` js
consumer.addTopics(['t1', 't2'], function (err, added) {
});
or
consumer.addTopics([{ topic: 't1', offset: 10 }], function (err, added) {
}, true);
```
...
function autoCommit(force, cb) { if (arguments.length === 1) { cb = force; force = false; } if (this.committing && !force) return cb(null, 'Offset committing'); this.committing = true; setTimeout(function () { this.committing = false; }.bind(this), this.options.autoCommitIntervalMs); var payloads = this.payloads; if (this.pausedPayloads) payloads = payloads.concat(this.pausedPayloads); var commits = payloads.filter(function (p) { return p.offset !== 0; }); if (commits.length) { this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb); } else { cb(null, 'Nothing to be committed'); } }
...
if (offset === -1) offset = 0;
if (!initing) p.offset = offset + 1;
else p.offset = offset;
}
});
if (this.options.autoCommit && !initing) {
this.autoCommit(false, function (err) {
err && logger.debug('auto commit offset', err);
});
}
};
function autoCommit (force, cb) {
if (arguments.length === 1) {
...
buildPayloads = function (payloads) { var self = this; return payloads.map(function (p) { if (typeof p !== 'object') p = { topic: p }; p.partition = p.partition || 0; p.offset = p.offset || 0; p.maxBytes = self.options.fetchMaxBytes; p.metadata = 'm'; // metadata can be arbitrary return p; }); }
...
* @param {BaseProducer~sendCallback} cb A function to call once the send has completed
*/
BaseProducer.prototype.send = function (payloads, cb) {
var client = this.client;
var requireAcks = this.requireAcks;
var ackTimeoutMs = this.ackTimeoutMs;
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks
, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic
], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
...
close = function (force, cb) { this.ready = false; if (typeof force === 'function') { cb = force; force = false; } if (force) { this.commit(force, function (err) { if (err) { return cb(err); } this.client.close(cb); }.bind(this)); } else { this.client.close(cb); } }
...
### close(force, cb)
* `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false`
Example
```js
consumer.close(true, cb);
consumer.close(cb); //force is disabled
```
## HighLevelConsumer
⚠️ ***This consumer has been deprecated in the latest version of Kafka (0.10.1) and is likely to be removed in the future. Please
use the ConsumerGroup instead.***
### HighLevelConsumer(client, payloads, options)
...
function autoCommit(force, cb) { if (arguments.length === 1) { cb = force; force = false; } if (this.committing && !force) return cb(null, 'Offset committing'); this.committing = true; setTimeout(function () { this.committing = false; }.bind(this), this.options.autoCommitIntervalMs); var payloads = this.payloads; if (this.pausedPayloads) payloads = payloads.concat(this.pausedPayloads); var commits = payloads.filter(function (p) { return p.offset !== 0; }); if (commits.length) { this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb); } else { cb(null, 'Nothing to be committed'); } }
...
Commit offset of the current topics manually, this method should be called when a consumer leaves
* `cb`: **Function**, the callback
Example:
``` js
consumer.commit(function(err, data) {
});
```
### setOffset(topic, partition, offset)
Set offset of the given topic
* `topic`: **String**
...
connect = function () { var self = this; // Client already exists this.ready = this.client.ready; if (this.ready) this.init(); this.client.on('ready', function () { logger.debug('consumer ready'); if (!self.ready) self.init(); self.ready = true; }); this.client.on('error', function (err) { logger.error('client error %s', err.message); self.emit('error', err); }); this.client.on('close', function () { logger.debug('connection closed'); }); this.client.on('brokersChanged', function () { var topicNames = self.payloads.map(function (p) { return p.topic; }); this.refreshMetadata(topicNames, function (err) { if (err) return self.emit('error', err); self.fetch(); }); }); // 'done' will be emit when a message fetch request complete this.on('done', function (topics) { self.updateOffsets(topics); setImmediate(function () { self.fetch(); }); }); }
...
}
var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.connect();
}
util.inherits(BaseProducer, events.EventEmitter);
BaseProducer.prototype.connect = function () {
// emiter...
var self = this;
...
fetch = function () { if (!this.ready || this.paused) return; this.client.sendFetchRequest(this, this.payloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes); }
...
Example
```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.fetch([
{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }
], function (err, data) {
// data
// { 't': { '0': [999] } }
});
```
...
fetchOffset = function (payloads, cb) { this.client.sendOffsetFetchRequest(this.options.groupId, payloads, cb); }
...
logger.debug('%s owns topics: ', self.client.clientId, syncGroupResponse.partitions);
const topicPartitionList = createTopicPartitionList(syncGroupResponse.partitions);
const useDefaultOffsets = self.options.fromOffset in ACCEPTED_FROM_OFFSET;
async.waterfall([
function (callback) {
self.fetchOffset(syncGroupResponse.partitions, callback);
},
function (offsets, callback) {
logger.debug('%s fetchOffset Response: %j', self.client.clientId, offsets);
var noOffset = topicPartitionList.some(function (tp) {
return offsets[tp.topic][tp.partition] === -1;
});
...
init = function () { if (!this.payloads.length) { return; } var self = this; var topics = self.payloads.map(function (p) { return p.topic; }); self.client.topicExists(topics, function (err) { if (err) { return self.emit('error', err); } if (self.options.fromOffset) { return self.fetch(); } self.fetchOffset(self.payloads, function (err, topics) { if (err) { return self.emit('error', err); } self.updateOffsets(topics, true); self.fetch(); }); }); }
...
});
};
Consumer.prototype.connect = function () {
var self = this;
// Client already exists
this.ready = this.client.ready;
if (this.ready) this.init();
this.client.on('ready', function () {
logger.debug('consumer ready');
if (!self.ready) self.init();
self.ready = true;
});
...
pause = function () { this.paused = true; }
...
} else if (!self.connecting) {
self.fetch();
}
});
}, 200);
this.client.on('brokersChanged', function () {
self.pause();
recoverFromBrokerChange();
});
this.client.on('reconnect', function (lastError) {
self.fetch();
});
...
pauseTopics = function (topics) { if (!this.pausedPayloads) this.pausedPayloads = []; pauseOrResume(this.payloads, this.pausedPayloads, topics); }
...
### resume()
Resume the consumer. Resumes the fetch loop.
### pauseTopics(topics)
Pause specify topics
```
consumer.pauseTopics([
'topic1',
{ topic: 'topic2', partition: 0 }
]);
```
### resumeTopics(topics)
Resume specify topics
...
removeTopics = function (topics, cb) { topics = typeof topics === 'string' ? [topics] : topics; this.payloads = this.payloads.filter(function (p) { return !~topics.indexOf(p.topic); }); this.client.removeTopicMetadata(topics, cb); }
...
### removeTopics(topics, cb)
* `topics`: **Array**, array of topics to remove
* `cb`: **Function**, the callback
Example:
``` js
consumer.removeTopics(['t1', 't2'], function (err, removed) {
});
```
### commit(cb)
Commit offset of the current topics manually, this method should be called when a consumer leaves
* `cb`: **Function**, the callback
...
resume = function () { this.paused = false; this.fetch(); }
...
}
const offset = _.head(result[topic.topic][topic.partition]);
const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset;
logger.debug('replacing %s-%s stale offset of %d with %d', topic.topic, topic.partition, oldOffset, offset);
this.setOffset(topic.topic, topic.partition, offset);
this.resume();
});
});
// 'done' will be emit when a message fetch request complete
this.on('done', function (topics) {
self.updateOffsets(topics);
if (!self.paused) {
...
resumeTopics = function (topics) { if (!this.pausedPayloads) this.pausedPayloads = []; var reFetch = !this.payloads.length; pauseOrResume(this.pausedPayloads, this.payloads, topics); reFetch = reFetch && this.payloads.length; if (reFetch) this.fetch(); }
...
]);
```
### resumeTopics(topics)
Resume specify topics
```
consumer.resumeTopics([
'topic1',
{ topic: 'topic2', partition: 0 }
]);
```
### close(force, cb)
* `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false`
...
setOffset = function (topic, partition, offset) { this.payloads.every(function (p) { if (p.topic === topic && p.partition == partition) { // eslint-disable-line eqeqeq p.offset = offset; return false; } return true; }); }
...
* `partition`: **Number**
* `offset`: **Number**
Example:
``` js
consumer.setOffset('topic', 0, 0);
```
### pause()
Pause the consumer. ***Calling `pause` does not automatically stop messages from being emitted.*** This is because pause just stops
the kafka consumer fetch loop. Each iteration of the fetch loop can obtain a batch of messages (limited by `fetchMaxBytes`).
### resume()
Resume the consumer. Resumes the fetch loop.
...
updateOffsets = function (topics, initing) { this.payloads.forEach(function (p) { if (!_.isEmpty(topics[p.topic]) && topics[p.topic][p.partition] !== undefined) { var offset = topics[p.topic][p.partition]; if (offset === -1) offset = 0; if (!initing) p.offset = offset + 1; else p.offset = offset; } }); if (this.options.autoCommit && !initing) { this.autoCommit(false, function (err) { err && logger.debug('auto commit offset', err); }); } }
...
this.setOffset(topic.topic, topic.partition, offset);
this.resume();
});
});
// 'done' will be emit when a message fetch request complete
this.on('done', function (topics) {
self.updateOffsets(topics);
if (!self.paused) {
setImmediate(function () {
self.fetch();
});
}
});
...
function ConsumerGroup(memberOptions, topics) { EventEmitter.call(this); const self = this; this.options = _.defaults((memberOptions || {}), DEFAULTS); if (!this.options.heartbeatInterval) { this.options.heartbeatInterval = Math.floor(this.options.sessionTimeout / 3); } if (memberOptions.ssl === true) { memberOptions.ssl = {}; } if (!(this.options.fromOffset in ACCEPTED_FROM_OFFSET)) { throw new Error(`fromOffset ${this.options.fromOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join(', ')}`); } if (!(this.options.outOfRangeOffset in ACCEPTED_FROM_OFFSET)) { throw new Error(`outOfRangeOffset ${this.options.outOfRangeOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join (', ')}`); } this.client = new Client(memberOptions.host, memberOptions.id, memberOptions.zk, memberOptions.batch, memberOptions.ssl); if (_.isString(topics)) { topics = [topics]; } assert(Array.isArray(topics), 'Array of topics is required'); this.topics = topics; this.recovery = new ConsumerGroupRecovery(this); this.setupProtocols(this.options.protocol); if (this.options.connectOnReady && !this.options.migrateHLC) { this.client.once('ready', this.connect.bind(this)); } if (this.options.migrateHLC) { const ConsumerGroupMigrator = require('./consumerGroupMigrator'); this.migrator = new ConsumerGroupMigrator(this); this.migrator.on('error', function (error) { self.emit('error', error); }); } this.client.on('error', function (err) { logger.error('Error from %s', self.client.clientId, err); self.emit('error', err); }); const recoverFromBrokerChange = _.debounce(function () { logger.debug('brokersChanged refreshing metadata'); self.client.refreshMetadata(self.topics, function (error) { if (error) { self.emit(error); return; } self.paused = false; if (!self.ready && !self.connecting) { if (self.reconnectTimer) { // brokers changed so bypass backoff retry and reconnect now clearTimeout(self.reconnectTimer); self.reconnectTimer = null; } self.connect(); } else if (!self.connecting) { self.fetch(); } }); }, 200); this.client.on('brokersChanged', function () { self.pause(); recoverFromBrokerChange(); }); this.client.on('reconnect', function (lastError) { self.fetch(); }); this.on('offsetOutOfRange', topic => { this.pause(); if (this.options.outOfRangeOffset === 'none') { this.emit('error', new errors.InvalidConsumerOffsetError(`Offset out of range for topic "${topic.topic}" partition ${topic .partition}`)); return; } topic.time = ACCEPTED_FROM_OFFSET[this.options.outOfRangeOffset]; this.getOffset().fetch([topic], (error, result) => { if (error) { this.emit('error', new errors.InvalidConsumerOffsetError(`Fetching ${this.options.outOfRangeOffset} offset failed`, error )); return; } const offset = _.head(result[topic.topic][topic.partition]); const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset; logger.debug('replacing %s-%s stale offset of %d with %d', topic.topic, topic.partition, oldOffset, offset); this.setOffset(topic.topic, topic.partition, offset); this.resume(); }); }); // 'done' will be emit when a message fetch request complete this.on('done', function (topics) { self.updateOffsets(topics); if (!self.paused) { setImmediate(function () { self.fetch(); }); } }); if (this.options.groupId) { validateConfig('options.groupId', this.options.groupId); } this.isLeader = false; this.coordinatorId = null; this.generationId = null; this.ready = false; this.topicPayloads = []; }
n/a
super_ = function (client, topics, options) { if (!topics) { throw new Error('Must have payloads'); } this.fetchCount = 0; this.client = client; this.options = _.defaults((options || {}), DEFAULTS); this.initialised = false; this.ready = false; this.closing = false; this.paused = this.options.paused; this.rebalancing = false; this.pendingRebalances = 0; this.committing = false; this.needToCommit = false; this.id = this.options.id || this.options.groupId + '_' + uuid.v4(); this.payloads = this.buildPayloads(topics); this.topicPayloads = this.buildTopicPayloads(topics); this.connect(); if (this.options.groupId) { validateConfig('options.groupId', this.options.groupId); } }
n/a
assignPartitions = function (protocol, groupMembers, callback) { logger.debug('Assigning Partitions to members', groupMembers); logger.debug('Using group protocol', protocol); protocol = _.find(this.protocols, {name: protocol}); var self = this; var topics = _(groupMembers).map('subscription').flatten().uniq().value(); async.waterfall([ function (callback) { logger.debug('loadingMetadata for topics:', topics); self.client.loadMetadataForTopics(topics, callback); }, function (metadataResponse, callback) { var metadata = mapTopicToPartitions(metadataResponse[1].metadata); logger.debug('mapTopicToPartitions', metadata); protocol.assign(metadata, groupMembers, callback); } ], callback); }
...
this.isLeader = (joinGroupResponse.leaderId === joinGroupResponse.memberId);
this.generationId = joinGroupResponse.generationId;
this.memberId = joinGroupResponse.memberId;
var groupAssignment;
if (this.isLeader) {
// assign partitions
return this.assignPartitions(joinGroupResponse.groupProtocol, joinGroupResponse.members
, callback);
}
callback(null, groupAssignment);
};
ConsumerGroup.prototype.saveDefaultOffsets = function (topicPartitionList, callback) {
var self = this;
const offsetPayload = _(topicPartitionList).cloneDeep().map(tp => {
...
close = function (force, cb) { var self = this; this.ready = false; this.stopHeartbeats(); if (typeof force === 'function') { cb = force; force = false; } async.series([ function (callback) { if (force) { self.commit(true, callback); return; } callback(null); }, function (callback) { self.leaveGroup(function (error) { if (error) { logger.error('Leave group failed with', error); } callback(null); }); }, function (callback) { self.client.close(callback); } ], cb); }
...
### close(force, cb)
* `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false`
Example
```js
consumer.close(true, cb);
consumer.close(cb); //force is disabled
```
## HighLevelConsumer
⚠️ ***This consumer has been deprecated in the latest version of Kafka (0.10.1) and is likely to be removed in the future. Please
use the ConsumerGroup instead.***
### HighLevelConsumer(client, payloads, options)
...
connect = function () { if (this.connecting) { logger.warn('Connect ignored. Currently connecting.'); return; } logger.debug('Connecting %s', this.client.clientId); var self = this; this.connecting = true; this.emit('rebalancing'); async.waterfall([ function (callback) { if (self.client.coordinatorId) { return callback(null, null); } self.client.sendGroupCoordinatorRequest(self.options.groupId, callback); }, function (coordinatorInfo, callback) { logger.debug('GroupCoordinator Response:', coordinatorInfo); if (coordinatorInfo) { self.setCoordinatorId(coordinatorInfo.coordinatorId); } self.client.sendJoinGroupRequest(self.options.groupId, emptyStrIfNull(self.memberId), self.options.sessionTimeout, self.protocols , callback); }, function (joinGroupResponse, callback) { self.handleJoinGroup(joinGroupResponse, callback); }, function (groupAssignment, callback) { logger.debug('SyncGroup Request from %s', self.memberId); self.client.sendSyncGroupRequest(self.options.groupId, self.generationId, self.memberId, groupAssignment, callback); }, function (syncGroupResponse, callback) { self.handleSyncGroup(syncGroupResponse, callback); } ], function (error, startFetch) { self.connecting = false; self.rebalancing = false; if (error) { return self.recovery.tryToRecoverFrom(error, 'connect'); } self.ready = true; self.recovery.clearError(); logger.debug('generationId', self.generationId); if (startFetch) { self.fetch(); } self.startHeartbeats(); self.emit('connect'); self.emit('rebalanced'); }); }
...
}
var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.connect();
}
util.inherits(BaseProducer, events.EventEmitter);
BaseProducer.prototype.connect = function () {
// emiter...
var self = this;
...
fetchOffset = function (payloads, cb) { this.client.sendOffsetFetchV1Request(this.options.groupId, payloads, cb); }
...
logger.debug('%s owns topics: ', self.client.clientId, syncGroupResponse.partitions);
const topicPartitionList = createTopicPartitionList(syncGroupResponse.partitions);
const useDefaultOffsets = self.options.fromOffset in ACCEPTED_FROM_OFFSET;
async.waterfall([
function (callback) {
self.fetchOffset(syncGroupResponse.partitions, callback);
},
function (offsets, callback) {
logger.debug('%s fetchOffset Response: %j', self.client.clientId, offsets);
var noOffset = topicPartitionList.some(function (tp) {
return offsets[tp.topic][tp.partition] === -1;
});
...
getDefaultOffset = function (tp, defaultOffset) { return _.get(this.defaultOffsets, [tp.topic, tp.partition], defaultOffset); }
...
callback(null, offsets);
}
},
function (offsets, callback) {
self.topicPayloads = self.buildPayloads(topicPartitionList).map(function (p) {
var offset = offsets[p.topic][p.partition];
if (offset === -1) { // -1 means no offset was saved for this topic/partition combo
offset = useDefaultOffsets ? self.getDefaultOffset(p, 0) : 0;
if (self.migrator) {
offset = self.migrator.getOffset(p, offset);
}
}
p.offset = offset;
return p;
});
...
getOffset = function () { if (this.offset) { return this.offset; } this.offset = new Offset(this.client); // we can ignore this since we are already forwarding error event emitted from client this.offset.on('error', _.noop); return this.offset; }
...
if (this.options.outOfRangeOffset === 'none') {
this.emit('error', new errors.InvalidConsumerOffsetError(`Offset out of range for topic "${topic.topic}" partition
${topic.partition}`));
return;
}
topic.time = ACCEPTED_FROM_OFFSET[this.options.outOfRangeOffset];
this.getOffset().fetch([topic], (error, result) => {
if (error) {
this.emit('error', new errors.InvalidConsumerOffsetError(`Fetching ${this.options.outOfRangeOffset} offset failed`,
error));
return;
}
const offset = _.head(result[topic.topic][topic.partition]);
const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset;
...
handleJoinGroup = function (joinGroupResponse, callback) { logger.debug('joinGroupResponse %j from %s', joinGroupResponse, this.client.clientId); this.isLeader = (joinGroupResponse.leaderId === joinGroupResponse.memberId); this.generationId = joinGroupResponse.generationId; this.memberId = joinGroupResponse.memberId; var groupAssignment; if (this.isLeader) { // assign partitions return this.assignPartitions(joinGroupResponse.groupProtocol, joinGroupResponse.members, callback); } callback(null, groupAssignment); }
...
if (coordinatorInfo) {
self.setCoordinatorId(coordinatorInfo.coordinatorId);
}
self.client.sendJoinGroupRequest(self.options.groupId, emptyStrIfNull(self.memberId), self.options.sessionTimeout, self.protocols
, callback);
},
function (joinGroupResponse, callback) {
self.handleJoinGroup(joinGroupResponse, callback);
},
function (groupAssignment, callback) {
logger.debug('SyncGroup Request from %s', self.memberId);
self.client.sendSyncGroupRequest(self.options.groupId, self.generationId, self.memberId, groupAssignment, callback);
},
...
handleSyncGroup = function (syncGroupResponse, callback) { logger.debug('SyncGroup Response'); var self = this; var ownedTopics = Object.keys(syncGroupResponse.partitions); if (ownedTopics.length) { logger.debug('%s owns topics: ', self.client.clientId, syncGroupResponse.partitions); const topicPartitionList = createTopicPartitionList(syncGroupResponse.partitions); const useDefaultOffsets = self.options.fromOffset in ACCEPTED_FROM_OFFSET; async.waterfall([ function (callback) { self.fetchOffset(syncGroupResponse.partitions, callback); }, function (offsets, callback) { logger.debug('%s fetchOffset Response: %j', self.client.clientId, offsets); var noOffset = topicPartitionList.some(function (tp) { return offsets[tp.topic][tp.partition] === -1; }); if (noOffset) { logger.debug('No saved offsets'); if (self.options.fromOffset === 'none') { return callback(new Error(`${self.client.clientId} owns topics and partitions which contains no saved offsets for group '${self.options.groupId}'`)); } async.parallel([ function (callback) { if (self.migrator) { return self.migrator.saveHighLevelConsumerOffsets(topicPartitionList, callback); } callback(null); }, function (callback) { if (useDefaultOffsets) { return self.saveDefaultOffsets(topicPartitionList, callback); } callback(null); } ], function (error) { if (error) { return callback(error); } logger.debug('%s defaultOffset Response for %s: %j', self.client.clientId, self.options.fromOffset, self.defaultOffsets ); callback(null, offsets); }); } else { logger.debug('Has saved offsets'); callback(null, offsets); } }, function (offsets, callback) { self.topicPayloads = self.buildPayloads(topicPartitionList).map(function (p) { var offset = offsets[p.topic][p.partition]; if (offset === -1) { // -1 means no offset was saved for this topic/partition combo offset = useDefaultOffsets ? self.getDefaultOffset(p, 0) : 0; if (self.migrator) { offset = self.migrator.getOffset(p, offset); } } p.offset = offset; return p; }); callback(null, true); } ], callback); } else { // no partitions assigned callback(null, false); } }
...
function (groupAssignment, callback) {
logger.debug('SyncGroup Request from %s', self.memberId);
self.client.sendSyncGroupRequest(self.options.groupId, self.generationId, self.memberId, groupAssignment, callback);
},
function (syncGroupResponse, callback) {
self.handleSyncGroup(syncGroupResponse, callback);
}
], function (error, startFetch) {
self.connecting = false;
self.rebalancing = false;
if (error) {
return self.recovery.tryToRecoverFrom(error, 'connect');
}
...
leaveGroup = function (callback) { logger.debug('%s leaving group', this.client.clientId); var self = this; this.stopHeartbeats(); if (self.generationId != null && self.memberId) { this.client.sendLeaveGroupRequest(this.options.groupId, this.memberId, function (error) { self.generationId = null; callback(error); }); } else { callback(null); } }
...
if (force) {
self.commit(true, callback);
return;
}
callback(null);
},
function (callback) {
self.leaveGroup(function (error) {
if (error) {
logger.error('Leave group failed with', error);
}
callback(null);
});
},
function (callback) {
...
saveDefaultOffsets = function (topicPartitionList, callback) { var self = this; const offsetPayload = _(topicPartitionList).cloneDeep().map(tp => { tp.time = ACCEPTED_FROM_OFFSET[this.options.fromOffset]; return tp; }); self.getOffset().fetch(offsetPayload, function (error, result) { if (error) { return callback(error); } self.defaultOffsets = _.mapValues(result, function (partitionOffsets) { return _.mapValues(partitionOffsets, _.head); }); callback(null); }); }
...
if (self.migrator) {
return self.migrator.saveHighLevelConsumerOffsets(topicPartitionList, callback);
}
callback(null);
},
function (callback) {
if (useDefaultOffsets) {
return self.saveDefaultOffsets(topicPartitionList, callback);
}
callback(null);
}
], function (error) {
if (error) {
return callback(error);
}
...
scheduleReconnect = function (timeout) { assert(timeout); this.rebalancing = true; if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); } var self = this; this.reconnectTimer = setTimeout(function () { self.reconnectTimer = null; self.connect(); }, timeout); }
...
if (retry) {
retryTimeout = this.getRetryTimeout(error);
}
if (retry && retryTimeout) {
logger.debug('RECOVERY from %s: %s retrying in %s ms', source, this.consumerGroup.client.clientId, retryTimeout, error
);
this.consumerGroup.scheduleReconnect(retryTimeout);
} else {
this.consumerGroup.emit('error', error);
}
this.lastError = error;
};
ConsumerGroupRecovery.prototype.clearError = function () {
...
sendHeartbeat = function () { assert(this.memberId, 'invalid memberId'); assert(this.generationId >= 0, 'invalid generationId'); // logger.debug('%s ❤️ ->', this.client.clientId); var self = this; function heartbeatCallback (error) { if (error) { logger.warn('%s Heartbeat error:', self.client.clientId, error); self.recovery.tryToRecoverFrom(error, 'heartbeat'); } // logger.debug('%s 💚 <-', self.client.clientId, error); } const heartbeat = new Heartbeat(this.client, heartbeatCallback); heartbeat.send(this.options.groupId, this.generationId, this.memberId); return heartbeat; }
...
assert(this.ready, 'consumerGroup is not ready');
const heartbeatIntervalMs = this.options.heartbeatInterval || (Math.floor(this.options.sessionTimeout / 3));
logger.debug('%s started heartbeats at every %d ms', this.client.clientId, heartbeatIntervalMs);
this.stopHeartbeats();
let heartbeat = this.sendHeartbeat();
this.heartbeatInterval = setInterval(() => {
// only send another heartbeat if we got a response from the last one
if (heartbeat.verifyResolved()) {
heartbeat = this.sendHeartbeat();
}
}, heartbeatIntervalMs);
...
sendOffsetCommitRequest = function (commits, cb) { if (this.generationId && this.memberId) { this.client.sendOffsetCommitV2Request(this.options.groupId, this.generationId, this.memberId, commits, cb); } else { cb(null, 'Nothing to be committed'); } }
...
}.bind(this), this.options.autoCommitIntervalMs);
var payloads = this.payloads;
if (this.pausedPayloads) payloads = payloads.concat(this.pausedPayloads);
var commits = payloads.filter(function (p) { return p.offset !== 0; });
if (commits.length) {
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
}
Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit;
Consumer.prototype.fetch = function () {
...
setCoordinatorId = function (coordinatorId) { this.client.coordinatorId = String(coordinatorId); }
...
}
self.client.sendGroupCoordinatorRequest(self.options.groupId, callback);
},
function (coordinatorInfo, callback) {
logger.debug('GroupCoordinator Response:', coordinatorInfo);
if (coordinatorInfo) {
self.setCoordinatorId(coordinatorInfo.coordinatorId);
}
self.client.sendJoinGroupRequest(self.options.groupId, emptyStrIfNull(self.memberId), self.options.sessionTimeout, self.protocols
, callback);
},
function (joinGroupResponse, callback) {
self.handleJoinGroup(joinGroupResponse, callback);
},
...
setupProtocols = function (protocols) { if (!Array.isArray(protocols)) { protocols = [protocols]; } this.protocols = protocols.map(function (protocol) { if (typeof protocol === 'string') { if (!(protocol in builtInProtocols)) { throw new Error('Unknown built in assignment protocol ' + protocol); } protocol = _.assign({}, builtInProtocols[protocol]); } else { checkProtocol(protocol); } protocol.subscription = this.topics; return protocol; }, this); }
...
assert(Array.isArray(topics), 'Array of topics is required');
this.topics = topics;
this.recovery = new ConsumerGroupRecovery(this);
this.setupProtocols(this.options.protocol);
if (this.options.connectOnReady && !this.options.migrateHLC) {
this.client.once('ready', this.connect.bind(this));
}
if (this.options.migrateHLC) {
const ConsumerGroupMigrator = require('./consumerGroupMigrator');
...
startHeartbeats = function () { assert(this.options.sessionTimeout > 0); assert(this.ready, 'consumerGroup is not ready'); const heartbeatIntervalMs = this.options.heartbeatInterval || (Math.floor(this.options.sessionTimeout / 3)); logger.debug('%s started heartbeats at every %d ms', this.client.clientId, heartbeatIntervalMs); this.stopHeartbeats(); let heartbeat = this.sendHeartbeat(); this.heartbeatInterval = setInterval(() => { // only send another heartbeat if we got a response from the last one if (heartbeat.verifyResolved()) { heartbeat = this.sendHeartbeat(); } }, heartbeatIntervalMs); }
...
self.recovery.clearError();
logger.debug('generationId', self.generationId);
if (startFetch) {
self.fetch();
}
self.startHeartbeats();
self.emit('connect');
self.emit('rebalanced');
});
};
ConsumerGroup.prototype.scheduleReconnect = function (timeout) {
assert(timeout);
...
stopHeartbeats = function () { this.heartbeatInterval && clearInterval(this.heartbeatInterval); }
...
ConsumerGroup.prototype.startHeartbeats = function () {
assert(this.options.sessionTimeout > 0);
assert(this.ready, 'consumerGroup is not ready');
const heartbeatIntervalMs = this.options.heartbeatInterval || (Math.floor(this.options.sessionTimeout / 3));
logger.debug('%s started heartbeats at every %d ms', this.client.clientId, heartbeatIntervalMs);
this.stopHeartbeats();
let heartbeat = this.sendHeartbeat();
this.heartbeatInterval = setInterval(() => {
// only send another heartbeat if we got a response from the last one
if (heartbeat.verifyResolved()) {
heartbeat = this.sendHeartbeat();
...
CustomPartitioner = function (partitioner) { this.getPartition = partitioner; }
n/a
super_ = function () {}
n/a
CyclicPartitioner = function () { this.c = 0; }
n/a
super_ = function () {}
n/a
getPartition = function (partitions) { if (_.isEmpty(partitions)) return 0; return partitions[ this.c++ % partitions.length ]; }
...
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition
(_.map(topicMetadata[p.topic], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
let messages = _.isArray(p.messages) ? p.messages : [p.messages];
messages = messages.map(function (message) {
if (message instanceof KeyedMessage) {
return message;
}
...
DefaultPartitioner = function () {}
n/a
super_ = function () {}
n/a
getPartition = function (partitions) { if (partitions && _.isArray(partitions) && partitions.length > 0) { return partitions[0]; } else { return 0; } }
...
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition
(_.map(topicMetadata[p.topic], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
let messages = _.isArray(p.messages) ? p.messages : [p.messages];
messages = messages.map(function (message) {
if (message instanceof KeyedMessage) {
return message;
}
...
FailedToRebalanceConsumerError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
...
operation.attempt(function (currentAttempt) {
self.rebalanceAttempt(oldTopicPayloads, function (err) {
if (operation.retry(err)) {
return;
}
if (err) {
self.rebalancing = false;
return self.emit('error', new errors.FailedToRebalanceConsumerError(operation
.mainError().toString()));
} else {
var topicNames = self.topicPayloads.map(function (p) {
return p.topic;
});
self.client.refreshMetadata(topicNames, function (err) {
register();
if (err) {
...
function Error() { [native code] }
n/a
FailedToRegisterConsumerError = function (message, nested) { NestedError.call(this, message, nested); this.message = message; }
...
} else {
callback(new Error(util.format('Consumer %s is not registered in group %s', self.id, self.options.groupId)));
}
});
}
], function (error) {
if (error) {
self.emit('error', new errors.FailedToRegisterConsumerError(error.toString
(), error));
}
});
}
}, 20000);
function fetchAndUpdateOffsets (cb) {
self.fetchOffset(self.topicPayloads, function (err, topics) {
...
super_ = function (message, nested) { this.nested = nested; if (typeof message !== 'undefined') { Object.defineProperty(this, 'message', { value: message, writable: true, enumerable: false, configurable: true }); } Error.captureStackTrace(this, this.constructor); var oldStackDescriptor = Object.getOwnPropertyDescriptor(this, 'stack'); var stackDescriptor = buildStackDescriptor(oldStackDescriptor, nested); Object.defineProperty(this, 'stack', stackDescriptor); }
n/a
GroupCoordinatorNotAvailableError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
function Error() { [native code] }
n/a
GroupLoadInProgressError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
function Error() { [native code] }
n/a
HeartbeatTimeoutError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
function Error() { [native code] }
n/a
HighLevelConsumer = function (client, topics, options) { if (!topics) { throw new Error('Must have payloads'); } this.fetchCount = 0; this.client = client; this.options = _.defaults((options || {}), DEFAULTS); this.initialised = false; this.ready = false; this.closing = false; this.paused = this.options.paused; this.rebalancing = false; this.pendingRebalances = 0; this.committing = false; this.needToCommit = false; this.id = this.options.id || this.options.groupId + '_' + uuid.v4(); this.payloads = this.buildPayloads(topics); this.topicPayloads = this.buildTopicPayloads(topics); this.connect(); if (this.options.groupId) { validateConfig('options.groupId', this.options.groupId); } }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
_releasePartitions = function (topicPayloads, callback) { var self = this; async.each(topicPayloads, function (tp, cbb) { if (tp.partition !== undefined) { async.series([ function (delcbb) { self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) { if (err) { // Partition doesn't exist simply carry on cbb(); } else delcbb(); }); }, function (delcbb) { self.client.zk.deletePartitionOwnership(self.options.groupId, tp.topic, tp.partition, delcbb); }, function (delcbb) { self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) { if (err) { delcbb(); } else { delcbb('Partition should not exist'); } }); }], cbb); } else { cbb(); } }, callback); }
...
}
});
},
// Release current partitions
function (callback) {
logger.debug('HighLevelConsumer %s releasing current partitions during rebalance', self.id);
self._releasePartitions(oldTopicPayloads, callback);
},
// Rebalance
function (callback) {
logger.debug('HighLevelConsumer %s determining the partitions to own during rebalance', self.id);
logger.debug('consumerPerTopicMap.consumerTopicMap %j', consumerPerTopicMap.consumerTopicMap);
for (var topic in consumerPerTopicMap.consumerTopicMap[self.id]) {
...
addTopics = function (topics, cb) { var self = this; if (!this.ready) { setTimeout(function () { self.addTopics(topics, cb); }, 100); return; } this.client.addTopics( topics, function (err, added) { if (err) return cb && cb(err, added); var payloads = self.buildPayloads(topics); // update offset of topics that will be added self.fetchOffset(payloads, function (err, offsets) { if (err) return cb(err); payloads.forEach(function (p) { var offset = offsets[p.topic][p.partition]; if (offset === -1) offset = 0; p.offset = offset; self.topicPayloads.push(p); }); // TODO: rebalance consumer cb && cb(null, added); }); } ); }
...
* `topics`: **Array**, array of topics to add
* `cb`: **Function**,the callback
* `fromOffset`: **Boolean**, if true, the consumer will fetch message from the specified offset, otherwise it will fetch message
from the last commited offset of the topic.
Example:
``` js
consumer.addTopics(['t1', 't2'], function (err, added) {
});
or
consumer.addTopics([{ topic: 't1', offset: 10 }], function (err, added) {
}, true);
```
...
function autoCommit(force, cb) { if (arguments.length === 1) { cb = force; force = false; } if (!force) { if (this.committing) return cb(null, 'Offset committing'); if (!this.needToCommit) return cb(null, 'Commit not needed'); } this.needToCommit = false; this.committing = true; setTimeout(function () { this.committing = false; }.bind(this), this.options.autoCommitIntervalMs); var commits = this.topicPayloads.filter(function (p) { return p.offset !== -1; }); if (commits.length) { this.sendOffsetCommitRequest(commits, cb); } else { cb(null, 'Nothing to be committed'); } }
...
if (offset === -1) offset = 0;
if (!initing) p.offset = offset + 1;
else p.offset = offset;
}
});
if (this.options.autoCommit && !initing) {
this.autoCommit(false, function (err) {
err && logger.debug('auto commit offset', err);
});
}
};
function autoCommit (force, cb) {
if (arguments.length === 1) {
...
buildPayloads = function (payloads) { var self = this; return payloads.map(function (p) { if (typeof p !== 'object') p = { topic: p }; p.partition = p.partition || 0; p.offset = p.offset || 0; p.maxBytes = self.options.fetchMaxBytes; p.metadata = 'm'; // metadata can be arbitrary return p; }); }
...
* @param {BaseProducer~sendCallback} cb A function to call once the send has completed
*/
BaseProducer.prototype.send = function (payloads, cb) {
var client = this.client;
var requireAcks = this.requireAcks;
var ackTimeoutMs = this.ackTimeoutMs;
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks
, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic
], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
...
buildTopicPayloads = function (topics) { return topics.map(function (j) { var k = { topic: j.topic }; return k; }); }
...
this.paused = this.options.paused;
this.rebalancing = false;
this.pendingRebalances = 0;
this.committing = false;
this.needToCommit = false;
this.id = this.options.id || this.options.groupId + '_' + uuid.v4();
this.payloads = this.buildPayloads(topics);
this.topicPayloads = this.buildTopicPayloads(topics);
this.connect();
if (this.options.groupId) {
validateConfig('options.groupId', this.options.groupId);
}
};
util.inherits(HighLevelConsumer, events.EventEmitter);
...
close = function (force, cb) { var self = this; this.ready = false; this.closing = true; clearInterval(this.checkPartitionOwnershipInterval); if (typeof force === 'function') { cb = force; force = false; } async.series([ function (callback) { self.leaveGroup(callback); }, function (callback) { if (force) { async.series([ function (callback) { self.commit(true, callback); }, function (callback) { self.client.close(callback); } ], callback); return; } self.client.close(callback); } ], cb); }
...
### close(force, cb)
* `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false`
Example
```js
consumer.close(true, cb);
consumer.close(cb); //force is disabled
```
## HighLevelConsumer
⚠️ ***This consumer has been deprecated in the latest version of Kafka (0.10.1) and is likely to be removed in the future. Please
use the ConsumerGroup instead.***
### HighLevelConsumer(client, payloads, options)
...
function autoCommit(force, cb) { if (arguments.length === 1) { cb = force; force = false; } if (!force) { if (this.committing) return cb(null, 'Offset committing'); if (!this.needToCommit) return cb(null, 'Commit not needed'); } this.needToCommit = false; this.committing = true; setTimeout(function () { this.committing = false; }.bind(this), this.options.autoCommitIntervalMs); var commits = this.topicPayloads.filter(function (p) { return p.offset !== -1; }); if (commits.length) { this.sendOffsetCommitRequest(commits, cb); } else { cb(null, 'Nothing to be committed'); } }
...
Commit offset of the current topics manually, this method should be called when a consumer leaves
* `cb`: **Function**, the callback
Example:
``` js
consumer.commit(function(err, data) {
});
```
### setOffset(topic, partition, offset)
Set offset of the given topic
* `topic`: **String**
...
connect = function () { var self = this; // Client alreadyexists if (this.client.ready) { this.init(); } this.client.on('ready', function () { if (!self.initialised) self.init(); // Check the topics exist and create a watcher on them var topics = self.payloads.map(function (p) { return p.topic; }); self.client.topicExists(topics, function (err) { if (err) { return self.emit('error', err); } self.initialised = true; }); }); function checkPartitionOwnership (callback) { async.each(self.topicPayloads, function (tp, cbb) { if (tp.partition !== undefined) { self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) { if (err) { cbb(err); } else { cbb(); } }); } else { cbb(); } }, callback); } // Check partition ownership and registration this.checkPartitionOwnershipInterval = setInterval(function () { if (!self.rebalancing) { async.parallel([ checkPartitionOwnership, function (callback) { self.client.zk.isConsumerRegistered(self.options.groupId, self.id, function (error, registered) { if (error) { return callback(error); } if (registered) { callback(); } else { callback(new Error(util.format('Consumer %s is not registered in group %s', self.id, self.options.groupId))); } }); } ], function (error) { if (error) { self.emit('error', new errors.FailedToRegisterConsumerError(error.toString(), error)); } }); } }, 20000); function fetchAndUpdateOffsets (cb) { self.fetchOffset(self.topicPayloads, function (err, topics) { if (err) { return cb(err); } self.ready = true; self.updateOffsets(topics, true); return cb(); }); } function rebalance () { logger.debug('rebalance() %s is rebalancing: %s ready: %s', self.id, self.rebalancing, self.ready); if (!self.rebalancing && !self.closing) { deregister(); self.emit('rebalancing'); self.rebalancing = true; logger.debug('HighLevelConsumer rebalance retry config: %s', JSON.stringify(self.options.rebalanceRetry)); var oldTopicPayloads = self.topicPayloads; var operation = retry.operation(self.options.rebalanceRetry); operation.attempt(function (currentAttempt) { self.rebalanceAttempt(oldTopicPayloads, function (err) { if (operation.retry(err)) { return; } if (err) { self.rebalancing = false; return self.emit('error', new errors.FailedToRebalanceConsumerError(operation.mainError().toString())); } else { var topicNames = self.topicPayloads.map(function (p) { return p.topic; }); self.client.refreshMetadata(topicNames, function (err) { register(); if (err) { self.rebalancing = false; self.emit('error', err); return; } if (self.topicPayloads.length) { fetchAndUpdateOffsets(function (err) { self.rebalancing = false; if (err) { self.emit('error', new errors.FailedToRebalanceConsumerError(err.message)); return; } self.fetch(); self.emit('rebalanced'); }); } else { // was not assigned any partitions during rebalance self.rebalancing = false; self.emit('rebalanced'); } }); } }); }); } } // Wait for the consumer to be ready this.on('registered', rebalance); function register (fn) { logger.debug('Registered listeners %s', self.id); self.client.zk.on('consumersChanged', fn ...
...
}
var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.connect();
}
util.inherits(BaseProducer, events.EventEmitter);
BaseProducer.prototype.connect = function () {
// emiter...
var self = this;
...
fetch = function () { if (!this.ready || this.rebalancing || this.paused) { return; } this.client.sendFetchRequest(this, this.topicPayloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes, this.options.maxTickMessages ); }
...
Example
```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.fetch([
{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }
], function (err, data) {
// data
// { 't': { '0': [999] } }
});
```
...
fetchOffset = function (payloads, cb) { logger.debug('in fetchOffset %s payloads: %j', this.id, payloads); this.client.sendOffsetFetchRequest(this.options.groupId, payloads, cb); }
...
logger.debug('%s owns topics: ', self.client.clientId, syncGroupResponse.partitions);
const topicPartitionList = createTopicPartitionList(syncGroupResponse.partitions);
const useDefaultOffsets = self.options.fromOffset in ACCEPTED_FROM_OFFSET;
async.waterfall([
function (callback) {
self.fetchOffset(syncGroupResponse.partitions, callback);
},
function (offsets, callback) {
logger.debug('%s fetchOffset Response: %j', self.client.clientId, offsets);
var noOffset = topicPartitionList.some(function (tp) {
return offsets[tp.topic][tp.partition] === -1;
});
...
getTopicPayloads = function () { if (!this.rebalancing) return this.topicPayloads; return null; }
n/a
init = function () { var self = this; if (!self.topicPayloads.length) { return; } self.registerConsumer(function (err) { if (err) { return self.emit('error', new errors.FailedToRegisterConsumerError(err.toString())); } // Close the return self.emit('registered'); }); }
...
});
};
Consumer.prototype.connect = function () {
var self = this;
// Client already exists
this.ready = this.client.ready;
if (this.ready) this.init();
this.client.on('ready', function () {
logger.debug('consumer ready');
if (!self.ready) self.init();
self.ready = true;
});
...
leaveGroup = function (cb) { var self = this; async.parallel([ function (callback) { if (self.topicPayloads.length) { self._releasePartitions(self.topicPayloads, callback); } else { callback(null); } }, function (callback) { self.client.zk.unregisterConsumer(self.options.groupId, self.id, callback); } ], cb); }
...
if (force) {
self.commit(true, callback);
return;
}
callback(null);
},
function (callback) {
self.leaveGroup(function (error) {
if (error) {
logger.error('Leave group failed with', error);
}
callback(null);
});
},
function (callback) {
...
offsetRequest = function (payloads, cb) { this.client.sendOffsetRequest(payloads, cb); }
...
});
this.on('offsetOutOfRange', function (topic) {
self.pause();
topic.maxNum = self.options.maxNumSegments;
topic.metadata = 'm';
topic.time = Date.now();
self.offsetRequest([topic], function (err, offsets) {
if (err) {
self.emit('error', new errors.InvalidConsumerOffsetError(self));
} else {
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
// set minimal offset
self.setOffset(topic.topic, topic.partition, min);
self.resume();
...
pause = function () { this.paused = true; }
...
} else if (!self.connecting) {
self.fetch();
}
});
}, 200);
this.client.on('brokersChanged', function () {
self.pause();
recoverFromBrokerChange();
});
this.client.on('reconnect', function (lastError) {
self.fetch();
});
...
rebalanceAttempt = function (oldTopicPayloads, cb) { var self = this; // Do the rebalance..... var consumerPerTopicMap; var newTopicPayloads = []; logger.debug('HighLevelConsumer %s is attempting to rebalance', self.id); async.series([ // Stop fetching data and commit offsets function (callback) { logger.debug('HighLevelConsumer %s stopping data read during rebalance', self.id); self.stop(function () { callback(); }); }, // Assemble the data function (callback) { logger.debug('HighLevelConsumer %s assembling data for rebalance', self.id); self.client.zk.getConsumersPerTopic(self.options.groupId, function (err, obj) { if (err) { callback(err); } else { consumerPerTopicMap = obj; callback(); } }); }, // Release current partitions function (callback) { logger.debug('HighLevelConsumer %s releasing current partitions during rebalance', self.id); self._releasePartitions(oldTopicPayloads, callback); }, // Rebalance function (callback) { logger.debug('HighLevelConsumer %s determining the partitions to own during rebalance', self.id); logger.debug('consumerPerTopicMap.consumerTopicMap %j', consumerPerTopicMap.consumerTopicMap); for (var topic in consumerPerTopicMap.consumerTopicMap[self.id]) { if (!consumerPerTopicMap.consumerTopicMap[self.id].hasOwnProperty(topic)) { continue; } var topicToAdd = consumerPerTopicMap.consumerTopicMap[self.id][topic]; var numberOfConsumers = consumerPerTopicMap.topicConsumerMap[topicToAdd].length; var numberOfPartition = consumerPerTopicMap.topicPartitionMap[topicToAdd].length; var partitionsPerConsumer = Math.floor(numberOfPartition / numberOfConsumers); var extraPartitions = numberOfPartition % numberOfConsumers; var currentConsumerIndex; for (var index in consumerPerTopicMap.topicConsumerMap[topicToAdd]) { if (!consumerPerTopicMap.topicConsumerMap[topicToAdd].hasOwnProperty(index)) { continue; } if (consumerPerTopicMap.topicConsumerMap[topicToAdd][index] === self.id) { currentConsumerIndex = parseInt(index); break; } } var extraBit = currentConsumerIndex; if (currentConsumerIndex > extraPartitions) extraBit = extraPartitions; var startPart = partitionsPerConsumer * currentConsumerIndex + extraBit; var extraNParts = 1; if (currentConsumerIndex + 1 > extraPartitions) extraNParts = 0; var nParts = partitionsPerConsumer + extraNParts; for (var i = startPart; i < startPart + nParts; i++) { newTopicPayloads.push({ topic: topicToAdd, partition: consumerPerTopicMap.topicPartitionMap[topicToAdd][i], offset: 0, maxBytes: self.options.fetchMaxBytes, metadata: 'm' }); } } logger.debug('newTopicPayloads %j', newTopicPayloads); callback(); }, // Update ZK with new ownership function (callback) { if (newTopicPayloads.length) { logger.debug('HighLevelConsumer %s gaining ownership of partitions during rebalance', self.id); async.eachSeries(newTopicPayloads, function (tp, cbb) { if (tp.partition !== undefined) { async.series([ function (addcbb) { self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) { if (err) { // Partition doesn't exist simply carry on addcbb(); } else cbb(); // Partition exists simply carry on }); }, function (addcbb) { self.client.zk.addPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) { if (err) { addcbb(err); } else addcbb(); ...
...
self.rebalancing = true;
logger.debug('HighLevelConsumer rebalance retry config: %s', JSON.stringify(self.options.rebalanceRetry));
var oldTopicPayloads = self.topicPayloads;
var operation = retry.operation(self.options.rebalanceRetry);
operation.attempt(function (currentAttempt) {
self.rebalanceAttempt(oldTopicPayloads, function (err) {
if (operation.retry(err)) {
return;
}
if (err) {
self.rebalancing = false;
return self.emit('error', new errors.FailedToRebalanceConsumerError(operation.mainError().toString()));
} else {
...
registerConsumer = function (cb) { var self = this; var groupId = this.options.groupId; this.client.zk.registerConsumer(groupId, this.id, this.payloads, function (err) { if (err) return cb(err); self.client.zk.listConsumers(self.options.groupId); var topics = self.topicPayloads.reduce(function (ret, topicPayload) { if (ret.indexOf(topicPayload.topic) === -1) { ret.push(topicPayload.topic); } return ret; }, []); topics.forEach(function (topic) { self.client.zk.listPartitions(topic); }); cb(); }); }
...
this.client.on('zkReconnect', function () {
logger.debug('zookeeper reconnect for %s', self.id);
attachZookeeperErrorListener();
// clean up what's leftover
self.leaveGroup(function () {
// rejoin the group
self.registerConsumer(function (error) {
if (error) {
return self.emit('error', new errors.FailedToRegisterConsumerError('Failed to register consumer on zkReconnect
', error));
}
self.emit('registered');
});
});
});
...
removeTopics = function (topics, cb) { topics = typeof topics === 'string' ? [topics] : topics; this.payloads = this.payloads.filter(function (p) { return !~topics.indexOf(p.topic); }); this.client.removeTopicMetadata(topics, cb); }
...
### removeTopics(topics, cb)
* `topics`: **Array**, array of topics to remove
* `cb`: **Function**, the callback
Example:
``` js
consumer.removeTopics(['t1', 't2'], function (err, removed) {
});
```
### commit(cb)
Commit offset of the current topics manually, this method should be called when a consumer leaves
* `cb`: **Function**, the callback
...
resume = function () { this.paused = false; this.fetch(); }
...
}
const offset = _.head(result[topic.topic][topic.partition]);
const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset;
logger.debug('replacing %s-%s stale offset of %d with %d', topic.topic, topic.partition, oldOffset, offset);
this.setOffset(topic.topic, topic.partition, offset);
this.resume();
});
});
// 'done' will be emit when a message fetch request complete
this.on('done', function (topics) {
self.updateOffsets(topics);
if (!self.paused) {
...
sendOffsetCommitRequest = function (commits, cb) { this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb); }
...
}.bind(this), this.options.autoCommitIntervalMs);
var payloads = this.payloads;
if (this.pausedPayloads) payloads = payloads.concat(this.pausedPayloads);
var commits = payloads.filter(function (p) { return p.offset !== 0; });
if (commits.length) {
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
}
Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit;
Consumer.prototype.fetch = function () {
...
setOffset = function (topic, partition, offset) { this.topicPayloads.every(function (p) { if (p.topic === topic && p.partition == partition) { // eslint-disable-line eqeqeq p.offset = offset; return false; } return true; }); }
...
* `partition`: **Number**
* `offset`: **Number**
Example:
``` js
consumer.setOffset('topic', 0, 0);
```
### pause()
Pause the consumer. ***Calling `pause` does not automatically stop messages from being emitted.*** This is because pause just stops
the kafka consumer fetch loop. Each iteration of the fetch loop can obtain a batch of messages (limited by `fetchMaxBytes`).
### resume()
Resume the consumer. Resumes the fetch loop.
...
stop = function (cb) { if (!this.options.autoCommit) return cb && cb(); this.commit(true, function (err) { cb && cb(err); }); }
...
var newTopicPayloads = [];
logger.debug('HighLevelConsumer %s is attempting to rebalance', self.id);
async.series([
// Stop fetching data and commit offsets
function (callback) {
logger.debug('HighLevelConsumer %s stopping data read during rebalance', self.id);
self.stop(function () {
callback();
});
},
// Assemble the data
function (callback) {
logger.debug('HighLevelConsumer %s assembling data for rebalance', self.id);
...
updateOffsets = function (topics, initing) { this.topicPayloads.forEach(p => { if (!_.isEmpty(topics[p.topic]) && topics[p.topic][p.partition] !== undefined) { var offset = topics[p.topic][p.partition]; if (offset === -1) offset = 0; if (!initing) p.offset = offset + 1; else p.offset = offset; this.needToCommit = true; } }); if (this.options.autoCommit && !initing) { this.autoCommit(false, function (err) { err && logger.debug('auto commit offset', err); }); } }
...
this.setOffset(topic.topic, topic.partition, offset);
this.resume();
});
});
// 'done' will be emit when a message fetch request complete
this.on('done', function (topics) {
self.updateOffsets(topics);
if (!self.paused) {
setImmediate(function () {
self.fetch();
});
}
});
...
function HighLevelProducer(client, options, customPartitioner) { BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.cyclic, customPartitioner); }
n/a
function BaseProducer(client, options, defaultPartitionerType, customPartitioner) { options = options || {}; this.ready = false; this.client = client; this.requireAcks = options.requireAcks === undefined ? DEFAULTS.requireAcks : options.requireAcks; this.ackTimeoutMs = options.ackTimeoutMs === undefined ? DEFAULTS.ackTimeoutMs : options.ackTimeoutMs; if (customPartitioner !== undefined && options.partitionerType !== PARTITIONER_TYPES.custom) { throw new Error('Partitioner Type must be custom if providing a customPartitioner.'); } else if (customPartitioner === undefined && options.partitionerType === PARTITIONER_TYPES.custom) { throw new Error('No customer partitioner defined'); } var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType]; // eslint-disable-next-line this.partitioner = new partitionerType(customPartitioner); this.connect(); }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
buildPayloads = function (payloads, topicMetadata) { const topicPartitionRequests = Object.create(null); payloads.forEach((p) => { p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic], 'partition '), p.key); p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0; let messages = _.isArray(p.messages) ? p.messages : [p.messages]; messages = messages.map(function (message) { if (message instanceof KeyedMessage) { return message; } return new Message(0, 0, '', message); }); let key = p.topic + p.partition; let request = topicPartitionRequests[key]; if (request == null) { topicPartitionRequests[key] = new ProduceRequest(p.topic, p.partition, messages, p.attributes); } else { assert(request.attributes === p.attributes); Array.prototype.push.apply(request.messages, messages); } }); return _.values(topicPartitionRequests); }
...
* @param {BaseProducer~sendCallback} cb A function to call once the send has completed
*/
BaseProducer.prototype.send = function (payloads, cb) {
var client = this.client;
var requireAcks = this.requireAcks;
var ackTimeoutMs = this.ackTimeoutMs;
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks
, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic
], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
...
close = function (cb) { this.client.close(cb); }
...
### close(force, cb)
* `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false`
Example
```js
consumer.close(true, cb);
consumer.close(cb); //force is disabled
```
## HighLevelConsumer
⚠️ ***This consumer has been deprecated in the latest version of Kafka (0.10.1) and is likely to be removed in the future. Please
use the ConsumerGroup instead.***
### HighLevelConsumer(client, payloads, options)
...
connect = function () { // emiter... var self = this; this.ready = this.client.ready; if (this.ready) self.emit('ready'); this.client.on('ready', function () { if (!self.ready) { self.ready = true; self.emit('ready'); } }); this.client.on('brokersChanged', function () { let topics = Object.keys(this.topicMetadata); this.refreshMetadata(topics, function (error) { if (error) { self.emit('error', error); } }); }); this.client.on('error', function (err) { self.emit('error', err); }); this.client.on('close', function () {}); }
...
}
var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.connect();
}
util.inherits(BaseProducer, events.EventEmitter);
BaseProducer.prototype.connect = function () {
// emiter...
var self = this;
...
createTopics = function (topics, async, cb) { if (!this.ready) { return cb(new Error('Producer not ready!')); } this.client.createTopics(topics, async, cb); }
...
``` js
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
// Create topics sync
producer.createTopics(['t','t1'], false, function (err, data) {
console.log(data);
});
// Create topics async
producer.createTopics(['t'], true, function (err, data) {});
producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg
```
...
send = function (payloads, cb) { var client = this.client; var requireAcks = this.requireAcks; var ackTimeoutMs = this.ackTimeoutMs; client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb); }
...
producer = new Producer(client),
km = new KeyedMessage('key', 'message'),
payloads = [
{ topic: 'topic1', messages: 'hi', partition: 0 },
{ topic: 'topic2', messages: ['hello', 'world', km] }
];
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
producer.on('error', function (err) {})
```
> ⚠️**WARNING**: Batch multiple messages of the same topic/partition together as an array on the `messages` attribute otherwise
you may lose messages!
...
IllegalGenerationError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
function Error() { [native code] }
n/a
InvalidConfigError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
function Error() { [native code] }
n/a
InvalidConsumerOffsetError = function (message, nested) { NestedError.apply(this, arguments); }
...
this.client.on('reconnect', function (lastError) {
self.fetch();
});
this.on('offsetOutOfRange', topic => {
this.pause();
if (this.options.outOfRangeOffset === 'none') {
this.emit('error', new errors.InvalidConsumerOffsetError(`Offset out of range
for topic "${topic.topic}" partition ${topic.partition}`));
return;
}
topic.time = ACCEPTED_FROM_OFFSET[this.options.outOfRangeOffset];
this.getOffset().fetch([topic], (error, result) => {
if (error) {
...
super_ = function (message, nested) { this.nested = nested; if (typeof message !== 'undefined') { Object.defineProperty(this, 'message', { value: message, writable: true, enumerable: false, configurable: true }); } Error.captureStackTrace(this, this.constructor); var oldStackDescriptor = Object.getOwnPropertyDescriptor(this, 'stack'); var stackDescriptor = buildStackDescriptor(oldStackDescriptor, nested); Object.defineProperty(this, 'stack', stackDescriptor); }
n/a
KafkaBuffer = function (batchSize, batchAge) { this._batch_size = batchSize; this._batch_age = batchAge; this._batch_age_timer = null; this._buffer = null; }
n/a
_setupTimer = function (callback) { var self = this; if (this._batch_age_timer != null) { clearTimeout(this._batch_age_timer); } this._batch_age_timer = setTimeout(function () { if (self._buffer && (self._buffer.length > 0)) { callback(); } }, this._batch_age); }
...
}
if (typeof callback !== 'undefined' && callback != null) {
if (this._batch_size == null || this._batch_age == null ||
(this._buffer && (this._buffer.length > this._batch_size))) {
callback();
} else {
this._setupTimer(callback);
}
}
};
KafkaBuffer.prototype._setupTimer = function (callback) {
var self = this;
...
addChunk = function (buffer, callback) { if (this._buffer == null) { this._buffer = new Buffer(buffer); } else { this._buffer = Buffer.concat([this._buffer, buffer]); } if (typeof callback !== 'undefined' && callback != null) { if (this._batch_size == null || this._batch_age == null || (this._buffer && (this._buffer.length > this._batch_size))) { callback(); } else { this._setupTimer(callback); } } }
...
this.noAckBatchAge = options ? options.noAckBatchAge : null;
this._KafkaBuffer = new KafkaBuffer(this.noAckBatchSize, this.noAckBatchAge);
};
util.inherits(BrokerTransform, Transform);
BrokerTransform.prototype._transform = function (chunk, enc, done) {
this._KafkaBuffer.addChunk(chunk, this._transformNext.bind(this));
done();
};
BrokerTransform.prototype._transformNext = function () {
this.push(this._KafkaBuffer.getBatch());
this._KafkaBuffer.truncateBatch();
};
...
getBatch = function () { return this._buffer; }
...
BrokerTransform.prototype._transform = function (chunk, enc, done) {
this._KafkaBuffer.addChunk(chunk, this._transformNext.bind(this));
done();
};
BrokerTransform.prototype._transformNext = function () {
this.push(this._KafkaBuffer.getBatch());
this._KafkaBuffer.truncateBatch();
};
module.exports = BrokerTransform;
...
truncateBatch = function () { this._buffer = null; }
...
BrokerTransform.prototype._transform = function (chunk, enc, done) {
this._KafkaBuffer.addChunk(chunk, this._transformNext.bind(this));
done();
};
BrokerTransform.prototype._transformNext = function () {
this.push(this._KafkaBuffer.getBatch());
this._KafkaBuffer.truncateBatch();
};
module.exports = BrokerTransform;
...
KeyedPartitioner = function () {}
n/a
super_ = function () {}
n/a
getPartition = function (partitions, key) { key = key || ''; var index = this.hashCode(key) % partitions.length; return partitions[index]; }
...
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition
(_.map(topicMetadata[p.topic], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
let messages = _.isArray(p.messages) ? p.messages : [p.messages];
messages = messages.map(function (message) {
if (message instanceof KeyedMessage) {
return message;
}
...
hashCode = function (string) { var hash = 0; var length = string.length; for (var i = 0; i < length; i++) { hash = ((hash * 31) + string.charCodeAt(i)) & 0x7fffffff; } return (hash === 0) ? 1 : hash; }
...
return (hash === 0) ? 1 : hash;
};
KeyedPartitioner.prototype.getPartition = function (partitions, key) {
key = key || '';
var index = this.hashCode(key) % partitions.length;
return partitions[index];
};
var CustomPartitioner = function (partitioner) {
this.getPartition = partitioner;
};
util.inherits(CustomPartitioner, Partitioner);
...
NotCoordinatorForGroupError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
function Error() { [native code] }
n/a
Offset = function (client) { var self = this; this.client = client; this.ready = this.client.ready; this.client.on('ready', function () { self.ready = true; self.emit('ready'); }); this.client.once('connect', function () { self.emit('connect'); }); this.client.on('error', function (err) { self.emit('error', err); }); }
...
* `cb`: *Function*, the callback
Example
```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.fetch([
{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }
], function (err, data) {
// data
// { 't': { '0': [999] } }
});
```
...
function EventEmitter() { EventEmitter.init.call(this); }
n/a
buildPayloads = function (payloads) { return payloads.map(function (p) { p.partition = p.partition || 0; p.time = p.time || Date.now(); p.maxNum = p.maxNum || 1; p.metadata = 'm'; // metadata can be arbitrary return p; }); }
...
* @param {BaseProducer~sendCallback} cb A function to call once the send has completed
*/
BaseProducer.prototype.send = function (payloads, cb) {
var client = this.client;
var requireAcks = this.requireAcks;
var ackTimeoutMs = this.ackTimeoutMs;
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks
, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic
], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
...
commit = function (groupId, payloads, cb) { if (!this.ready) { this.once('ready', () => this.commit(groupId, payloads, cb)); return; } this.client.sendOffsetCommitRequest(groupId, this.buildPayloads(payloads), cb); }
...
Commit offset of the current topics manually, this method should be called when a consumer leaves
* `cb`: **Function**, the callback
Example:
``` js
consumer.commit(function(err, data) {
});
```
### setOffset(topic, partition, offset)
Set offset of the given topic
* `topic`: **String**
...
fetch = function (payloads, cb) { if (!this.ready) { this.once('ready', () => this.fetch(payloads, cb)); return; } this.client.sendOffsetRequest(this.buildPayloads(payloads), cb); }
...
Example
```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.fetch([
{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }
], function (err, data) {
// data
// { 't': { '0': [999] } }
});
```
...
fetchCommits = function (groupId, payloads, cb) { if (!this.ready) { this.once('ready', () => this.fetchCommits(groupId, payloads, cb)); return; } this.client.sendOffsetFetchRequest(groupId, this.buildPayloads(payloads), cb); }
...
Example
```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.fetchCommits('groupId', [
{ topic: 't', partition: 0 }
], function (err, data) {
});
```
### fetchLatestOffsets(topics, cb)
...
fetchEarliestOffsets = function (topics, cb) { fetchOffsets(this, topics, cb, -2); }
...
### fetchEarliestOffsets(topics, cb)
Example
```js
var partition = 0;
var topic = 't';
offset.fetchEarliestOffsets([topic], function (error, offsets) {
if (error)
return handleError(error);
console.log(offsets[topic][partition]);
});
```
# Troubleshooting / FAQ
...
fetchLatestOffsets = function (topics, cb) { fetchOffsets(this, topics, cb, -1); }
...
### fetchLatestOffsets(topics, cb)
Example
```js
var partition = 0;
var topic = 't';
offset.fetchLatestOffsets([topic], function (error, offsets) {
if (error)
return handleError(error);
console.log(offsets[topic][partition]);
});
```
### fetchEarliestOffsets(topics, cb)
...
function Producer(client, options, customPartitioner) { BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.default, customPartitioner); }
n/a
function BaseProducer(client, options, defaultPartitionerType, customPartitioner) { options = options || {}; this.ready = false; this.client = client; this.requireAcks = options.requireAcks === undefined ? DEFAULTS.requireAcks : options.requireAcks; this.ackTimeoutMs = options.ackTimeoutMs === undefined ? DEFAULTS.ackTimeoutMs : options.ackTimeoutMs; if (customPartitioner !== undefined && options.partitionerType !== PARTITIONER_TYPES.custom) { throw new Error('Partitioner Type must be custom if providing a customPartitioner.'); } else if (customPartitioner === undefined && options.partitionerType === PARTITIONER_TYPES.custom) { throw new Error('No customer partitioner defined'); } var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType]; // eslint-disable-next-line this.partitioner = new partitionerType(customPartitioner); this.connect(); }
n/a
RandomPartitioner = function () {}
n/a
super_ = function () {}
n/a
getPartition = function (partitions) { return partitions[Math.floor(Math.random() * partitions.length)]; }
...
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition
(_.map(topicMetadata[p.topic], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
let messages = _.isArray(p.messages) ? p.messages : [p.messages];
messages = messages.map(function (message) {
if (message instanceof KeyedMessage) {
return message;
}
...
RebalanceInProgressError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
function Error() { [native code] }
n/a
TopicsNotExistError = function (topics) { Error.captureStackTrace(this, this); this.topics = topics; this.message = 'The topic(s) ' + topics.toString() + ' do not exist'; }
...
*/
Client.prototype.topicExists = function (topics, cb) {
var notExistsTopics = [];
var self = this;
async.each(topics, checkZK, function (err) {
if (err) return cb(err);
if (notExistsTopics.length) return cb(new errors.TopicsNotExistError(notExistsTopics
));
cb();
});
function checkZK (topic, cb) {
self.zk.topicExists(topic, function (err, existed, topic) {
if (err) return cb(err);
if (!existed) notExistsTopics.push(topic);
...
function Error() { [native code] }
n/a
UnknownMemberIdError = function (message) { Error.captureStackTrace(this, this); this.message = message; }
n/a
function Error() { [native code] }
n/a
function ConsumerGroupMigrator(consumerGroup) { EventEmitter.call(this); assert(consumerGroup); const self = this; this.consumerGroup = consumerGroup; this.client = consumerGroup.client; var verified = 0; if (consumerGroup.options.migrateRolling) { this.zk = zookeeper.createClient(consumerGroup.client.connectionString, {retries: 10}); this.zk.on('connected', function () { self.filterByExistingZkTopics(function (error, topics) { if (error) { return self.emit('error', error); } if (topics.length) { self.checkForOwnersAndListenForChange(topics); } else { logger.debug('No HLC topics exist in zookeeper.'); self.connectConsumerGroup(); } }); }); this.on('noOwnersForTopics', function (topics) { logger.debug('No owners for topics %s reported.', topics); if (++verified <= NUMER_OF_TIMES_TO_VERIFY) { logger.debug('%s verify %d of %d HLC has given up ownership by checking again in %d', self.client.clientId, verified, NUMER_OF_TIMES_TO_VERIFY, VERIFY_WAIT_TIME_MS); setTimeout(function () { self.checkForOwners(topics); }, VERIFY_WAIT_TIME_MS); } else { self.connectConsumerGroup(); } }); this.on('topicOwnerChange', _.debounce(function (topics) { verified = 0; self.checkForOwnersAndListenForChange(topics); }, 250)); this.zk.connect(); } else { this.connectConsumerGroup(); } }
n/a
function EventEmitter() { EventEmitter.init.call(this); }
n/a
checkForOwners = function (topics, listenForChange) { const self = this; const path = '/consumers/' + this.consumerGroup.options.groupId + '/owners/'; var ownedPartitions = 0; function topicWatcher (event) { self.emit('topicOwnerChange', topics); } async.each(topics, function (topic, callback) { const args = [path + topic]; if (listenForChange) { logger.debug('%s listening for changes in topic %s', self.client.clientId, topic); args.push(topicWatcher); } args.push(function (error, children, stats) { if (error) { return callback(error); } ownedPartitions += children.length; callback(null); }); self.zk.getChildren.apply(self.zk, args); }, function (error) { if (error) { return self.emit('error', error); } if (ownedPartitions === 0) { self.emit('noOwnersForTopics', topics); } else { logger.debug('%s %d partitions are owned by old HLC... waiting...', self.client.clientId, ownedPartitions); } } ); }
...
this.on('noOwnersForTopics', function (topics) {
logger.debug('No owners for topics %s reported.', topics);
if (++verified <= NUMER_OF_TIMES_TO_VERIFY) {
logger.debug('%s verify %d of %d HLC has given up ownership by checking again in %d', self.client.clientId, verified
,
NUMER_OF_TIMES_TO_VERIFY, VERIFY_WAIT_TIME_MS);
setTimeout(function () {
self.checkForOwners(topics);
}, VERIFY_WAIT_TIME_MS);
} else {
self.connectConsumerGroup();
}
});
this.on('topicOwnerChange', _.debounce(function (topics) {
...
checkForOwnersAndListenForChange = function (topics) { this.checkForOwners(topics, true); }
...
this.zk.on('connected', function () {
self.filterByExistingZkTopics(function (error, topics) {
if (error) {
return self.emit('error', error);
}
if (topics.length) {
self.checkForOwnersAndListenForChange(topics);
} else {
logger.debug('No HLC topics exist in zookeeper.');
self.connectConsumerGroup();
}
});
});
...
connectConsumerGroup = function () { logger.debug('%s connecting consumer group', this.client.clientId); const self = this; if (this.client.ready) { this.consumerGroup.connect(); } else { this.client.once('ready', function () { self.consumerGroup.connect(); }); } this.zk && this.zk.close(); }
...
return self.emit('error', error);
}
if (topics.length) {
self.checkForOwnersAndListenForChange(topics);
} else {
logger.debug('No HLC topics exist in zookeeper.');
self.connectConsumerGroup();
}
});
});
this.on('noOwnersForTopics', function (topics) {
logger.debug('No owners for topics %s reported.', topics);
if (++verified <= NUMER_OF_TIMES_TO_VERIFY) {
...
filterByExistingZkTopics = function (callback) { const self = this; const path = '/consumers/' + this.consumerGroup.options.groupId + '/owners/'; async.filter(this.consumerGroup.topics, function (topic, cb) { const topicPath = path + topic; logger.debug('%s checking zk path %s', self.client.clientId, topicPath); self.zk.exists(topicPath, function (error, stat) { if (error) { return callback(error); } cb(stat); }); }, function (result) { callback(null, result); }); }
...
this.consumerGroup = consumerGroup;
this.client = consumerGroup.client;
var verified = 0;
if (consumerGroup.options.migrateRolling) {
this.zk = zookeeper.createClient(consumerGroup.client.connectionString, {retries: 10});
this.zk.on('connected', function () {
self.filterByExistingZkTopics(function (error, topics) {
if (error) {
return self.emit('error', error);
}
if (topics.length) {
self.checkForOwnersAndListenForChange(topics);
} else {
...
getOffset = function (tp, defaultOffset) { const offset = _.get(this.offsets, [tp.topic, tp.partition], defaultOffset); if (offset === -1) { return defaultOffset; } return offset; }
...
if (this.options.outOfRangeOffset === 'none') {
this.emit('error', new errors.InvalidConsumerOffsetError(`Offset out of range for topic "${topic.topic}" partition
${topic.partition}`));
return;
}
topic.time = ACCEPTED_FROM_OFFSET[this.options.outOfRangeOffset];
this.getOffset().fetch([topic], (error, result) => {
if (error) {
this.emit('error', new errors.InvalidConsumerOffsetError(`Fetching ${this.options.outOfRangeOffset} offset failed`,
error));
return;
}
const offset = _.head(result[topic.topic][topic.partition]);
const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset;
...
saveHighLevelConsumerOffsets = function (topicPartitionList, callback) { const self = this; this.client.sendOffsetFetchRequest(this.consumerGroup.options.groupId, topicPartitionList, function (error, results) { logger.debug('sendOffsetFetchRequest response:', results, error); if (error) { return callback(error); } self.offsets = results; callback(null); }); }
...
if (self.options.fromOffset === 'none') {
return callback(new Error(`${self.client.clientId} owns topics and partitions which contains no saved offsets for group '${
self.options.groupId}'`));
}
async.parallel([
function (callback) {
if (self.migrator) {
return self.migrator.saveHighLevelConsumerOffsets(topicPartitionList, callback);
}
callback(null);
},
function (callback) {
if (useDefaultOffsets) {
return self.saveDefaultOffsets(topicPartitionList, callback);
}
...
function ConsumerGroupRecovery(consumerGroup) { this.consumerGroup = consumerGroup; this.options = consumerGroup.options; }
n/a
clearError = function () { this.lastError = null; }
...
self.connecting = false;
self.rebalancing = false;
if (error) {
return self.recovery.tryToRecoverFrom(error, 'connect');
}
self.ready = true;
self.recovery.clearError();
logger.debug('generationId', self.generationId);
if (startFetch) {
self.fetch();
}
self.startHeartbeats();
...
getRetryTimeout = function (error) { assert(error); if (!this._timeouts) { this._timeouts = retry.timeouts({ retries: this.options.retries, factor: this.options.retryFactor, minTimeout: this.options.retryMinTimeout }); } if (this._retryIndex == null || this.lastError == null || error.errorCode !== this.lastError.errorCode) { this._retryIndex = 0; } var index = this._retryIndex++; if (index >= this._timeouts.length) { return false; } return this._timeouts[index]; }
...
recoverableItem.handler && recoverableItem.handler.call(this.consumerGroup, error);
return true;
}
return false;
}, this);
if (retry) {
retryTimeout = this.getRetryTimeout(error);
}
if (retry && retryTimeout) {
logger.debug('RECOVERY from %s: %s retrying in %s ms', source, this.consumerGroup.client.clientId, retryTimeout, error
);
this.consumerGroup.scheduleReconnect(retryTimeout);
} else {
this.consumerGroup.emit('error', error);
...
tryToRecoverFrom = function (error, source) { this.consumerGroup.ready = false; this.consumerGroup.stopHeartbeats(); var retryTimeout = false; var retry = recoverableErrors.some(function (recoverableItem) { if (isErrorInstanceOf(error, recoverableItem.errors)) { recoverableItem.handler && recoverableItem.handler.call(this.consumerGroup, error); return true; } return false; }, this); if (retry) { retryTimeout = this.getRetryTimeout(error); } if (retry && retryTimeout) { logger.debug('RECOVERY from %s: %s retrying in %s ms', source, this.consumerGroup.client.clientId, retryTimeout, error); this.consumerGroup.scheduleReconnect(retryTimeout); } else { this.consumerGroup.emit('error', error); } this.lastError = error; }
...
function (syncGroupResponse, callback) {
self.handleSyncGroup(syncGroupResponse, callback);
}
], function (error, startFetch) {
self.connecting = false;
self.rebalancing = false;
if (error) {
return self.recovery.tryToRecoverFrom(error, 'connect');
}
self.ready = true;
self.recovery.clearError();
logger.debug('generationId', self.generationId);
...
function setLoggerProvider(provider) { loggerProvider = provider; }
...
### How do I set a logger provider?
For performance reasons, initialization of the `kafka-node` module creates all necessary loggers. This means that custom logger
providers need to be set *before requiring the `kafka-node` module*. The following example shows how this can be done:
```javascript
// first configure the logger provider
const kafkaLogging = require('kafka-node/logging');
kafkaLogging.setLoggerProvider(consoleLoggerProvider);
// then require kafka-node and continue as normal
const kafka = require('kafka-node');
```
# Running Tests
...
CustomPartitioner = function (partitioner) { this.getPartition = partitioner; }
n/a
CyclicPartitioner = function () { this.c = 0; }
n/a
DefaultPartitioner = function () {}
n/a
KeyedPartitioner = function () {}
n/a
RandomPartitioner = function () {}
n/a
function decodeFetchResponse(cb, maxTickMessages) { return function (resp) { return _decodeFetchResponse(resp, cb, maxTickMessages); }; }
...
broker.socket.end();
});
};
Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
var self = this;
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes);
var decoder = protocol.decodeFetchResponse(function (err, type, message) {
if (err) {
if (err.message === 'OffsetOutOfRange') {
return consumer.emit('offsetOutOfRange', err);
} else if (err.message === 'NotLeaderForPartition' || err.message === 'UnknownTopicOrPartition') {
return self.emit('brokersChanged');
}
...
function decodeGroupCoordinatorResponse(resp) { var result; Binary.parse(resp) .word32bs('size') .word32bs('correlationId') .word16bs('errorCode') .word32bs('coordinatorId') .word16bs('coordinatorHost') .tap(function (vars) { this.buffer('coordinatorHost', vars.coordinatorHost); vars.coordinatorHost = vars.coordinatorHost.toString(); }) .word32bs('coordinatorPort') .tap(function (vars) { if (vars.errorCode !== 0) { result = createGroupError(vars.errorCode); return; } result = { coordinatorHost: vars.coordinatorHost, coordinatorPort: vars.coordinatorPort, coordinatorId: vars.coordinatorId }; }); return result; }
n/a
function decodeGroupHeartbeat(resp) { var result = null; Binary.parse(resp) .word32bs('size') .word32bs('correlationId') .word16bs('errorCode') .tap(function (vars) { result = createGroupError(vars.errorCode); }); return result; }
n/a
function decodeJoinGroupResponse(resp) { var result = { members: [] }; var error; Binary.parse(resp) .word32bs('size') .word32bs('correlationId') .word16bs('errorCode') .tap(function (vars) { error = createGroupError(vars.errorCode); }) .word32bs('generationId') .tap(function (vars) { result.generationId = vars.generationId; }) .word16bs('groupProtocol') .tap(function (vars) { this.buffer('groupProtocol', vars.groupProtocol); result.groupProtocol = vars.groupProtocol = vars.groupProtocol.toString(); }) .word16bs('leaderId') .tap(function (vars) { this.buffer('leaderId', vars.leaderId); result.leaderId = vars.leaderId = vars.leaderId.toString(); }) .word16bs('memberId') .tap(function (vars) { this.buffer('memberId', vars.memberId); result.memberId = vars.memberId = vars.memberId.toString(); }) .word32bs('memberNum') .loop(function (end, vars) { if (error) { return end(); } if (vars.memberNum-- === 0) return end(); var memberMetadata; this .word16bs('groupMemberId').tap(function (vars) { this.buffer('groupMemberId', vars.groupMemberId); vars.memberId = vars.groupMemberId.toString(); }) .word32bs('memberMetadata').tap(function (vars) { if (vars.memberMetadata > -1) { this.buffer('memberMetadata', vars.memberMetadata); memberMetadata = decodeGroupData(this.vars.memberMetadata); memberMetadata.id = vars.memberId; result.members.push(memberMetadata); } }); }); return error || result; }
n/a
function decodeLeaveGroupResponse(resp) { var error = null; Binary.parse(resp) .word32bs('size') .word32bs('correlationId') .word16bs('errorCode') .tap(function (vars) { error = createGroupError(vars.errorCode); }); return error; }
n/a
function decodeMetadataResponse(resp) { var brokers = {}; var out = {}; var topics = {}; var errors = []; Binary.parse(resp) .word32bs('size') .word32bs('correlationId') .word32bs('brokerNum') .loop(decodeBrokers) .word32bs('topicNum') .loop(_decodeTopics); function decodeBrokers (end, vars) { if (vars.brokerNum-- === 0) return end(); this.word32bs('nodeId') .word16bs('host') .tap(function (vars) { this.buffer('host', vars.host); vars.host = vars.host.toString(); }) .word32bs('port') .tap(function (vars) { brokers[vars.nodeId] = { nodeId: vars.nodeId, host: vars.host, port: vars.port }; }); } function _decodeTopics (end, vars) { if (vars.topicNum-- === 0) return end(); this.word16bs('topicError') .word16bs('topic') .tap(function (vars) { this.buffer('topic', vars.topic); vars.topic = vars.topic.toString(); }) .word32bs('partitionNum') .tap(function (vars) { if (vars.topicError !== 0) { return errors.push(ERROR_CODE[vars.topicError]); } this.loop(decodePartitions); }); } function decodePartitions (end, vars) { if (vars.partitionNum-- === 0) return end(); topics[vars.topic] = topics[vars.topic] || {}; this.word16bs('errorCode') .word32bs('partition') .word32bs('leader') .word32bs('replicasNum') .tap(function (vars) { var buffer = this.buffer('replicas', vars.replicasNum * 4).vars.replicas; this.vars.replicas = bufferToArray(vars.replicasNum, buffer); }) .word32bs('isrNum') .tap(function (vars) { var buffer = this.buffer('isr', vars.isrNum * 4).vars.isr; this.vars.isr = bufferToArray(vars.isrNum, buffer); if (vars.errorCode === 0 || vars.errorCode === 9) { topics[vars.topic][vars.partition] = new PartitionMetadata(vars.topic, vars.partition, vars.leader, vars.replicas, vars.isr); } else { errors.push(ERROR_CODE[vars.errorCode]); } }); } if (!_.isEmpty(errors)) out.error = errors; out.metadata = topics; return [brokers, out]; }
n/a
function decodeOffsetCommitResponse(resp) { var topics = {}; Binary.parse(resp) .word32bs('size') .word32bs('correlationId') .word32bs('topicNum') .loop(decodeTopics(decodePartitions)); function decodePartitions (end, vars) { if (--vars.partitionNum === 0) end(); topics[vars.topic] = topics[vars.topic] || {}; this.word32bs('partition') .word16bs('errorcode') .tap(function (vars) { topics[vars.topic]['partition'] = vars.partition; topics[vars.topic]['errorCode'] = vars.errorcode; }); } return topics; }
n/a
function decodeOffsetFetchResponse(resp) { var topics = {}; Binary.parse(resp) .word32bs('size') .word32bs('correlationId') .word32bs('topicNum') .loop(decodeTopics(decodePartitions)); function decodePartitions (end, vars) { if (--vars.partitionNum === 0) end(); topics[vars.topic] = topics[vars.topic] || {}; this.word32bs('partition') .word64bs('offset') .word16bs('metadata') .tap(function (vars) { if (vars.metadata === -1) { return; } this.buffer('metadata', vars.metadata); }) .word16bs('errorCode') .tap(function (vars) { topics[vars.topic][vars.partition] = vars.errorCode === 0 ? vars.offset : -1; }); } return topics; }
n/a
function decodeOffsetFetchV1Response(resp) { var topics = {}; Binary.parse(resp) .word32bs('size') .word32bs('correlationId') .word32bs('topicNum') .loop(decodeTopics(decodePartitions)); function decodePartitions (end, vars) { if (--vars.partitionNum === 0) end(); topics[vars.topic] = topics[vars.topic] || {}; this.word32bs('partition') .word64bs('offset') .word16bs('metadata') .tap(function (vars) { if (vars.metadata === -1) { return; } this.buffer('metadata', vars.metadata); }) .word16bs('errorCode') .tap(function (vars) { if (vars.metadata.length === 0 && vars.offset === 0) { topics[vars.topic][vars.partition] = -1; } else { topics[vars.topic][vars.partition] = vars.errorCode === 0 ? vars.offset : -1; } }); } return topics; }
n/a
function decodeOffsetResponse(resp) { var topics = {}; Binary.parse(resp) .word32bs('size') .word32bs('correlationId') .word32bs('topicNum') .loop(decodeTopics(decodePartitions)); function decodePartitions (end, vars) { if (--vars.partitionNum === 0) end(); topics[vars.topic] = topics[vars.topic] || {}; this.word32bs('partition') .word16bs('errorCode') .word32bs('offsetNum') .loop(decodeOffsets); } function decodeOffsets (end, vars) { if (--vars.offsetNum <= 0) end(); topics[vars.topic][vars.partition] = topics[vars.topic][vars.partition] || []; this.word64bs('offset') .tap(function (vars) { if (vars.offset != null) topics[vars.topic][vars.partition].push(vars.offset); }); } return topics; }
n/a
function decodeProduceResponse(resp) { var topics = {}; var error; Binary.parse(resp) .word32bs('size') .word32bs('correlationId') .word32bs('topicNum') .loop(decodeTopics(decodePartitions)); function decodePartitions (end, vars) { if (--vars.partitionNum === 0) end(); topics[vars.topic] = topics[vars.topic] || {}; this.word32bs('partition') .word16bs('errorCode') .word64bs('offset') .tap(function (vars) { if (vars.errorCode) { error = new Error(ERROR_CODE[vars.errorCode]); } else { topics[vars.topic][vars.partition] = vars.offset; } }); } return error || topics; }
n/a
function decodeSyncGroupResponse(resp) { var result; Binary.parse(resp) .word32bs('size') .word32bs('correlationId') .word16bs('errorCode') .tap(function (vars) { result = createGroupError(vars.errorCode); }) .word32bs('memberAssignment') .tap(function (vars) { if (result) { return; } this.buffer('memberAssignment', vars.memberAssignment); result = decodeMemberAssignment(vars.memberAssignment); }); return result; }
n/a
function encodeFetchRequest(maxWaitMs, minBytes) { return function encodeFetchRequest (clientId, correlationId, payloads) { return _encodeFetchRequest(clientId, correlationId, payloads, maxWaitMs, minBytes); }; }
...
broker.socket.closing = true;
broker.socket.end();
});
};
Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
var self = this;
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes);
var decoder = protocol.decodeFetchResponse(function (err, type, message) {
if (err) {
if (err.message === 'OffsetOutOfRange') {
return consumer.emit('offsetOutOfRange', err);
} else if (err.message === 'NotLeaderForPartition' || err.message === 'UnknownTopicOrPartition') {
return self.emit('brokersChanged');
}
...
function encodeGroupCoordinatorRequest(clientId, correlationId, groupId) { var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.groupCoordinator); request.Int16BE(groupId.length).string(groupId); return encodeRequestWithLength(request.make()); }
n/a
function encodeGroupHeartbeat(clientId, correlationId, groupId, generationId, memberId) { var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.heartbeat); request .Int16BE(groupId.length).string(groupId) .Int32BE(generationId) .Int16BE(memberId.length).string(memberId); return encodeRequestWithLength(request.make()); }
n/a
function encodeJoinGroupRequest(clientId, correlationId, groupId, memberId, sessionTimeout, groupProtocols) { var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.joinGroup); request .Int16BE(groupId.length).string(groupId) .Int32BE(sessionTimeout) .Int16BE(memberId.length).string(memberId) .Int16BE(GROUPS_PROTOCOL_TYPE.length).string(GROUPS_PROTOCOL_TYPE) .Int32BE(groupProtocols.length); groupProtocols.forEach(encodeGroupProtocol.bind(request)); return encodeRequestWithLength(request.make()); }
n/a
function encodeLeaveGroupRequest(clientId, correlationId, groupId, memberId) { var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.leaveGroup); request .Int16BE(groupId.length).string(groupId) .Int16BE(memberId.length).string(memberId); return encodeRequestWithLength(request.make()); }
n/a
function encodeMessageSet(messageSet) { var buffer = new Buffermaker(); messageSet.forEach(function (message) { var msg = encodeMessage(message); buffer.Int64BE(0) .Int32BE(msg.length) .string(msg); }); return buffer.make(); }
n/a
function encodeMetadataRequest(clientId, correlationId, topics) { var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.metadata); request.Int32BE(topics.length); topics.forEach(function (topic) { request.Int16BE(topic.length) .string(topic); }); return encodeRequestWithLength(request.make()); }
...
* containing the topic name, partition, leader number, replica count, and in sync replicas per partition.
*
* @param {Array} topics An array of topics to load the metadata for
* @param {Client~loadMetadataForTopicsCallback} cb Function to call once all metadata is loaded
*/
Client.prototype.loadMetadataForTopics = function (topics, cb) {
var correlationId = this.nextId();
var request = protocol.encodeMetadataRequest(this.clientId, correlationId, topics);
var broker = this.brokerForLeader();
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [protocol.decodeMetadataResponse, cb]);
...
function encodeOffsetCommitRequest(group) { return function (clientId, correlationId, payloads) { return _encodeOffsetCommitRequest(clientId, correlationId, group, payloads); }; }
...
payload.messages = [ new Message(0, attributes, '', message) ];
cb();
});
}
};
Client.prototype.sendOffsetCommitRequest = function (group, payloads, cb) {
var encoder = protocol.encodeOffsetCommitRequest(group);
var decoder = protocol.decodeOffsetCommitResponse;
this.send(payloads, encoder, decoder, cb);
};
Client.prototype.sendOffsetCommitV2Request = function (group, generationId, memberId, payloads, cb) {
var encoder = protocol.encodeOffsetCommitV2Request;
var decoder = protocol.decodeOffsetCommitResponse;
...
function encodeOffsetCommitV2Request(clientId, correlationId, group, generationId, memberId, payloads) { payloads = groupByTopic(payloads); var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.offsetCommit, 2); var topics = Object.keys(payloads); request.Int16BE(group.length).string(group) .Int32BE(generationId) .Int16BE(memberId.length).string(memberId) .Int64BE(-1) .Int32BE(topics.length); topics.forEach(function (topic) { request.Int16BE(topic.length) .string(topic); var partitions = _.toPairs(payloads[topic]).map(function (pairs) { return pairs[1]; }); request.Int32BE(partitions.length); partitions.forEach(function (p) { request.Int32BE(p.partition) .Int64BE(p.offset) .Int16BE(p.metadata.length) .string(p.metadata); }); }); return encodeRequestWithLength(request.make()); }
n/a
function encodeOffsetFetchRequest(group) { return function (clientId, correlationId, payloads) { return _encodeOffsetFetchRequest(clientId, correlationId, group, payloads); }; }
...
Client.prototype.sendOffsetFetchV1Request = function (group, payloads, cb) {
var encoder = protocol.encodeOffsetFetchV1Request;
var decoder = protocol.decodeOffsetFetchV1Response;
this.sendGroupRequest(encoder, decoder, arguments);
};
Client.prototype.sendOffsetFetchRequest = function (group, payloads, cb) {
var encoder = protocol.encodeOffsetFetchRequest(group);
var decoder = protocol.decodeOffsetFetchResponse;
this.send(payloads, encoder, decoder, cb);
};
Client.prototype.sendOffsetRequest = function (payloads, cb) {
var encoder = protocol.encodeOffsetRequest;
var decoder = protocol.decodeOffsetResponse;
...
function encodeOffsetFetchV1Request(clientId, correlationId, group, payloads) { var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.offsetFetch, 1); var topics = Object.keys(payloads); request.Int16BE(group.length) .string(group) .Int32BE(topics.length); topics.forEach(function (topic) { request.Int16BE(topic.length).string(topic) .Int32BE(payloads[topic].length); payloads[topic].forEach(function (p) { request.Int32BE(p); }); }); return encodeRequestWithLength(request.make()); }
n/a
function encodeOffsetRequest(clientId, correlationId, payloads) { payloads = groupByTopic(payloads); var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.offset); var topics = Object.keys(payloads); request.Int32BE(REPLICA_ID) .Int32BE(topics.length); topics.forEach(function (topic) { request.Int16BE(topic.length) .string(topic); var partitions = _.toPairs(payloads[topic]).map(function (pairs) { return pairs[1]; }); request.Int32BE(partitions.length); partitions.forEach(function (p) { request.Int32BE(p.partition) .Int64BE(p.time) .Int32BE(p.maxNum); }); }); return encodeRequestWithLength(request.make()); }
n/a
function encodeProduceRequest(requireAcks, ackTimeoutMs) { return function (clientId, correlationId, payloads) { return _encodeProduceRequest(clientId, correlationId, payloads, requireAcks, ackTimeoutMs); }; }
...
Array.prototype.unshift.call(arguments, 'error');
consumer.emit.apply(consumer, arguments);
}
});
};
Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, cb) {
var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs);
var decoder = protocol.decodeProduceResponse;
var self = this;
decoder.requireAcks = requireAcks;
async.each(payloads, buildRequest, function (err) {
if (err) return cb(err);
...
function encodeSyncGroupRequest(clientId, correlationId, groupId, generationId, memberId, groupAssignment) { var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.syncGroup); request .Int16BE(groupId.length).string(groupId) .Int32BE(generationId) .Int16BE(memberId.length).string(memberId); if (groupAssignment && groupAssignment.length) { request.Int32BE(groupAssignment.length); groupAssignment.forEach(function (assignment) { request.Int16BE(assignment.memberId.length).string(assignment.memberId) .string(_encodeMemberAssignment(assignment)); }); } else { request.Int32BE(0); } return encodeRequestWithLength(request.make()); }
n/a
FetchRequest = function () { for (var i = 0; i < args.length; i++) { this[args[i]] = arguments[i]; } }
n/a
FetchResponse = function () { for (var i = 0; i < args.length; i++) { this[args[i]] = arguments[i]; } }
n/a
function KeyedMessage(key, value) { exports.Message.call(this, 0, 0, key, value); }
n/a
Message = function () { for (var i = 0; i < args.length; i++) { this[args[i]] = arguments[i]; } }
n/a
OffsetCommitRequest = function () { for (var i = 0; i < args.length; i++) { this[args[i]] = arguments[i]; } }
n/a
OffsetCommitResponse = function () { for (var i = 0; i < args.length; i++) { this[args[i]] = arguments[i]; } }
n/a
PartitionMetadata = function () { for (var i = 0; i < args.length; i++) { this[args[i]] = arguments[i]; } }
n/a
ProduceRequest = function () { for (var i = 0; i < args.length; i++) { this[args[i]] = arguments[i]; } }
n/a
Request = function () { for (var i = 0; i < args.length; i++) { this[args[i]] = arguments[i]; } }
n/a
TopicAndPartition = function () { for (var i = 0; i < args.length; i++) { this[args[i]] = arguments[i]; } }
n/a
function assignRange(topicPartition, groupMembers, callback) { logger.debug('topicPartition: %j', topicPartition); var assignment = _(groupMembers).map('id').reduce(function (obj, id) { obj[id] = []; return obj; }, {}); const topicMemberMap = topicToMemberMap(groupMembers); for (var topic in topicMemberMap) { if (!topicMemberMap.hasOwnProperty(topic)) { continue; } logger.debug('For topic %s', topic); topicMemberMap[topic].sort(); logger.debug(' members: ', topicMemberMap[topic]); var numberOfPartitionsForTopic = topicPartition[topic].length; logger.debug(' numberOfPartitionsForTopic', numberOfPartitionsForTopic); var numberOfMembersForTopic = topicMemberMap[topic].length; logger.debug(' numberOfMembersForTopic', numberOfMembersForTopic); var numberPartitionsPerMember = Math.floor(numberOfPartitionsForTopic / numberOfMembersForTopic); logger.debug(' numberPartitionsPerMember', numberPartitionsPerMember); var membersWithExtraPartition = numberOfPartitionsForTopic % numberOfMembersForTopic; var topicPartitionList = createTopicPartitionArray(topic, numberOfPartitionsForTopic); for (var i = 0, n = numberOfMembersForTopic; i < n; i++) { var start = numberPartitionsPerMember * i + Math.min(i, membersWithExtraPartition); var length = numberPartitionsPerMember + (i + 1 > membersWithExtraPartition ? 0 : 1); var assignedTopicPartitions = assignment[topicMemberMap[topic][i]]; assignedTopicPartitions.push.apply(assignedTopicPartitions, topicPartitionList.slice(start, start + length)); } } logger.debug(assignment); callback(null, convertToAssignmentList(assignment, VERSION)); }
...
}
this.protocols = protocols.map(function (protocol) {
if (typeof protocol === 'string') {
if (!(protocol in builtInProtocols)) {
throw new Error('Unknown built in assignment protocol ' + protocol);
}
protocol = _.assign({}, builtInProtocols[protocol]);
} else {
checkProtocol(protocol);
}
protocol.subscription = this.topics;
return protocol;
}, this);
...
function assignRoundRobin(topicPartition, groupMembers, callback) { logger.debug('topicPartition: %j', topicPartition); logger.debug('groupMembers: %j', groupMembers); var _members = _(groupMembers).map('id'); var members = _members.value().sort(); logger.debug('members', members); var assignment = _members.reduce(function (obj, id) { obj[id] = []; return obj; }, {}); var subscriberMap = groupMembers.reduce(function (subscribers, member) { subscribers[member.id] = member.subscription; return subscribers; }, {}); logger.debug('subscribers', subscriberMap); // layout topic/partitions pairs into a list var topicPartitionList = _(topicPartition).map(function (partitions, topic) { return partitions.map(function (partition) { return { topic: topic, partition: partition }; }); }).flatten().value(); logger.debug('round robin on topic partition pairs: ', topicPartitionList); var assigner = cycle(members); topicPartitionList.forEach(function (tp) { var topic = tp.topic; while (!_.includes(subscriberMap[assigner.peek()], topic)) { assigner.next(); } assignment[assigner.next()].push(tp); }); var ret = _.map(assignment, function (value, key) { var ret = {}; ret.memberId = key; ret.topicPartitions = groupPartitionsByTopic(value); ret.version = VERSION; return ret; }); callback(null, ret); }
...
}
this.protocols = protocols.map(function (protocol) {
if (typeof protocol === 'string') {
if (!(protocol in builtInProtocols)) {
throw new Error('Unknown built in assignment protocol ' + protocol);
}
protocol = _.assign({}, builtInProtocols[protocol]);
} else {
checkProtocol(protocol);
}
protocol.subscription = this.topics;
return protocol;
}, this);
...
function decodeSnappy(buffer, cb) { if (isChunked(buffer)) { var pos = 16; var max = buffer.length; var encoded = []; var size; while (pos < max) { size = buffer.readUInt32BE(pos); pos += 4; encoded.push(buffer.slice(pos, pos + size)); pos += size; } return async.mapSeries(encoded, snappy.uncompress, function (err, decodedChunks) { if (err) return cb(err); return cb(null, Buffer.concat(decodedChunks)); } ); } return snappy.uncompress(buffer, cb); }
...
value: vars.value,
offset: vars.offset,
partition: partition,
highWaterOffset: highWaterOffset,
key: vars.key
});
}
codec.decode(vars.value, function (error, inlineMessageSet) {
if (error) return; // Not sure where to report this
decodeMessageSet(topic, partition, inlineMessageSet, cb, maxTickMessages);
});
}
});
// Defensive code around potential denial of service
if (maxTickMessages && messageCount > maxTickMessages) break;
...
encode = function (input, callback) { if (!(typeof (input) === 'string' || Buffer.isBuffer(input))) { return callback(new Error('input must be a String or a Buffer')); } binding.compress(input, callback); }
...
function buildRequest (payload, cb) {
var attributes = payload.attributes;
var codec = getCodec(attributes);
if (!codec) return cb();
var innerSet = encodeMessageSet(payload.messages);
codec.encode(innerSet, function (err, message) {
if (err) return cb(err);
payload.messages = [ new Message(0, attributes, '', message) ];
cb();
});
}
};
...
function createTopicPartitionList(topicPartitions) { var tpList = []; for (var topic in topicPartitions) { if (!topicPartitions.hasOwnProperty(topic)) { continue; } topicPartitions[topic].forEach(function (partition) { tpList.push({ topic: topic, partition: partition }); }); } return tpList; }
n/a
function groupPartitionsByTopic(topicPartitions) { assert(Array.isArray(topicPartitions)); return topicPartitions.reduce(function (result, tp) { if (!(tp.topic in result)) { result[tp.topic] = [tp.partition]; } else { result[tp.topic].push(tp.partition); } return result; }, {}); }
n/a
function validateConfig(property, value) { if (!legalChars.test(value)) { throw new InvalidConfigError([property, value, "is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'"].join(' ')); } }
...
this.paused = this.options.paused;
this.id = nextId();
this.payloads = this.buildPayloads(topics);
this.connect();
this.encoding = this.options.encoding;
if (this.options.groupId) {
utils.validateConfig('options.groupId', this.options.groupId);
}
};
util.inherits(Consumer, events.EventEmitter);
Consumer.prototype.buildPayloads = function (payloads) {
var self = this;
return payloads.map(function (p) {
...
function validateTopicNames(topics) { // Rewriting same validations done by Apache Kafka team for topics // iterating over topics topics.some(function (topic) { if (topic.length <= 0) { throw new InvalidConfigError('topic name is illegal, cannot be empty'); } if (topic === '.' || topic === '..') { throw new InvalidConfigError('topic name cannot be . or ..'); } if (topic.length > allowedTopicLength) { throw new InvalidConfigError(`topic name is illegal, cannot be longer than ${allowedTopicLength} characters`); } if (!legalChars.test(topic)) { throw new InvalidConfigError(`topic name ${topic} is illegal, contains a character other than ASCII alphanumerics .,_ and -`); } }); return true; }
n/a
function validateTopics(topics) { if (topics.some(function (topic) { if ('partition' in topic) { return typeof topic.partition !== 'number'; } return false; })) { throw new InvalidConfigError('Offset must be a number and can not contain characters'); } }
...
})();
var Consumer = function (client, topics, options) {
if (!topics) {
throw new Error('Must have payloads');
}
utils.validateTopics(topics);
this.fetchCount = 0;
this.client = client;
this.options = _.defaults((options || {}), DEFAULTS);
this.ready = false;
this.paused = this.options.paused;
this.id = nextId();
...
Zookeeper = function (connectionString, options) { this.client = zookeeper.createClient(connectionString, options); var that = this; this.client.on('connected', function () { that.listBrokers(); }); this.client.on('disconnected', function () { that.emit('disconnected'); }); this.client.connect(); }
n/a
ZookeeperConsumerMappings = function () { this.consumerTopicMap = {}; this.topicConsumerMap = {}; this.topicPartitionMap = {}; }
n/a