-
Notifications
You must be signed in to change notification settings - Fork 831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor kafka components to reduce duplication #2918
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stellar job on this refactor 🏆 Thank you for taking the time to clean up all of this! I left a few nits in the comments, but it looks great otherwise. Feel free to
_, exists := r.clients[name] | ||
if exists { | ||
return errSharedClientNameDuplicate | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks redundant.
root = if this.partitioner == "manual" { | ||
if this.partition.or("") == "" { | ||
"a partition must be specified when the partitioner is set to manual" | ||
} | ||
} else if this.partition.or("") != "" { | ||
"a partition cannot be specified unless the partitioner is set to manual" | ||
}`). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The indentation looks a bit off
// Deprecated | ||
service.NewStringField(rmoFieldRackID).Deprecated(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, thanks! ❤️
|
||
if w.rackID, err = conf.FieldString("rack_id"); err != nil { | ||
var err error | ||
if w.recordConverter, err = kafka.NewFranzWriterFromConfig(conf, nil, nil); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs for NewFranzWriterFromConfig
say:
A closure function must be provided that is responsible for granting access to a connected client.
Might be worth updating that to mention it's optional.
internal/impl/kafka/franz_writer.go
Outdated
const ( | ||
rwFieldTopic = "topic" | ||
rwFieldKey = "key" | ||
rwFieldPartition = "partition" | ||
rwFieldMetadata = "metadata" | ||
rwFieldTimestamp = "timestamp" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be prefixed with fw
(or kfw
like the ones above) instead of rw
?
@@ -55,76 +56,20 @@ if this.partition.or("") == "" { | |||
// FranzKafkaOutputConfigFields returns the full suite of config fields for a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The lint rule indentation above is a bit off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Legendary - thanks for cleaning this up! A couple nits for you.
service.NewIntField(roFieldMaxInFlight). | ||
Description("The maximum number of batches to be sending in parallel at any given time."). | ||
Default(10), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could use service.NewOutputMaxInFlightField()
maxInFlight int, | ||
err error, | ||
) { | ||
if maxInFlight, err = conf.FieldInt(roFieldMaxInFlight); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: conf.FieldMaxInFlight()
Default(""). | ||
Example("__redpanda.connect.status"), | ||
|
||
// Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we mark it deprecated in the config too?
af8e92c
to
4956c6a
Compare
9021637
to
307f70f
Compare
fc0f280
to
01886c6
Compare
I've done a broad refactor of the various kafka input/output components in order to:
There's quite a lot going on here and it touches lots of the existing connectors. The integration tests run fine but we'll need to do some manual testing of the migrator components before merging/releasing.
One facet of this work is the idea of bringing out an implementation of a "consumer" and "producer" into a reusable type, this is important as we will have multiple plugins essentially doing the same thing (reading or writing benthos messages from/to kafka records) where the implementation is non-trivial (ordering, checkpointing, batching).
Eventually I want to have a solid mechanism for instantiating inputs and outputs that will be more user friendly for guaranteeing things like strict ordering. However, this is a complicated body of work and so it's important to first ensure that it will only need to be implemeted in one place, and that the codebase is tidy enough to reason about.