From af8e92c3dd5397fa375b28e5779a7ed3559ea015 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Mon, 7 Oct 2024 10:15:00 +0100 Subject: [PATCH] Add redpanda output --- cmd/tools/docs_gen/schema_test.go | 8 +- .../components/pages/outputs/redpanda.adoc | 336 ++++++++++++++++++ .../impl/kafka/enterprise/redpanda_output.go | 94 +++++ internal/plugins/info.csv | 1 + 4 files changed, 437 insertions(+), 2 deletions(-) create mode 100644 docs/modules/components/pages/outputs/redpanda.adoc create mode 100644 internal/impl/kafka/enterprise/redpanda_output.go diff --git a/cmd/tools/docs_gen/schema_test.go b/cmd/tools/docs_gen/schema_test.go index 3a1a54a5cf..573c06107b 100644 --- a/cmd/tools/docs_gen/schema_test.go +++ b/cmd/tools/docs_gen/schema_test.go @@ -21,12 +21,16 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/connect/v4/public/schema" + _ "github.com/redpanda-data/connect/v4/public/components/all" ) func TestComponentExamples(t *testing.T) { - env := service.GlobalEnvironment() - linter := env.FullConfigSchema("", "").NewStreamConfigLinter() + sch := schema.Standard("", "") + env := sch.Environment() + + linter := sch.NewStreamConfigLinter() linter.SetRejectDeprecated(true) linter.SetSkipEnvVarCheck(true) diff --git a/docs/modules/components/pages/outputs/redpanda.adoc b/docs/modules/components/pages/outputs/redpanda.adoc new file mode 100644 index 0000000000..7a0ec71f13 --- /dev/null +++ b/docs/modules/components/pages/outputs/redpanda.adoc @@ -0,0 +1,336 @@ += redpanda +:type: output +:status: beta +:categories: ["Services"] + + + +//// + THIS FILE IS AUTOGENERATED! + + To make changes, edit the corresponding source file under: + + https://github.com/redpanda-data/connect/tree/main/internal/impl/. + + And: + + https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl +//// + +// © 2024 Redpanda Data Inc. + + +component_type_dropdown::[] + + +TODO + + +[tabs] +====== +Common:: ++ +-- + +```yml +# Common config fields, showing default values +output: + label: "" + redpanda: + topic: "" # No default (required) + key: "" # No default (optional) + partition: ${! meta("partition") } # No default (optional) + metadata: + include_prefixes: [] + include_patterns: [] + max_in_flight: 10 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" +``` + +-- +Advanced:: ++ +-- + +```yml +# All config fields, showing default values +output: + label: "" + redpanda: + topic: "" # No default (required) + key: "" # No default (optional) + partition: ${! meta("partition") } # No default (optional) + metadata: + include_prefixes: [] + include_patterns: [] + timestamp: ${! timestamp_unix() } # No default (optional) + max_in_flight: 10 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" + processors: [] # No default (optional) +``` + +-- +====== + +== Examples + +[tabs] +====== +Simple Output:: ++ +-- + +Data is generated and written to a topic bar, targetting the cluster configured within the redpanda block at the bottom. This is useful as it allows us to configure TLS and SASL only once for potentially multiple inputs and outputs. + +```yaml +input: + generate: + interval: 1s + mapping: 'root.name = fake("name")' + +pipeline: + processors: + - mutation: | + root.id = uuid_v4() + root.loud_name = this.name.uppercase() + +output: + redpanda: + topic: bar + key: ${! @id } + +redpanda: + seed_brokers: [ "127.0.0.1:9093" ] + tls: + enabled: true + sasl: + - mechanism: SCRAM-SHA-512 + password: bar + username: foo +``` + +-- +====== + +== Fields + +=== `topic` + +A topic to write messages to. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + + +=== `key` + +An optional key to populate for each message. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + + +=== `partition` + +An optional explicit partition to set for each message. This field is only relevant when the `partitioner` is set to `manual`. The provided interpolation string must be a valid integer. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + + +```yml +# Examples + +partition: ${! meta("partition") } +``` + +=== `metadata` + +Determine which (if any) metadata values should be added to messages as headers. + + +*Type*: `object` + + +=== `metadata.include_prefixes` + +Provide a list of explicit metadata key prefixes to match against. + + +*Type*: `array` + +*Default*: `[]` + +```yml +# Examples + +include_prefixes: + - foo_ + - bar_ + +include_prefixes: + - kafka_ + +include_prefixes: + - content- +``` + +=== `metadata.include_patterns` + +Provide a list of explicit metadata key regular expression (re2) patterns to match against. + + +*Type*: `array` + +*Default*: `[]` + +```yml +# Examples + +include_patterns: + - .* + +include_patterns: + - _timestamp_unix$ +``` + +=== `timestamp` + +An optional timestamp to set for each message. When left empty, the current timestamp is used. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + + +```yml +# Examples + +timestamp: ${! timestamp_unix() } + +timestamp: ${! metadata("kafka_timestamp_unix") } +``` + +=== `max_in_flight` + +The maximum number of batches to be sending in parallel at any given time. + + +*Type*: `int` + +*Default*: `10` + +=== `batching` + +Allows you to configure a xref:configuration:batching.adoc[batching policy]. + + +*Type*: `object` + + +```yml +# Examples + +batching: + byte_size: 5000 + count: 0 + period: 1s + +batching: + count: 10 + period: 1s + +batching: + check: this.contains("END BATCH") + count: 0 + period: 1m +``` + +=== `batching.count` + +A number of messages at which the batch should be flushed. If `0` disables count based batching. + + +*Type*: `int` + +*Default*: `0` + +=== `batching.byte_size` + +An amount of bytes at which the batch should be flushed. If `0` disables size based batching. + + +*Type*: `int` + +*Default*: `0` + +=== `batching.period` + +A period in which an incomplete batch should be flushed regardless of its size. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +period: 1s + +period: 1m + +period: 500ms +``` + +=== `batching.check` + +A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +check: this.type == "end_of_transaction" +``` + +=== `batching.processors` + +A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. + + +*Type*: `array` + + +```yml +# Examples + +processors: + - archive: + format: concatenate + +processors: + - archive: + format: lines + +processors: + - archive: + format: json_array +``` + + diff --git a/internal/impl/kafka/enterprise/redpanda_output.go b/internal/impl/kafka/enterprise/redpanda_output.go new file mode 100644 index 0000000000..2969edc9cc --- /dev/null +++ b/internal/impl/kafka/enterprise/redpanda_output.go @@ -0,0 +1,94 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md + +package enterprise + +import ( + "context" + + "github.com/redpanda-data/benthos/v4/public/service" + + "github.com/redpanda-data/connect/v4/internal/impl/kafka" +) + +func redpandaOutputConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Beta(). + Categories("Services"). + Summary("TODO"). + Fields(kafka.FranzWriterConfigFields()...). + Fields( + service.NewIntField(roFieldMaxInFlight). + Description("The maximum number of batches to be sending in parallel at any given time."). + Default(10), + service.NewBatchPolicyField(roFieldBatching), + ). + LintRule(` +root = if this.partitioner == "manual" { +if this.partition.or("") == "" { +"a partition must be specified when the partitioner is set to manual" +} +} else if this.partition.or("") != "" { +"a partition cannot be specified unless the partitioner is set to manual" +}`). + Example("Simple Output", "Data is generated and written to a topic bar, targetting the cluster configured within the redpanda block at the bottom. This is useful as it allows us to configure TLS and SASL only once for potentially multiple inputs and outputs.", ` +input: + generate: + interval: 1s + mapping: 'root.name = fake("name")' + +pipeline: + processors: + - mutation: | + root.id = uuid_v4() + root.loud_name = this.name.uppercase() + +output: + redpanda: + topic: bar + key: ${! @id } + +redpanda: + seed_brokers: [ "127.0.0.1:9093" ] + tls: + enabled: true + sasl: + - mechanism: SCRAM-SHA-512 + password: bar + username: foo +`) +} + +const ( + roFieldMaxInFlight = "max_in_flight" + roFieldBatching = "batching" +) + +func init() { + err := service.RegisterBatchOutput("redpanda", redpandaOutputConfig(), + func(conf *service.ParsedConfig, mgr *service.Resources) ( + output service.BatchOutput, + batchPolicy service.BatchPolicy, + maxInFlight int, + err error, + ) { + if maxInFlight, err = conf.FieldInt(roFieldMaxInFlight); err != nil { + return + } + if batchPolicy, err = conf.FieldBatchPolicy(roFieldBatching); err != nil { + return + } + output, err = kafka.NewFranzWriterFromConfig(conf, func(fn kafka.FranzSharedClientUseFn) error { + return kafka.FranzSharedClientUse(sharedGlobalRedpandaClientKey, mgr, fn) + }, func(context.Context) error { return nil }) + return + }) + if err != nil { + panic(err) + } +} diff --git a/internal/plugins/info.csv b/internal/plugins/info.csv index 5e3c362d0d..94632ef7cc 100644 --- a/internal/plugins/info.csv +++ b/internal/plugins/info.csv @@ -191,6 +191,7 @@ redis_scan ,input ,Redis ,4.27.0 ,certif redis_script ,processor ,Redis Script ,4.11.0 ,certified ,n ,y ,y redis_streams ,input ,Redis Streams ,0.0.0 ,certified ,n ,y ,y redis_streams ,output ,Redis Streams ,0.0.0 ,certified ,n ,y ,y +redpanda ,output ,redpanda ,4.38.0 ,enterprise ,n ,y ,y redpanda_data_transform ,processor ,redpanda_data_transform ,4.31.0 ,certified ,n ,n ,n redpanda_migrator ,input ,redpanda_migrator ,4.37.0 ,enterprise ,n ,y ,y redpanda_migrator ,output ,redpanda_migrator ,4.37.0 ,enterprise ,n ,y ,y