Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add global consume timeout #1061

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
100 changes: 97 additions & 3 deletions e2e/both.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ describe('Consumer/Producer', function() {

});
});

it('should return ready messages on partition EOF', function(done) {
crypto.randomBytes(4096, function(ex, buffer) {
producer.setPollInterval(10);
Expand Down Expand Up @@ -221,7 +221,7 @@ describe('Consumer/Producer', function() {
consumer.once('data', function(msg) {
events.push("data");
});

consumer.once('partition.eof', function(eof) {
events.push("partition.eof");
});
Expand Down Expand Up @@ -272,6 +272,100 @@ describe('Consumer/Producer', function() {
});
});

it('should fill a batch when there is no total consume timeout set', function(done) {
crypto.randomBytes(4096, function(ex, buffer) {
producer.setPollInterval(10);

producer.once('delivery-report', function(err, report) {
t.ifError(err);
});

consumer.subscribe([topic]);

var events = [];

consumer.once('data', function(msg) {
events.push("data");
});

consumer.on('partition.eof', function(eof) {
events.push("partition.eof");
});

// Produce 10 messages, 500ms apart — the whole batch should fill, but the
// time taken for the consume call should be >=5000ms.

let timeoutId;
let toProduce = 10;
produceLoop = () => {
producer.produce(topic, null, buffer, null);
if (--toProduce > 0) {
timeoutId = setTimeout(produceLoop, 500);
}
};
produceLoop();

consumer.setDefaultConsumeTimeout(1000);
const started = Date.now();
consumer.consume(10, function(err, messages) {
t.ifError(err);
t(messages.length >= 10, 'Too few messages consumed within batch');
t(Date.now() - started < 5000, 'Consume finished too fast, should have taken at least 5 seconds')
clearTimeout(timeoutId);
done();
});
});
});

it('should not fill a batch when there is a total consume timeout set', function(done) {
crypto.randomBytes(4096, function(ex, buffer) {
producer.setPollInterval(10);

producer.once('delivery-report', function(err, report) {
t.ifError(err);
});

consumer.subscribe([topic]);

var events = [];

consumer.once('data', function(msg) {
events.push("data");
});

consumer.on('partition.eof', function(eof) {
events.push("partition.eof");
});

// Produce 20 messages, 900ms apart — the whole batch should *not* fill,
// we should only get 11 messages (11*900 = 9900, 9900 < 10000).

let timeoutId;
let toProduce = 20;
produceLoop = () => {
producer.produce(topic, null, buffer, null);
if (--toProduce > 0) {
timeoutId = setTimeout(produceLoop, 900);
}
};
produceLoop();

consumer.setDefaultConsumeTimeout(1000);
consumer.setDefaultTotalConsumeTimeout(10000);
const started = Date.now();
consumer.consume(100, function(err, messages) {
t.ifError(err);
// Why 13? First message is produced immediately, then 11 more are
// produced over 990 seconds, and then a 13th is produced and consumed
// in the "final" loop where elapsed time > 10,000.
t(messages.length == 13, 'Batch should have consumed 13 messages, instead consumed ' + messages.length + ' messages');
t((Date.now() - started) < (10000 + 1000), 'Consume took ' + (Date.now() - started) + ', longer than global timeout + single message timeout of ' + (10000 + 1000));
clearTimeout(timeoutId);
done();
});
});
});

it('should be able to produce and consume messages: consumeLoop', function(done) {
var key = 'key';

Expand Down Expand Up @@ -409,7 +503,7 @@ describe('Consumer/Producer', function() {
];
run_headers_test(done, headers);
});

it('should be able to produce and consume messages with multiple headers value as string: consumeLoop', function(done) {
var headers = [
{ key1: 'value1' },
Expand Down
4 changes: 3 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
committed(timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this;

consume(number: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void;
consume(cb: (err: LibrdKafkaError, messages: Message) => void): void;
consume(): void;

getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets;
Expand All @@ -237,6 +237,8 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {

setDefaultConsumeTimeout(timeoutMs: number): void;

setDefaultTotalConsumeTimeout(totalTimeoutMs: number): void

setDefaultConsumeLoopTimeoutDelay(timeoutMs: number): void;

subscribe(topics: SubscribeTopicList): this;
Expand Down
24 changes: 18 additions & 6 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ function KafkaConsumer(conf, topicConf) {
this.topicConfig = topicConf;

this._consumeTimeout = DEFAULT_CONSUME_TIME_OUT;
this._consumeTotalTimeout = undefined;
this._consumeLoopTimeoutDelay = DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY;
}

Expand All @@ -143,6 +144,15 @@ KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) {
this._consumeTimeout = timeoutMs;
};

/**
* Set the default total consume operation timeout provided to c++land
* @param {number} totalTimeoutMs - maximum number of milliseconds to allow a
* batch consume operation to run
*/
KafkaConsumer.prototype.setDefaultTotalConsumeTimeout = function(totalTimeoutMs) {
this._consumeTotalTimeout = totalTimeoutMs;
};

/**
* Set the default sleep delay for the next consume loop after the previous one has timed out.
* @param {number} intervalMs - number of milliseconds to sleep after a message fetch has timed out
Expand Down Expand Up @@ -386,19 +396,20 @@ KafkaConsumer.prototype.unsubscribe = function() {
*/
KafkaConsumer.prototype.consume = function(number, cb) {
var timeoutMs = this._consumeTimeout !== undefined ? this._consumeTimeout : DEFAULT_CONSUME_TIME_OUT;
var self = this;

if ((number && typeof number === 'number') || (number && cb)) {
// Undefined is OK here — the default is to not upper bound the consume call's
// total time spent, to avoid breaking changes.
var totalTimeoutMs = this._consumeTotalTimeout

if ((number && typeof number === 'number') || (number && cb)) {
if (cb === undefined) {
cb = function() {};
} else if (typeof cb !== 'function') {
throw new TypeError('Callback must be a function');
}

this._consumeNum(timeoutMs, number, cb);
this._consumeNum(timeoutMs, totalTimeoutMs, number, cb);
} else {

// See https://github.com/Blizzard/node-rdkafka/issues/220
// Docs specify just a callback can be provided but really we needed
// a fallback to the number argument
Expand Down Expand Up @@ -426,6 +437,7 @@ KafkaConsumer.prototype.consume = function(number, cb) {
KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) {
var self = this;
var retryReadInterval = this._consumeLoopTimeoutDelay;

self._client.consumeLoop(timeoutMs, retryReadInterval, function readCallback(err, message, eofEvent, warning) {

if (err) {
Expand Down Expand Up @@ -460,10 +472,10 @@ KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) {
* @private
* @see consume
*/
KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) {
KafkaConsumer.prototype._consumeNum = function(timeoutMs, totalTimeoutMs, numMessages, cb) {
var self = this;

this._client.consume(timeoutMs, numMessages, function(err, messages, eofEvents) {
this._client.consume(timeoutMs, totalTimeoutMs, numMessages, function(err, messages, eofEvents) {
if (err) {
err = LibrdKafkaError.create(err);
if (cb) {
Expand Down
67 changes: 32 additions & 35 deletions src/kafka-consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) {

if (info.Length() < 3) {
// Just throw an exception
return Nan::ThrowError("Invalid number of parameters");
return Nan::ThrowError("Invalid number of parameters, expected 3");
}

if (!info[0]->IsNumber()) {
Expand Down Expand Up @@ -1106,11 +1106,12 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) {
NAN_METHOD(KafkaConsumer::NodeConsume) {
Nan::HandleScope scope;

if (info.Length() < 2) {
// Just throw an exception
return Nan::ThrowError("Invalid number of parameters");
if (info.Length() != 4) {
// Just throw an exception — we didn't get enough arguments
return Nan::ThrowError("Invalid number of arguments, expected 4");
}

// The first argument should always be the timeout.
int timeout_ms;
Nan::Maybe<uint32_t> maybeTimeout =
Nan::To<uint32_t>(info[0].As<v8::Number>());
Expand All @@ -1121,40 +1122,36 @@ NAN_METHOD(KafkaConsumer::NodeConsume) {
timeout_ms = static_cast<int>(maybeTimeout.FromJust());
}

if (info[1]->IsNumber()) {
if (!info[2]->IsFunction()) {
return Nan::ThrowError("Need to specify a callback");
}

v8::Local<v8::Number> numMessagesNumber = info[1].As<v8::Number>();
Nan::Maybe<uint32_t> numMessagesMaybe = Nan::To<uint32_t>(numMessagesNumber); // NOLINT

uint32_t numMessages;
if (numMessagesMaybe.IsNothing()) {
return Nan::ThrowError("Parameter must be a number over 0");
} else {
numMessages = numMessagesMaybe.FromJust();
}

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());

v8::Local<v8::Function> cb = info[2].As<v8::Function>();
Nan::Callback *callback = new Nan::Callback(cb);
Nan::AsyncQueueWorker(
new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms)); // NOLINT

// Parse the total timeout — how long to wait
uint total_timeout_ms;
Nan::Maybe<uint32_t> maybeTotalTimeout =
Nan::To<uint32_t>(info[1].As<v8::Number>());
if (maybeTotalTimeout.IsNothing()) {
total_timeout_ms = -1;
} else {
if (!info[1]->IsFunction()) {
return Nan::ThrowError("Need to specify a callback");
}
total_timeout_ms = static_cast<uint>(maybeTotalTimeout.FromJust());
}

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
// Parse the # of messages to read
v8::Local<v8::Number> numMessagesNumber = info[2].As<v8::Number>();
Nan::Maybe<uint32_t> numMessagesMaybe = Nan::To<uint32_t>(numMessagesNumber); // NOLINT
uint32_t numMessages;
if (numMessagesMaybe.IsNothing()) {
return Nan::ThrowError("Number of messages to consume must be a number over 0");
} else {
numMessages = numMessagesMaybe.FromJust();
}

v8::Local<v8::Function> cb = info[1].As<v8::Function>();
Nan::Callback *callback = new Nan::Callback(cb);
Nan::AsyncQueueWorker(
new Workers::KafkaConsumerConsume(callback, consumer, timeout_ms));
// Check that the callback is configured properly
if (!info[3]->IsFunction()) {
return Nan::ThrowError("Need to specify a callback");
}
v8::Local<v8::Function> cb = info[3].As<v8::Function>();
Nan::Callback *callback = new Nan::Callback(cb);

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
Nan::AsyncQueueWorker(
new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, total_timeout_ms)); // NOLINT

info.GetReturnValue().Set(Nan::Null());
}
Expand Down Expand Up @@ -1195,7 +1192,7 @@ NAN_METHOD(KafkaConsumer::NodeDisconnect) {
// cleanup the async worker
consumeLoop->WorkComplete();
consumeLoop->Destroy();

consumer->m_consume_loop = nullptr;
}

Expand Down
Loading