BrokerNotAvailableError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}...
requestArgs.unshift(this.clientId, correlationId);
var request = encode.apply(null, requestArgs);
var broker = this.brokerForLeader(this.coordinatorId);
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [decode, cb]);
broker.write(request);
};
Client.prototype.sendGroupCoordinatorRequest = function (groupId, cb) {
...BrokerReadable = function (options) {
Readable.call(this, options);
}n/a
BrokerTransform = function (options) {
Transform.call(this, options);
this.noAckBatchSize = options ? options.noAckBatchSize : null;
this.noAckBatchAge = options ? options.noAckBatchAge : null;
this._KafkaBuffer = new KafkaBuffer(this.noAckBatchSize, this.noAckBatchAge);
}n/a
BrokerWrapper = function (socket, noAckBatchOptions) {
this.socket = socket;
var self = this;
var readable = new BrokerReadable();
var transform = new BrokerTransform(noAckBatchOptions);
readable.pipe(transform);
transform.on('readable', function () {
var bulkMessage = null;
while (bulkMessage = transform.read()) { // eslint-disable-line no-cond-assign
self.socket.write(bulkMessage);
}
});
this.readableSocket = readable;
}n/a
Client = function (connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions) {
if (this instanceof Client === false) {
return new Client(connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions);
}
this.sslOptions = sslOptions;
this.ssl = !!sslOptions;
if (clientId) {
validateConfig('clientId', clientId);
}
this.connectionString = connectionString || 'localhost:2181/';
this.clientId = clientId || 'kafka-node-client';
this.zkOptions = zkOptions;
this.noAckBatchOptions = noAckBatchOptions;
this.brokers = {};
this.longpollingBrokers = {};
this.topicMetadata = {};
this.topicPartitions = {};
this.correlationId = 0;
this._socketId = 0;
this.cbqueue = {};
this.brokerMetadata = {};
this.ready = false;
this.connect();
}...
partitionerType: 2
}
```
``` js
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
```
### Events
- `ready`: this event is emitted when producer is ready to send messages.
- `error`: this is the error event propagates from internal client, producer should always listen it.
...Consumer = function (client, topics, options) {
if (!topics) {
throw new Error('Must have payloads');
}
utils.validateTopics(topics);
this.fetchCount = 0;
this.client = client;
this.options = _.defaults((options || {}), DEFAULTS);
this.ready = false;
this.paused = this.options.paused;
this.id = nextId();
this.payloads = this.buildPayloads(topics);
this.connect();
this.encoding = this.options.encoding;
if (this.options.groupId) {
utils.validateConfig('options.groupId', this.options.groupId);
}
}n/a
function ConsumerGroup(memberOptions, topics) {
EventEmitter.call(this);
const self = this;
this.options = _.defaults((memberOptions || {}), DEFAULTS);
if (!this.options.heartbeatInterval) {
this.options.heartbeatInterval = Math.floor(this.options.sessionTimeout / 3);
}
if (memberOptions.ssl === true) {
memberOptions.ssl = {};
}
if (!(this.options.fromOffset in ACCEPTED_FROM_OFFSET)) {
throw new Error(`fromOffset ${this.options.fromOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join(', ')}`);
}
if (!(this.options.outOfRangeOffset in ACCEPTED_FROM_OFFSET)) {
throw new Error(`outOfRangeOffset ${this.options.outOfRangeOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join
(', ')}`);
}
this.client = new Client(memberOptions.host, memberOptions.id, memberOptions.zk,
memberOptions.batch, memberOptions.ssl);
if (_.isString(topics)) {
topics = [topics];
}
assert(Array.isArray(topics), 'Array of topics is required');
this.topics = topics;
this.recovery = new ConsumerGroupRecovery(this);
this.setupProtocols(this.options.protocol);
if (this.options.connectOnReady && !this.options.migrateHLC) {
this.client.once('ready', this.connect.bind(this));
}
if (this.options.migrateHLC) {
const ConsumerGroupMigrator = require('./consumerGroupMigrator');
this.migrator = new ConsumerGroupMigrator(this);
this.migrator.on('error', function (error) {
self.emit('error', error);
});
}
this.client.on('error', function (err) {
logger.error('Error from %s', self.client.clientId, err);
self.emit('error', err);
});
const recoverFromBrokerChange = _.debounce(function () {
logger.debug('brokersChanged refreshing metadata');
self.client.refreshMetadata(self.topics, function (error) {
if (error) {
self.emit(error);
return;
}
self.paused = false;
if (!self.ready && !self.connecting) {
if (self.reconnectTimer) {
// brokers changed so bypass backoff retry and reconnect now
clearTimeout(self.reconnectTimer);
self.reconnectTimer = null;
}
self.connect();
} else if (!self.connecting) {
self.fetch();
}
});
}, 200);
this.client.on('brokersChanged', function () {
self.pause();
recoverFromBrokerChange();
});
this.client.on('reconnect', function (lastError) {
self.fetch();
});
this.on('offsetOutOfRange', topic => {
this.pause();
if (this.options.outOfRangeOffset === 'none') {
this.emit('error', new errors.InvalidConsumerOffsetError(`Offset out of range for topic "${topic.topic}" partition ${topic
.partition}`));
return;
}
topic.time = ACCEPTED_FROM_OFFSET[this.options.outOfRangeOffset];
this.getOffset().fetch([topic], (error, result) => {
if (error) {
this.emit('error', new errors.InvalidConsumerOffsetError(`Fetching ${this.options.outOfRangeOffset} offset failed`, error
));
return;
}
const offset = _.head(result[topic.topic][topic.partition]);
const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset;
logger.debug('replacing %s-%s stale offset of %d with %d', topic.topic, topic.partition, oldOffset, offset);
this.setOffset(topic.topic, topic.partition, offset);
this.resume();
});
});
// 'done' will be emit when a message fetch request complete
this.on('done', function (topics) {
self.updateOffsets(topics);
if (!self.paused) {
setImmediate(function () {
self.fetch();
});
}
});
if (this.options.groupId) {
validateConfig('options.groupId', this.options.groupId);
}
this.isLeader = false;
this.coordinatorId = null;
this.generationId = null;
this.ready = false;
this.topicPayloads = [];
}n/a
CustomPartitioner = function (partitioner) {
this.getPartition = partitioner;
}n/a
CyclicPartitioner = function () {
this.c = 0;
}n/a
DefaultPartitioner = function () {}n/a
FailedToRebalanceConsumerError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}...
operation.attempt(function (currentAttempt) {
self.rebalanceAttempt(oldTopicPayloads, function (err) {
if (operation.retry(err)) {
return;
}
if (err) {
self.rebalancing = false;
return self.emit('error', new errors.FailedToRebalanceConsumerError(operation
.mainError().toString()));
} else {
var topicNames = self.topicPayloads.map(function (p) {
return p.topic;
});
self.client.refreshMetadata(topicNames, function (err) {
register();
if (err) {
...FailedToRegisterConsumerError = function (message, nested) {
NestedError.call(this, message, nested);
this.message = message;
}...
} else {
callback(new Error(util.format('Consumer %s is not registered in group %s', self.id, self.options.groupId)));
}
});
}
], function (error) {
if (error) {
self.emit('error', new errors.FailedToRegisterConsumerError(error.toString
(), error));
}
});
}
}, 20000);
function fetchAndUpdateOffsets (cb) {
self.fetchOffset(self.topicPayloads, function (err, topics) {
...GroupCoordinatorNotAvailableError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
GroupLoadInProgressError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
HeartbeatTimeoutError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
HighLevelConsumer = function (client, topics, options) {
if (!topics) {
throw new Error('Must have payloads');
}
this.fetchCount = 0;
this.client = client;
this.options = _.defaults((options || {}), DEFAULTS);
this.initialised = false;
this.ready = false;
this.closing = false;
this.paused = this.options.paused;
this.rebalancing = false;
this.pendingRebalances = 0;
this.committing = false;
this.needToCommit = false;
this.id = this.options.id || this.options.groupId + '_' + uuid.v4();
this.payloads = this.buildPayloads(topics);
this.topicPayloads = this.buildTopicPayloads(topics);
this.connect();
if (this.options.groupId) {
validateConfig('options.groupId', this.options.groupId);
}
}n/a
function HighLevelProducer(client, options, customPartitioner) {
BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.cyclic, customPartitioner);
}n/a
function BaseProducer(client, options, defaultPartitionerType, customPartitioner) {
options = options || {};
this.ready = false;
this.client = client;
this.requireAcks = options.requireAcks === undefined
? DEFAULTS.requireAcks
: options.requireAcks;
this.ackTimeoutMs = options.ackTimeoutMs === undefined
? DEFAULTS.ackTimeoutMs
: options.ackTimeoutMs;
if (customPartitioner !== undefined && options.partitionerType !== PARTITIONER_TYPES.custom) {
throw new Error('Partitioner Type must be custom if providing a customPartitioner.');
} else if (customPartitioner === undefined && options.partitionerType === PARTITIONER_TYPES.custom) {
throw new Error('No customer partitioner defined');
}
var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.connect();
}n/a
IllegalGenerationError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
InvalidConfigError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
InvalidConsumerOffsetError = function (message, nested) {
NestedError.apply(this, arguments);
}...
this.client.on('reconnect', function (lastError) {
self.fetch();
});
this.on('offsetOutOfRange', topic => {
this.pause();
if (this.options.outOfRangeOffset === 'none') {
this.emit('error', new errors.InvalidConsumerOffsetError(`Offset out of range
for topic "${topic.topic}" partition ${topic.partition}`));
return;
}
topic.time = ACCEPTED_FROM_OFFSET[this.options.outOfRangeOffset];
this.getOffset().fetch([topic], (error, result) => {
if (error) {
...KafkaBuffer = function (batchSize, batchAge) {
this._batch_size = batchSize;
this._batch_age = batchAge;
this._batch_age_timer = null;
this._buffer = null;
}n/a
function KeyedMessage(key, value) {
exports.Message.call(this, 0, 0, key, value);
}n/a
KeyedPartitioner = function () {}n/a
NotCoordinatorForGroupError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
Offset = function (client) {
var self = this;
this.client = client;
this.ready = this.client.ready;
this.client.on('ready', function () {
self.ready = true;
self.emit('ready');
});
this.client.once('connect', function () {
self.emit('connect');
});
this.client.on('error', function (err) {
self.emit('error', err);
});
}...
* `cb`: *Function*, the callback
Example
```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.fetch([
{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }
], function (err, data) {
// data
// { 't': { '0': [999] } }
});
```
...function Producer(client, options, customPartitioner) {
BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.default, customPartitioner);
}n/a
RandomPartitioner = function () {}n/a
RebalanceInProgressError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
TopicsNotExistError = function (topics) {
Error.captureStackTrace(this, this);
this.topics = topics;
this.message = 'The topic(s) ' + topics.toString() + ' do not exist';
}...
*/
Client.prototype.topicExists = function (topics, cb) {
var notExistsTopics = [];
var self = this;
async.each(topics, checkZK, function (err) {
if (err) return cb(err);
if (notExistsTopics.length) return cb(new errors.TopicsNotExistError(notExistsTopics
));
cb();
});
function checkZK (topic, cb) {
self.zk.topicExists(topic, function (err, existed, topic) {
if (err) return cb(err);
if (!existed) notExistsTopics.push(topic);
...UnknownMemberIdError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
function ConsumerGroupMigrator(consumerGroup) {
EventEmitter.call(this);
assert(consumerGroup);
const self = this;
this.consumerGroup = consumerGroup;
this.client = consumerGroup.client;
var verified = 0;
if (consumerGroup.options.migrateRolling) {
this.zk = zookeeper.createClient(consumerGroup.client.connectionString, {retries: 10});
this.zk.on('connected', function () {
self.filterByExistingZkTopics(function (error, topics) {
if (error) {
return self.emit('error', error);
}
if (topics.length) {
self.checkForOwnersAndListenForChange(topics);
} else {
logger.debug('No HLC topics exist in zookeeper.');
self.connectConsumerGroup();
}
});
});
this.on('noOwnersForTopics', function (topics) {
logger.debug('No owners for topics %s reported.', topics);
if (++verified <= NUMER_OF_TIMES_TO_VERIFY) {
logger.debug('%s verify %d of %d HLC has given up ownership by checking again in %d', self.client.clientId, verified,
NUMER_OF_TIMES_TO_VERIFY, VERIFY_WAIT_TIME_MS);
setTimeout(function () {
self.checkForOwners(topics);
}, VERIFY_WAIT_TIME_MS);
} else {
self.connectConsumerGroup();
}
});
this.on('topicOwnerChange', _.debounce(function (topics) {
verified = 0;
self.checkForOwnersAndListenForChange(topics);
}, 250));
this.zk.connect();
} else {
this.connectConsumerGroup();
}
}n/a
function ConsumerGroupRecovery(consumerGroup) {
this.consumerGroup = consumerGroup;
this.options = consumerGroup.options;
}n/a
BrokerNotAvailableError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}...
requestArgs.unshift(this.clientId, correlationId);
var request = encode.apply(null, requestArgs);
var broker = this.brokerForLeader(this.coordinatorId);
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [decode, cb]);
broker.write(request);
};
Client.prototype.sendGroupCoordinatorRequest = function (groupId, cb) {
...function Error() { [native code] }n/a
BrokerReadable = function (options) {
Readable.call(this, options);
}n/a
function Readable(options) {
if (!(this instanceof Readable))
return new Readable(options);
this._readableState = new ReadableState(options, this);
// legacy
this.readable = true;
if (options && typeof options.read === 'function')
this._read = options.read;
Stream.call(this);
}n/a
_read = function (size) {}n/a
BrokerTransform = function (options) {
Transform.call(this, options);
this.noAckBatchSize = options ? options.noAckBatchSize : null;
this.noAckBatchAge = options ? options.noAckBatchAge : null;
this._KafkaBuffer = new KafkaBuffer(this.noAckBatchSize, this.noAckBatchAge);
}n/a
function Transform(options) {
if (!(this instanceof Transform))
return new Transform(options);
Duplex.call(this, options);
this._transformState = new TransformState(this);
var stream = this;
// start out asking for a readable event once data is transformed.
this._readableState.needReadable = true;
// we have implemented the _read method, and done the other things
// that Readable wants before the first _read call, so unset the
// sync guard flag.
this._readableState.sync = false;
if (options) {
if (typeof options.transform === 'function')
this._transform = options.transform;
if (typeof options.flush === 'function')
this._flush = options.flush;
}
// When the writable side finishes, then flush out anything remaining.
this.once('prefinish', function() {
if (typeof this._flush === 'function')
this._flush(function(er) {
done(stream, er);
});
else
done(stream);
});
}n/a
_transform = function (chunk, enc, done) {
this._KafkaBuffer.addChunk(chunk, this._transformNext.bind(this));
done();
}n/a
_transformNext = function () {
this.push(this._KafkaBuffer.getBatch());
this._KafkaBuffer.truncateBatch();
}n/a
BrokerWrapper = function (socket, noAckBatchOptions) {
this.socket = socket;
var self = this;
var readable = new BrokerReadable();
var transform = new BrokerTransform(noAckBatchOptions);
readable.pipe(transform);
transform.on('readable', function () {
var bulkMessage = null;
while (bulkMessage = transform.read()) { // eslint-disable-line no-cond-assign
self.socket.write(bulkMessage);
}
});
this.readableSocket = readable;
}n/a
write = function (buffer) {
this.socket.write(buffer);
}...
var broker = this.brokerForLeader(this.coordinatorId);
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [decode, cb]);
broker.write(request);
};
Client.prototype.sendGroupCoordinatorRequest = function (groupId, cb) {
this.sendGroupRequest(protocol.encodeGroupCoordinatorRequest, protocol.decodeGroupCoordinatorResponse, arguments);
};
Client.prototype.sendJoinGroupRequest = function (groupId, memberId, sessionTimeout, groupProtocol, cb) {
...writeAsync = function (buffer) {
this.readableSocket.push(buffer);
}...
if (longpolling) {
if (broker.socket.waiting) continue;
broker.socket.waiting = true;
}
if (decoder.requireAcks === 0) {
broker.writeAsync(request);
cb(null, { result: 'no ack' });
} else {
this.queueCallback(broker.socket, correlationId, [decoder, cb]);
broker.write(request);
}
}
};
...Client = function (connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions) {
if (this instanceof Client === false) {
return new Client(connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions);
}
this.sslOptions = sslOptions;
this.ssl = !!sslOptions;
if (clientId) {
validateConfig('clientId', clientId);
}
this.connectionString = connectionString || 'localhost:2181/';
this.clientId = clientId || 'kafka-node-client';
this.zkOptions = zkOptions;
this.noAckBatchOptions = noAckBatchOptions;
this.brokers = {};
this.longpollingBrokers = {};
this.topicMetadata = {};
this.topicPartitions = {};
this.correlationId = 0;
this._socketId = 0;
this.cbqueue = {};
this.brokerMetadata = {};
this.ready = false;
this.connect();
}...
partitionerType: 2
}
```
``` js
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
```
### Events
- `ready`: this event is emitted when producer is ready to send messages.
- `error`: this is the error event propagates from internal client, producer should always listen it.
...function EventEmitter() {
EventEmitter.init.call(this);
}n/a
addTopics = function (topics, cb) {
var self = this;
this.topicExists(topics, function (err) {
if (err) return cb(err);
self.loadMetadataForTopics(topics, function (err, resp) {
if (err) return cb(err);
self.updateMetadatas(resp);
cb(null, topics);
});
});
}...
* `topics`: **Array**, array of topics to add
* `cb`: **Function**,the callback
* `fromOffset`: **Boolean**, if true, the consumer will fetch message from the specified offset, otherwise it will fetch message
from the last commited offset of the topic.
Example:
``` js
consumer.addTopics(['t1', 't2'], function (err, added) {
});
or
consumer.addTopics([{ topic: 't1', offset: 10 }], function (err, added) {
}, true);
```
...brokerForLeader = function (leader, longpolling) {
var addr;
var brokers = this.getBrokers(longpolling);
// If leader is not give, choose the first broker as leader
if (typeof leader === 'undefined') {
if (!_.isEmpty(brokers)) {
addr = Object.keys(brokers)[0];
return brokers[addr];
} else if (!_.isEmpty(this.brokerMetadata)) {
leader = Object.keys(this.brokerMetadata)[0];
} else {
return;
}
}
var broker = _.find(this.brokerProfiles, {id: leader});
if (!broker) {
return;
}
addr = broker.host + ':' + broker.port;
return brokers[addr] || this.setupBroker(broker.host, broker.port, longpolling, brokers);
}...
requestArgs = _.values(requestArgs);
var cb = requestArgs.pop();
var correlationId = this.nextId();
requestArgs.unshift(this.clientId, correlationId);
var request = encode.apply(null, requestArgs);
var broker = this.brokerForLeader(this.coordinatorId);
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [decode, cb]);
broker.write(request);
...checkMetadatas = function (payloads) {
if (_.isEmpty(this.topicMetadata)) return [ [], payloads ];
// out: [ [metadata exists], [metadata not exists] ]
var out = [ [], [] ];
payloads.forEach(function (p) {
if (this.hasMetadata(p.topic, p.partition)) out[0].push(p);
else out[1].push(p);
}.bind(this));
return out;
}...
}
};
Client.prototype.send = function (payloads, encoder, decoder, cb) {
var self = this;
var _payloads = payloads;
// payloads: [ [metadata exists], [metadata not exists] ]
payloads = this.checkMetadatas(payloads);
if (payloads[0].length && !payloads[1].length) {
this.sendToBroker(_.flatten(payloads), encoder, decoder, cb);
return;
}
if (payloads[1].length) {
var topicNames = payloads[1].map(function (p) { return p.topic; });
this.loadMetadataForTopics(topicNames, function (err, resp) {
...clearCallbackQueue = function (socket, error) {
var socketId = socket.socketId;
var longpolling = socket.longpolling;
if (!this.cbqueue.hasOwnProperty(socketId)) {
return;
}
var queue = this.cbqueue[socketId];
if (!longpolling) {
Object.keys(queue).forEach(function (key) {
var handlers = queue[key];
var cb = handlers[1];
cb(error);
});
}
delete this.cbqueue[socketId];
}...
socket.on('error', function (err) {
this.error = err;
self.emit('error', err);
});
socket.on('close', function (hadError) {
self.emit('close', this);
if (hadError && this.error) {
self.clearCallbackQueue(this, this.error);
} else {
self.clearCallbackQueue(this, new errors.BrokerNotAvailableError('Broker not available'));
}
retry(this);
});
socket.on('end', function () {
retry(this);
...close = function (cb) {
this.closeBrokers(this.brokers);
this.closeBrokers(this.longpollingBrokers);
this.zk.close();
cb && cb();
}...
### close(force, cb)
* `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false`
Example
```js
consumer.close(true, cb);
consumer.close(cb); //force is disabled
```
## HighLevelConsumer
⚠️ ***This consumer has been deprecated in the latest version of Kafka (0.10.1) and is likely to be removed in the future. Please
use the ConsumerGroup instead.***
### HighLevelConsumer(client, payloads, options)
...closeBrokers = function (brokers) {
_.each(brokers, function (broker) {
broker.socket.closing = true;
broker.socket.end();
});
}...
self.brokerProfiles[addr] = brokerProfile;
self.brokerProfiles[addr].id = key;
});
};
Client.prototype.close = function (cb) {
this.closeBrokers(this.brokers);
this.closeBrokers(this.longpollingBrokers);
this.zk.close();
cb && cb();
};
Client.prototype.closeBrokers = function (brokers) {
_.each(brokers, function (broker) {
...connect = function () {
var zk = this.zk = new Zookeeper(this.connectionString, this.zkOptions);
var self = this;
zk.once('init', function (brokers) {
try {
self.ready = true;
self.brokerMetadata = brokers;
self.setupBrokerProfiles(brokers);
Object
.keys(self.brokerProfiles)
.some(function (key, index) {
var broker = self.brokerProfiles[key];
self.setupBroker(broker.host, broker.port, false, self.brokers);
// Only connect one broker
return !index;
});
self.emit('ready');
} catch (error) {
self.ready = false;
self.emit('error', error);
}
});
zk.on('brokersChanged', function (brokerMetadata) {
try {
self.brokerMetadata = brokerMetadata;
logger.debug('brokersChanged', brokerMetadata);
self.setupBrokerProfiles(brokerMetadata);
self.refreshBrokers();
// Emit after a 3 seconds
setTimeout(function () {
self.emit('brokersChanged');
}, 3000);
} catch (error) {
self.emit('error', error);
}
});
zk.once('disconnected', function () {
if (!zk.closed) {
zk.close();
self.connect();
self.emit('zkReconnect');
}
});
zk.on('error', function (err) {
self.emit('error', err);
});
}...
}
var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.connect();
}
util.inherits(BaseProducer, events.EventEmitter);
BaseProducer.prototype.connect = function () {
// emiter...
var self = this;
...createBroker = function (host, port, longpolling) {
var self = this;
var socket;
if (self.ssl) {
socket = tls.connect(port, host, self.sslOptions);
} else {
socket = net.createConnection(port, host);
}
socket.addr = host + ':' + port;
socket.host = host;
socket.port = port;
socket.socketId = this.nextSocketId();
if (longpolling) socket.longpolling = true;
socket.on('connect', function () {
var lastError = this.error;
this.error = null;
if (lastError) {
this.waiting = false;
self.emit('reconnect');
} else {
self.emit('connect');
}
});
socket.on('error', function (err) {
this.error = err;
self.emit('error', err);
});
socket.on('close', function (hadError) {
self.emit('close', this);
if (hadError && this.error) {
self.clearCallbackQueue(this, this.error);
} else {
self.clearCallbackQueue(this, new errors.BrokerNotAvailableError('Broker not available'));
}
retry(this);
});
socket.on('end', function () {
retry(this);
});
socket.buffer = new Buffer([]);
socket.on('data', function (data) {
this.buffer = Buffer.concat([this.buffer, data]);
self.handleReceivedData(this);
});
socket.setKeepAlive(true, 60000);
function retry (s) {
if (s.retrying || s.closing) return;
s.retrying = true;
s.retryTimer = setTimeout(function () {
if (s.closing) return;
self.reconnectBroker(s);
}, 1000);
}
return new BrokerWrapper(socket, this.noAckBatchOptions);
}...
Client.prototype.getBrokers = function (longpolling) {
return longpolling ? this.longpollingBrokers : this.brokers;
};
Client.prototype.setupBroker = function (host, port, longpolling, brokers) {
var brokerKey = host + ':' + port;
brokers[brokerKey] = this.createBroker(host, port, longpolling);
return brokers[brokerKey];
};
Client.prototype.createBroker = function (host, port, longpolling) {
var self = this;
var socket;
if (self.ssl) {
...createTopics = function (topics, isAsync, cb) {
topics = typeof topics === 'string' ? [topics] : topics;
if (typeof isAsync === 'function' && typeof cb === 'undefined') {
cb = isAsync;
isAsync = true;
}
try {
validateKafkaTopics(topics);
} catch (e) {
if (isAsync) return cb(e);
throw e;
}
cb = _.once(cb);
const getTopicsFromKafka = (topics, callback) => {
this.loadMetadataForTopics(topics, function (error, resp) {
if (error) {
return callback(error);
}
callback(null, Object.keys(resp[1].metadata));
});
};
const operation = retry.operation({ minTimeout: 200, maxTimeout: 2000 });
operation.attempt(currentAttempt => {
logger.debug('create topics currentAttempt', currentAttempt);
getTopicsFromKafka(topics, function (error, kafkaTopics) {
if (error) {
if (operation.retry(error)) {
return;
}
}
logger.debug('kafka reported topics', kafkaTopics);
const left = _.difference(topics, kafkaTopics);
if (left.length === 0) {
logger.debug(`Topics created ${kafkaTopics}`);
return cb(null, kafkaTopics);
}
logger.debug(`Topics left ${left.join(', ')}`);
if (!operation.retry(new Error(`Topics not created ${left}`))) {
cb(operation.mainError());
}
});
});
if (!isAsync) {
cb(null);
}
}...
``` js
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
// Create topics sync
producer.createTopics(['t','t1'], false, function (err, data) {
console.log(data);
});
// Create topics async
producer.createTopics(['t'], true, function (err, data) {});
producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg
```
...getBrokers = function (longpolling) {
return longpolling ? this.longpollingBrokers : this.brokers;
}...
Client.prototype.leaderByPartition = function (topic, partition) {
var topicMetadata = this.topicMetadata;
return topicMetadata[topic] && topicMetadata[topic][partition] && topicMetadata[topic][partition].leader;
};
Client.prototype.brokerForLeader = function (leader, longpolling) {
var addr;
var brokers = this.getBrokers(longpolling);
// If leader is not give, choose the first broker as leader
if (typeof leader === 'undefined') {
if (!_.isEmpty(brokers)) {
addr = Object.keys(brokers)[0];
return brokers[addr];
} else if (!_.isEmpty(this.brokerMetadata)) {
leader = Object.keys(this.brokerMetadata)[0];
...handleReceivedData = function (socket) {
var vars = Binary.parse(socket.buffer).word32bu('size').word32bu('correlationId').vars;
var size = vars.size + 4;
var correlationId = vars.correlationId;
if (socket.buffer.length >= size) {
var resp = socket.buffer.slice(0, size);
var handlers = this.unqueueCallback(socket, correlationId);
if (!handlers) return;
var decoder = handlers[0];
var cb = handlers[1];
var result = decoder(resp);
(result instanceof Error)
? cb.call(this, result)
: cb.call(this, null, result);
socket.buffer = socket.buffer.slice(size);
if (socket.longpolling) socket.waiting = false;
} else { return; }
if (socket.buffer.length) {
setImmediate(function () { this.handleReceivedData(socket); }.bind(this));
}
}...
});
socket.on('end', function () {
retry(this);
});
socket.buffer = new Buffer([]);
socket.on('data', function (data) {
this.buffer = Buffer.concat([this.buffer, data]);
self.handleReceivedData(this);
});
socket.setKeepAlive(true, 60000);
function retry (s) {
if (s.retrying || s.closing) return;
s.retrying = true;
s.retryTimer = setTimeout(function () {
...hasMetadata = function (topic, partition) {
var brokerMetadata = this.brokerMetadata;
var leader = this.leaderByPartition(topic, partition);
return (leader !== undefined) && brokerMetadata[leader];
}...
};
Client.prototype.checkMetadatas = function (payloads) {
if (_.isEmpty(this.topicMetadata)) return [ [], payloads ];
// out: [ [metadata exists], [metadata not exists] ]
var out = [ [], [] ];
payloads.forEach(function (p) {
if (this.hasMetadata(p.topic, p.partition)) out[0].push(p);
else out[1].push(p);
}.bind(this));
return out;
};
Client.prototype.hasMetadata = function (topic, partition) {
var brokerMetadata = this.brokerMetadata;
...leaderByPartition = function (topic, partition) {
var topicMetadata = this.topicMetadata;
return topicMetadata[topic] && topicMetadata[topic][partition] && topicMetadata[topic][partition].leader;
}...
else out[1].push(p);
}.bind(this));
return out;
};
Client.prototype.hasMetadata = function (topic, partition) {
var brokerMetadata = this.brokerMetadata;
var leader = this.leaderByPartition(topic, partition);
return (leader !== undefined) && brokerMetadata[leader];
};
Client.prototype.updateMetadatas = function (metadatas) {
// _.extend(this.brokerMetadata, metadatas[0])
_.extend(this.topicMetadata, metadatas[1].metadata);
...loadMetadataForTopics = function (topics, cb) {
var correlationId = this.nextId();
var request = protocol.encodeMetadataRequest(this.clientId, correlationId, topics);
var broker = this.brokerForLeader();
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [protocol.decodeMetadataResponse, cb]);
broker.write(request);
}...
## How do I get a list of all topics?
Call `client.loadMetadataForTopics` with a blank topic array to get the entire list of available topics (and available brokers).
```js
client.once('connect', function () {
client.loadMetadataForTopics([], function (error, results) {
if (error) {
return console.error(error);
}
console.log('%j', _.get(results, '1.metadata'));
});
});
```
...nextId = function () {
if (this.correlationId >= MAX_INT32) {
this.correlationId = 0;
}
return this.correlationId++;
}...
var decoder = protocol.decodeOffsetResponse;
this.send(payloads, encoder, decoder, cb);
};
Client.prototype.sendGroupRequest = function (encode, decode, requestArgs) {
requestArgs = _.values(requestArgs);
var cb = requestArgs.pop();
var correlationId = this.nextId();
requestArgs.unshift(this.clientId, correlationId);
var request = encode.apply(null, requestArgs);
var broker = this.brokerForLeader(this.coordinatorId);
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
...nextSocketId = function () {
return this._socketId++;
}...
socket = tls.connect(port, host, self.sslOptions);
} else {
socket = net.createConnection(port, host);
}
socket.addr = host + ':' + port;
socket.host = host;
socket.port = port;
socket.socketId = this.nextSocketId();
if (longpolling) socket.longpolling = true;
socket.on('connect', function () {
var lastError = this.error;
this.error = null;
if (lastError) {
this.waiting = false;
...payloadsByLeader = function (payloads) {
return payloads.reduce(function (out, p) {
var leader = this.leaderByPartition(p.topic, p.partition);
out[leader] = out[leader] || [];
out[leader].push(p);
return out;
}.bind(this), {});
}...
self.sendToBroker(payloads[1].concat(payloads[0]), encoder, decoder, cb);
});
}
};
Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) {
var longpolling = encoder.name === 'encodeFetchRequest';
payloads = this.payloadsByLeader(payloads);
if (!longpolling) {
cb = wrap(payloads, cb);
}
for (var leader in payloads) {
if (!payloads.hasOwnProperty(leader)) {
continue;
}
...queueCallback = function (socket, id, data) {
var socketId = socket.socketId;
var queue;
if (this.cbqueue.hasOwnProperty(socketId)) {
queue = this.cbqueue[socketId];
} else {
queue = {};
this.cbqueue[socketId] = queue;
}
queue[id] = data;
}...
var request = encode.apply(null, requestArgs);
var broker = this.brokerForLeader(this.coordinatorId);
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [decode, cb]);
broker.write(request);
};
Client.prototype.sendGroupCoordinatorRequest = function (groupId, cb) {
this.sendGroupRequest(protocol.encodeGroupCoordinatorRequest, protocol.decodeGroupCoordinatorResponse, arguments);
};
...reconnectBroker = function (oldSocket) {
oldSocket.retrying = false;
if (oldSocket.error) {
oldSocket.destroy();
}
var brokers = this.getBrokers(oldSocket.longpolling);
var newBroker = this.setupBroker(oldSocket.host, oldSocket.port, oldSocket.longpolling, brokers);
newBroker.socket.error = oldSocket.error;
}...
socket.setKeepAlive(true, 60000);
function retry (s) {
if (s.retrying || s.closing) return;
s.retrying = true;
s.retryTimer = setTimeout(function () {
if (s.closing) return;
self.reconnectBroker(s);
}, 1000);
}
return new BrokerWrapper(socket, this.noAckBatchOptions);
};
Client.prototype.reconnectBroker = function (oldSocket) {
oldSocket.retrying = false;
...refreshBrokers = function () {
var self = this;
var validBrokers = Object.keys(this.brokerProfiles);
function closeDeadBrokers (brokers) {
var deadBrokerKeys = _.difference(Object.keys(brokers), validBrokers);
if (deadBrokerKeys.length) {
self.closeBrokers(deadBrokerKeys.map(function (key) {
var broker = brokers[key];
delete brokers[key];
return broker;
}));
}
}
closeDeadBrokers(this.brokers);
closeDeadBrokers(this.longpollingBrokers);
}...
}
});
zk.on('brokersChanged', function (brokerMetadata) {
try {
self.brokerMetadata = brokerMetadata;
logger.debug('brokersChanged', brokerMetadata);
self.setupBrokerProfiles(brokerMetadata);
self.refreshBrokers();
// Emit after a 3 seconds
setTimeout(function () {
self.emit('brokersChanged');
}, 3000);
} catch (error) {
self.emit('error', error);
}
...refreshMetadata = function (topicNames, cb) {
var self = this;
if (!topicNames.length) return cb();
attemptRequestMetadata(topicNames, cb);
function attemptRequestMetadata (topics, cb) {
var operation = retry.operation({ minTimeout: 200, maxTimeout: 1000 });
operation.attempt(function (currentAttempt) {
logger.debug('refresh metadata currentAttempt', currentAttempt);
self.loadMetadataForTopics(topics, function (err, resp) {
err = err || resp[1].error;
if (operation.retry(err)) {
return;
}
if (err) {
logger.debug('refresh metadata error', err.message);
return cb(err);
}
self.updateMetadatas(resp);
cb();
});
});
}
}...
Error:
```
BrokerNotAvailableError: Could not find the leader
```
Call `client.refreshMetadata()` before sending the first message. Reference issue [#354
](https://github.com/SOHU-Co/kafka-node/issues/354)
## How do I debug an issue?
This module uses the [debug module](https://github.com/visionmedia/debug) so you can just run below before starting your app.
```bash
...removeTopicMetadata = function (topics, cb) {
topics.forEach(function (t) {
if (this.topicMetadata[t]) delete this.topicMetadata[t];
}.bind(this));
cb(null, topics.length);
}...
Consumer.prototype.removeTopics = function (topics, cb) {
topics = typeof topics === 'string' ? [topics] : topics;
this.payloads = this.payloads.filter(function (p) {
return !~topics.indexOf(p.topic);
});
this.client.removeTopicMetadata(topics, cb);
};
Consumer.prototype.close = function (force, cb) {
this.ready = false;
if (typeof force === 'function') {
cb = force;
force = false;
...send = function (payloads, encoder, decoder, cb) {
var self = this;
var _payloads = payloads;
// payloads: [ [metadata exists], [metadata not exists] ]
payloads = this.checkMetadatas(payloads);
if (payloads[0].length && !payloads[1].length) {
this.sendToBroker(_.flatten(payloads), encoder, decoder, cb);
return;
}
if (payloads[1].length) {
var topicNames = payloads[1].map(function (p) { return p.topic; });
this.loadMetadataForTopics(topicNames, function (err, resp) {
if (err) {
return cb(err);
}
var error = resp[1].error;
if (error) {
return cb(error);
}
self.updateMetadatas(resp);
// check payloads again
payloads = self.checkMetadatas(_payloads);
if (payloads[1].length) {
return cb(new errors.BrokerNotAvailableError('Could not find the leader'));
}
self.sendToBroker(payloads[1].concat(payloads[0]), encoder, decoder, cb);
});
}
}...
producer = new Producer(client),
km = new KeyedMessage('key', 'message'),
payloads = [
{ topic: 'topic1', messages: 'hi', partition: 0 },
{ topic: 'topic2', messages: ['hello', 'world', km] }
];
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
producer.on('error', function (err) {})
```
> ⚠️**WARNING**: Batch multiple messages of the same topic/partition together as an array on the `messages` attribute otherwise
you may lose messages!
...sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
var self = this;
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes);
var decoder = protocol.decodeFetchResponse(function (err, type, message) {
if (err) {
if (err.message === 'OffsetOutOfRange') {
return consumer.emit('offsetOutOfRange', err);
} else if (err.message === 'NotLeaderForPartition' || err.message === 'UnknownTopicOrPartition') {
return self.emit('brokersChanged');
}
return consumer.emit('error', err);
}
var encoding = consumer.options.encoding;
if (type === 'message') {
if (encoding !== 'buffer' && message.value) {
message.value = message.value.toString(encoding);
}
consumer.emit('message', message);
} else {
consumer.emit('done', message);
}
}, maxTickMessages);
this.send(payloads, encoder, decoder, function (err) {
if (err) {
Array.prototype.unshift.call(arguments, 'error');
consumer.emit.apply(consumer, arguments);
}
});
}...
cb(null, 'Nothing to be committed');
}
}
Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit;
Consumer.prototype.fetch = function () {
if (!this.ready || this.paused) return;
this.client.sendFetchRequest(this, this.payloads, this.options.fetchMaxWaitMs, this
.options.fetchMinBytes);
};
Consumer.prototype.fetchOffset = function (payloads, cb) {
this.client.sendOffsetFetchRequest(this.options.groupId, payloads, cb);
};
Consumer.prototype.addTopics = function (topics, cb, fromOffset) {
...sendGroupCoordinatorRequest = function (groupId, cb) {
this.sendGroupRequest(protocol.encodeGroupCoordinatorRequest, protocol.decodeGroupCoordinatorResponse, arguments);
}...
this.emit('rebalancing');
async.waterfall([
function (callback) {
if (self.client.coordinatorId) {
return callback(null, null);
}
self.client.sendGroupCoordinatorRequest(self.options.groupId, callback);
},
function (coordinatorInfo, callback) {
logger.debug('GroupCoordinator Response:', coordinatorInfo);
if (coordinatorInfo) {
self.setCoordinatorId(coordinatorInfo.coordinatorId);
}
...sendGroupRequest = function (encode, decode, requestArgs) {
requestArgs = _.values(requestArgs);
var cb = requestArgs.pop();
var correlationId = this.nextId();
requestArgs.unshift(this.clientId, correlationId);
var request = encode.apply(null, requestArgs);
var broker = this.brokerForLeader(this.coordinatorId);
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [decode, cb]);
broker.write(request);
}...
var decoder = protocol.decodeOffsetCommitResponse;
this.send(payloads, encoder, decoder, cb);
};
Client.prototype.sendOffsetCommitV2Request = function (group, generationId, memberId, payloads, cb) {
var encoder = protocol.encodeOffsetCommitV2Request;
var decoder = protocol.decodeOffsetCommitResponse;
this.sendGroupRequest(encoder, decoder, arguments);
};
Client.prototype.sendOffsetFetchV1Request = function (group, payloads, cb) {
var encoder = protocol.encodeOffsetFetchV1Request;
var decoder = protocol.decodeOffsetFetchV1Response;
this.sendGroupRequest(encoder, decoder, arguments);
};
...sendHeartbeatRequest = function (groupId, generationId, memberId, cb) {
this.sendGroupRequest(protocol.encodeGroupHeartbeat, protocol.decodeGroupHeartbeat, arguments);
}...
constructor (client, handler) {
this.client = client;
this.handler = handler;
this.pending = true;
}
send (groupId, generationId, memberId) {
this.client.sendHeartbeatRequest(groupId, generationId, memberId, (error) => {
if (this.canceled) {
logger.debug('heartbeat yielded after being canceled', error);
return;
}
this.pending = false;
this.handler(error);
});
...sendJoinGroupRequest = function (groupId, memberId, sessionTimeout, groupProtocol, cb) {
this.sendGroupRequest(protocol.encodeJoinGroupRequest, protocol.decodeJoinGroupResponse, arguments);
}...
},
function (coordinatorInfo, callback) {
logger.debug('GroupCoordinator Response:', coordinatorInfo);
if (coordinatorInfo) {
self.setCoordinatorId(coordinatorInfo.coordinatorId);
}
self.client.sendJoinGroupRequest(self.options.groupId, emptyStrIfNull(self.memberId
), self.options.sessionTimeout, self.protocols, callback);
},
function (joinGroupResponse, callback) {
self.handleJoinGroup(joinGroupResponse, callback);
},
function (groupAssignment, callback) {
...sendLeaveGroupRequest = function (groupId, memberId, cb) {
this.sendGroupRequest(protocol.encodeLeaveGroupRequest, protocol.decodeLeaveGroupResponse, arguments);
}...
};
ConsumerGroup.prototype.leaveGroup = function (callback) {
logger.debug('%s leaving group', this.client.clientId);
var self = this;
this.stopHeartbeats();
if (self.generationId != null && self.memberId) {
this.client.sendLeaveGroupRequest(this.options.groupId, this.memberId, function (
error) {
self.generationId = null;
callback(error);
});
} else {
callback(null);
}
};
...sendOffsetCommitRequest = function (group, payloads, cb) {
var encoder = protocol.encodeOffsetCommitRequest(group);
var decoder = protocol.decodeOffsetCommitResponse;
this.send(payloads, encoder, decoder, cb);
}...
}.bind(this), this.options.autoCommitIntervalMs);
var payloads = this.payloads;
if (this.pausedPayloads) payloads = payloads.concat(this.pausedPayloads);
var commits = payloads.filter(function (p) { return p.offset !== 0; });
if (commits.length) {
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
}
Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit;
Consumer.prototype.fetch = function () {
...sendOffsetCommitV2Request = function (group, generationId, memberId, payloads, cb) {
var encoder = protocol.encodeOffsetCommitV2Request;
var decoder = protocol.decodeOffsetCommitResponse;
this.sendGroupRequest(encoder, decoder, arguments);
}...
ConsumerGroup.prototype.fetchOffset = function (payloads, cb) {
this.client.sendOffsetFetchV1Request(this.options.groupId, payloads, cb);
};
ConsumerGroup.prototype.sendOffsetCommitRequest = function (commits, cb) {
if (this.generationId && this.memberId) {
this.client.sendOffsetCommitV2Request(this.options.groupId, this.generationId, this
.memberId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
};
ConsumerGroup.prototype.close = function (force, cb) {
var self = this;
...sendOffsetFetchRequest = function (group, payloads, cb) {
var encoder = protocol.encodeOffsetFetchRequest(group);
var decoder = protocol.decodeOffsetFetchResponse;
this.send(payloads, encoder, decoder, cb);
}...
}
}
);
};
ConsumerGroupMigrator.prototype.saveHighLevelConsumerOffsets = function (topicPartitionList, callback) {
const self = this;
this.client.sendOffsetFetchRequest(this.consumerGroup.options.groupId, topicPartitionList
, function (error, results) {
logger.debug('sendOffsetFetchRequest response:', results, error);
if (error) {
return callback(error);
}
self.offsets = results;
callback(null);
});
...sendOffsetFetchV1Request = function (group, payloads, cb) {
var encoder = protocol.encodeOffsetFetchV1Request;
var decoder = protocol.decodeOffsetFetchV1Response;
this.sendGroupRequest(encoder, decoder, arguments);
}...
const heartbeat = new Heartbeat(this.client, heartbeatCallback);
heartbeat.send(this.options.groupId, this.generationId, this.memberId);
return heartbeat;
};
ConsumerGroup.prototype.fetchOffset = function (payloads, cb) {
this.client.sendOffsetFetchV1Request(this.options.groupId, payloads, cb);
};
ConsumerGroup.prototype.sendOffsetCommitRequest = function (commits, cb) {
if (this.generationId && this.memberId) {
this.client.sendOffsetCommitV2Request(this.options.groupId, this.generationId, this.memberId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
...sendOffsetRequest = function (payloads, cb) {
var encoder = protocol.encodeOffsetRequest;
var decoder = protocol.decodeOffsetResponse;
this.send(payloads, encoder, decoder, cb);
}...
HighLevelConsumer.prototype.fetchOffset = function (payloads, cb) {
logger.debug('in fetchOffset %s payloads: %j', this.id, payloads);
this.client.sendOffsetFetchRequest(this.options.groupId, payloads, cb);
};
HighLevelConsumer.prototype.offsetRequest = function (payloads, cb) {
this.client.sendOffsetRequest(payloads, cb);
};
/**
* Register a consumer against a group
*
* @param consumer to register
*
...sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, cb) {
var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs);
var decoder = protocol.decodeProduceResponse;
var self = this;
decoder.requireAcks = requireAcks;
async.each(payloads, buildRequest, function (err) {
if (err) return cb(err);
self.send(payloads, encoder, decoder, function (err, result) {
if (err) {
if (err.message === 'NotLeaderForPartition') {
self.emit('brokersChanged');
}
cb(err);
} else {
cb(null, result);
}
});
});
function buildRequest (payload, cb) {
var attributes = payload.attributes;
var codec = getCodec(attributes);
if (!codec) return cb();
var innerSet = encodeMessageSet(payload.messages);
codec.encode(innerSet, function (err, message) {
if (err) return cb(err);
payload.messages = [ new Message(0, attributes, '', message) ];
cb();
});
}
}...
* @param {BaseProducer~sendCallback} cb A function to call once the send has completed
*/
BaseProducer.prototype.send = function (payloads, cb) {
var client = this.client;
var requireAcks = this.requireAcks;
var ackTimeoutMs = this.ackTimeoutMs;
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks
, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic
], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
...sendSyncGroupRequest = function (groupId, generationId, memberId, groupAssignment, cb) {
this.sendGroupRequest(protocol.encodeSyncGroupRequest, protocol.decodeSyncGroupResponse, arguments);
}...
function (joinGroupResponse, callback) {
self.handleJoinGroup(joinGroupResponse, callback);
},
function (groupAssignment, callback) {
logger.debug('SyncGroup Request from %s', self.memberId);
self.client.sendSyncGroupRequest(self.options.groupId, self.generationId, self.memberId
, groupAssignment, callback);
},
function (syncGroupResponse, callback) {
self.handleSyncGroup(syncGroupResponse, callback);
}
], function (error, startFetch) {
self.connecting = false;
...sendToBroker = function (payloads, encoder, decoder, cb) {
var longpolling = encoder.name === 'encodeFetchRequest';
payloads = this.payloadsByLeader(payloads);
if (!longpolling) {
cb = wrap(payloads, cb);
}
for (var leader in payloads) {
if (!payloads.hasOwnProperty(leader)) {
continue;
}
var correlationId = this.nextId();
var request = encoder(this.clientId, correlationId, payloads[leader]);
var broker = this.brokerForLeader(leader, longpolling);
if (!broker || !broker.socket || broker.socket.error || broker.socket.closing || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Could not find the leader'), payloads[leader]);
}
if (longpolling) {
if (broker.socket.waiting) continue;
broker.socket.waiting = true;
}
if (decoder.requireAcks === 0) {
broker.writeAsync(request);
cb(null, { result: 'no ack' });
} else {
this.queueCallback(broker.socket, correlationId, [decoder, cb]);
broker.write(request);
}
}
}...
Client.prototype.send = function (payloads, encoder, decoder, cb) {
var self = this;
var _payloads = payloads;
// payloads: [ [metadata exists], [metadata not exists] ]
payloads = this.checkMetadatas(payloads);
if (payloads[0].length && !payloads[1].length) {
this.sendToBroker(_.flatten(payloads), encoder, decoder, cb);
return;
}
if (payloads[1].length) {
var topicNames = payloads[1].map(function (p) { return p.topic; });
this.loadMetadataForTopics(topicNames, function (err, resp) {
if (err) {
return cb(err);
...setupBroker = function (host, port, longpolling, brokers) {
var brokerKey = host + ':' + port;
brokers[brokerKey] = this.createBroker(host, port, longpolling);
return brokers[brokerKey];
}...
self.ready = true;
self.brokerMetadata = brokers;
self.setupBrokerProfiles(brokers);
Object
.keys(self.brokerProfiles)
.some(function (key, index) {
var broker = self.brokerProfiles[key];
self.setupBroker(broker.host, broker.port, false, self.brokers);
// Only connect one broker
return !index;
});
self.emit('ready');
} catch (error) {
self.ready = false;
self.emit('error', error);
...setupBrokerProfiles = function (brokers) {
this.brokerProfiles = Object.create(null);
var self = this;
var protocol = self.ssl ? 'ssl:' : 'plaintext:';
Object.keys(brokers).forEach(function (key) {
var brokerProfile = brokers[key];
var addr;
if (brokerProfile.endpoints && brokerProfile.endpoints.length) {
var endpoint = _.find(brokerProfile.endpoints, function (endpoint) {
return url.parse(endpoint).protocol === protocol;
});
if (endpoint == null) {
throw new Error(['No kafka endpoint found for broker: ', key, ' with protocol ', protocol].join(''));
}
var endpointUrl = url.parse(endpoint);
addr = endpointUrl.hostname + ':' + endpointUrl.port;
brokerProfile.host = endpointUrl.hostname;
brokerProfile.port = endpointUrl.port;
} else {
addr = brokerProfile.host + ':' + brokerProfile.port;
}
assert(brokerProfile.host && brokerProfile.port, 'kafka host or port is empty');
self.brokerProfiles[addr] = brokerProfile;
self.brokerProfiles[addr].id = key;
});
}...
Client.prototype.connect = function () {
var zk = this.zk = new Zookeeper(this.connectionString, this.zkOptions);
var self = this;
zk.once('init', function (brokers) {
try {
self.ready = true;
self.brokerMetadata = brokers;
self.setupBrokerProfiles(brokers);
Object
.keys(self.brokerProfiles)
.some(function (key, index) {
var broker = self.brokerProfiles[key];
self.setupBroker(broker.host, broker.port, false, self.brokers);
// Only connect one broker
return !index;
...topicExists = function (topics, cb) {
var notExistsTopics = [];
var self = this;
async.each(topics, checkZK, function (err) {
if (err) return cb(err);
if (notExistsTopics.length) return cb(new errors.TopicsNotExistError(notExistsTopics));
cb();
});
function checkZK (topic, cb) {
self.zk.topicExists(topic, function (err, existed, topic) {
if (err) return cb(err);
if (!existed) notExistsTopics.push(topic);
cb();
});
}
}...
async.each(topics, checkZK, function (err) {
if (err) return cb(err);
if (notExistsTopics.length) return cb(new errors.TopicsNotExistError(notExistsTopics));
cb();
});
function checkZK (topic, cb) {
self.zk.topicExists(topic, function (err, existed, topic) {
if (err) return cb(err);
if (!existed) notExistsTopics.push(topic);
cb();
});
}
};
...unqueueCallback = function (socket, id) {
var socketId = socket.socketId;
if (!this.cbqueue.hasOwnProperty(socketId)) {
return null;
}
var queue = this.cbqueue[socketId];
if (!queue.hasOwnProperty(id)) {
return null;
}
var result = queue[id];
// cleanup socket queue
delete queue[id];
if (!Object.keys(queue).length) {
delete this.cbqueue[socketId];
}
return result;
}...
Client.prototype.handleReceivedData = function (socket) {
var vars = Binary.parse(socket.buffer).word32bu('size').word32bu('correlationId').vars;
var size = vars.size + 4;
var correlationId = vars.correlationId;
if (socket.buffer.length >= size) {
var resp = socket.buffer.slice(0, size);
var handlers = this.unqueueCallback(socket, correlationId);
if (!handlers) return;
var decoder = handlers[0];
var cb = handlers[1];
var result = decoder(resp);
(result instanceof Error)
? cb.call(this, result)
...updateMetadatas = function (metadatas) {
// _.extend(this.brokerMetadata, metadatas[0])
_.extend(this.topicMetadata, metadatas[1].metadata);
for (var topic in this.topicMetadata) {
if (!this.topicMetadata.hasOwnProperty(topic)) {
continue;
}
this.topicPartitions[topic] = Object.keys(this.topicMetadata[topic]).map(function (val) {
return parseInt(val, 10);
});
}
}...
Client.prototype.addTopics = function (topics, cb) {
var self = this;
this.topicExists(topics, function (err) {
if (err) return cb(err);
self.loadMetadataForTopics(topics, function (err, resp) {
if (err) return cb(err);
self.updateMetadatas(resp);
cb(null, topics);
});
});
};
Client.prototype.nextId = function () {
if (this.correlationId >= MAX_INT32) {
...Consumer = function (client, topics, options) {
if (!topics) {
throw new Error('Must have payloads');
}
utils.validateTopics(topics);
this.fetchCount = 0;
this.client = client;
this.options = _.defaults((options || {}), DEFAULTS);
this.ready = false;
this.paused = this.options.paused;
this.id = nextId();
this.payloads = this.buildPayloads(topics);
this.connect();
this.encoding = this.options.encoding;
if (this.options.groupId) {
utils.validateConfig('options.groupId', this.options.groupId);
}
}n/a
function EventEmitter() {
EventEmitter.init.call(this);
}n/a
addTopics = function (topics, cb, fromOffset) {
fromOffset = !!fromOffset;
var self = this;
if (!this.ready) {
setTimeout(function () {
self.addTopics(topics, cb, fromOffset);
}
, 100);
return;
}
// The default is that the topics is a string array of topic names
var topicNames = topics;
// If the topics is actually an object and not string we assume it is an array of payloads
if (typeof topics[0] === 'object') {
topicNames = topics.map(function (p) { return p.topic; });
}
this.client.addTopics(
topicNames,
function (err, added) {
if (err) return cb && cb(err, added);
var payloads = self.buildPayloads(topics);
var reFetch = !self.payloads.length;
if (fromOffset) {
payloads.forEach(function (p) {
self.payloads.push(p);
});
if (reFetch) self.fetch();
cb && cb(null, added);
return;
}
// update offset of topics that will be added
self.fetchOffset(payloads, function (err, offsets) {
if (err) return cb(err);
payloads.forEach(function (p) {
var offset = offsets[p.topic][p.partition];
if (offset === -1) offset = 0;
p.offset = offset;
self.payloads.push(p);
});
if (reFetch) self.fetch();
cb && cb(null, added);
});
}
);
}...
* `topics`: **Array**, array of topics to add
* `cb`: **Function**,the callback
* `fromOffset`: **Boolean**, if true, the consumer will fetch message from the specified offset, otherwise it will fetch message
from the last commited offset of the topic.
Example:
``` js
consumer.addTopics(['t1', 't2'], function (err, added) {
});
or
consumer.addTopics([{ topic: 't1', offset: 10 }], function (err, added) {
}, true);
```
...function autoCommit(force, cb) {
if (arguments.length === 1) {
cb = force;
force = false;
}
if (this.committing && !force) return cb(null, 'Offset committing');
this.committing = true;
setTimeout(function () {
this.committing = false;
}.bind(this), this.options.autoCommitIntervalMs);
var payloads = this.payloads;
if (this.pausedPayloads) payloads = payloads.concat(this.pausedPayloads);
var commits = payloads.filter(function (p) { return p.offset !== 0; });
if (commits.length) {
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
}...
if (offset === -1) offset = 0;
if (!initing) p.offset = offset + 1;
else p.offset = offset;
}
});
if (this.options.autoCommit && !initing) {
this.autoCommit(false, function (err) {
err && logger.debug('auto commit offset', err);
});
}
};
function autoCommit (force, cb) {
if (arguments.length === 1) {
...buildPayloads = function (payloads) {
var self = this;
return payloads.map(function (p) {
if (typeof p !== 'object') p = { topic: p };
p.partition = p.partition || 0;
p.offset = p.offset || 0;
p.maxBytes = self.options.fetchMaxBytes;
p.metadata = 'm'; // metadata can be arbitrary
return p;
});
}...
* @param {BaseProducer~sendCallback} cb A function to call once the send has completed
*/
BaseProducer.prototype.send = function (payloads, cb) {
var client = this.client;
var requireAcks = this.requireAcks;
var ackTimeoutMs = this.ackTimeoutMs;
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks
, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic
], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
...close = function (force, cb) {
this.ready = false;
if (typeof force === 'function') {
cb = force;
force = false;
}
if (force) {
this.commit(force, function (err) {
if (err) {
return cb(err);
}
this.client.close(cb);
}.bind(this));
} else {
this.client.close(cb);
}
}...
### close(force, cb)
* `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false`
Example
```js
consumer.close(true, cb);
consumer.close(cb); //force is disabled
```
## HighLevelConsumer
⚠️ ***This consumer has been deprecated in the latest version of Kafka (0.10.1) and is likely to be removed in the future. Please
use the ConsumerGroup instead.***
### HighLevelConsumer(client, payloads, options)
...function autoCommit(force, cb) {
if (arguments.length === 1) {
cb = force;
force = false;
}
if (this.committing && !force) return cb(null, 'Offset committing');
this.committing = true;
setTimeout(function () {
this.committing = false;
}.bind(this), this.options.autoCommitIntervalMs);
var payloads = this.payloads;
if (this.pausedPayloads) payloads = payloads.concat(this.pausedPayloads);
var commits = payloads.filter(function (p) { return p.offset !== 0; });
if (commits.length) {
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
}...
Commit offset of the current topics manually, this method should be called when a consumer leaves
* `cb`: **Function**, the callback
Example:
``` js
consumer.commit(function(err, data) {
});
```
### setOffset(topic, partition, offset)
Set offset of the given topic
* `topic`: **String**
...connect = function () {
var self = this;
// Client already exists
this.ready = this.client.ready;
if (this.ready) this.init();
this.client.on('ready', function () {
logger.debug('consumer ready');
if (!self.ready) self.init();
self.ready = true;
});
this.client.on('error', function (err) {
logger.error('client error %s', err.message);
self.emit('error', err);
});
this.client.on('close', function () {
logger.debug('connection closed');
});
this.client.on('brokersChanged', function () {
var topicNames = self.payloads.map(function (p) {
return p.topic;
});
this.refreshMetadata(topicNames, function (err) {
if (err) return self.emit('error', err);
self.fetch();
});
});
// 'done' will be emit when a message fetch request complete
this.on('done', function (topics) {
self.updateOffsets(topics);
setImmediate(function () {
self.fetch();
});
});
}...
}
var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.connect();
}
util.inherits(BaseProducer, events.EventEmitter);
BaseProducer.prototype.connect = function () {
// emiter...
var self = this;
...fetch = function () {
if (!this.ready || this.paused) return;
this.client.sendFetchRequest(this, this.payloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes);
}...
Example
```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.fetch([
{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }
], function (err, data) {
// data
// { 't': { '0': [999] } }
});
```
...fetchOffset = function (payloads, cb) {
this.client.sendOffsetFetchRequest(this.options.groupId, payloads, cb);
}...
logger.debug('%s owns topics: ', self.client.clientId, syncGroupResponse.partitions);
const topicPartitionList = createTopicPartitionList(syncGroupResponse.partitions);
const useDefaultOffsets = self.options.fromOffset in ACCEPTED_FROM_OFFSET;
async.waterfall([
function (callback) {
self.fetchOffset(syncGroupResponse.partitions, callback);
},
function (offsets, callback) {
logger.debug('%s fetchOffset Response: %j', self.client.clientId, offsets);
var noOffset = topicPartitionList.some(function (tp) {
return offsets[tp.topic][tp.partition] === -1;
});
...init = function () {
if (!this.payloads.length) {
return;
}
var self = this;
var topics = self.payloads.map(function (p) { return p.topic; });
self.client.topicExists(topics, function (err) {
if (err) {
return self.emit('error', err);
}
if (self.options.fromOffset) {
return self.fetch();
}
self.fetchOffset(self.payloads, function (err, topics) {
if (err) {
return self.emit('error', err);
}
self.updateOffsets(topics, true);
self.fetch();
});
});
}...
});
};
Consumer.prototype.connect = function () {
var self = this;
// Client already exists
this.ready = this.client.ready;
if (this.ready) this.init();
this.client.on('ready', function () {
logger.debug('consumer ready');
if (!self.ready) self.init();
self.ready = true;
});
...pause = function () {
this.paused = true;
}...
} else if (!self.connecting) {
self.fetch();
}
});
}, 200);
this.client.on('brokersChanged', function () {
self.pause();
recoverFromBrokerChange();
});
this.client.on('reconnect', function (lastError) {
self.fetch();
});
...pauseTopics = function (topics) {
if (!this.pausedPayloads) this.pausedPayloads = [];
pauseOrResume(this.payloads, this.pausedPayloads, topics);
}...
### resume()
Resume the consumer. Resumes the fetch loop.
### pauseTopics(topics)
Pause specify topics
```
consumer.pauseTopics([
'topic1',
{ topic: 'topic2', partition: 0 }
]);
```
### resumeTopics(topics)
Resume specify topics
...removeTopics = function (topics, cb) {
topics = typeof topics === 'string' ? [topics] : topics;
this.payloads = this.payloads.filter(function (p) {
return !~topics.indexOf(p.topic);
});
this.client.removeTopicMetadata(topics, cb);
}...
### removeTopics(topics, cb)
* `topics`: **Array**, array of topics to remove
* `cb`: **Function**, the callback
Example:
``` js
consumer.removeTopics(['t1', 't2'], function (err, removed) {
});
```
### commit(cb)
Commit offset of the current topics manually, this method should be called when a consumer leaves
* `cb`: **Function**, the callback
...resume = function () {
this.paused = false;
this.fetch();
}...
}
const offset = _.head(result[topic.topic][topic.partition]);
const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset;
logger.debug('replacing %s-%s stale offset of %d with %d', topic.topic, topic.partition, oldOffset, offset);
this.setOffset(topic.topic, topic.partition, offset);
this.resume();
});
});
// 'done' will be emit when a message fetch request complete
this.on('done', function (topics) {
self.updateOffsets(topics);
if (!self.paused) {
...resumeTopics = function (topics) {
if (!this.pausedPayloads) this.pausedPayloads = [];
var reFetch = !this.payloads.length;
pauseOrResume(this.pausedPayloads, this.payloads, topics);
reFetch = reFetch && this.payloads.length;
if (reFetch) this.fetch();
}...
]);
```
### resumeTopics(topics)
Resume specify topics
```
consumer.resumeTopics([
'topic1',
{ topic: 'topic2', partition: 0 }
]);
```
### close(force, cb)
* `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false`
...setOffset = function (topic, partition, offset) {
this.payloads.every(function (p) {
if (p.topic === topic && p.partition == partition) { // eslint-disable-line eqeqeq
p.offset = offset;
return false;
}
return true;
});
}...
* `partition`: **Number**
* `offset`: **Number**
Example:
``` js
consumer.setOffset('topic', 0, 0);
```
### pause()
Pause the consumer. ***Calling `pause` does not automatically stop messages from being emitted.*** This is because pause just stops
the kafka consumer fetch loop. Each iteration of the fetch loop can obtain a batch of messages (limited by `fetchMaxBytes`).
### resume()
Resume the consumer. Resumes the fetch loop.
...updateOffsets = function (topics, initing) {
this.payloads.forEach(function (p) {
if (!_.isEmpty(topics[p.topic]) && topics[p.topic][p.partition] !== undefined) {
var offset = topics[p.topic][p.partition];
if (offset === -1) offset = 0;
if (!initing) p.offset = offset + 1;
else p.offset = offset;
}
});
if (this.options.autoCommit && !initing) {
this.autoCommit(false, function (err) {
err && logger.debug('auto commit offset', err);
});
}
}...
this.setOffset(topic.topic, topic.partition, offset);
this.resume();
});
});
// 'done' will be emit when a message fetch request complete
this.on('done', function (topics) {
self.updateOffsets(topics);
if (!self.paused) {
setImmediate(function () {
self.fetch();
});
}
});
...function ConsumerGroup(memberOptions, topics) {
EventEmitter.call(this);
const self = this;
this.options = _.defaults((memberOptions || {}), DEFAULTS);
if (!this.options.heartbeatInterval) {
this.options.heartbeatInterval = Math.floor(this.options.sessionTimeout / 3);
}
if (memberOptions.ssl === true) {
memberOptions.ssl = {};
}
if (!(this.options.fromOffset in ACCEPTED_FROM_OFFSET)) {
throw new Error(`fromOffset ${this.options.fromOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join(', ')}`);
}
if (!(this.options.outOfRangeOffset in ACCEPTED_FROM_OFFSET)) {
throw new Error(`outOfRangeOffset ${this.options.outOfRangeOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join
(', ')}`);
}
this.client = new Client(memberOptions.host, memberOptions.id, memberOptions.zk,
memberOptions.batch, memberOptions.ssl);
if (_.isString(topics)) {
topics = [topics];
}
assert(Array.isArray(topics), 'Array of topics is required');
this.topics = topics;
this.recovery = new ConsumerGroupRecovery(this);
this.setupProtocols(this.options.protocol);
if (this.options.connectOnReady && !this.options.migrateHLC) {
this.client.once('ready', this.connect.bind(this));
}
if (this.options.migrateHLC) {
const ConsumerGroupMigrator = require('./consumerGroupMigrator');
this.migrator = new ConsumerGroupMigrator(this);
this.migrator.on('error', function (error) {
self.emit('error', error);
});
}
this.client.on('error', function (err) {
logger.error('Error from %s', self.client.clientId, err);
self.emit('error', err);
});
const recoverFromBrokerChange = _.debounce(function () {
logger.debug('brokersChanged refreshing metadata');
self.client.refreshMetadata(self.topics, function (error) {
if (error) {
self.emit(error);
return;
}
self.paused = false;
if (!self.ready && !self.connecting) {
if (self.reconnectTimer) {
// brokers changed so bypass backoff retry and reconnect now
clearTimeout(self.reconnectTimer);
self.reconnectTimer = null;
}
self.connect();
} else if (!self.connecting) {
self.fetch();
}
});
}, 200);
this.client.on('brokersChanged', function () {
self.pause();
recoverFromBrokerChange();
});
this.client.on('reconnect', function (lastError) {
self.fetch();
});
this.on('offsetOutOfRange', topic => {
this.pause();
if (this.options.outOfRangeOffset === 'none') {
this.emit('error', new errors.InvalidConsumerOffsetError(`Offset out of range for topic "${topic.topic}" partition ${topic
.partition}`));
return;
}
topic.time = ACCEPTED_FROM_OFFSET[this.options.outOfRangeOffset];
this.getOffset().fetch([topic], (error, result) => {
if (error) {
this.emit('error', new errors.InvalidConsumerOffsetError(`Fetching ${this.options.outOfRangeOffset} offset failed`, error
));
return;
}
const offset = _.head(result[topic.topic][topic.partition]);
const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset;
logger.debug('replacing %s-%s stale offset of %d with %d', topic.topic, topic.partition, oldOffset, offset);
this.setOffset(topic.topic, topic.partition, offset);
this.resume();
});
});
// 'done' will be emit when a message fetch request complete
this.on('done', function (topics) {
self.updateOffsets(topics);
if (!self.paused) {
setImmediate(function () {
self.fetch();
});
}
});
if (this.options.groupId) {
validateConfig('options.groupId', this.options.groupId);
}
this.isLeader = false;
this.coordinatorId = null;
this.generationId = null;
this.ready = false;
this.topicPayloads = [];
}n/a
super_ = function (client, topics, options) {
if (!topics) {
throw new Error('Must have payloads');
}
this.fetchCount = 0;
this.client = client;
this.options = _.defaults((options || {}), DEFAULTS);
this.initialised = false;
this.ready = false;
this.closing = false;
this.paused = this.options.paused;
this.rebalancing = false;
this.pendingRebalances = 0;
this.committing = false;
this.needToCommit = false;
this.id = this.options.id || this.options.groupId + '_' + uuid.v4();
this.payloads = this.buildPayloads(topics);
this.topicPayloads = this.buildTopicPayloads(topics);
this.connect();
if (this.options.groupId) {
validateConfig('options.groupId', this.options.groupId);
}
}n/a
assignPartitions = function (protocol, groupMembers, callback) {
logger.debug('Assigning Partitions to members', groupMembers);
logger.debug('Using group protocol', protocol);
protocol = _.find(this.protocols, {name: protocol});
var self = this;
var topics = _(groupMembers).map('subscription').flatten().uniq().value();
async.waterfall([
function (callback) {
logger.debug('loadingMetadata for topics:', topics);
self.client.loadMetadataForTopics(topics, callback);
},
function (metadataResponse, callback) {
var metadata = mapTopicToPartitions(metadataResponse[1].metadata);
logger.debug('mapTopicToPartitions', metadata);
protocol.assign(metadata, groupMembers, callback);
}
], callback);
}...
this.isLeader = (joinGroupResponse.leaderId === joinGroupResponse.memberId);
this.generationId = joinGroupResponse.generationId;
this.memberId = joinGroupResponse.memberId;
var groupAssignment;
if (this.isLeader) {
// assign partitions
return this.assignPartitions(joinGroupResponse.groupProtocol, joinGroupResponse.members
, callback);
}
callback(null, groupAssignment);
};
ConsumerGroup.prototype.saveDefaultOffsets = function (topicPartitionList, callback) {
var self = this;
const offsetPayload = _(topicPartitionList).cloneDeep().map(tp => {
...close = function (force, cb) {
var self = this;
this.ready = false;
this.stopHeartbeats();
if (typeof force === 'function') {
cb = force;
force = false;
}
async.series([
function (callback) {
if (force) {
self.commit(true, callback);
return;
}
callback(null);
},
function (callback) {
self.leaveGroup(function (error) {
if (error) {
logger.error('Leave group failed with', error);
}
callback(null);
});
},
function (callback) {
self.client.close(callback);
}
], cb);
}...
### close(force, cb)
* `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false`
Example
```js
consumer.close(true, cb);
consumer.close(cb); //force is disabled
```
## HighLevelConsumer
⚠️ ***This consumer has been deprecated in the latest version of Kafka (0.10.1) and is likely to be removed in the future. Please
use the ConsumerGroup instead.***
### HighLevelConsumer(client, payloads, options)
...connect = function () {
if (this.connecting) {
logger.warn('Connect ignored. Currently connecting.');
return;
}
logger.debug('Connecting %s', this.client.clientId);
var self = this;
this.connecting = true;
this.emit('rebalancing');
async.waterfall([
function (callback) {
if (self.client.coordinatorId) {
return callback(null, null);
}
self.client.sendGroupCoordinatorRequest(self.options.groupId, callback);
},
function (coordinatorInfo, callback) {
logger.debug('GroupCoordinator Response:', coordinatorInfo);
if (coordinatorInfo) {
self.setCoordinatorId(coordinatorInfo.coordinatorId);
}
self.client.sendJoinGroupRequest(self.options.groupId, emptyStrIfNull(self.memberId), self.options.sessionTimeout, self.protocols
, callback);
},
function (joinGroupResponse, callback) {
self.handleJoinGroup(joinGroupResponse, callback);
},
function (groupAssignment, callback) {
logger.debug('SyncGroup Request from %s', self.memberId);
self.client.sendSyncGroupRequest(self.options.groupId, self.generationId, self.memberId, groupAssignment, callback);
},
function (syncGroupResponse, callback) {
self.handleSyncGroup(syncGroupResponse, callback);
}
], function (error, startFetch) {
self.connecting = false;
self.rebalancing = false;
if (error) {
return self.recovery.tryToRecoverFrom(error, 'connect');
}
self.ready = true;
self.recovery.clearError();
logger.debug('generationId', self.generationId);
if (startFetch) {
self.fetch();
}
self.startHeartbeats();
self.emit('connect');
self.emit('rebalanced');
});
}...
}
var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.connect();
}
util.inherits(BaseProducer, events.EventEmitter);
BaseProducer.prototype.connect = function () {
// emiter...
var self = this;
...fetchOffset = function (payloads, cb) {
this.client.sendOffsetFetchV1Request(this.options.groupId, payloads, cb);
}...
logger.debug('%s owns topics: ', self.client.clientId, syncGroupResponse.partitions);
const topicPartitionList = createTopicPartitionList(syncGroupResponse.partitions);
const useDefaultOffsets = self.options.fromOffset in ACCEPTED_FROM_OFFSET;
async.waterfall([
function (callback) {
self.fetchOffset(syncGroupResponse.partitions, callback);
},
function (offsets, callback) {
logger.debug('%s fetchOffset Response: %j', self.client.clientId, offsets);
var noOffset = topicPartitionList.some(function (tp) {
return offsets[tp.topic][tp.partition] === -1;
});
...getDefaultOffset = function (tp, defaultOffset) {
return _.get(this.defaultOffsets, [tp.topic, tp.partition], defaultOffset);
}...
callback(null, offsets);
}
},
function (offsets, callback) {
self.topicPayloads = self.buildPayloads(topicPartitionList).map(function (p) {
var offset = offsets[p.topic][p.partition];
if (offset === -1) { // -1 means no offset was saved for this topic/partition combo
offset = useDefaultOffsets ? self.getDefaultOffset(p, 0) : 0;
if (self.migrator) {
offset = self.migrator.getOffset(p, offset);
}
}
p.offset = offset;
return p;
});
...getOffset = function () {
if (this.offset) {
return this.offset;
}
this.offset = new Offset(this.client);
// we can ignore this since we are already forwarding error event emitted from client
this.offset.on('error', _.noop);
return this.offset;
}...
if (this.options.outOfRangeOffset === 'none') {
this.emit('error', new errors.InvalidConsumerOffsetError(`Offset out of range for topic "${topic.topic}" partition
${topic.partition}`));
return;
}
topic.time = ACCEPTED_FROM_OFFSET[this.options.outOfRangeOffset];
this.getOffset().fetch([topic], (error, result) => {
if (error) {
this.emit('error', new errors.InvalidConsumerOffsetError(`Fetching ${this.options.outOfRangeOffset} offset failed`,
error));
return;
}
const offset = _.head(result[topic.topic][topic.partition]);
const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset;
...handleJoinGroup = function (joinGroupResponse, callback) {
logger.debug('joinGroupResponse %j from %s', joinGroupResponse, this.client.clientId);
this.isLeader = (joinGroupResponse.leaderId === joinGroupResponse.memberId);
this.generationId = joinGroupResponse.generationId;
this.memberId = joinGroupResponse.memberId;
var groupAssignment;
if (this.isLeader) {
// assign partitions
return this.assignPartitions(joinGroupResponse.groupProtocol, joinGroupResponse.members, callback);
}
callback(null, groupAssignment);
}...
if (coordinatorInfo) {
self.setCoordinatorId(coordinatorInfo.coordinatorId);
}
self.client.sendJoinGroupRequest(self.options.groupId, emptyStrIfNull(self.memberId), self.options.sessionTimeout, self.protocols
, callback);
},
function (joinGroupResponse, callback) {
self.handleJoinGroup(joinGroupResponse, callback);
},
function (groupAssignment, callback) {
logger.debug('SyncGroup Request from %s', self.memberId);
self.client.sendSyncGroupRequest(self.options.groupId, self.generationId, self.memberId, groupAssignment, callback);
},
...handleSyncGroup = function (syncGroupResponse, callback) {
logger.debug('SyncGroup Response');
var self = this;
var ownedTopics = Object.keys(syncGroupResponse.partitions);
if (ownedTopics.length) {
logger.debug('%s owns topics: ', self.client.clientId, syncGroupResponse.partitions);
const topicPartitionList = createTopicPartitionList(syncGroupResponse.partitions);
const useDefaultOffsets = self.options.fromOffset in ACCEPTED_FROM_OFFSET;
async.waterfall([
function (callback) {
self.fetchOffset(syncGroupResponse.partitions, callback);
},
function (offsets, callback) {
logger.debug('%s fetchOffset Response: %j', self.client.clientId, offsets);
var noOffset = topicPartitionList.some(function (tp) {
return offsets[tp.topic][tp.partition] === -1;
});
if (noOffset) {
logger.debug('No saved offsets');
if (self.options.fromOffset === 'none') {
return callback(new Error(`${self.client.clientId} owns topics and partitions which contains no saved offsets for group
'${self.options.groupId}'`));
}
async.parallel([
function (callback) {
if (self.migrator) {
return self.migrator.saveHighLevelConsumerOffsets(topicPartitionList, callback);
}
callback(null);
},
function (callback) {
if (useDefaultOffsets) {
return self.saveDefaultOffsets(topicPartitionList, callback);
}
callback(null);
}
], function (error) {
if (error) {
return callback(error);
}
logger.debug('%s defaultOffset Response for %s: %j', self.client.clientId, self.options.fromOffset, self.defaultOffsets
);
callback(null, offsets);
});
} else {
logger.debug('Has saved offsets');
callback(null, offsets);
}
},
function (offsets, callback) {
self.topicPayloads = self.buildPayloads(topicPartitionList).map(function (p) {
var offset = offsets[p.topic][p.partition];
if (offset === -1) { // -1 means no offset was saved for this topic/partition combo
offset = useDefaultOffsets ? self.getDefaultOffset(p, 0) : 0;
if (self.migrator) {
offset = self.migrator.getOffset(p, offset);
}
}
p.offset = offset;
return p;
});
callback(null, true);
}
], callback);
} else { // no partitions assigned
callback(null, false);
}
}...
function (groupAssignment, callback) {
logger.debug('SyncGroup Request from %s', self.memberId);
self.client.sendSyncGroupRequest(self.options.groupId, self.generationId, self.memberId, groupAssignment, callback);
},
function (syncGroupResponse, callback) {
self.handleSyncGroup(syncGroupResponse, callback);
}
], function (error, startFetch) {
self.connecting = false;
self.rebalancing = false;
if (error) {
return self.recovery.tryToRecoverFrom(error, 'connect');
}
...leaveGroup = function (callback) {
logger.debug('%s leaving group', this.client.clientId);
var self = this;
this.stopHeartbeats();
if (self.generationId != null && self.memberId) {
this.client.sendLeaveGroupRequest(this.options.groupId, this.memberId, function (error) {
self.generationId = null;
callback(error);
});
} else {
callback(null);
}
}...
if (force) {
self.commit(true, callback);
return;
}
callback(null);
},
function (callback) {
self.leaveGroup(function (error) {
if (error) {
logger.error('Leave group failed with', error);
}
callback(null);
});
},
function (callback) {
...saveDefaultOffsets = function (topicPartitionList, callback) {
var self = this;
const offsetPayload = _(topicPartitionList).cloneDeep().map(tp => {
tp.time = ACCEPTED_FROM_OFFSET[this.options.fromOffset];
return tp;
});
self.getOffset().fetch(offsetPayload, function (error, result) {
if (error) {
return callback(error);
}
self.defaultOffsets = _.mapValues(result, function (partitionOffsets) {
return _.mapValues(partitionOffsets, _.head);
});
callback(null);
});
}...
if (self.migrator) {
return self.migrator.saveHighLevelConsumerOffsets(topicPartitionList, callback);
}
callback(null);
},
function (callback) {
if (useDefaultOffsets) {
return self.saveDefaultOffsets(topicPartitionList, callback);
}
callback(null);
}
], function (error) {
if (error) {
return callback(error);
}
...scheduleReconnect = function (timeout) {
assert(timeout);
this.rebalancing = true;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
var self = this;
this.reconnectTimer = setTimeout(function () {
self.reconnectTimer = null;
self.connect();
}, timeout);
}...
if (retry) {
retryTimeout = this.getRetryTimeout(error);
}
if (retry && retryTimeout) {
logger.debug('RECOVERY from %s: %s retrying in %s ms', source, this.consumerGroup.client.clientId, retryTimeout, error
);
this.consumerGroup.scheduleReconnect(retryTimeout);
} else {
this.consumerGroup.emit('error', error);
}
this.lastError = error;
};
ConsumerGroupRecovery.prototype.clearError = function () {
...sendHeartbeat = function () {
assert(this.memberId, 'invalid memberId');
assert(this.generationId >= 0, 'invalid generationId');
// logger.debug('%s ❤️ ->', this.client.clientId);
var self = this;
function heartbeatCallback (error) {
if (error) {
logger.warn('%s Heartbeat error:', self.client.clientId, error);
self.recovery.tryToRecoverFrom(error, 'heartbeat');
}
// logger.debug('%s 💚 <-', self.client.clientId, error);
}
const heartbeat = new Heartbeat(this.client, heartbeatCallback);
heartbeat.send(this.options.groupId, this.generationId, this.memberId);
return heartbeat;
}...
assert(this.ready, 'consumerGroup is not ready');
const heartbeatIntervalMs = this.options.heartbeatInterval || (Math.floor(this.options.sessionTimeout / 3));
logger.debug('%s started heartbeats at every %d ms', this.client.clientId, heartbeatIntervalMs);
this.stopHeartbeats();
let heartbeat = this.sendHeartbeat();
this.heartbeatInterval = setInterval(() => {
// only send another heartbeat if we got a response from the last one
if (heartbeat.verifyResolved()) {
heartbeat = this.sendHeartbeat();
}
}, heartbeatIntervalMs);
...sendOffsetCommitRequest = function (commits, cb) {
if (this.generationId && this.memberId) {
this.client.sendOffsetCommitV2Request(this.options.groupId, this.generationId, this.memberId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
}...
}.bind(this), this.options.autoCommitIntervalMs);
var payloads = this.payloads;
if (this.pausedPayloads) payloads = payloads.concat(this.pausedPayloads);
var commits = payloads.filter(function (p) { return p.offset !== 0; });
if (commits.length) {
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
}
Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit;
Consumer.prototype.fetch = function () {
...setCoordinatorId = function (coordinatorId) {
this.client.coordinatorId = String(coordinatorId);
}...
}
self.client.sendGroupCoordinatorRequest(self.options.groupId, callback);
},
function (coordinatorInfo, callback) {
logger.debug('GroupCoordinator Response:', coordinatorInfo);
if (coordinatorInfo) {
self.setCoordinatorId(coordinatorInfo.coordinatorId);
}
self.client.sendJoinGroupRequest(self.options.groupId, emptyStrIfNull(self.memberId), self.options.sessionTimeout, self.protocols
, callback);
},
function (joinGroupResponse, callback) {
self.handleJoinGroup(joinGroupResponse, callback);
},
...setupProtocols = function (protocols) {
if (!Array.isArray(protocols)) {
protocols = [protocols];
}
this.protocols = protocols.map(function (protocol) {
if (typeof protocol === 'string') {
if (!(protocol in builtInProtocols)) {
throw new Error('Unknown built in assignment protocol ' + protocol);
}
protocol = _.assign({}, builtInProtocols[protocol]);
} else {
checkProtocol(protocol);
}
protocol.subscription = this.topics;
return protocol;
}, this);
}...
assert(Array.isArray(topics), 'Array of topics is required');
this.topics = topics;
this.recovery = new ConsumerGroupRecovery(this);
this.setupProtocols(this.options.protocol);
if (this.options.connectOnReady && !this.options.migrateHLC) {
this.client.once('ready', this.connect.bind(this));
}
if (this.options.migrateHLC) {
const ConsumerGroupMigrator = require('./consumerGroupMigrator');
...startHeartbeats = function () {
assert(this.options.sessionTimeout > 0);
assert(this.ready, 'consumerGroup is not ready');
const heartbeatIntervalMs = this.options.heartbeatInterval || (Math.floor(this.options.sessionTimeout / 3));
logger.debug('%s started heartbeats at every %d ms', this.client.clientId, heartbeatIntervalMs);
this.stopHeartbeats();
let heartbeat = this.sendHeartbeat();
this.heartbeatInterval = setInterval(() => {
// only send another heartbeat if we got a response from the last one
if (heartbeat.verifyResolved()) {
heartbeat = this.sendHeartbeat();
}
}, heartbeatIntervalMs);
}...
self.recovery.clearError();
logger.debug('generationId', self.generationId);
if (startFetch) {
self.fetch();
}
self.startHeartbeats();
self.emit('connect');
self.emit('rebalanced');
});
};
ConsumerGroup.prototype.scheduleReconnect = function (timeout) {
assert(timeout);
...stopHeartbeats = function () {
this.heartbeatInterval && clearInterval(this.heartbeatInterval);
}...
ConsumerGroup.prototype.startHeartbeats = function () {
assert(this.options.sessionTimeout > 0);
assert(this.ready, 'consumerGroup is not ready');
const heartbeatIntervalMs = this.options.heartbeatInterval || (Math.floor(this.options.sessionTimeout / 3));
logger.debug('%s started heartbeats at every %d ms', this.client.clientId, heartbeatIntervalMs);
this.stopHeartbeats();
let heartbeat = this.sendHeartbeat();
this.heartbeatInterval = setInterval(() => {
// only send another heartbeat if we got a response from the last one
if (heartbeat.verifyResolved()) {
heartbeat = this.sendHeartbeat();
...CustomPartitioner = function (partitioner) {
this.getPartition = partitioner;
}n/a
super_ = function () {}n/a
CyclicPartitioner = function () {
this.c = 0;
}n/a
super_ = function () {}n/a
getPartition = function (partitions) {
if (_.isEmpty(partitions)) return 0;
return partitions[ this.c++ % partitions.length ];
}...
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition
(_.map(topicMetadata[p.topic], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
let messages = _.isArray(p.messages) ? p.messages : [p.messages];
messages = messages.map(function (message) {
if (message instanceof KeyedMessage) {
return message;
}
...DefaultPartitioner = function () {}n/a
super_ = function () {}n/a
getPartition = function (partitions) {
if (partitions && _.isArray(partitions) && partitions.length > 0) {
return partitions[0];
} else {
return 0;
}
}...
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition
(_.map(topicMetadata[p.topic], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
let messages = _.isArray(p.messages) ? p.messages : [p.messages];
messages = messages.map(function (message) {
if (message instanceof KeyedMessage) {
return message;
}
...FailedToRebalanceConsumerError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}...
operation.attempt(function (currentAttempt) {
self.rebalanceAttempt(oldTopicPayloads, function (err) {
if (operation.retry(err)) {
return;
}
if (err) {
self.rebalancing = false;
return self.emit('error', new errors.FailedToRebalanceConsumerError(operation
.mainError().toString()));
} else {
var topicNames = self.topicPayloads.map(function (p) {
return p.topic;
});
self.client.refreshMetadata(topicNames, function (err) {
register();
if (err) {
...function Error() { [native code] }n/a
FailedToRegisterConsumerError = function (message, nested) {
NestedError.call(this, message, nested);
this.message = message;
}...
} else {
callback(new Error(util.format('Consumer %s is not registered in group %s', self.id, self.options.groupId)));
}
});
}
], function (error) {
if (error) {
self.emit('error', new errors.FailedToRegisterConsumerError(error.toString
(), error));
}
});
}
}, 20000);
function fetchAndUpdateOffsets (cb) {
self.fetchOffset(self.topicPayloads, function (err, topics) {
...super_ = function (message, nested) {
this.nested = nested;
if (typeof message !== 'undefined') {
Object.defineProperty(this, 'message', {
value: message,
writable: true,
enumerable: false,
configurable: true
});
}
Error.captureStackTrace(this, this.constructor);
var oldStackDescriptor = Object.getOwnPropertyDescriptor(this, 'stack');
var stackDescriptor = buildStackDescriptor(oldStackDescriptor, nested);
Object.defineProperty(this, 'stack', stackDescriptor);
}n/a
GroupCoordinatorNotAvailableError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
function Error() { [native code] }n/a
GroupLoadInProgressError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
function Error() { [native code] }n/a
HeartbeatTimeoutError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
function Error() { [native code] }n/a
HighLevelConsumer = function (client, topics, options) {
if (!topics) {
throw new Error('Must have payloads');
}
this.fetchCount = 0;
this.client = client;
this.options = _.defaults((options || {}), DEFAULTS);
this.initialised = false;
this.ready = false;
this.closing = false;
this.paused = this.options.paused;
this.rebalancing = false;
this.pendingRebalances = 0;
this.committing = false;
this.needToCommit = false;
this.id = this.options.id || this.options.groupId + '_' + uuid.v4();
this.payloads = this.buildPayloads(topics);
this.topicPayloads = this.buildTopicPayloads(topics);
this.connect();
if (this.options.groupId) {
validateConfig('options.groupId', this.options.groupId);
}
}n/a
function EventEmitter() {
EventEmitter.init.call(this);
}n/a
_releasePartitions = function (topicPayloads, callback) {
var self = this;
async.each(topicPayloads, function (tp, cbb) {
if (tp.partition !== undefined) {
async.series([
function (delcbb) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
// Partition doesn't exist simply carry on
cbb();
} else delcbb();
});
},
function (delcbb) {
self.client.zk.deletePartitionOwnership(self.options.groupId, tp.topic, tp.partition, delcbb);
},
function (delcbb) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
delcbb();
} else {
delcbb('Partition should not exist');
}
});
}],
cbb);
} else {
cbb();
}
}, callback);
}...
}
});
},
// Release current partitions
function (callback) {
logger.debug('HighLevelConsumer %s releasing current partitions during rebalance', self.id);
self._releasePartitions(oldTopicPayloads, callback);
},
// Rebalance
function (callback) {
logger.debug('HighLevelConsumer %s determining the partitions to own during rebalance', self.id);
logger.debug('consumerPerTopicMap.consumerTopicMap %j', consumerPerTopicMap.consumerTopicMap);
for (var topic in consumerPerTopicMap.consumerTopicMap[self.id]) {
...addTopics = function (topics, cb) {
var self = this;
if (!this.ready) {
setTimeout(function () {
self.addTopics(topics, cb);
}, 100);
return;
}
this.client.addTopics(
topics,
function (err, added) {
if (err) return cb && cb(err, added);
var payloads = self.buildPayloads(topics);
// update offset of topics that will be added
self.fetchOffset(payloads, function (err, offsets) {
if (err) return cb(err);
payloads.forEach(function (p) {
var offset = offsets[p.topic][p.partition];
if (offset === -1) offset = 0;
p.offset = offset;
self.topicPayloads.push(p);
});
// TODO: rebalance consumer
cb && cb(null, added);
});
}
);
}...
* `topics`: **Array**, array of topics to add
* `cb`: **Function**,the callback
* `fromOffset`: **Boolean**, if true, the consumer will fetch message from the specified offset, otherwise it will fetch message
from the last commited offset of the topic.
Example:
``` js
consumer.addTopics(['t1', 't2'], function (err, added) {
});
or
consumer.addTopics([{ topic: 't1', offset: 10 }], function (err, added) {
}, true);
```
...function autoCommit(force, cb) {
if (arguments.length === 1) {
cb = force;
force = false;
}
if (!force) {
if (this.committing) return cb(null, 'Offset committing');
if (!this.needToCommit) return cb(null, 'Commit not needed');
}
this.needToCommit = false;
this.committing = true;
setTimeout(function () {
this.committing = false;
}.bind(this), this.options.autoCommitIntervalMs);
var commits = this.topicPayloads.filter(function (p) { return p.offset !== -1; });
if (commits.length) {
this.sendOffsetCommitRequest(commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
}...
if (offset === -1) offset = 0;
if (!initing) p.offset = offset + 1;
else p.offset = offset;
}
});
if (this.options.autoCommit && !initing) {
this.autoCommit(false, function (err) {
err && logger.debug('auto commit offset', err);
});
}
};
function autoCommit (force, cb) {
if (arguments.length === 1) {
...buildPayloads = function (payloads) {
var self = this;
return payloads.map(function (p) {
if (typeof p !== 'object') p = { topic: p };
p.partition = p.partition || 0;
p.offset = p.offset || 0;
p.maxBytes = self.options.fetchMaxBytes;
p.metadata = 'm'; // metadata can be arbitrary
return p;
});
}...
* @param {BaseProducer~sendCallback} cb A function to call once the send has completed
*/
BaseProducer.prototype.send = function (payloads, cb) {
var client = this.client;
var requireAcks = this.requireAcks;
var ackTimeoutMs = this.ackTimeoutMs;
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks
, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic
], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
...buildTopicPayloads = function (topics) {
return topics.map(function (j) {
var k = { topic: j.topic };
return k;
});
}...
this.paused = this.options.paused;
this.rebalancing = false;
this.pendingRebalances = 0;
this.committing = false;
this.needToCommit = false;
this.id = this.options.id || this.options.groupId + '_' + uuid.v4();
this.payloads = this.buildPayloads(topics);
this.topicPayloads = this.buildTopicPayloads(topics);
this.connect();
if (this.options.groupId) {
validateConfig('options.groupId', this.options.groupId);
}
};
util.inherits(HighLevelConsumer, events.EventEmitter);
...close = function (force, cb) {
var self = this;
this.ready = false;
this.closing = true;
clearInterval(this.checkPartitionOwnershipInterval);
if (typeof force === 'function') {
cb = force;
force = false;
}
async.series([
function (callback) {
self.leaveGroup(callback);
},
function (callback) {
if (force) {
async.series([
function (callback) {
self.commit(true, callback);
},
function (callback) {
self.client.close(callback);
}
], callback);
return;
}
self.client.close(callback);
}
], cb);
}...
### close(force, cb)
* `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false`
Example
```js
consumer.close(true, cb);
consumer.close(cb); //force is disabled
```
## HighLevelConsumer
⚠️ ***This consumer has been deprecated in the latest version of Kafka (0.10.1) and is likely to be removed in the future. Please
use the ConsumerGroup instead.***
### HighLevelConsumer(client, payloads, options)
...function autoCommit(force, cb) {
if (arguments.length === 1) {
cb = force;
force = false;
}
if (!force) {
if (this.committing) return cb(null, 'Offset committing');
if (!this.needToCommit) return cb(null, 'Commit not needed');
}
this.needToCommit = false;
this.committing = true;
setTimeout(function () {
this.committing = false;
}.bind(this), this.options.autoCommitIntervalMs);
var commits = this.topicPayloads.filter(function (p) { return p.offset !== -1; });
if (commits.length) {
this.sendOffsetCommitRequest(commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
}...
Commit offset of the current topics manually, this method should be called when a consumer leaves
* `cb`: **Function**, the callback
Example:
``` js
consumer.commit(function(err, data) {
});
```
### setOffset(topic, partition, offset)
Set offset of the given topic
* `topic`: **String**
...connect = function () {
var self = this;
// Client alreadyexists
if (this.client.ready) {
this.init();
}
this.client.on('ready', function () {
if (!self.initialised) self.init();
// Check the topics exist and create a watcher on them
var topics = self.payloads.map(function (p) {
return p.topic;
});
self.client.topicExists(topics, function (err) {
if (err) {
return self.emit('error', err);
}
self.initialised = true;
});
});
function checkPartitionOwnership (callback) {
async.each(self.topicPayloads, function (tp, cbb) {
if (tp.partition !== undefined) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
cbb(err);
} else {
cbb();
}
});
} else {
cbb();
}
}, callback);
}
// Check partition ownership and registration
this.checkPartitionOwnershipInterval = setInterval(function () {
if (!self.rebalancing) {
async.parallel([
checkPartitionOwnership,
function (callback) {
self.client.zk.isConsumerRegistered(self.options.groupId, self.id, function (error, registered) {
if (error) {
return callback(error);
}
if (registered) {
callback();
} else {
callback(new Error(util.format('Consumer %s is not registered in group %s', self.id, self.options.groupId)));
}
});
}
], function (error) {
if (error) {
self.emit('error', new errors.FailedToRegisterConsumerError(error.toString(), error));
}
});
}
}, 20000);
function fetchAndUpdateOffsets (cb) {
self.fetchOffset(self.topicPayloads, function (err, topics) {
if (err) {
return cb(err);
}
self.ready = true;
self.updateOffsets(topics, true);
return cb();
});
}
function rebalance () {
logger.debug('rebalance() %s is rebalancing: %s ready: %s', self.id, self.rebalancing, self.ready);
if (!self.rebalancing && !self.closing) {
deregister();
self.emit('rebalancing');
self.rebalancing = true;
logger.debug('HighLevelConsumer rebalance retry config: %s', JSON.stringify(self.options.rebalanceRetry));
var oldTopicPayloads = self.topicPayloads;
var operation = retry.operation(self.options.rebalanceRetry);
operation.attempt(function (currentAttempt) {
self.rebalanceAttempt(oldTopicPayloads, function (err) {
if (operation.retry(err)) {
return;
}
if (err) {
self.rebalancing = false;
return self.emit('error', new errors.FailedToRebalanceConsumerError(operation.mainError().toString()));
} else {
var topicNames = self.topicPayloads.map(function (p) {
return p.topic;
});
self.client.refreshMetadata(topicNames, function (err) {
register();
if (err) {
self.rebalancing = false;
self.emit('error', err);
return;
}
if (self.topicPayloads.length) {
fetchAndUpdateOffsets(function (err) {
self.rebalancing = false;
if (err) {
self.emit('error', new errors.FailedToRebalanceConsumerError(err.message));
return;
}
self.fetch();
self.emit('rebalanced');
});
} else { // was not assigned any partitions during rebalance
self.rebalancing = false;
self.emit('rebalanced');
}
});
}
});
});
}
}
// Wait for the consumer to be ready
this.on('registered', rebalance);
function register (fn) {
logger.debug('Registered listeners %s', self.id);
self.client.zk.on('consumersChanged', fn ......
}
var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.connect();
}
util.inherits(BaseProducer, events.EventEmitter);
BaseProducer.prototype.connect = function () {
// emiter...
var self = this;
...fetch = function () {
if (!this.ready || this.rebalancing || this.paused) {
return;
}
this.client.sendFetchRequest(this, this.topicPayloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes, this.options.maxTickMessages
);
}...
Example
```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.fetch([
{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }
], function (err, data) {
// data
// { 't': { '0': [999] } }
});
```
...fetchOffset = function (payloads, cb) {
logger.debug('in fetchOffset %s payloads: %j', this.id, payloads);
this.client.sendOffsetFetchRequest(this.options.groupId, payloads, cb);
}...
logger.debug('%s owns topics: ', self.client.clientId, syncGroupResponse.partitions);
const topicPartitionList = createTopicPartitionList(syncGroupResponse.partitions);
const useDefaultOffsets = self.options.fromOffset in ACCEPTED_FROM_OFFSET;
async.waterfall([
function (callback) {
self.fetchOffset(syncGroupResponse.partitions, callback);
},
function (offsets, callback) {
logger.debug('%s fetchOffset Response: %j', self.client.clientId, offsets);
var noOffset = topicPartitionList.some(function (tp) {
return offsets[tp.topic][tp.partition] === -1;
});
...getTopicPayloads = function () {
if (!this.rebalancing) return this.topicPayloads;
return null;
}n/a
init = function () {
var self = this;
if (!self.topicPayloads.length) {
return;
}
self.registerConsumer(function (err) {
if (err) {
return self.emit('error', new errors.FailedToRegisterConsumerError(err.toString()));
}
// Close the
return self.emit('registered');
});
}...
});
};
Consumer.prototype.connect = function () {
var self = this;
// Client already exists
this.ready = this.client.ready;
if (this.ready) this.init();
this.client.on('ready', function () {
logger.debug('consumer ready');
if (!self.ready) self.init();
self.ready = true;
});
...leaveGroup = function (cb) {
var self = this;
async.parallel([
function (callback) {
if (self.topicPayloads.length) {
self._releasePartitions(self.topicPayloads, callback);
} else {
callback(null);
}
},
function (callback) {
self.client.zk.unregisterConsumer(self.options.groupId, self.id, callback);
}
], cb);
}...
if (force) {
self.commit(true, callback);
return;
}
callback(null);
},
function (callback) {
self.leaveGroup(function (error) {
if (error) {
logger.error('Leave group failed with', error);
}
callback(null);
});
},
function (callback) {
...offsetRequest = function (payloads, cb) {
this.client.sendOffsetRequest(payloads, cb);
}...
});
this.on('offsetOutOfRange', function (topic) {
self.pause();
topic.maxNum = self.options.maxNumSegments;
topic.metadata = 'm';
topic.time = Date.now();
self.offsetRequest([topic], function (err, offsets) {
if (err) {
self.emit('error', new errors.InvalidConsumerOffsetError(self));
} else {
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
// set minimal offset
self.setOffset(topic.topic, topic.partition, min);
self.resume();
...pause = function () {
this.paused = true;
}...
} else if (!self.connecting) {
self.fetch();
}
});
}, 200);
this.client.on('brokersChanged', function () {
self.pause();
recoverFromBrokerChange();
});
this.client.on('reconnect', function (lastError) {
self.fetch();
});
...rebalanceAttempt = function (oldTopicPayloads, cb) {
var self = this;
// Do the rebalance.....
var consumerPerTopicMap;
var newTopicPayloads = [];
logger.debug('HighLevelConsumer %s is attempting to rebalance', self.id);
async.series([
// Stop fetching data and commit offsets
function (callback) {
logger.debug('HighLevelConsumer %s stopping data read during rebalance', self.id);
self.stop(function () {
callback();
});
},
// Assemble the data
function (callback) {
logger.debug('HighLevelConsumer %s assembling data for rebalance', self.id);
self.client.zk.getConsumersPerTopic(self.options.groupId, function (err, obj) {
if (err) {
callback(err);
} else {
consumerPerTopicMap = obj;
callback();
}
});
},
// Release current partitions
function (callback) {
logger.debug('HighLevelConsumer %s releasing current partitions during rebalance', self.id);
self._releasePartitions(oldTopicPayloads, callback);
},
// Rebalance
function (callback) {
logger.debug('HighLevelConsumer %s determining the partitions to own during rebalance', self.id);
logger.debug('consumerPerTopicMap.consumerTopicMap %j', consumerPerTopicMap.consumerTopicMap);
for (var topic in consumerPerTopicMap.consumerTopicMap[self.id]) {
if (!consumerPerTopicMap.consumerTopicMap[self.id].hasOwnProperty(topic)) {
continue;
}
var topicToAdd = consumerPerTopicMap.consumerTopicMap[self.id][topic];
var numberOfConsumers = consumerPerTopicMap.topicConsumerMap[topicToAdd].length;
var numberOfPartition = consumerPerTopicMap.topicPartitionMap[topicToAdd].length;
var partitionsPerConsumer = Math.floor(numberOfPartition / numberOfConsumers);
var extraPartitions = numberOfPartition % numberOfConsumers;
var currentConsumerIndex;
for (var index in consumerPerTopicMap.topicConsumerMap[topicToAdd]) {
if (!consumerPerTopicMap.topicConsumerMap[topicToAdd].hasOwnProperty(index)) {
continue;
}
if (consumerPerTopicMap.topicConsumerMap[topicToAdd][index] === self.id) {
currentConsumerIndex = parseInt(index);
break;
}
}
var extraBit = currentConsumerIndex;
if (currentConsumerIndex > extraPartitions) extraBit = extraPartitions;
var startPart = partitionsPerConsumer * currentConsumerIndex + extraBit;
var extraNParts = 1;
if (currentConsumerIndex + 1 > extraPartitions) extraNParts = 0;
var nParts = partitionsPerConsumer + extraNParts;
for (var i = startPart; i < startPart + nParts; i++) {
newTopicPayloads.push({
topic: topicToAdd,
partition: consumerPerTopicMap.topicPartitionMap[topicToAdd][i],
offset: 0,
maxBytes: self.options.fetchMaxBytes,
metadata: 'm'
});
}
}
logger.debug('newTopicPayloads %j', newTopicPayloads);
callback();
},
// Update ZK with new ownership
function (callback) {
if (newTopicPayloads.length) {
logger.debug('HighLevelConsumer %s gaining ownership of partitions during rebalance', self.id);
async.eachSeries(newTopicPayloads, function (tp, cbb) {
if (tp.partition !== undefined) {
async.series([
function (addcbb) {
self.client.zk.checkPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
// Partition doesn't exist simply carry on
addcbb();
} else cbb(); // Partition exists simply carry on
});
},
function (addcbb) {
self.client.zk.addPartitionOwnership(self.id, self.options.groupId, tp.topic, tp.partition, function (err) {
if (err) {
addcbb(err);
} else addcbb(); ......
self.rebalancing = true;
logger.debug('HighLevelConsumer rebalance retry config: %s', JSON.stringify(self.options.rebalanceRetry));
var oldTopicPayloads = self.topicPayloads;
var operation = retry.operation(self.options.rebalanceRetry);
operation.attempt(function (currentAttempt) {
self.rebalanceAttempt(oldTopicPayloads, function (err) {
if (operation.retry(err)) {
return;
}
if (err) {
self.rebalancing = false;
return self.emit('error', new errors.FailedToRebalanceConsumerError(operation.mainError().toString()));
} else {
...registerConsumer = function (cb) {
var self = this;
var groupId = this.options.groupId;
this.client.zk.registerConsumer(groupId, this.id, this.payloads, function (err) {
if (err) return cb(err);
self.client.zk.listConsumers(self.options.groupId);
var topics = self.topicPayloads.reduce(function (ret, topicPayload) {
if (ret.indexOf(topicPayload.topic) === -1) {
ret.push(topicPayload.topic);
}
return ret;
}, []);
topics.forEach(function (topic) {
self.client.zk.listPartitions(topic);
});
cb();
});
}...
this.client.on('zkReconnect', function () {
logger.debug('zookeeper reconnect for %s', self.id);
attachZookeeperErrorListener();
// clean up what's leftover
self.leaveGroup(function () {
// rejoin the group
self.registerConsumer(function (error) {
if (error) {
return self.emit('error', new errors.FailedToRegisterConsumerError('Failed to register consumer on zkReconnect
', error));
}
self.emit('registered');
});
});
});
...removeTopics = function (topics, cb) {
topics = typeof topics === 'string' ? [topics] : topics;
this.payloads = this.payloads.filter(function (p) {
return !~topics.indexOf(p.topic);
});
this.client.removeTopicMetadata(topics, cb);
}...
### removeTopics(topics, cb)
* `topics`: **Array**, array of topics to remove
* `cb`: **Function**, the callback
Example:
``` js
consumer.removeTopics(['t1', 't2'], function (err, removed) {
});
```
### commit(cb)
Commit offset of the current topics manually, this method should be called when a consumer leaves
* `cb`: **Function**, the callback
...resume = function () {
this.paused = false;
this.fetch();
}...
}
const offset = _.head(result[topic.topic][topic.partition]);
const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset;
logger.debug('replacing %s-%s stale offset of %d with %d', topic.topic, topic.partition, oldOffset, offset);
this.setOffset(topic.topic, topic.partition, offset);
this.resume();
});
});
// 'done' will be emit when a message fetch request complete
this.on('done', function (topics) {
self.updateOffsets(topics);
if (!self.paused) {
...sendOffsetCommitRequest = function (commits, cb) {
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
}...
}.bind(this), this.options.autoCommitIntervalMs);
var payloads = this.payloads;
if (this.pausedPayloads) payloads = payloads.concat(this.pausedPayloads);
var commits = payloads.filter(function (p) { return p.offset !== 0; });
if (commits.length) {
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
}
Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit;
Consumer.prototype.fetch = function () {
...setOffset = function (topic, partition, offset) {
this.topicPayloads.every(function (p) {
if (p.topic === topic && p.partition == partition) { // eslint-disable-line eqeqeq
p.offset = offset;
return false;
}
return true;
});
}...
* `partition`: **Number**
* `offset`: **Number**
Example:
``` js
consumer.setOffset('topic', 0, 0);
```
### pause()
Pause the consumer. ***Calling `pause` does not automatically stop messages from being emitted.*** This is because pause just stops
the kafka consumer fetch loop. Each iteration of the fetch loop can obtain a batch of messages (limited by `fetchMaxBytes`).
### resume()
Resume the consumer. Resumes the fetch loop.
...stop = function (cb) {
if (!this.options.autoCommit) return cb && cb();
this.commit(true, function (err) {
cb && cb(err);
});
}...
var newTopicPayloads = [];
logger.debug('HighLevelConsumer %s is attempting to rebalance', self.id);
async.series([
// Stop fetching data and commit offsets
function (callback) {
logger.debug('HighLevelConsumer %s stopping data read during rebalance', self.id);
self.stop(function () {
callback();
});
},
// Assemble the data
function (callback) {
logger.debug('HighLevelConsumer %s assembling data for rebalance', self.id);
...updateOffsets = function (topics, initing) {
this.topicPayloads.forEach(p => {
if (!_.isEmpty(topics[p.topic]) && topics[p.topic][p.partition] !== undefined) {
var offset = topics[p.topic][p.partition];
if (offset === -1) offset = 0;
if (!initing) p.offset = offset + 1;
else p.offset = offset;
this.needToCommit = true;
}
});
if (this.options.autoCommit && !initing) {
this.autoCommit(false, function (err) {
err && logger.debug('auto commit offset', err);
});
}
}...
this.setOffset(topic.topic, topic.partition, offset);
this.resume();
});
});
// 'done' will be emit when a message fetch request complete
this.on('done', function (topics) {
self.updateOffsets(topics);
if (!self.paused) {
setImmediate(function () {
self.fetch();
});
}
});
...function HighLevelProducer(client, options, customPartitioner) {
BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.cyclic, customPartitioner);
}n/a
function BaseProducer(client, options, defaultPartitionerType, customPartitioner) {
options = options || {};
this.ready = false;
this.client = client;
this.requireAcks = options.requireAcks === undefined
? DEFAULTS.requireAcks
: options.requireAcks;
this.ackTimeoutMs = options.ackTimeoutMs === undefined
? DEFAULTS.ackTimeoutMs
: options.ackTimeoutMs;
if (customPartitioner !== undefined && options.partitionerType !== PARTITIONER_TYPES.custom) {
throw new Error('Partitioner Type must be custom if providing a customPartitioner.');
} else if (customPartitioner === undefined && options.partitionerType === PARTITIONER_TYPES.custom) {
throw new Error('No customer partitioner defined');
}
var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.connect();
}n/a
function EventEmitter() {
EventEmitter.init.call(this);
}n/a
buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic], 'partition
'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
let messages = _.isArray(p.messages) ? p.messages : [p.messages];
messages = messages.map(function (message) {
if (message instanceof KeyedMessage) {
return message;
}
return new Message(0, 0, '', message);
});
let key = p.topic + p.partition;
let request = topicPartitionRequests[key];
if (request == null) {
topicPartitionRequests[key] = new ProduceRequest(p.topic, p.partition, messages, p.attributes);
} else {
assert(request.attributes === p.attributes);
Array.prototype.push.apply(request.messages, messages);
}
});
return _.values(topicPartitionRequests);
}...
* @param {BaseProducer~sendCallback} cb A function to call once the send has completed
*/
BaseProducer.prototype.send = function (payloads, cb) {
var client = this.client;
var requireAcks = this.requireAcks;
var ackTimeoutMs = this.ackTimeoutMs;
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks
, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic
], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
...close = function (cb) {
this.client.close(cb);
}...
### close(force, cb)
* `force`: **Boolean**, if set to true, it forces the consumer to commit the current offset before closing, default `false`
Example
```js
consumer.close(true, cb);
consumer.close(cb); //force is disabled
```
## HighLevelConsumer
⚠️ ***This consumer has been deprecated in the latest version of Kafka (0.10.1) and is likely to be removed in the future. Please
use the ConsumerGroup instead.***
### HighLevelConsumer(client, payloads, options)
...connect = function () {
// emiter...
var self = this;
this.ready = this.client.ready;
if (this.ready) self.emit('ready');
this.client.on('ready', function () {
if (!self.ready) {
self.ready = true;
self.emit('ready');
}
});
this.client.on('brokersChanged', function () {
let topics = Object.keys(this.topicMetadata);
this.refreshMetadata(topics, function (error) {
if (error) {
self.emit('error', error);
}
});
});
this.client.on('error', function (err) {
self.emit('error', err);
});
this.client.on('close', function () {});
}...
}
var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.connect();
}
util.inherits(BaseProducer, events.EventEmitter);
BaseProducer.prototype.connect = function () {
// emiter...
var self = this;
...createTopics = function (topics, async, cb) {
if (!this.ready) {
return cb(new Error('Producer not ready!'));
}
this.client.createTopics(topics, async, cb);
}...
``` js
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
// Create topics sync
producer.createTopics(['t','t1'], false, function (err, data) {
console.log(data);
});
// Create topics async
producer.createTopics(['t'], true, function (err, data) {});
producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg
```
...send = function (payloads, cb) {
var client = this.client;
var requireAcks = this.requireAcks;
var ackTimeoutMs = this.ackTimeoutMs;
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb);
}...
producer = new Producer(client),
km = new KeyedMessage('key', 'message'),
payloads = [
{ topic: 'topic1', messages: 'hi', partition: 0 },
{ topic: 'topic2', messages: ['hello', 'world', km] }
];
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
producer.on('error', function (err) {})
```
> ⚠️**WARNING**: Batch multiple messages of the same topic/partition together as an array on the `messages` attribute otherwise
you may lose messages!
...IllegalGenerationError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
function Error() { [native code] }n/a
InvalidConfigError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
function Error() { [native code] }n/a
InvalidConsumerOffsetError = function (message, nested) {
NestedError.apply(this, arguments);
}...
this.client.on('reconnect', function (lastError) {
self.fetch();
});
this.on('offsetOutOfRange', topic => {
this.pause();
if (this.options.outOfRangeOffset === 'none') {
this.emit('error', new errors.InvalidConsumerOffsetError(`Offset out of range
for topic "${topic.topic}" partition ${topic.partition}`));
return;
}
topic.time = ACCEPTED_FROM_OFFSET[this.options.outOfRangeOffset];
this.getOffset().fetch([topic], (error, result) => {
if (error) {
...super_ = function (message, nested) {
this.nested = nested;
if (typeof message !== 'undefined') {
Object.defineProperty(this, 'message', {
value: message,
writable: true,
enumerable: false,
configurable: true
});
}
Error.captureStackTrace(this, this.constructor);
var oldStackDescriptor = Object.getOwnPropertyDescriptor(this, 'stack');
var stackDescriptor = buildStackDescriptor(oldStackDescriptor, nested);
Object.defineProperty(this, 'stack', stackDescriptor);
}n/a
KafkaBuffer = function (batchSize, batchAge) {
this._batch_size = batchSize;
this._batch_age = batchAge;
this._batch_age_timer = null;
this._buffer = null;
}n/a
_setupTimer = function (callback) {
var self = this;
if (this._batch_age_timer != null) {
clearTimeout(this._batch_age_timer);
}
this._batch_age_timer = setTimeout(function () {
if (self._buffer && (self._buffer.length > 0)) {
callback();
}
}, this._batch_age);
}...
}
if (typeof callback !== 'undefined' && callback != null) {
if (this._batch_size == null || this._batch_age == null ||
(this._buffer && (this._buffer.length > this._batch_size))) {
callback();
} else {
this._setupTimer(callback);
}
}
};
KafkaBuffer.prototype._setupTimer = function (callback) {
var self = this;
...addChunk = function (buffer, callback) {
if (this._buffer == null) {
this._buffer = new Buffer(buffer);
} else {
this._buffer = Buffer.concat([this._buffer, buffer]);
}
if (typeof callback !== 'undefined' && callback != null) {
if (this._batch_size == null || this._batch_age == null ||
(this._buffer && (this._buffer.length > this._batch_size))) {
callback();
} else {
this._setupTimer(callback);
}
}
}...
this.noAckBatchAge = options ? options.noAckBatchAge : null;
this._KafkaBuffer = new KafkaBuffer(this.noAckBatchSize, this.noAckBatchAge);
};
util.inherits(BrokerTransform, Transform);
BrokerTransform.prototype._transform = function (chunk, enc, done) {
this._KafkaBuffer.addChunk(chunk, this._transformNext.bind(this));
done();
};
BrokerTransform.prototype._transformNext = function () {
this.push(this._KafkaBuffer.getBatch());
this._KafkaBuffer.truncateBatch();
};
...getBatch = function () {
return this._buffer;
}...
BrokerTransform.prototype._transform = function (chunk, enc, done) {
this._KafkaBuffer.addChunk(chunk, this._transformNext.bind(this));
done();
};
BrokerTransform.prototype._transformNext = function () {
this.push(this._KafkaBuffer.getBatch());
this._KafkaBuffer.truncateBatch();
};
module.exports = BrokerTransform;
...truncateBatch = function () {
this._buffer = null;
}...
BrokerTransform.prototype._transform = function (chunk, enc, done) {
this._KafkaBuffer.addChunk(chunk, this._transformNext.bind(this));
done();
};
BrokerTransform.prototype._transformNext = function () {
this.push(this._KafkaBuffer.getBatch());
this._KafkaBuffer.truncateBatch();
};
module.exports = BrokerTransform;
...KeyedPartitioner = function () {}n/a
super_ = function () {}n/a
getPartition = function (partitions, key) {
key = key || '';
var index = this.hashCode(key) % partitions.length;
return partitions[index];
}...
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition
(_.map(topicMetadata[p.topic], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
let messages = _.isArray(p.messages) ? p.messages : [p.messages];
messages = messages.map(function (message) {
if (message instanceof KeyedMessage) {
return message;
}
...hashCode = function (string) {
var hash = 0;
var length = string.length;
for (var i = 0; i < length; i++) {
hash = ((hash * 31) + string.charCodeAt(i)) & 0x7fffffff;
}
return (hash === 0) ? 1 : hash;
}...
return (hash === 0) ? 1 : hash;
};
KeyedPartitioner.prototype.getPartition = function (partitions, key) {
key = key || '';
var index = this.hashCode(key) % partitions.length;
return partitions[index];
};
var CustomPartitioner = function (partitioner) {
this.getPartition = partitioner;
};
util.inherits(CustomPartitioner, Partitioner);
...NotCoordinatorForGroupError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
function Error() { [native code] }n/a
Offset = function (client) {
var self = this;
this.client = client;
this.ready = this.client.ready;
this.client.on('ready', function () {
self.ready = true;
self.emit('ready');
});
this.client.once('connect', function () {
self.emit('connect');
});
this.client.on('error', function (err) {
self.emit('error', err);
});
}...
* `cb`: *Function*, the callback
Example
```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.fetch([
{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }
], function (err, data) {
// data
// { 't': { '0': [999] } }
});
```
...function EventEmitter() {
EventEmitter.init.call(this);
}n/a
buildPayloads = function (payloads) {
return payloads.map(function (p) {
p.partition = p.partition || 0;
p.time = p.time || Date.now();
p.maxNum = p.maxNum || 1;
p.metadata = 'm'; // metadata can be arbitrary
return p;
});
}...
* @param {BaseProducer~sendCallback} cb A function to call once the send has completed
*/
BaseProducer.prototype.send = function (payloads, cb) {
var client = this.client;
var requireAcks = this.requireAcks;
var ackTimeoutMs = this.ackTimeoutMs;
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks
, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition(_.map(topicMetadata[p.topic
], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
...commit = function (groupId, payloads, cb) {
if (!this.ready) {
this.once('ready', () => this.commit(groupId, payloads, cb));
return;
}
this.client.sendOffsetCommitRequest(groupId, this.buildPayloads(payloads), cb);
}...
Commit offset of the current topics manually, this method should be called when a consumer leaves
* `cb`: **Function**, the callback
Example:
``` js
consumer.commit(function(err, data) {
});
```
### setOffset(topic, partition, offset)
Set offset of the given topic
* `topic`: **String**
...fetch = function (payloads, cb) {
if (!this.ready) {
this.once('ready', () => this.fetch(payloads, cb));
return;
}
this.client.sendOffsetRequest(this.buildPayloads(payloads), cb);
}...
Example
```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.fetch([
{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 }
], function (err, data) {
// data
// { 't': { '0': [999] } }
});
```
...fetchCommits = function (groupId, payloads, cb) {
if (!this.ready) {
this.once('ready', () => this.fetchCommits(groupId, payloads, cb));
return;
}
this.client.sendOffsetFetchRequest(groupId, this.buildPayloads(payloads), cb);
}...
Example
```js
var kafka = require('kafka-node'),
client = new kafka.Client(),
offset = new kafka.Offset(client);
offset.fetchCommits('groupId', [
{ topic: 't', partition: 0 }
], function (err, data) {
});
```
### fetchLatestOffsets(topics, cb)
...fetchEarliestOffsets = function (topics, cb) {
fetchOffsets(this, topics, cb, -2);
}...
### fetchEarliestOffsets(topics, cb)
Example
```js
var partition = 0;
var topic = 't';
offset.fetchEarliestOffsets([topic], function (error, offsets) {
if (error)
return handleError(error);
console.log(offsets[topic][partition]);
});
```
# Troubleshooting / FAQ
...fetchLatestOffsets = function (topics, cb) {
fetchOffsets(this, topics, cb, -1);
}...
### fetchLatestOffsets(topics, cb)
Example
```js
var partition = 0;
var topic = 't';
offset.fetchLatestOffsets([topic], function (error, offsets) {
if (error)
return handleError(error);
console.log(offsets[topic][partition]);
});
```
### fetchEarliestOffsets(topics, cb)
...function Producer(client, options, customPartitioner) {
BaseProducer.call(this, client, options, BaseProducer.PARTITIONER_TYPES.default, customPartitioner);
}n/a
function BaseProducer(client, options, defaultPartitionerType, customPartitioner) {
options = options || {};
this.ready = false;
this.client = client;
this.requireAcks = options.requireAcks === undefined
? DEFAULTS.requireAcks
: options.requireAcks;
this.ackTimeoutMs = options.ackTimeoutMs === undefined
? DEFAULTS.ackTimeoutMs
: options.ackTimeoutMs;
if (customPartitioner !== undefined && options.partitionerType !== PARTITIONER_TYPES.custom) {
throw new Error('Partitioner Type must be custom if providing a customPartitioner.');
} else if (customPartitioner === undefined && options.partitionerType === PARTITIONER_TYPES.custom) {
throw new Error('No customer partitioner defined');
}
var partitionerType = PARTITIONER_MAP[options.partitionerType] || PARTITIONER_MAP[defaultPartitionerType];
// eslint-disable-next-line
this.partitioner = new partitionerType(customPartitioner);
this.connect();
}n/a
RandomPartitioner = function () {}n/a
super_ = function () {}n/a
getPartition = function (partitions) {
return partitions[Math.floor(Math.random() * partitions.length)];
}...
client.sendProduceRequest(this.buildPayloads(payloads, client.topicMetadata), requireAcks, ackTimeoutMs, cb);
};
BaseProducer.prototype.buildPayloads = function (payloads, topicMetadata) {
const topicPartitionRequests = Object.create(null);
payloads.forEach((p) => {
p.partition = p.hasOwnProperty('partition') ? p.partition : this.partitioner.getPartition
(_.map(topicMetadata[p.topic], 'partition'), p.key);
p.attributes = p.hasOwnProperty('attributes') ? p.attributes : 0;
let messages = _.isArray(p.messages) ? p.messages : [p.messages];
messages = messages.map(function (message) {
if (message instanceof KeyedMessage) {
return message;
}
...RebalanceInProgressError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
function Error() { [native code] }n/a
TopicsNotExistError = function (topics) {
Error.captureStackTrace(this, this);
this.topics = topics;
this.message = 'The topic(s) ' + topics.toString() + ' do not exist';
}...
*/
Client.prototype.topicExists = function (topics, cb) {
var notExistsTopics = [];
var self = this;
async.each(topics, checkZK, function (err) {
if (err) return cb(err);
if (notExistsTopics.length) return cb(new errors.TopicsNotExistError(notExistsTopics
));
cb();
});
function checkZK (topic, cb) {
self.zk.topicExists(topic, function (err, existed, topic) {
if (err) return cb(err);
if (!existed) notExistsTopics.push(topic);
...function Error() { [native code] }n/a
UnknownMemberIdError = function (message) {
Error.captureStackTrace(this, this);
this.message = message;
}n/a
function Error() { [native code] }n/a
function ConsumerGroupMigrator(consumerGroup) {
EventEmitter.call(this);
assert(consumerGroup);
const self = this;
this.consumerGroup = consumerGroup;
this.client = consumerGroup.client;
var verified = 0;
if (consumerGroup.options.migrateRolling) {
this.zk = zookeeper.createClient(consumerGroup.client.connectionString, {retries: 10});
this.zk.on('connected', function () {
self.filterByExistingZkTopics(function (error, topics) {
if (error) {
return self.emit('error', error);
}
if (topics.length) {
self.checkForOwnersAndListenForChange(topics);
} else {
logger.debug('No HLC topics exist in zookeeper.');
self.connectConsumerGroup();
}
});
});
this.on('noOwnersForTopics', function (topics) {
logger.debug('No owners for topics %s reported.', topics);
if (++verified <= NUMER_OF_TIMES_TO_VERIFY) {
logger.debug('%s verify %d of %d HLC has given up ownership by checking again in %d', self.client.clientId, verified,
NUMER_OF_TIMES_TO_VERIFY, VERIFY_WAIT_TIME_MS);
setTimeout(function () {
self.checkForOwners(topics);
}, VERIFY_WAIT_TIME_MS);
} else {
self.connectConsumerGroup();
}
});
this.on('topicOwnerChange', _.debounce(function (topics) {
verified = 0;
self.checkForOwnersAndListenForChange(topics);
}, 250));
this.zk.connect();
} else {
this.connectConsumerGroup();
}
}n/a
function EventEmitter() {
EventEmitter.init.call(this);
}n/a
checkForOwners = function (topics, listenForChange) {
const self = this;
const path = '/consumers/' + this.consumerGroup.options.groupId + '/owners/';
var ownedPartitions = 0;
function topicWatcher (event) {
self.emit('topicOwnerChange', topics);
}
async.each(topics,
function (topic, callback) {
const args = [path + topic];
if (listenForChange) {
logger.debug('%s listening for changes in topic %s', self.client.clientId, topic);
args.push(topicWatcher);
}
args.push(function (error, children, stats) {
if (error) {
return callback(error);
}
ownedPartitions += children.length;
callback(null);
});
self.zk.getChildren.apply(self.zk, args);
},
function (error) {
if (error) {
return self.emit('error', error);
}
if (ownedPartitions === 0) {
self.emit('noOwnersForTopics', topics);
} else {
logger.debug('%s %d partitions are owned by old HLC... waiting...', self.client.clientId, ownedPartitions);
}
}
);
}...
this.on('noOwnersForTopics', function (topics) {
logger.debug('No owners for topics %s reported.', topics);
if (++verified <= NUMER_OF_TIMES_TO_VERIFY) {
logger.debug('%s verify %d of %d HLC has given up ownership by checking again in %d', self.client.clientId, verified
,
NUMER_OF_TIMES_TO_VERIFY, VERIFY_WAIT_TIME_MS);
setTimeout(function () {
self.checkForOwners(topics);
}, VERIFY_WAIT_TIME_MS);
} else {
self.connectConsumerGroup();
}
});
this.on('topicOwnerChange', _.debounce(function (topics) {
...checkForOwnersAndListenForChange = function (topics) {
this.checkForOwners(topics, true);
}...
this.zk.on('connected', function () {
self.filterByExistingZkTopics(function (error, topics) {
if (error) {
return self.emit('error', error);
}
if (topics.length) {
self.checkForOwnersAndListenForChange(topics);
} else {
logger.debug('No HLC topics exist in zookeeper.');
self.connectConsumerGroup();
}
});
});
...connectConsumerGroup = function () {
logger.debug('%s connecting consumer group', this.client.clientId);
const self = this;
if (this.client.ready) {
this.consumerGroup.connect();
} else {
this.client.once('ready', function () {
self.consumerGroup.connect();
});
}
this.zk && this.zk.close();
}...
return self.emit('error', error);
}
if (topics.length) {
self.checkForOwnersAndListenForChange(topics);
} else {
logger.debug('No HLC topics exist in zookeeper.');
self.connectConsumerGroup();
}
});
});
this.on('noOwnersForTopics', function (topics) {
logger.debug('No owners for topics %s reported.', topics);
if (++verified <= NUMER_OF_TIMES_TO_VERIFY) {
...filterByExistingZkTopics = function (callback) {
const self = this;
const path = '/consumers/' + this.consumerGroup.options.groupId + '/owners/';
async.filter(this.consumerGroup.topics, function (topic, cb) {
const topicPath = path + topic;
logger.debug('%s checking zk path %s', self.client.clientId, topicPath);
self.zk.exists(topicPath, function (error, stat) {
if (error) {
return callback(error);
}
cb(stat);
});
}, function (result) {
callback(null, result);
});
}...
this.consumerGroup = consumerGroup;
this.client = consumerGroup.client;
var verified = 0;
if (consumerGroup.options.migrateRolling) {
this.zk = zookeeper.createClient(consumerGroup.client.connectionString, {retries: 10});
this.zk.on('connected', function () {
self.filterByExistingZkTopics(function (error, topics) {
if (error) {
return self.emit('error', error);
}
if (topics.length) {
self.checkForOwnersAndListenForChange(topics);
} else {
...getOffset = function (tp, defaultOffset) {
const offset = _.get(this.offsets, [tp.topic, tp.partition], defaultOffset);
if (offset === -1) {
return defaultOffset;
}
return offset;
}...
if (this.options.outOfRangeOffset === 'none') {
this.emit('error', new errors.InvalidConsumerOffsetError(`Offset out of range for topic "${topic.topic}" partition
${topic.partition}`));
return;
}
topic.time = ACCEPTED_FROM_OFFSET[this.options.outOfRangeOffset];
this.getOffset().fetch([topic], (error, result) => {
if (error) {
this.emit('error', new errors.InvalidConsumerOffsetError(`Fetching ${this.options.outOfRangeOffset} offset failed`,
error));
return;
}
const offset = _.head(result[topic.topic][topic.partition]);
const oldOffset = _.find(this.topicPayloads, {topic: topic.topic, partition: topic.partition}).offset;
...saveHighLevelConsumerOffsets = function (topicPartitionList, callback) {
const self = this;
this.client.sendOffsetFetchRequest(this.consumerGroup.options.groupId, topicPartitionList, function (error, results) {
logger.debug('sendOffsetFetchRequest response:', results, error);
if (error) {
return callback(error);
}
self.offsets = results;
callback(null);
});
}...
if (self.options.fromOffset === 'none') {
return callback(new Error(`${self.client.clientId} owns topics and partitions which contains no saved offsets for group '${
self.options.groupId}'`));
}
async.parallel([
function (callback) {
if (self.migrator) {
return self.migrator.saveHighLevelConsumerOffsets(topicPartitionList, callback);
}
callback(null);
},
function (callback) {
if (useDefaultOffsets) {
return self.saveDefaultOffsets(topicPartitionList, callback);
}
...function ConsumerGroupRecovery(consumerGroup) {
this.consumerGroup = consumerGroup;
this.options = consumerGroup.options;
}n/a
clearError = function () {
this.lastError = null;
}...
self.connecting = false;
self.rebalancing = false;
if (error) {
return self.recovery.tryToRecoverFrom(error, 'connect');
}
self.ready = true;
self.recovery.clearError();
logger.debug('generationId', self.generationId);
if (startFetch) {
self.fetch();
}
self.startHeartbeats();
...getRetryTimeout = function (error) {
assert(error);
if (!this._timeouts) {
this._timeouts = retry.timeouts({
retries: this.options.retries,
factor: this.options.retryFactor,
minTimeout: this.options.retryMinTimeout
});
}
if (this._retryIndex == null || this.lastError == null ||
error.errorCode !== this.lastError.errorCode) {
this._retryIndex = 0;
}
var index = this._retryIndex++;
if (index >= this._timeouts.length) {
return false;
}
return this._timeouts[index];
}...
recoverableItem.handler && recoverableItem.handler.call(this.consumerGroup, error);
return true;
}
return false;
}, this);
if (retry) {
retryTimeout = this.getRetryTimeout(error);
}
if (retry && retryTimeout) {
logger.debug('RECOVERY from %s: %s retrying in %s ms', source, this.consumerGroup.client.clientId, retryTimeout, error
);
this.consumerGroup.scheduleReconnect(retryTimeout);
} else {
this.consumerGroup.emit('error', error);
...tryToRecoverFrom = function (error, source) {
this.consumerGroup.ready = false;
this.consumerGroup.stopHeartbeats();
var retryTimeout = false;
var retry = recoverableErrors.some(function (recoverableItem) {
if (isErrorInstanceOf(error, recoverableItem.errors)) {
recoverableItem.handler && recoverableItem.handler.call(this.consumerGroup, error);
return true;
}
return false;
}, this);
if (retry) {
retryTimeout = this.getRetryTimeout(error);
}
if (retry && retryTimeout) {
logger.debug('RECOVERY from %s: %s retrying in %s ms', source, this.consumerGroup.client.clientId, retryTimeout, error);
this.consumerGroup.scheduleReconnect(retryTimeout);
} else {
this.consumerGroup.emit('error', error);
}
this.lastError = error;
}...
function (syncGroupResponse, callback) {
self.handleSyncGroup(syncGroupResponse, callback);
}
], function (error, startFetch) {
self.connecting = false;
self.rebalancing = false;
if (error) {
return self.recovery.tryToRecoverFrom(error, 'connect');
}
self.ready = true;
self.recovery.clearError();
logger.debug('generationId', self.generationId);
...function setLoggerProvider(provider) {
loggerProvider = provider;
}...
### How do I set a logger provider?
For performance reasons, initialization of the `kafka-node` module creates all necessary loggers. This means that custom logger
providers need to be set *before requiring the `kafka-node` module*. The following example shows how this can be done:
```javascript
// first configure the logger provider
const kafkaLogging = require('kafka-node/logging');
kafkaLogging.setLoggerProvider(consoleLoggerProvider);
// then require kafka-node and continue as normal
const kafka = require('kafka-node');
```
# Running Tests
...CustomPartitioner = function (partitioner) {
this.getPartition = partitioner;
}n/a
CyclicPartitioner = function () {
this.c = 0;
}n/a
DefaultPartitioner = function () {}n/a
KeyedPartitioner = function () {}n/a
RandomPartitioner = function () {}n/a
function decodeFetchResponse(cb, maxTickMessages) {
return function (resp) {
return _decodeFetchResponse(resp, cb, maxTickMessages);
};
}...
broker.socket.end();
});
};
Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
var self = this;
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes);
var decoder = protocol.decodeFetchResponse(function (err, type, message) {
if (err) {
if (err.message === 'OffsetOutOfRange') {
return consumer.emit('offsetOutOfRange', err);
} else if (err.message === 'NotLeaderForPartition' || err.message === 'UnknownTopicOrPartition') {
return self.emit('brokersChanged');
}
...function decodeGroupCoordinatorResponse(resp) {
var result;
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word16bs('errorCode')
.word32bs('coordinatorId')
.word16bs('coordinatorHost')
.tap(function (vars) {
this.buffer('coordinatorHost', vars.coordinatorHost);
vars.coordinatorHost = vars.coordinatorHost.toString();
})
.word32bs('coordinatorPort')
.tap(function (vars) {
if (vars.errorCode !== 0) {
result = createGroupError(vars.errorCode);
return;
}
result = {
coordinatorHost: vars.coordinatorHost,
coordinatorPort: vars.coordinatorPort,
coordinatorId: vars.coordinatorId
};
});
return result;
}n/a
function decodeGroupHeartbeat(resp) {
var result = null;
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word16bs('errorCode')
.tap(function (vars) {
result = createGroupError(vars.errorCode);
});
return result;
}n/a
function decodeJoinGroupResponse(resp) {
var result = {
members: []
};
var error;
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word16bs('errorCode')
.tap(function (vars) {
error = createGroupError(vars.errorCode);
})
.word32bs('generationId')
.tap(function (vars) {
result.generationId = vars.generationId;
})
.word16bs('groupProtocol')
.tap(function (vars) {
this.buffer('groupProtocol', vars.groupProtocol);
result.groupProtocol = vars.groupProtocol = vars.groupProtocol.toString();
})
.word16bs('leaderId')
.tap(function (vars) {
this.buffer('leaderId', vars.leaderId);
result.leaderId = vars.leaderId = vars.leaderId.toString();
})
.word16bs('memberId')
.tap(function (vars) {
this.buffer('memberId', vars.memberId);
result.memberId = vars.memberId = vars.memberId.toString();
})
.word32bs('memberNum')
.loop(function (end, vars) {
if (error) {
return end();
}
if (vars.memberNum-- === 0) return end();
var memberMetadata;
this
.word16bs('groupMemberId').tap(function (vars) {
this.buffer('groupMemberId', vars.groupMemberId);
vars.memberId = vars.groupMemberId.toString();
})
.word32bs('memberMetadata').tap(function (vars) {
if (vars.memberMetadata > -1) {
this.buffer('memberMetadata', vars.memberMetadata);
memberMetadata = decodeGroupData(this.vars.memberMetadata);
memberMetadata.id = vars.memberId;
result.members.push(memberMetadata);
}
});
});
return error || result;
}n/a
function decodeLeaveGroupResponse(resp) {
var error = null;
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word16bs('errorCode')
.tap(function (vars) {
error = createGroupError(vars.errorCode);
});
return error;
}n/a
function decodeMetadataResponse(resp) {
var brokers = {};
var out = {};
var topics = {};
var errors = [];
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word32bs('brokerNum')
.loop(decodeBrokers)
.word32bs('topicNum')
.loop(_decodeTopics);
function decodeBrokers (end, vars) {
if (vars.brokerNum-- === 0) return end();
this.word32bs('nodeId')
.word16bs('host')
.tap(function (vars) {
this.buffer('host', vars.host);
vars.host = vars.host.toString();
})
.word32bs('port')
.tap(function (vars) {
brokers[vars.nodeId] = { nodeId: vars.nodeId, host: vars.host, port: vars.port };
});
}
function _decodeTopics (end, vars) {
if (vars.topicNum-- === 0) return end();
this.word16bs('topicError')
.word16bs('topic')
.tap(function (vars) {
this.buffer('topic', vars.topic);
vars.topic = vars.topic.toString();
})
.word32bs('partitionNum')
.tap(function (vars) {
if (vars.topicError !== 0) {
return errors.push(ERROR_CODE[vars.topicError]);
}
this.loop(decodePartitions);
});
}
function decodePartitions (end, vars) {
if (vars.partitionNum-- === 0) return end();
topics[vars.topic] = topics[vars.topic] || {};
this.word16bs('errorCode')
.word32bs('partition')
.word32bs('leader')
.word32bs('replicasNum')
.tap(function (vars) {
var buffer = this.buffer('replicas', vars.replicasNum * 4).vars.replicas;
this.vars.replicas = bufferToArray(vars.replicasNum, buffer);
})
.word32bs('isrNum')
.tap(function (vars) {
var buffer = this.buffer('isr', vars.isrNum * 4).vars.isr;
this.vars.isr = bufferToArray(vars.isrNum, buffer);
if (vars.errorCode === 0 || vars.errorCode === 9) {
topics[vars.topic][vars.partition] =
new PartitionMetadata(vars.topic, vars.partition, vars.leader, vars.replicas, vars.isr);
} else {
errors.push(ERROR_CODE[vars.errorCode]);
}
});
}
if (!_.isEmpty(errors)) out.error = errors;
out.metadata = topics;
return [brokers, out];
}n/a
function decodeOffsetCommitResponse(resp) {
var topics = {};
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word32bs('topicNum')
.loop(decodeTopics(decodePartitions));
function decodePartitions (end, vars) {
if (--vars.partitionNum === 0) end();
topics[vars.topic] = topics[vars.topic] || {};
this.word32bs('partition')
.word16bs('errorcode')
.tap(function (vars) {
topics[vars.topic]['partition'] = vars.partition;
topics[vars.topic]['errorCode'] = vars.errorcode;
});
}
return topics;
}n/a
function decodeOffsetFetchResponse(resp) {
var topics = {};
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word32bs('topicNum')
.loop(decodeTopics(decodePartitions));
function decodePartitions (end, vars) {
if (--vars.partitionNum === 0) end();
topics[vars.topic] = topics[vars.topic] || {};
this.word32bs('partition')
.word64bs('offset')
.word16bs('metadata')
.tap(function (vars) {
if (vars.metadata === -1) {
return;
}
this.buffer('metadata', vars.metadata);
})
.word16bs('errorCode')
.tap(function (vars) {
topics[vars.topic][vars.partition] = vars.errorCode === 0 ? vars.offset : -1;
});
}
return topics;
}n/a
function decodeOffsetFetchV1Response(resp) {
var topics = {};
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word32bs('topicNum')
.loop(decodeTopics(decodePartitions));
function decodePartitions (end, vars) {
if (--vars.partitionNum === 0) end();
topics[vars.topic] = topics[vars.topic] || {};
this.word32bs('partition')
.word64bs('offset')
.word16bs('metadata')
.tap(function (vars) {
if (vars.metadata === -1) {
return;
}
this.buffer('metadata', vars.metadata);
})
.word16bs('errorCode')
.tap(function (vars) {
if (vars.metadata.length === 0 && vars.offset === 0) {
topics[vars.topic][vars.partition] = -1;
} else {
topics[vars.topic][vars.partition] = vars.errorCode === 0 ? vars.offset : -1;
}
});
}
return topics;
}n/a
function decodeOffsetResponse(resp) {
var topics = {};
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word32bs('topicNum')
.loop(decodeTopics(decodePartitions));
function decodePartitions (end, vars) {
if (--vars.partitionNum === 0) end();
topics[vars.topic] = topics[vars.topic] || {};
this.word32bs('partition')
.word16bs('errorCode')
.word32bs('offsetNum')
.loop(decodeOffsets);
}
function decodeOffsets (end, vars) {
if (--vars.offsetNum <= 0) end();
topics[vars.topic][vars.partition] = topics[vars.topic][vars.partition] || [];
this.word64bs('offset')
.tap(function (vars) {
if (vars.offset != null) topics[vars.topic][vars.partition].push(vars.offset);
});
}
return topics;
}n/a
function decodeProduceResponse(resp) {
var topics = {};
var error;
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word32bs('topicNum')
.loop(decodeTopics(decodePartitions));
function decodePartitions (end, vars) {
if (--vars.partitionNum === 0) end();
topics[vars.topic] = topics[vars.topic] || {};
this.word32bs('partition')
.word16bs('errorCode')
.word64bs('offset')
.tap(function (vars) {
if (vars.errorCode) {
error = new Error(ERROR_CODE[vars.errorCode]);
} else {
topics[vars.topic][vars.partition] = vars.offset;
}
});
}
return error || topics;
}n/a
function decodeSyncGroupResponse(resp) {
var result;
Binary.parse(resp)
.word32bs('size')
.word32bs('correlationId')
.word16bs('errorCode')
.tap(function (vars) {
result = createGroupError(vars.errorCode);
})
.word32bs('memberAssignment')
.tap(function (vars) {
if (result) {
return;
}
this.buffer('memberAssignment', vars.memberAssignment);
result = decodeMemberAssignment(vars.memberAssignment);
});
return result;
}n/a
function encodeFetchRequest(maxWaitMs, minBytes) {
return function encodeFetchRequest (clientId, correlationId, payloads) {
return _encodeFetchRequest(clientId, correlationId, payloads, maxWaitMs, minBytes);
};
}...
broker.socket.closing = true;
broker.socket.end();
});
};
Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
var self = this;
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes);
var decoder = protocol.decodeFetchResponse(function (err, type, message) {
if (err) {
if (err.message === 'OffsetOutOfRange') {
return consumer.emit('offsetOutOfRange', err);
} else if (err.message === 'NotLeaderForPartition' || err.message === 'UnknownTopicOrPartition') {
return self.emit('brokersChanged');
}
...function encodeGroupCoordinatorRequest(clientId, correlationId, groupId) {
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.groupCoordinator);
request.Int16BE(groupId.length).string(groupId);
return encodeRequestWithLength(request.make());
}n/a
function encodeGroupHeartbeat(clientId, correlationId, groupId, generationId, memberId) {
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.heartbeat);
request
.Int16BE(groupId.length).string(groupId)
.Int32BE(generationId)
.Int16BE(memberId.length).string(memberId);
return encodeRequestWithLength(request.make());
}n/a
function encodeJoinGroupRequest(clientId, correlationId, groupId, memberId, sessionTimeout, groupProtocols) {
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.joinGroup);
request
.Int16BE(groupId.length).string(groupId)
.Int32BE(sessionTimeout)
.Int16BE(memberId.length).string(memberId)
.Int16BE(GROUPS_PROTOCOL_TYPE.length).string(GROUPS_PROTOCOL_TYPE)
.Int32BE(groupProtocols.length);
groupProtocols.forEach(encodeGroupProtocol.bind(request));
return encodeRequestWithLength(request.make());
}n/a
function encodeLeaveGroupRequest(clientId, correlationId, groupId, memberId) {
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.leaveGroup);
request
.Int16BE(groupId.length).string(groupId)
.Int16BE(memberId.length).string(memberId);
return encodeRequestWithLength(request.make());
}n/a
function encodeMessageSet(messageSet) {
var buffer = new Buffermaker();
messageSet.forEach(function (message) {
var msg = encodeMessage(message);
buffer.Int64BE(0)
.Int32BE(msg.length)
.string(msg);
});
return buffer.make();
}n/a
function encodeMetadataRequest(clientId, correlationId, topics) {
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.metadata);
request.Int32BE(topics.length);
topics.forEach(function (topic) {
request.Int16BE(topic.length)
.string(topic);
});
return encodeRequestWithLength(request.make());
}...
* containing the topic name, partition, leader number, replica count, and in sync replicas per partition.
*
* @param {Array} topics An array of topics to load the metadata for
* @param {Client~loadMetadataForTopicsCallback} cb Function to call once all metadata is loaded
*/
Client.prototype.loadMetadataForTopics = function (topics, cb) {
var correlationId = this.nextId();
var request = protocol.encodeMetadataRequest(this.clientId, correlationId, topics);
var broker = this.brokerForLeader();
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) {
return cb(new errors.BrokerNotAvailableError('Broker not available'));
}
this.queueCallback(broker.socket, correlationId, [protocol.decodeMetadataResponse, cb]);
...function encodeOffsetCommitRequest(group) {
return function (clientId, correlationId, payloads) {
return _encodeOffsetCommitRequest(clientId, correlationId, group, payloads);
};
}...
payload.messages = [ new Message(0, attributes, '', message) ];
cb();
});
}
};
Client.prototype.sendOffsetCommitRequest = function (group, payloads, cb) {
var encoder = protocol.encodeOffsetCommitRequest(group);
var decoder = protocol.decodeOffsetCommitResponse;
this.send(payloads, encoder, decoder, cb);
};
Client.prototype.sendOffsetCommitV2Request = function (group, generationId, memberId, payloads, cb) {
var encoder = protocol.encodeOffsetCommitV2Request;
var decoder = protocol.decodeOffsetCommitResponse;
...function encodeOffsetCommitV2Request(clientId, correlationId, group, generationId, memberId, payloads) {
payloads = groupByTopic(payloads);
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.offsetCommit, 2);
var topics = Object.keys(payloads);
request.Int16BE(group.length).string(group)
.Int32BE(generationId)
.Int16BE(memberId.length).string(memberId)
.Int64BE(-1)
.Int32BE(topics.length);
topics.forEach(function (topic) {
request.Int16BE(topic.length)
.string(topic);
var partitions = _.toPairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
request.Int32BE(partitions.length);
partitions.forEach(function (p) {
request.Int32BE(p.partition)
.Int64BE(p.offset)
.Int16BE(p.metadata.length)
.string(p.metadata);
});
});
return encodeRequestWithLength(request.make());
}n/a
function encodeOffsetFetchRequest(group) {
return function (clientId, correlationId, payloads) {
return _encodeOffsetFetchRequest(clientId, correlationId, group, payloads);
};
}...
Client.prototype.sendOffsetFetchV1Request = function (group, payloads, cb) {
var encoder = protocol.encodeOffsetFetchV1Request;
var decoder = protocol.decodeOffsetFetchV1Response;
this.sendGroupRequest(encoder, decoder, arguments);
};
Client.prototype.sendOffsetFetchRequest = function (group, payloads, cb) {
var encoder = protocol.encodeOffsetFetchRequest(group);
var decoder = protocol.decodeOffsetFetchResponse;
this.send(payloads, encoder, decoder, cb);
};
Client.prototype.sendOffsetRequest = function (payloads, cb) {
var encoder = protocol.encodeOffsetRequest;
var decoder = protocol.decodeOffsetResponse;
...function encodeOffsetFetchV1Request(clientId, correlationId, group, payloads) {
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.offsetFetch, 1);
var topics = Object.keys(payloads);
request.Int16BE(group.length)
.string(group)
.Int32BE(topics.length);
topics.forEach(function (topic) {
request.Int16BE(topic.length).string(topic)
.Int32BE(payloads[topic].length);
payloads[topic].forEach(function (p) {
request.Int32BE(p);
});
});
return encodeRequestWithLength(request.make());
}n/a
function encodeOffsetRequest(clientId, correlationId, payloads) {
payloads = groupByTopic(payloads);
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.offset);
var topics = Object.keys(payloads);
request.Int32BE(REPLICA_ID)
.Int32BE(topics.length);
topics.forEach(function (topic) {
request.Int16BE(topic.length)
.string(topic);
var partitions = _.toPairs(payloads[topic]).map(function (pairs) { return pairs[1]; });
request.Int32BE(partitions.length);
partitions.forEach(function (p) {
request.Int32BE(p.partition)
.Int64BE(p.time)
.Int32BE(p.maxNum);
});
});
return encodeRequestWithLength(request.make());
}n/a
function encodeProduceRequest(requireAcks, ackTimeoutMs) {
return function (clientId, correlationId, payloads) {
return _encodeProduceRequest(clientId, correlationId, payloads, requireAcks, ackTimeoutMs);
};
}...
Array.prototype.unshift.call(arguments, 'error');
consumer.emit.apply(consumer, arguments);
}
});
};
Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, cb) {
var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs);
var decoder = protocol.decodeProduceResponse;
var self = this;
decoder.requireAcks = requireAcks;
async.each(payloads, buildRequest, function (err) {
if (err) return cb(err);
...function encodeSyncGroupRequest(clientId, correlationId, groupId, generationId, memberId, groupAssignment) {
var request = encodeRequestHeader(clientId, correlationId, REQUEST_TYPE.syncGroup);
request
.Int16BE(groupId.length).string(groupId)
.Int32BE(generationId)
.Int16BE(memberId.length).string(memberId);
if (groupAssignment && groupAssignment.length) {
request.Int32BE(groupAssignment.length);
groupAssignment.forEach(function (assignment) {
request.Int16BE(assignment.memberId.length).string(assignment.memberId)
.string(_encodeMemberAssignment(assignment));
});
} else {
request.Int32BE(0);
}
return encodeRequestWithLength(request.make());
}n/a
FetchRequest = function () {
for (var i = 0; i < args.length; i++) {
this[args[i]] = arguments[i];
}
}n/a
FetchResponse = function () {
for (var i = 0; i < args.length; i++) {
this[args[i]] = arguments[i];
}
}n/a
function KeyedMessage(key, value) {
exports.Message.call(this, 0, 0, key, value);
}n/a
Message = function () {
for (var i = 0; i < args.length; i++) {
this[args[i]] = arguments[i];
}
}n/a
OffsetCommitRequest = function () {
for (var i = 0; i < args.length; i++) {
this[args[i]] = arguments[i];
}
}n/a
OffsetCommitResponse = function () {
for (var i = 0; i < args.length; i++) {
this[args[i]] = arguments[i];
}
}n/a
PartitionMetadata = function () {
for (var i = 0; i < args.length; i++) {
this[args[i]] = arguments[i];
}
}n/a
ProduceRequest = function () {
for (var i = 0; i < args.length; i++) {
this[args[i]] = arguments[i];
}
}n/a
Request = function () {
for (var i = 0; i < args.length; i++) {
this[args[i]] = arguments[i];
}
}n/a
TopicAndPartition = function () {
for (var i = 0; i < args.length; i++) {
this[args[i]] = arguments[i];
}
}n/a
function assignRange(topicPartition, groupMembers, callback) {
logger.debug('topicPartition: %j', topicPartition);
var assignment = _(groupMembers).map('id').reduce(function (obj, id) {
obj[id] = [];
return obj;
}, {});
const topicMemberMap = topicToMemberMap(groupMembers);
for (var topic in topicMemberMap) {
if (!topicMemberMap.hasOwnProperty(topic)) {
continue;
}
logger.debug('For topic %s', topic);
topicMemberMap[topic].sort();
logger.debug(' members: ', topicMemberMap[topic]);
var numberOfPartitionsForTopic = topicPartition[topic].length;
logger.debug(' numberOfPartitionsForTopic', numberOfPartitionsForTopic);
var numberOfMembersForTopic = topicMemberMap[topic].length;
logger.debug(' numberOfMembersForTopic', numberOfMembersForTopic);
var numberPartitionsPerMember = Math.floor(numberOfPartitionsForTopic / numberOfMembersForTopic);
logger.debug(' numberPartitionsPerMember', numberPartitionsPerMember);
var membersWithExtraPartition = numberOfPartitionsForTopic % numberOfMembersForTopic;
var topicPartitionList = createTopicPartitionArray(topic, numberOfPartitionsForTopic);
for (var i = 0, n = numberOfMembersForTopic; i < n; i++) {
var start = numberPartitionsPerMember * i + Math.min(i, membersWithExtraPartition);
var length = numberPartitionsPerMember + (i + 1 > membersWithExtraPartition ? 0 : 1);
var assignedTopicPartitions = assignment[topicMemberMap[topic][i]];
assignedTopicPartitions.push.apply(assignedTopicPartitions, topicPartitionList.slice(start, start + length));
}
}
logger.debug(assignment);
callback(null, convertToAssignmentList(assignment, VERSION));
}...
}
this.protocols = protocols.map(function (protocol) {
if (typeof protocol === 'string') {
if (!(protocol in builtInProtocols)) {
throw new Error('Unknown built in assignment protocol ' + protocol);
}
protocol = _.assign({}, builtInProtocols[protocol]);
} else {
checkProtocol(protocol);
}
protocol.subscription = this.topics;
return protocol;
}, this);
...function assignRoundRobin(topicPartition, groupMembers, callback) {
logger.debug('topicPartition: %j', topicPartition);
logger.debug('groupMembers: %j', groupMembers);
var _members = _(groupMembers).map('id');
var members = _members.value().sort();
logger.debug('members', members);
var assignment = _members.reduce(function (obj, id) {
obj[id] = [];
return obj;
}, {});
var subscriberMap = groupMembers.reduce(function (subscribers, member) {
subscribers[member.id] = member.subscription;
return subscribers;
}, {});
logger.debug('subscribers', subscriberMap);
// layout topic/partitions pairs into a list
var topicPartitionList = _(topicPartition).map(function (partitions, topic) {
return partitions.map(function (partition) {
return {
topic: topic,
partition: partition
};
});
}).flatten().value();
logger.debug('round robin on topic partition pairs: ', topicPartitionList);
var assigner = cycle(members);
topicPartitionList.forEach(function (tp) {
var topic = tp.topic;
while (!_.includes(subscriberMap[assigner.peek()], topic)) {
assigner.next();
}
assignment[assigner.next()].push(tp);
});
var ret = _.map(assignment, function (value, key) {
var ret = {};
ret.memberId = key;
ret.topicPartitions = groupPartitionsByTopic(value);
ret.version = VERSION;
return ret;
});
callback(null, ret);
}...
}
this.protocols = protocols.map(function (protocol) {
if (typeof protocol === 'string') {
if (!(protocol in builtInProtocols)) {
throw new Error('Unknown built in assignment protocol ' + protocol);
}
protocol = _.assign({}, builtInProtocols[protocol]);
} else {
checkProtocol(protocol);
}
protocol.subscription = this.topics;
return protocol;
}, this);
...function decodeSnappy(buffer, cb) {
if (isChunked(buffer)) {
var pos = 16;
var max = buffer.length;
var encoded = [];
var size;
while (pos < max) {
size = buffer.readUInt32BE(pos);
pos += 4;
encoded.push(buffer.slice(pos, pos + size));
pos += size;
}
return async.mapSeries(encoded, snappy.uncompress,
function (err, decodedChunks) {
if (err) return cb(err);
return cb(null, Buffer.concat(decodedChunks));
}
);
}
return snappy.uncompress(buffer, cb);
}...
value: vars.value,
offset: vars.offset,
partition: partition,
highWaterOffset: highWaterOffset,
key: vars.key
});
}
codec.decode(vars.value, function (error, inlineMessageSet) {
if (error) return; // Not sure where to report this
decodeMessageSet(topic, partition, inlineMessageSet, cb, maxTickMessages);
});
}
});
// Defensive code around potential denial of service
if (maxTickMessages && messageCount > maxTickMessages) break;
...encode = function (input, callback) {
if (!(typeof (input) === 'string' || Buffer.isBuffer(input))) {
return callback(new Error('input must be a String or a Buffer'));
}
binding.compress(input, callback);
}...
function buildRequest (payload, cb) {
var attributes = payload.attributes;
var codec = getCodec(attributes);
if (!codec) return cb();
var innerSet = encodeMessageSet(payload.messages);
codec.encode(innerSet, function (err, message) {
if (err) return cb(err);
payload.messages = [ new Message(0, attributes, '', message) ];
cb();
});
}
};
...function createTopicPartitionList(topicPartitions) {
var tpList = [];
for (var topic in topicPartitions) {
if (!topicPartitions.hasOwnProperty(topic)) {
continue;
}
topicPartitions[topic].forEach(function (partition) {
tpList.push({
topic: topic,
partition: partition
});
});
}
return tpList;
}n/a
function groupPartitionsByTopic(topicPartitions) {
assert(Array.isArray(topicPartitions));
return topicPartitions.reduce(function (result, tp) {
if (!(tp.topic in result)) {
result[tp.topic] = [tp.partition];
} else {
result[tp.topic].push(tp.partition);
}
return result;
}, {});
}n/a
function validateConfig(property, value) {
if (!legalChars.test(value)) {
throw new InvalidConfigError([property, value, "is illegal, contains a character other than ASCII alphanumerics, '.', '_' and
'-'"].join(' '));
}
}...
this.paused = this.options.paused;
this.id = nextId();
this.payloads = this.buildPayloads(topics);
this.connect();
this.encoding = this.options.encoding;
if (this.options.groupId) {
utils.validateConfig('options.groupId', this.options.groupId);
}
};
util.inherits(Consumer, events.EventEmitter);
Consumer.prototype.buildPayloads = function (payloads) {
var self = this;
return payloads.map(function (p) {
...function validateTopicNames(topics) {
// Rewriting same validations done by Apache Kafka team for topics
// iterating over topics
topics.some(function (topic) {
if (topic.length <= 0) {
throw new InvalidConfigError('topic name is illegal, cannot be empty');
}
if (topic === '.' || topic === '..') {
throw new InvalidConfigError('topic name cannot be . or ..');
}
if (topic.length > allowedTopicLength) {
throw new InvalidConfigError(`topic name is illegal, cannot be longer than ${allowedTopicLength} characters`);
}
if (!legalChars.test(topic)) {
throw new InvalidConfigError(`topic name ${topic} is illegal, contains a character other than ASCII alphanumerics .,_ and -`);
}
});
return true;
}n/a
function validateTopics(topics) {
if (topics.some(function (topic) {
if ('partition' in topic) {
return typeof topic.partition !== 'number';
}
return false;
})) {
throw new InvalidConfigError('Offset must be a number and can not contain characters');
}
}...
})();
var Consumer = function (client, topics, options) {
if (!topics) {
throw new Error('Must have payloads');
}
utils.validateTopics(topics);
this.fetchCount = 0;
this.client = client;
this.options = _.defaults((options || {}), DEFAULTS);
this.ready = false;
this.paused = this.options.paused;
this.id = nextId();
...Zookeeper = function (connectionString, options) {
this.client = zookeeper.createClient(connectionString, options);
var that = this;
this.client.on('connected', function () {
that.listBrokers();
});
this.client.on('disconnected', function () {
that.emit('disconnected');
});
this.client.connect();
}n/a
ZookeeperConsumerMappings = function () {
this.consumerTopicMap = {};
this.topicConsumerMap = {};
this.topicPartitionMap = {};
}n/a