-
Notifications
You must be signed in to change notification settings - Fork 82
[0.24] KafkaChannel to init offsets before dispatcher #886
[0.24] KafkaChannel to init offsets before dispatcher #886
Conversation
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: aliok The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
05189a6
to
1058793
Compare
/retest |
Codecov Report
@@ Coverage Diff @@
## release-0.24 #886 +/- ##
===============================================
Coverage ? 74.06%
===============================================
Files ? 134
Lines ? 5954
Branches ? 0
===============================================
Hits ? 4410
Misses ? 1319
Partials ? 225 Continue to review full report at Codecov.
|
/test pull-knative-sandbox-eventing-kafka-unit-tests |
ab79292
to
0903a03
Compare
/retest Let's see what happens |
/test pull-knative-sandbox-eventing-kafka-integration-test-channel-consolidated |
/assign |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a short review, will continue later today.
@@ -477,7 +492,20 @@ func (r *Reconciler) reconcileChannelService(ctx context.Context, dispatcherName | |||
return svc, nil | |||
} | |||
|
|||
func (r *Reconciler) createClient(ctx context.Context) (sarama.ClusterAdmin, error) { | |||
func (r *Reconciler) createClients(ctx context.Context) (sarama.Client, sarama.ClusterAdmin, error) { | |||
kafkaClient := r.kafkaClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're not setting
r.kafkaClient
anywhere, is that intentional? wouldn't that mean you're creating a new client with every reconcile loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored the code that creates the admin client and created the normal client there, the same way.
For clusterAdmin, there's this ticket: IBM/sarama#1162. I see that ticket didn't really get any resolution.
I haven't tried reusing the regular kafkaClient. I can try that, once I made the failing tests working.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pretty bad sarama bug 😞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment that this thing returns both clients? Normal and ClusterAdmin?
func (r *Reconciler) createSaramaClients
instead? not just client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really always need both? See Ahmed's comment with stashing and defering the "regular" client.
Just wondering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one alternative can be that we have two functions, one for each.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We now have 2 functions. Code looks much better, thanks for the suggestions.
About stashing into r.kafkaClient
-> I will try this, one I see tests are passing again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, when I stash the client to r.kafkaClient
and reuse it, things break. I have no desire to debug why.
See my comment here: #886 (comment)
You can have a look at the commit list here, https://github.com/knative-sandbox/eventing-kafka/pull/886/commits, and see the job history for each commit there (not always consistent, especially when there's new code pushed while the jobs were running).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
. I have no desire to debug why.
that is OK w/ me. That said - mind creating an issue we are having to track this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/test pull-knative-sandbox-eventing-kafka-integration-test-channel-consolidated Retesting to see if there's a pattern |
/test pull-knative-sandbox-eventing-kafka-integration-test-channel-consolidated Stashing the kafka client broke things. I am gonna try here again, before reverting commit 4d57547. |
This reverts commit 4d57547
I reverted it now. See my comment here: #886 (comment) |
The following is the coverage report on the affected files.
|
@aliok: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
/test pull-knative-sandbox-eventing-kafka-unit-tests TestEnableSaramaLogging |
/lgtm |
[0.24] KafkaChannel to init offsets before dispatcher (knative-extensions#886)
…ions#886) * Init offsets in Kafka channel controller - first iteration * Do not check prober for subscription readiness * Better code style * Get rid of probing for checking subscription readiness * Get rid of unused deps * Ooops, fixed the tests * Pass context along the functions, which will be necessary later * Fix unit test * Move partition retrieval into a separate function, which is going to be used later * Check if offsets are initialized in the consumerFactory * IDE "extracts" method LOL * Make unit tests working * Move MockClusterAdmin into the common package and reuse it * Copy paste tests for CheckIfAllOffsetsInitialized * Unify tests for CheckIfAllOffsetsInitialized and InitOffsets * Separate tests for CheckIfAllOffsetsInitialized and InitOffsets * Do not block main reconciliation thread for offset checking * Remove last crumbs of probing * Change log level for offset init message * Move some consts to right place * Rename checkOffsets.. func to WaitForOffsets... * Rename consumerOffsetInitializer to ConsumerGroupOffsetsChecker * Do not handle deleted topics or partitions when checking the offsets * Copy the partitions array when retrieving partitions * Address comments * Separate client and clusteradminclient creation * Stash kafka client * Revert "Stash kafka client" This reverts commit 4d57547 * Do not do any offset initialization when subscription is already marked ready
…ions#886) (#383) * [0.24] KafkaChannel to init offsets before dispatcher (knative-extensions#886) * Init offsets in Kafka channel controller - first iteration * Do not check prober for subscription readiness * Better code style * Get rid of probing for checking subscription readiness * Get rid of unused deps * Ooops, fixed the tests * Pass context along the functions, which will be necessary later * Fix unit test * Move partition retrieval into a separate function, which is going to be used later * Check if offsets are initialized in the consumerFactory * IDE "extracts" method LOL * Make unit tests working * Move MockClusterAdmin into the common package and reuse it * Copy paste tests for CheckIfAllOffsetsInitialized * Unify tests for CheckIfAllOffsetsInitialized and InitOffsets * Separate tests for CheckIfAllOffsetsInitialized and InitOffsets * Do not block main reconciliation thread for offset checking * Remove last crumbs of probing * Change log level for offset init message * Move some consts to right place * Rename checkOffsets.. func to WaitForOffsets... * Rename consumerOffsetInitializer to ConsumerGroupOffsetsChecker * Do not handle deleted topics or partitions when checking the offsets * Copy the partitions array when retrieving partitions * Address comments * Separate client and clusteradminclient creation * Stash kafka client * Revert "Stash kafka client" This reverts commit 4d57547 * Do not do any offset initialization when subscription is already marked ready * [0.24] KafkaChannel dispatcher offset checking improvements (knative-extensions#924) * Change poll loop Signed-off-by: Pierangelo Di Pilato <[email protected]> * Change poll loop and requeue channel on failure * Get rid of unused func * Fix unit tests Co-authored-by: Pierangelo Di Pilato <[email protected]> Co-authored-by: Pierangelo Di Pilato <[email protected]>
…ions#913) * Cherry pick 3f2a9d7 [0.24] KafkaChannel to init offsets before dispatcher (knative-extensions#886) * ./hack/update-codegen.sh
…ions#913) * Cherry pick 3f2a9d7 [0.24] KafkaChannel to init offsets before dispatcher (knative-extensions#886) * ./hack/update-codegen.sh
…ions#913) * Cherry pick 3f2a9d7 [0.24] KafkaChannel to init offsets before dispatcher (knative-extensions#886) * ./hack/update-codegen.sh
…ions#913) * Cherry pick 3f2a9d7 [0.24] KafkaChannel to init offsets before dispatcher (knative-extensions#886) * ./hack/update-codegen.sh
* [0.25] KafkaChannel to init offsets before dispatcher (knative-extensions#913) * Cherry pick 3f2a9d7 [0.24] KafkaChannel to init offsets before dispatcher (knative-extensions#886) * ./hack/update-codegen.sh * [0.25] KafkaChannel dispatcher offset checking improvements (knative-extensions#929) * Fix Kafka channel event loss during subscription becoming ready * Make it look like knative-extensions#926
Fixes #549
Proposed Changes
distributed
channel code as I changed some interfaces. But, no business logic changesRelease Note
Docs