Skip to content

Commit

Permalink
[0.25] KafkaChannel to init offsets before dispatcher (knative-extens…
Browse files Browse the repository at this point in the history
…ions#913)

* Cherry pick 3f2a9d7

[0.24] KafkaChannel to init offsets before dispatcher (knative-extensions#886)

* ./hack/update-codegen.sh
  • Loading branch information
aliok committed Oct 8, 2021
1 parent dcf8e88 commit 0629e17
Show file tree
Hide file tree
Showing 33 changed files with 728 additions and 2,170 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ require (
github.com/stretchr/testify v1.7.0
github.com/xdg/scram v1.0.3
go.opencensus.io v0.23.0
go.uber.org/atomic v1.9.0
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.18.1
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand All @@ -37,7 +36,6 @@ require (
knative.dev/control-protocol v0.0.0-20210803191615-72cde96b9f5a
knative.dev/eventing v0.25.0
knative.dev/hack v0.0.0-20210622141627-e28525d8d260
knative.dev/networking v0.0.0-20210803181815-acdfd41c575c
knative.dev/pkg v0.0.0-20210902173607-844a6bc45596
knative.dev/reconciler-test v0.0.0-20210803183715-b61cc77c06f6
)
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,6 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/dnscache v0.0.0-20210201191234-295bba877686/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
Expand Down Expand Up @@ -1272,8 +1271,6 @@ knative.dev/eventing v0.25.0/go.mod h1:8jIsrnSONPgv+m63OTzpwZQJiQASYl77C3llCyYlB
knative.dev/hack v0.0.0-20210622141627-e28525d8d260 h1:f2eMtOubAOc/Q7JlvFPDKXiPlJVK+VpX2Cot8hRzCgQ=
knative.dev/hack v0.0.0-20210622141627-e28525d8d260/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack/schema v0.0.0-20210622141627-e28525d8d260/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0=
knative.dev/networking v0.0.0-20210803181815-acdfd41c575c h1:7G6TQr7ZyIHx35Dn5zuNKUDhlly3KkFxgrKLXeKmjj8=
knative.dev/networking v0.0.0-20210803181815-acdfd41c575c/go.mod h1:UA9m1M3rGssy63gVwjSh7CYoWTKZNO8cnY9QsIu7tyo=
knative.dev/pkg v0.0.0-20210803160015-21eb4c167cc5/go.mod h1:RPk5txNA3apR9X40D4MpUOP9/VqOG8CrtABWfOwGVS4=
knative.dev/pkg v0.0.0-20210902173607-844a6bc45596 h1:LCSg0O51V8I7sfnhw+j9WLBol8f2lCV5HkPyxJT9zzA=
knative.dev/pkg v0.0.0-20210902173607-844a6bc45596/go.mod h1:RPk5txNA3apR9X40D4MpUOP9/VqOG8CrtABWfOwGVS4=
Expand Down
23 changes: 5 additions & 18 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ import (
"knative.dev/eventing-kafka/pkg/common/tracing"
)

const (
dispatcherReadySubHeader = "K-Subscriber-Status"
)

type TopicFunc func(separator, namespace, name string) string

type KafkaDispatcherArgs struct {
Expand Down Expand Up @@ -86,7 +82,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat

dispatcher := &KafkaDispatcher{
dispatcher: eventingchannels.NewMessageDispatcher(logging.FromContext(ctx).Desugar()),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, args.Config.Sarama.Config),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, args.Config.Sarama.Config, &consumer.KafkaConsumerGroupOffsetsChecker{}),
channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription),
subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup),
subscriptions: make(map[types.UID]Subscription),
Expand All @@ -95,15 +91,6 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
topicFunc: args.TopicFunc,
}

// initialize and start the subscription endpoint server
subscriptionEndpoint := &subscriptionEndpoint{
dispatcher: dispatcher,
logger: logging.FromContext(ctx),
}
go func() {
subscriptionEndpoint.start()
}()

podName, err := env.GetRequiredConfigValue(logging.FromContext(ctx).Desugar(), env.PodNameEnvVarKey)
if err != nil {
return nil, err
Expand Down Expand Up @@ -171,7 +158,7 @@ func (k UpdateError) Error() string {
}

// ReconcileConsumers will be called by new CRD based kafka channel dispatcher controller.
func (d *KafkaDispatcher) ReconcileConsumers(config *ChannelConfig) error {
func (d *KafkaDispatcher) ReconcileConsumers(ctx context.Context, config *ChannelConfig) error {
channelNamespacedName := types.NamespacedName{
Namespace: config.Namespace,
Name: config.Name,
Expand Down Expand Up @@ -215,7 +202,7 @@ func (d *KafkaDispatcher) ReconcileConsumers(config *ChannelConfig) error {

failedToSubscribe := make(UpdateError)
for subUid, subSpec := range toAddSubs {
if err := d.subscribe(channelNamespacedName, subSpec); err != nil {
if err := d.subscribe(ctx, channelNamespacedName, subSpec); err != nil {
failedToSubscribe[subUid] = err
}
}
Expand Down Expand Up @@ -286,7 +273,7 @@ func (d *KafkaDispatcher) CleanupChannel(name, namespace, hostname string) error

// subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine.
// subscribe must be called under updateLock.
func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscription) error {
func (d *KafkaDispatcher) subscribe(ctx context.Context, channelRef types.NamespacedName, sub Subscription) error {
d.logger.Infow("Subscribing to Kafka Channel", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID))

topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name)
Expand All @@ -310,7 +297,7 @@ func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscri
}
d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef),
zap.Any("subscription", sub.UID), zap.String("topic", topicName), zap.String("consumer group", groupID))
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(groupID, []string{topicName}, d.logger, handler)
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(ctx, groupID, []string{topicName}, handler)

if err != nil {
// we can not create a consumer - logging that, with reason
Expand Down
6 changes: 4 additions & 2 deletions pkg/channel/consolidated/dispatcher/dispatcher_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func TestDispatcher(t *testing.T) {
t.Skipf("This test can't run in CI")
}

ctx := context.TODO()

logger, err := zap.NewDevelopment(zap.AddStacktrace(zap.WarnLevel))
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -179,7 +181,7 @@ func TestDispatcher(t *testing.T) {
},
}
require.NoError(t, dispatcher.RegisterChannelHost(channelAConfig))
require.NoError(t, dispatcher.ReconcileConsumers(channelAConfig))
require.NoError(t, dispatcher.ReconcileConsumers(ctx, channelAConfig))

channelBConfig := &ChannelConfig{
Namespace: "default",
Expand All @@ -195,7 +197,7 @@ func TestDispatcher(t *testing.T) {
},
}
require.NoError(t, dispatcher.RegisterChannelHost(channelBConfig))
require.NoError(t, dispatcher.ReconcileConsumers(channelBConfig))
require.NoError(t, dispatcher.ReconcileConsumers(ctx, channelBConfig))

time.Sleep(5 * time.Second)

Expand Down
20 changes: 14 additions & 6 deletions pkg/channel/consolidated/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type mockKafkaConsumerFactory struct {
createErr bool
}

func (c mockKafkaConsumerFactory) StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler consumer.KafkaConsumerHandler, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
func (c mockKafkaConsumerFactory) StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler consumer.KafkaConsumerHandler, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
if c.createErr {
return nil, errors.New("error creating consumer")
}
Expand Down Expand Up @@ -267,9 +267,11 @@ func TestDispatcher_UpdateConsumers(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

// Initialize using oldConfig
require.NoError(t, d.RegisterChannelHost(tc.oldConfig))
require.NoError(t, d.ReconcileConsumers(tc.oldConfig))
require.NoError(t, d.ReconcileConsumers(ctx, tc.oldConfig))

oldSubscribers := sets.NewString()
for _, sub := range d.subscriptions {
Expand All @@ -283,7 +285,7 @@ func TestDispatcher_UpdateConsumers(t *testing.T) {
}

// Update with new config
err := d.ReconcileConsumers(tc.newConfig)
err := d.ReconcileConsumers(ctx, tc.newConfig)
if tc.createErr != "" {
if err == nil {
t.Errorf("Expected UpdateConfig error: '%v'. Actual nil", tc.createErr)
Expand Down Expand Up @@ -363,6 +365,8 @@ func TestDispatcher_MultipleChannelsInParallel(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

// Let's register channel configs first
for _, c := range configs {
require.NoError(t, d.RegisterChannelHost(c))
Expand All @@ -374,7 +378,7 @@ func TestDispatcher_MultipleChannelsInParallel(t *testing.T) {
wg.Add(1)
go func(c *ChannelConfig) {
defer wg.Done()
assert.NoError(t, d.ReconcileConsumers(c))
assert.NoError(t, d.ReconcileConsumers(ctx, c))
}(c)
}
}
Expand Down Expand Up @@ -418,6 +422,8 @@ func TestKafkaDispatcher_CleanupChannel(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

channelConfig := &ChannelConfig{
Namespace: "default",
Name: "test-channel",
Expand All @@ -438,7 +444,7 @@ func TestKafkaDispatcher_CleanupChannel(t *testing.T) {
},
}
require.NoError(t, d.RegisterChannelHost(channelConfig))
require.NoError(t, d.ReconcileConsumers(channelConfig))
require.NoError(t, d.ReconcileConsumers(ctx, channelConfig))

require.NoError(t, d.CleanupChannel(channelConfig.Name, channelConfig.Namespace, channelConfig.HostName))
require.NotContains(t, d.subscriptions, "subscription-1")
Expand All @@ -461,6 +467,8 @@ func TestSubscribeError(t *testing.T) {
channelSubscriptions: map[types.NamespacedName]*KafkaSubscription{},
}

ctx := context.TODO()

channelRef := types.NamespacedName{
Name: "test-channel",
Namespace: "test-ns",
Expand All @@ -470,7 +478,7 @@ func TestSubscribeError(t *testing.T) {
UID: "test-sub",
Subscription: fanout.Subscription{},
}
err := d.subscribe(channelRef, subRef)
err := d.subscribe(ctx, channelRef, subRef)
if err == nil {
t.Errorf("Expected error want %s, got %s", "error creating consumer", err)
}
Expand Down
60 changes: 0 additions & 60 deletions pkg/channel/consolidated/dispatcher/subscription_endpoint.go

This file was deleted.

Loading

0 comments on commit 0629e17

Please sign in to comment.