Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Migrate Kafka channel off reconciler.Base (#1081)
Browse files Browse the repository at this point in the history
* Migrate kafka channel off reconciler.Base

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Logger from context and add recorder

- Get the logger from the context
- the EventRecorder must be set in the NewController function
  because with the context passed to the r.Reconcile() method
  controller.GetEventRecorder(ctx) returns nil

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* lint

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Mar 30, 2020
1 parent aa80bc3 commit 25ab82d
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 115 deletions.
8 changes: 7 additions & 1 deletion kafka/channel/cmd/channel_dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ limitations under the License.
package main

import (
"context"
"os"

"knative.dev/pkg/configmap"
pkgcontroller "knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/signals"
Expand All @@ -35,5 +38,8 @@ func main() {
ctx = injection.WithNamespaceScope(ctx, ns)
}

sharedmain.MainWithContext(ctx, component, controller.NewController)
cfg := sharedmain.ParseAndGetConfigOrDie()
sharedmain.MainWithConfig(ctx, component, cfg, func(ctx context.Context, watcher configmap.Watcher) *pkgcontroller.Impl {
return controller.NewController(ctx, watcher, cfg)
})
}
13 changes: 7 additions & 6 deletions kafka/channel/pkg/reconciler/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"context"

"github.com/kelseyhightower/envconfig"

"go.uber.org/zap"
"k8s.io/client-go/tools/cache"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
"knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints"
Expand All @@ -33,7 +35,6 @@ import (
"knative.dev/pkg/client/injection/kube/informers/rbac/v1/rolebinding"

"knative.dev/eventing/pkg/logging"
"knative.dev/eventing/pkg/reconciler"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/system"
Expand All @@ -59,9 +60,7 @@ func NewController(
serviceInformer := service.Get(ctx)

r := &Reconciler{
Base: reconciler.NewBase(ctx, controllerAgentName, cmw),
systemNamespace: system.Namespace(),

systemNamespace: system.Namespace(),
KubeClientSet: kubeclient.Get(ctx),
kafkaClientSet: kafkaChannelClient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
Expand Down Expand Up @@ -89,7 +88,9 @@ func NewController(

// Get and Watch the Kakfa config map and dynamically update Kafka configuration.
if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get("config-kafka", metav1.GetOptions{}); err == nil {
cmw.Watch("config-kafka", r.updateKafkaConfig)
cmw.Watch("config-kafka", func(configMap *v1.ConfigMap) {
r.updateKafkaConfig(ctx, configMap)
})
} else if !apierrors.IsNotFound(err) {
logging.FromContext(ctx).With(zap.Error(err)).Fatal("Error reading ConfigMap 'config-kafka'")
}
Expand Down
85 changes: 23 additions & 62 deletions kafka/channel/pkg/reconciler/controller/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"errors"
"fmt"
"reflect"
"time"

"github.com/Shopify/sarama"

"go.uber.org/zap"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand All @@ -37,11 +38,14 @@ import (
corev1listers "k8s.io/client-go/listers/core/v1"
rbacv1listers "k8s.io/client-go/listers/rbac/v1"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing/pkg/apis/eventing"
eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned"
"knative.dev/eventing/pkg/logging"
"knative.dev/eventing/pkg/reconciler"
"knative.dev/eventing/pkg/reconciler/names"

"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
pkgreconciler "knative.dev/pkg/reconciler"

"knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1alpha1"
Expand All @@ -51,31 +55,21 @@ import (
listers "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1alpha1"
"knative.dev/eventing-contrib/kafka/channel/pkg/reconciler/controller/resources"
"knative.dev/eventing-contrib/kafka/channel/pkg/utils"
eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned"
)

const (
// ReconcilerName is the name of the reconciler.
ReconcilerName = "KafkaChannels"

// controllerAgentName is the string used by this controller to identify
// itself when creating events.
controllerAgentName = "kafka-ch-controller"

// Name of the corev1.Events emitted from the reconciliation process.
channelReconciled = "ChannelReconciled"
channelReconcileFailed = "ChannelReconcileFailed"
channelUpdateStatusFailed = "ChannelUpdateStatusFailed"
dispatcherDeploymentCreated = "DispatcherDeploymentCreated"
dispatcherDeploymentUpdated = "DispatcherDeploymentUpdated"
dispatcherDeploymentFailed = "DispatcherDeploymentFailed"
dispatcherDeploymentUpdateFailed = "DispatcherDeploymentUpdateFailed"
dispatcherServiceCreated = "DispatcherServiceCreated"
dispatcherServiceFailed = "DispatcherServiceFailed"
dispatcherServiceAccountFailed = "DispatcherServiceAccountFailed"
dispatcherServiceAccountCreated = "DispatcherServiceAccountCreated"
dispatcherRoleBindingCreated = "DispatcherRoleBindingCreated"
dispatcherRoleBindingFailed = "DispatcherRoleBindingFailed"
dispatcherDeploymentCreated = "DispatcherDeploymentCreated"
dispatcherDeploymentUpdated = "DispatcherDeploymentUpdated"
dispatcherDeploymentFailed = "DispatcherDeploymentFailed"
dispatcherServiceCreated = "DispatcherServiceCreated"
dispatcherServiceFailed = "DispatcherServiceFailed"
dispatcherServiceAccountCreated = "DispatcherServiceAccountCreated"
dispatcherRoleBindingCreated = "DispatcherRoleBindingCreated"

dispatcherName = "kafka-ch-dispatcher"
)
Expand Down Expand Up @@ -108,8 +102,6 @@ func init() {

// Reconciler reconciles Kafka Channels.
type Reconciler struct {
*reconciler.Base

KubeClientSet kubernetes.Interface

EventingClientSet eventingclientset.Interface
Expand All @@ -135,9 +127,6 @@ type Reconciler struct {
}

var (
deploymentGVK = appsv1.SchemeGroupVersion.WithKind("Deployment")
serviceGVK = corev1.SchemeGroupVersion.WithKind("Service")

scopeNamespace = "namespace"
scopeCluster = "cluster"
)
Expand Down Expand Up @@ -293,7 +282,7 @@ func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, disp
if apierrs.IsNotFound(err) {
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Create(expected)
if err == nil {
r.Recorder.Event(kc, corev1.EventTypeNormal, dispatcherDeploymentCreated, "Dispatcher deployment created")
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentCreated, "Dispatcher deployment created")
kc.Status.PropagateDispatcherStatus(&d.Status)
return d, err
} else {
Expand All @@ -306,10 +295,10 @@ func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, disp
kc.Status.MarkDispatcherUnknown("DispatcherDeploymentFailed", "Failed to get dispatcher deployment: %v", err)
return nil, err
} else if !reflect.DeepEqual(expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image) {
r.Logger.Infof("Deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image)
logging.FromContext(ctx).Sugar().Infof("Deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image)
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(expected)
if err == nil {
r.Recorder.Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
kc.Status.PropagateDispatcherStatus(&d.Status)
return d, nil
} else {
Expand All @@ -329,7 +318,7 @@ func (r *Reconciler) reconcileServiceAccount(ctx context.Context, dispatcherName
expected := resources.MakeServiceAccount(dispatcherNamespace, dispatcherName)
sa, err := r.KubeClientSet.CoreV1().ServiceAccounts(dispatcherNamespace).Create(expected)
if err == nil {
r.Recorder.Event(kc, corev1.EventTypeNormal, dispatcherServiceAccountCreated, "Dispatcher service account created")
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherServiceAccountCreated, "Dispatcher service account created")
return sa, nil
} else {
kc.Status.MarkDispatcherFailed("DispatcherDeploymentFailed", "Failed to create the dispatcher service account: %v", err)
Expand All @@ -350,7 +339,7 @@ func (r *Reconciler) reconcileRoleBinding(ctx context.Context, name string, ns s
expected := resources.MakeRoleBinding(ns, name, sa, clusterRoleName)
rb, err := r.KubeClientSet.RbacV1().RoleBindings(ns).Create(expected)
if err == nil {
r.Recorder.Event(kc, corev1.EventTypeNormal, dispatcherRoleBindingCreated, "Dispatcher role binding created")
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherRoleBindingCreated, "Dispatcher role binding created")
return rb, nil
} else {
kc.Status.MarkDispatcherFailed("DispatcherDeploymentFailed", "Failed to create the dispatcher role binding: %v", err)
Expand All @@ -371,11 +360,11 @@ func (r *Reconciler) reconcileDispatcherService(ctx context.Context, dispatcherN
svc, err := r.KubeClientSet.CoreV1().Services(dispatcherNamespace).Create(expected)

if err == nil {
r.Recorder.Event(kc, corev1.EventTypeNormal, dispatcherServiceCreated, "Dispatcher service created")
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherServiceCreated, "Dispatcher service created")
kc.Status.MarkServiceTrue()
} else {
logging.FromContext(ctx).Error("Unable to create the dispatcher service", zap.Error(err))
r.Recorder.Eventf(kc, corev1.EventTypeWarning, dispatcherServiceFailed, "Failed to create the dispatcher service: %v", err)
controller.GetEventRecorder(ctx).Eventf(kc, corev1.EventTypeWarning, dispatcherServiceFailed, "Failed to create the dispatcher service: %v", err)
kc.Status.MarkServiceFailed("DispatcherServiceFailed", "Failed to create the dispatcher service: %v", err)
return svc, err
}
Expand Down Expand Up @@ -436,33 +425,6 @@ func (r *Reconciler) reconcileChannelService(ctx context.Context, dispatcherName
return svc, nil
}

func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.KafkaChannel) (*v1alpha1.KafkaChannel, error) {
kc, err := r.kafkachannelLister.KafkaChannels(desired.Namespace).Get(desired.Name)
if err != nil {
return nil, err
}

if reflect.DeepEqual(kc.Status, desired.Status) {
return kc, nil
}

becomesReady := desired.Status.IsReady() && !kc.Status.IsReady()

// Don't modify the informers copy.
existing := kc.DeepCopy()
existing.Status = desired.Status

new, err := r.kafkaClientSet.MessagingV1alpha1().KafkaChannels(desired.Namespace).UpdateStatus(existing)
if err == nil && becomesReady {
duration := time.Since(new.ObjectMeta.CreationTimestamp.Time)
r.Logger.Infof("KafkaChannel %q became ready after %v", kc.Name, duration)
if err := r.StatsReporter.ReportReady("KafkaChannel", kc.Namespace, kc.Name, duration); err != nil {
r.Logger.Infof("Failed to record ready for KafkaChannel %q: %v", kc.Name, err)
}
}
return new, err
}

func (r *Reconciler) createClient(ctx context.Context, kc *v1alpha1.KafkaChannel) (sarama.ClusterAdmin, error) {
// We don't currently initialize r.kafkaClusterAdmin, hence we end up creating the cluster admin client every time.
// This is because of an issue with Shopify/sarama. See https://github.com/Shopify/sarama/issues/1162.
Expand Down Expand Up @@ -514,17 +476,16 @@ func (r *Reconciler) deleteTopic(ctx context.Context, channel *v1alpha1.KafkaCha
return err
}

func (r *Reconciler) updateKafkaConfig(configMap *corev1.ConfigMap) {
r.Logger.Info("Reloading Kafka configuration")
func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.ConfigMap) {
logging.FromContext(ctx).Info("Reloading Kafka configuration")
kafkaConfig, err := utils.GetKafkaConfig(configMap.Data)
if err != nil {
r.Logger.Errorw("Error reading Kafka configuration", zap.Error(err))
logging.FromContext(ctx).Error("Error reading Kafka configuration", zap.Error(err))
}
// For now just override the previous config.
// Eventually the previous config should be snapshotted to delete Kafka topics
r.kafkaConfig = kafkaConfig
r.kafkaConfigError = err

}

func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1alpha1.KafkaChannel) pkgreconciler.Event {
Expand Down
Loading

0 comments on commit 25ab82d

Please sign in to comment.