From ff35e0154f3ed318e09242d49d077528bd4db2cf Mon Sep 17 00:00:00 2001 From: Peter Wilcsinszky Date: Fri, 27 Sep 2024 11:46:43 +0200 Subject: [PATCH] feat: rdkafka max_send_limit_bytes Signed-off-by: Peter Wilcsinszky --- .../crds/logging.banzaicloud.io_clusteroutputs.yaml | 4 ++++ .../crds/logging.banzaicloud.io_outputs.yaml | 4 ++++ config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml | 4 ++++ config/crd/bases/logging.banzaicloud.io_outputs.yaml | 4 ++++ docs/configuration/plugins/outputs/kafka.md | 6 ++++++ pkg/sdk/logging/model/output/kafka.go | 2 ++ 6 files changed, 24 insertions(+) diff --git a/charts/logging-operator/crds/logging.banzaicloud.io_clusteroutputs.yaml b/charts/logging-operator/crds/logging.banzaicloud.io_clusteroutputs.yaml index ca0421420..49866232a 100644 --- a/charts/logging-operator/crds/logging.banzaicloud.io_clusteroutputs.yaml +++ b/charts/logging-operator/crds/logging.banzaicloud.io_clusteroutputs.yaml @@ -3215,6 +3215,8 @@ spec: x-kubernetes-map-type: atomic type: object type: object + max_send_limit_bytes: + type: integer max_send_retries: type: integer message_key_key: @@ -10616,6 +10618,8 @@ spec: x-kubernetes-map-type: atomic type: object type: object + max_send_limit_bytes: + type: integer max_send_retries: type: integer message_key_key: diff --git a/charts/logging-operator/crds/logging.banzaicloud.io_outputs.yaml b/charts/logging-operator/crds/logging.banzaicloud.io_outputs.yaml index e9e93c3ec..71a9817b4 100644 --- a/charts/logging-operator/crds/logging.banzaicloud.io_outputs.yaml +++ b/charts/logging-operator/crds/logging.banzaicloud.io_outputs.yaml @@ -3211,6 +3211,8 @@ spec: x-kubernetes-map-type: atomic type: object type: object + max_send_limit_bytes: + type: integer max_send_retries: type: integer message_key_key: @@ -9886,6 +9888,8 @@ spec: x-kubernetes-map-type: atomic type: object type: object + max_send_limit_bytes: + type: integer max_send_retries: type: integer message_key_key: diff --git a/config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml b/config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml index ca0421420..49866232a 100644 --- a/config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml +++ b/config/crd/bases/logging.banzaicloud.io_clusteroutputs.yaml @@ -3215,6 +3215,8 @@ spec: x-kubernetes-map-type: atomic type: object type: object + max_send_limit_bytes: + type: integer max_send_retries: type: integer message_key_key: @@ -10616,6 +10618,8 @@ spec: x-kubernetes-map-type: atomic type: object type: object + max_send_limit_bytes: + type: integer max_send_retries: type: integer message_key_key: diff --git a/config/crd/bases/logging.banzaicloud.io_outputs.yaml b/config/crd/bases/logging.banzaicloud.io_outputs.yaml index e9e93c3ec..71a9817b4 100644 --- a/config/crd/bases/logging.banzaicloud.io_outputs.yaml +++ b/config/crd/bases/logging.banzaicloud.io_outputs.yaml @@ -3211,6 +3211,8 @@ spec: x-kubernetes-map-type: atomic type: object type: object + max_send_limit_bytes: + type: integer max_send_retries: type: integer message_key_key: @@ -9886,6 +9888,8 @@ spec: x-kubernetes-map-type: atomic type: object type: object + max_send_limit_bytes: + type: integer max_send_retries: type: integer message_key_key: diff --git a/docs/configuration/plugins/outputs/kafka.md b/docs/configuration/plugins/outputs/kafka.md index 5fd0e135c..e0f29a642 100644 --- a/docs/configuration/plugins/outputs/kafka.md +++ b/docs/configuration/plugins/outputs/kafka.md @@ -146,6 +146,12 @@ Default: nil ### keytab (*secret.Secret, optional) {#kafka-keytab} +### max_send_limit_bytes (int, optional) {#kafka-max_send_limit_bytes} + +Max byte size to send message to avoid MessageSizeTooLarge. Messages over the limit will be dropped + +Default: no limit + ### max_send_retries (int, optional) {#kafka-max_send_retries} Number of times to retry sending of messages to a leader diff --git a/pkg/sdk/logging/model/output/kafka.go b/pkg/sdk/logging/model/output/kafka.go index f920ef2e6..fcbb06aef 100644 --- a/pkg/sdk/logging/model/output/kafka.go +++ b/pkg/sdk/logging/model/output/kafka.go @@ -111,6 +111,8 @@ type KafkaOutputConfig struct { ScramMechanism string `json:"scram_mechanism,omitempty"` // Number of times to retry sending of messages to a leader (default: 1) MaxSendRetries int `json:"max_send_retries,omitempty"` + // Max byte size to send message to avoid MessageSizeTooLarge. Messages over the limit will be dropped (default: no limit) + MaxSendLimitBytes int `json:"max_send_limit_bytes,omitempty"` // The number of acks required per request (default: -1). RequiredAcks int `json:"required_acks,omitempty"` // How long the producer waits for acks. The unit is seconds (default: nil => Uses default of ruby-kafka library)