Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

[WIP] Kafka channel fix scope #1191

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions kafka/channel/pkg/reconciler/controller/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
pkgreconciler "knative.dev/pkg/reconciler"

"knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1alpha1"
kafkaclientset "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned"
kafkaScheme "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned/scheme"
kafkaChannelReconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1alpha1/kafkachannel"
listers "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1alpha1"
Expand Down Expand Up @@ -109,9 +108,8 @@ type Reconciler struct {
systemNamespace string
dispatcherImage string

kafkaConfig *utils.KafkaConfig
kafkaConfigError error
kafkaClientSet kafkaclientset.Interface
clusterKafkaConfig *utils.KafkaConfig
clusterKafkaConfigError error

// Using a shared kafkaClusterAdmin does not work currently because of an issue with
// Shopify/sarama, see https://github.com/Shopify/sarama/issues/1162.
Expand Down Expand Up @@ -150,12 +148,12 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1alpha1.KafkaChanne
return err
}

if r.kafkaConfig == nil {
if r.kafkaConfigError == nil {
r.kafkaConfigError = errors.New("The config map 'config-kafka' does not exist")
if r.clusterKafkaConfig == nil {
if r.clusterKafkaConfigError == nil {
r.clusterKafkaConfigError = errors.New("The config map 'config-kafka' does not exist")
}
kc.Status.MarkConfigFailed("MissingConfiguration", "%v", r.kafkaConfigError)
return r.kafkaConfigError
kc.Status.MarkConfigFailed("MissingConfiguration", "%v", r.clusterKafkaConfigError)
return r.clusterKafkaConfigError
}

kafkaClusterAdmin, err := r.createClient(ctx, kc)
Expand Down Expand Up @@ -433,7 +431,7 @@ func (r *Reconciler) createClient(ctx context.Context, kc *v1alpha1.KafkaChannel
kafkaClusterAdmin := r.kafkaClusterAdmin
if kafkaClusterAdmin == nil {
var err error
kafkaClusterAdmin, err = resources.MakeClient(controllerAgentName, r.kafkaConfig.Brokers)
kafkaClusterAdmin, err = resources.MakeClient(controllerAgentName, r.clusterKafkaConfig.Brokers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -484,14 +482,14 @@ func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.Co
}
// For now just override the previous config.
// Eventually the previous config should be snapshotted to delete Kafka topics
r.kafkaConfig = kafkaConfig
r.kafkaConfigError = err
r.clusterKafkaConfig = kafkaConfig
r.clusterKafkaConfigError = err
}

func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1alpha1.KafkaChannel) pkgreconciler.Event {
// Do not attempt retrying creating the client because it might be a permanent error
// in which case the finalizer will never get removed.
if kafkaClusterAdmin, err := r.createClient(ctx, kc); err == nil && r.kafkaConfig != nil {
if kafkaClusterAdmin, err := r.createClient(ctx, kc); err == nil && r.clusterKafkaConfig != nil {
if err := r.deleteTopic(ctx, kc, kafkaClusterAdmin); err != nil {
return err
}
Expand Down
15 changes: 6 additions & 9 deletions kafka/channel/pkg/reconciler/controller/kafkachannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func TestAllCases(t *testing.T) {
r := &Reconciler{
systemNamespace: testNS,
dispatcherImage: testDispatcherImage,
kafkaConfig: &KafkaConfig{
clusterKafkaConfig: &KafkaConfig{
Brokers: []string{brokerName},
},
kafkachannelLister: listers.GetKafkaChannelLister(),
Expand All @@ -336,11 +336,10 @@ func TestAllCases(t *testing.T) {
serviceLister: listers.GetServiceLister(),
endpointsLister: listers.GetEndpointsLister(),
kafkaClusterAdmin: &mockClusterAdmin{},
kafkaClientSet: fakekafkaclient.Get(ctx),
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
}
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), fakekafkaclient.Get(ctx), listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
}, zap.L()))
}

Expand Down Expand Up @@ -384,7 +383,7 @@ func TestTopicExists(t *testing.T) {
r := &Reconciler{
systemNamespace: testNS,
dispatcherImage: testDispatcherImage,
kafkaConfig: &KafkaConfig{
clusterKafkaConfig: &KafkaConfig{
Brokers: []string{brokerName},
},
kafkachannelLister: listers.GetKafkaChannelLister(),
Expand All @@ -402,11 +401,10 @@ func TestTopicExists(t *testing.T) {
}
},
},
kafkaClientSet: fakekafkaclient.Get(ctx),
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
}
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), fakekafkaclient.Get(ctx), listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
}, zap.L()))
}

Expand Down Expand Up @@ -454,7 +452,7 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) {
r := &Reconciler{
systemNamespace: testNS,
dispatcherImage: testDispatcherImage,
kafkaConfig: &KafkaConfig{
clusterKafkaConfig: &KafkaConfig{
Brokers: []string{brokerName},
},
kafkachannelLister: listers.GetKafkaChannelLister(),
Expand All @@ -472,11 +470,10 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) {
}
},
},
kafkaClientSet: fakekafkaclient.Get(ctx),
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
}
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), fakekafkaclient.Get(ctx), listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
}, zap.L()))
}

Expand Down