From 8f7c509df9c0bc44ffe56c0a9998433311c8ea8f Mon Sep 17 00:00:00 2001 From: CoreidCC Date: Wed, 20 Nov 2024 17:13:16 +0100 Subject: [PATCH 1/3] in_kafka: add support to switch to auto-commit Polling every 1ms and committing each message individually results in rather pure performance in high volume Kafka clusters. Commiting in batches (relay on auto-commit of kafka) drastically improves performance. Signed-off-by: CoreidCC --- plugins/in_kafka/in_kafka.c | 12 ++++++++++-- plugins/in_kafka/in_kafka.h | 2 ++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index 5d149d731e3..0235a96c57e 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -180,8 +180,11 @@ static int in_kafka_collect(struct flb_input_instance *ins, rd_kafka_message_destroy(rkm); - /* TO-DO: commit the record based on `ret` */ - rd_kafka_commit(ctx->kafka.rk, NULL, 0); + + if(!ctx->enable_auto_commit) { + /* TO-DO: commit the record based on `ret` */ + rd_kafka_commit(ctx->kafka.rk, NULL, 0); + } /* Break from the loop when reaching the limit of polling if available */ if (ctx->polling_threshold != FLB_IN_KAFKA_UNLIMITED && @@ -428,6 +431,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, buffer_max_size), "Set the maximum size of chunk" }, + { + FLB_CONFIG_MAP_BOOL, "enable_auto_commit", FLB_IN_KAFKA_ENABLE_AUTO_COMMIT, + 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit), + "Relay on kafka auto-commit and commit messages in batches" + }, /* EOF */ {0} }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index b56d9c66893..41328193b0e 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -32,6 +32,7 @@ #define FLB_IN_KAFKA_DEFAULT_FORMAT "none" #define FLB_IN_KAFKA_UNLIMITED (size_t)-1 #define FLB_IN_KAFKA_BUFFER_MAX_SIZE "4M" +#define FLB_IN_KAFKA_ENABLE_AUTO_COMMIT "false" enum { FLB_IN_KAFKA_FORMAT_NONE, @@ -48,6 +49,7 @@ struct flb_in_kafka_config { int coll_fd; size_t buffer_max_size; /* Maximum size of chunk allocation */ size_t polling_threshold; + bool enable_auto_commit; }; #endif From 6357338f983ebcf69f9e1714dc8ccbf2f842cdef Mon Sep 17 00:00:00 2001 From: CoreidCC Date: Wed, 20 Nov 2024 17:22:27 +0100 Subject: [PATCH 2/3] in_kafka: increase the poll-timeout if we run in own own thread having 1ms timeout might make sense if the input plugin is running in the main thread (not introducing delay for others). but if we run in our very own thread then we should not over- ride the fetch.wait.max.ms configuration value from the kafka-consumer. this in conjuntion with using autocommit again boosts the throuhput significantly. Signed-off-by: CoreidCC --- plugins/in_kafka/in_kafka.c | 17 ++++++++++++++++- plugins/in_kafka/in_kafka.h | 1 + 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index 0235a96c57e..b20d59add98 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -161,7 +161,7 @@ static int in_kafka_collect(struct flb_input_instance *ins, ret = FLB_EVENT_ENCODER_SUCCESS; while (ret == FLB_EVENT_ENCODER_SUCCESS) { - rkm = rd_kafka_consumer_poll(ctx->kafka.rk, 1); + rkm = rd_kafka_consumer_poll(ctx->kafka.rk, ctx->poll_timeount_ms); if (!rkm) { break; @@ -246,6 +246,21 @@ static int in_kafka_init(struct flb_input_instance *ins, goto init_error; } + /* Set the kafka poll timeout dependend on wether we run in our own + * or in the main event thread. + * a) run in main event thread: + * -> minimize the delay we might create + * b) run in our own thread: + * -> optimize for throuput and relay on 'fetch.wait.max.ms' + * which is set to 500 by default default. lets set it to + * twice that so that increasing fetch.wait.max.ms still + * has an effect. + */ + ctx->poll_timeount_ms = 1; + if(ins->is_threaded) { + ctx->poll_timeount_ms = 1000; + } + if (ctx->buffer_max_size > 0) { ctx->polling_threshold = ctx->buffer_max_size; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index 41328193b0e..7eca5f5341f 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -50,6 +50,7 @@ struct flb_in_kafka_config { size_t buffer_max_size; /* Maximum size of chunk allocation */ size_t polling_threshold; bool enable_auto_commit; + int poll_timeount_ms; }; #endif From f94cf72154f77659d8b54c12f2f45a4abd21dd50 Mon Sep 17 00:00:00 2001 From: CoreidCC Date: Fri, 22 Nov 2024 18:38:03 +0100 Subject: [PATCH 3/3] in_kafka: fix type in help text Signed-off-by: CoreidCC --- plugins/in_kafka/in_kafka.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index b20d59add98..df1605af31c 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -449,7 +449,7 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_BOOL, "enable_auto_commit", FLB_IN_KAFKA_ENABLE_AUTO_COMMIT, 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit), - "Relay on kafka auto-commit and commit messages in batches" + "Rely on kafka auto-commit and commit messages in batches" }, /* EOF */ {0}