Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[0.24] KafkaChannel to init offsets before dispatcher (#886) #383

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/channel/distributed/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
injectionclient "knative.dev/pkg/client/injection/kube/client"
Expand Down Expand Up @@ -157,7 +158,7 @@ func main() {
MetricsRegistry: ekConfig.Sarama.Config.MetricRegistry,
SaramaConfig: ekConfig.Sarama.Config,
}
dispatcher = dispatch.NewDispatcher(dispatcherConfig, controlProtocolServer)
dispatcher = dispatch.NewDispatcher(dispatcherConfig, controlProtocolServer, func(ref types.NamespacedName) {})

// Watch The Secret For Changes
err = distributedcommonconfig.InitializeSecretWatcher(ctx, environment.KafkaSecretNamespace, environment.KafkaSecretName, environment.ResyncPeriod, secretObserver)
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ require (
github.com/stretchr/testify v1.7.0
github.com/xdg/scram v1.0.3
go.opencensus.io v0.23.0
go.uber.org/atomic v1.8.0
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.17.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand All @@ -37,7 +36,6 @@ require (
knative.dev/control-protocol v0.0.0-20210622195244-f6f46782b93d
knative.dev/eventing v0.24.2
knative.dev/hack v0.0.0-20210622141627-e28525d8d260
knative.dev/networking v0.0.0-20210903191258-4ee76ca89d62
knative.dev/pkg v0.0.0-20210902173607-953af0138c75
knative.dev/reconciler-test v0.0.0-20210623134345-88c84739abd9
)
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,6 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/dnscache v0.0.0-20210201191234-295bba877686/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
Expand Down Expand Up @@ -1274,8 +1273,6 @@ knative.dev/eventing v0.24.2/go.mod h1:9xo0SWkIfpXrx0lvGQO7MUlPF8cu+QCMd2gGxj6wx
knative.dev/hack v0.0.0-20210622141627-e28525d8d260 h1:f2eMtOubAOc/Q7JlvFPDKXiPlJVK+VpX2Cot8hRzCgQ=
knative.dev/hack v0.0.0-20210622141627-e28525d8d260/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack/schema v0.0.0-20210622141627-e28525d8d260/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0=
knative.dev/networking v0.0.0-20210903191258-4ee76ca89d62 h1:ijcsotBvv74AcuMopX1gdMWi28+hDOra8M9wzHxgEDA=
knative.dev/networking v0.0.0-20210903191258-4ee76ca89d62/go.mod h1:zlhSWauSM+fg+pjiL75R0TStzTCONKosKbA3Z94uox4=
knative.dev/pkg v0.0.0-20210622173328-dd0db4b05c80/go.mod h1:kGegTnbZ+ljFjAE3E1+8wgaH2LMv8qYi+72o3F3cbdc=
knative.dev/pkg v0.0.0-20210902173607-953af0138c75 h1:U9Im5Wp0oKV2ZWP+V9RZSDgRqv4IhfnzObMrgzWdDRQ=
knative.dev/pkg v0.0.0-20210902173607-953af0138c75/go.mod h1:kGegTnbZ+ljFjAE3E1+8wgaH2LMv8qYi+72o3F3cbdc=
Expand Down
25 changes: 6 additions & 19 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ import (
"knative.dev/eventing-kafka/pkg/common/tracing"
)

const (
dispatcherReadySubHeader = "K-Subscriber-Status"
)

type TopicFunc func(separator, namespace, name string) string

type KafkaDispatcherArgs struct {
Expand Down Expand Up @@ -77,7 +73,7 @@ type KafkaDispatcher struct {
logger *zap.SugaredLogger
}

func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispatcher, error) {
func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs, enqueue func(ref types.NamespacedName)) (*KafkaDispatcher, error) {

producer, err := sarama.NewSyncProducer(args.Brokers, args.Config.Sarama.Config)
if err != nil {
Expand All @@ -86,7 +82,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat

dispatcher := &KafkaDispatcher{
dispatcher: eventingchannels.NewMessageDispatcher(logging.FromContext(ctx).Desugar()),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, args.Config.Sarama.Config),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, args.Config.Sarama.Config, &consumer.KafkaConsumerGroupOffsetsChecker{}, enqueue),
channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription),
subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup),
subscriptions: make(map[types.UID]Subscription),
Expand All @@ -95,15 +91,6 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
topicFunc: args.TopicFunc,
}

// initialize and start the subscription endpoint server
subscriptionEndpoint := &subscriptionEndpoint{
dispatcher: dispatcher,
logger: logging.FromContext(ctx),
}
go func() {
subscriptionEndpoint.start()
}()

podName, err := env.GetRequiredConfigValue(logging.FromContext(ctx).Desugar(), env.PodNameEnvVarKey)
if err != nil {
return nil, err
Expand Down Expand Up @@ -171,7 +158,7 @@ func (k UpdateError) Error() string {
}

// ReconcileConsumers will be called by new CRD based kafka channel dispatcher controller.
func (d *KafkaDispatcher) ReconcileConsumers(config *ChannelConfig) error {
func (d *KafkaDispatcher) ReconcileConsumers(ctx context.Context, config *ChannelConfig) error {
channelNamespacedName := types.NamespacedName{
Namespace: config.Namespace,
Name: config.Name,
Expand Down Expand Up @@ -215,7 +202,7 @@ func (d *KafkaDispatcher) ReconcileConsumers(config *ChannelConfig) error {

failedToSubscribe := make(UpdateError)
for subUid, subSpec := range toAddSubs {
if err := d.subscribe(channelNamespacedName, subSpec); err != nil {
if err := d.subscribe(ctx, channelNamespacedName, subSpec); err != nil {
failedToSubscribe[subUid] = err
}
}
Expand Down Expand Up @@ -286,7 +273,7 @@ func (d *KafkaDispatcher) CleanupChannel(name, namespace, hostname string) error

// subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine.
// subscribe must be called under updateLock.
func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscription) error {
func (d *KafkaDispatcher) subscribe(ctx context.Context, channelRef types.NamespacedName, sub Subscription) error {
d.logger.Infow("Subscribing to Kafka Channel", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID))

topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name)
Expand All @@ -310,7 +297,7 @@ func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscri
}
d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef),
zap.Any("subscription", sub.UID), zap.String("topic", topicName), zap.String("consumer group", groupID))
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(groupID, []string{topicName}, d.logger, handler)
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(ctx, groupID, []string{topicName}, handler, channelRef)

if err != nil {
// we can not create a consumer - logging that, with reason
Expand Down
9 changes: 6 additions & 3 deletions pkg/channel/consolidated/dispatcher/dispatcher_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cloudevents/sdk-go/v2/test"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/apis"
Expand All @@ -55,6 +56,8 @@ func TestDispatcher(t *testing.T) {
t.Skipf("This test can't run in CI")
}

ctx := context.TODO()

logger, err := zap.NewDevelopment(zap.AddStacktrace(zap.WarnLevel))
if err != nil {
t.Fatal(err)
Expand All @@ -80,7 +83,7 @@ func TestDispatcher(t *testing.T) {
}

// Create the dispatcher. At this point, if Kafka is not up, this thing fails
dispatcher, err := NewDispatcher(context.Background(), &dispatcherArgs)
dispatcher, err := NewDispatcher(context.Background(), &dispatcherArgs, func(ref types.NamespacedName) {})
if err != nil {
t.Skipf("no dispatcher: %v", err)
}
Expand Down Expand Up @@ -179,7 +182,7 @@ func TestDispatcher(t *testing.T) {
},
}
require.NoError(t, dispatcher.RegisterChannelHost(channelAConfig))
require.NoError(t, dispatcher.ReconcileConsumers(channelAConfig))
require.NoError(t, dispatcher.ReconcileConsumers(ctx, channelAConfig))

channelBConfig := &ChannelConfig{
Namespace: "default",
Expand All @@ -195,7 +198,7 @@ func TestDispatcher(t *testing.T) {
},
}
require.NoError(t, dispatcher.RegisterChannelHost(channelBConfig))
require.NoError(t, dispatcher.ReconcileConsumers(channelBConfig))
require.NoError(t, dispatcher.ReconcileConsumers(ctx, channelBConfig))

time.Sleep(5 * time.Second)

Expand Down
22 changes: 15 additions & 7 deletions pkg/channel/consolidated/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type mockKafkaConsumerFactory struct {
createErr bool
}

func (c mockKafkaConsumerFactory) StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler consumer.KafkaConsumerHandler, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
func (c mockKafkaConsumerFactory) StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler consumer.KafkaConsumerHandler, ref types.NamespacedName, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
if c.createErr {
return nil, errors.New("error creating consumer")
}
Expand Down Expand Up @@ -267,9 +267,11 @@ func TestDispatcher_UpdateConsumers(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

// Initialize using oldConfig
require.NoError(t, d.RegisterChannelHost(tc.oldConfig))
require.NoError(t, d.ReconcileConsumers(tc.oldConfig))
require.NoError(t, d.ReconcileConsumers(ctx, tc.oldConfig))

oldSubscribers := sets.NewString()
for _, sub := range d.subscriptions {
Expand All @@ -283,7 +285,7 @@ func TestDispatcher_UpdateConsumers(t *testing.T) {
}

// Update with new config
err := d.ReconcileConsumers(tc.newConfig)
err := d.ReconcileConsumers(ctx, tc.newConfig)
if tc.createErr != "" {
if err == nil {
t.Errorf("Expected UpdateConfig error: '%v'. Actual nil", tc.createErr)
Expand Down Expand Up @@ -363,6 +365,8 @@ func TestDispatcher_MultipleChannelsInParallel(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

// Let's register channel configs first
for _, c := range configs {
require.NoError(t, d.RegisterChannelHost(c))
Expand All @@ -374,7 +378,7 @@ func TestDispatcher_MultipleChannelsInParallel(t *testing.T) {
wg.Add(1)
go func(c *ChannelConfig) {
defer wg.Done()
assert.NoError(t, d.ReconcileConsumers(c))
assert.NoError(t, d.ReconcileConsumers(ctx, c))
}(c)
}
}
Expand Down Expand Up @@ -418,6 +422,8 @@ func TestKafkaDispatcher_CleanupChannel(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

channelConfig := &ChannelConfig{
Namespace: "default",
Name: "test-channel",
Expand All @@ -438,7 +444,7 @@ func TestKafkaDispatcher_CleanupChannel(t *testing.T) {
},
}
require.NoError(t, d.RegisterChannelHost(channelConfig))
require.NoError(t, d.ReconcileConsumers(channelConfig))
require.NoError(t, d.ReconcileConsumers(ctx, channelConfig))

require.NoError(t, d.CleanupChannel(channelConfig.Name, channelConfig.Namespace, channelConfig.HostName))
require.NotContains(t, d.subscriptions, "subscription-1")
Expand All @@ -461,6 +467,8 @@ func TestSubscribeError(t *testing.T) {
channelSubscriptions: map[types.NamespacedName]*KafkaSubscription{},
}

ctx := context.TODO()

channelRef := types.NamespacedName{
Name: "test-channel",
Namespace: "test-ns",
Expand All @@ -470,7 +478,7 @@ func TestSubscribeError(t *testing.T) {
UID: "test-sub",
Subscription: fanout.Subscription{},
}
err := d.subscribe(channelRef, subRef)
err := d.subscribe(ctx, channelRef, subRef)
if err == nil {
t.Errorf("Expected error want %s, got %s", "error creating consumer", err)
}
Expand Down Expand Up @@ -511,7 +519,7 @@ func TestNewDispatcher(t *testing.T) {
Brokers: []string{"localhost:10000"},
TopicFunc: utils.TopicName,
}
_, err := NewDispatcher(context.TODO(), args)
_, err := NewDispatcher(context.TODO(), args, func(ref types.NamespacedName) {})
if err == nil {
t.Errorf("Expected error want %s, got %s", "message receiver is not set", err)
}
Expand Down
60 changes: 0 additions & 60 deletions pkg/channel/consolidated/dispatcher/subscription_endpoint.go

This file was deleted.

Loading