diff --git a/kafka/channel/cmd/channel_dispatcher/main.go b/kafka/channel/cmd/channel_dispatcher/main.go index d74869935d..2a198a4621 100644 --- a/kafka/channel/cmd/channel_dispatcher/main.go +++ b/kafka/channel/cmd/channel_dispatcher/main.go @@ -17,8 +17,11 @@ limitations under the License. package main import ( + "context" "os" + "knative.dev/pkg/configmap" + pkgcontroller "knative.dev/pkg/controller" "knative.dev/pkg/injection" "knative.dev/pkg/injection/sharedmain" "knative.dev/pkg/signals" @@ -35,5 +38,8 @@ func main() { ctx = injection.WithNamespaceScope(ctx, ns) } - sharedmain.MainWithContext(ctx, component, controller.NewController) + cfg := sharedmain.ParseAndGetConfigOrDie() + sharedmain.MainWithConfig(ctx, component, cfg, func(ctx context.Context, watcher configmap.Watcher) *pkgcontroller.Impl { + return controller.NewController(ctx, watcher, cfg) + }) } diff --git a/kafka/channel/pkg/reconciler/controller/controller.go b/kafka/channel/pkg/reconciler/controller/controller.go index 49f7275eb6..ba89415d54 100644 --- a/kafka/channel/pkg/reconciler/controller/controller.go +++ b/kafka/channel/pkg/reconciler/controller/controller.go @@ -20,11 +20,13 @@ import ( "context" "github.com/kelseyhightower/envconfig" + "go.uber.org/zap" - "k8s.io/client-go/tools/cache" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints" @@ -33,7 +35,6 @@ import ( "knative.dev/pkg/client/injection/kube/informers/rbac/v1/rolebinding" "knative.dev/eventing/pkg/logging" - "knative.dev/eventing/pkg/reconciler" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/system" @@ -59,9 +60,7 @@ func NewController( serviceInformer := service.Get(ctx) r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), - systemNamespace: system.Namespace(), - + systemNamespace: system.Namespace(), KubeClientSet: kubeclient.Get(ctx), kafkaClientSet: kafkaChannelClient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), @@ -89,7 +88,9 @@ func NewController( // Get and Watch the Kakfa config map and dynamically update Kafka configuration. if _, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get("config-kafka", metav1.GetOptions{}); err == nil { - cmw.Watch("config-kafka", r.updateKafkaConfig) + cmw.Watch("config-kafka", func(configMap *v1.ConfigMap) { + r.updateKafkaConfig(ctx, configMap) + }) } else if !apierrors.IsNotFound(err) { logging.FromContext(ctx).With(zap.Error(err)).Fatal("Error reading ConfigMap 'config-kafka'") } diff --git a/kafka/channel/pkg/reconciler/controller/kafkachannel.go b/kafka/channel/pkg/reconciler/controller/kafkachannel.go index 417ddfad03..2f7b815997 100644 --- a/kafka/channel/pkg/reconciler/controller/kafkachannel.go +++ b/kafka/channel/pkg/reconciler/controller/kafkachannel.go @@ -21,10 +21,11 @@ import ( "errors" "fmt" "reflect" - "time" "github.com/Shopify/sarama" + "go.uber.org/zap" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -37,11 +38,14 @@ import ( corev1listers "k8s.io/client-go/listers/core/v1" rbacv1listers "k8s.io/client-go/listers/rbac/v1" "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/apis/eventing" + eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned" "knative.dev/eventing/pkg/logging" - "knative.dev/eventing/pkg/reconciler" "knative.dev/eventing/pkg/reconciler/names" + "knative.dev/pkg/apis" + "knative.dev/pkg/controller" pkgreconciler "knative.dev/pkg/reconciler" "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1alpha1" @@ -51,31 +55,21 @@ import ( listers "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1alpha1" "knative.dev/eventing-contrib/kafka/channel/pkg/reconciler/controller/resources" "knative.dev/eventing-contrib/kafka/channel/pkg/utils" - eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned" ) const ( - // ReconcilerName is the name of the reconciler. - ReconcilerName = "KafkaChannels" - // controllerAgentName is the string used by this controller to identify // itself when creating events. controllerAgentName = "kafka-ch-controller" // Name of the corev1.Events emitted from the reconciliation process. - channelReconciled = "ChannelReconciled" - channelReconcileFailed = "ChannelReconcileFailed" - channelUpdateStatusFailed = "ChannelUpdateStatusFailed" - dispatcherDeploymentCreated = "DispatcherDeploymentCreated" - dispatcherDeploymentUpdated = "DispatcherDeploymentUpdated" - dispatcherDeploymentFailed = "DispatcherDeploymentFailed" - dispatcherDeploymentUpdateFailed = "DispatcherDeploymentUpdateFailed" - dispatcherServiceCreated = "DispatcherServiceCreated" - dispatcherServiceFailed = "DispatcherServiceFailed" - dispatcherServiceAccountFailed = "DispatcherServiceAccountFailed" - dispatcherServiceAccountCreated = "DispatcherServiceAccountCreated" - dispatcherRoleBindingCreated = "DispatcherRoleBindingCreated" - dispatcherRoleBindingFailed = "DispatcherRoleBindingFailed" + dispatcherDeploymentCreated = "DispatcherDeploymentCreated" + dispatcherDeploymentUpdated = "DispatcherDeploymentUpdated" + dispatcherDeploymentFailed = "DispatcherDeploymentFailed" + dispatcherServiceCreated = "DispatcherServiceCreated" + dispatcherServiceFailed = "DispatcherServiceFailed" + dispatcherServiceAccountCreated = "DispatcherServiceAccountCreated" + dispatcherRoleBindingCreated = "DispatcherRoleBindingCreated" dispatcherName = "kafka-ch-dispatcher" ) @@ -108,8 +102,6 @@ func init() { // Reconciler reconciles Kafka Channels. type Reconciler struct { - *reconciler.Base - KubeClientSet kubernetes.Interface EventingClientSet eventingclientset.Interface @@ -135,9 +127,6 @@ type Reconciler struct { } var ( - deploymentGVK = appsv1.SchemeGroupVersion.WithKind("Deployment") - serviceGVK = corev1.SchemeGroupVersion.WithKind("Service") - scopeNamespace = "namespace" scopeCluster = "cluster" ) @@ -293,7 +282,7 @@ func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, disp if apierrs.IsNotFound(err) { d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Create(expected) if err == nil { - r.Recorder.Event(kc, corev1.EventTypeNormal, dispatcherDeploymentCreated, "Dispatcher deployment created") + controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentCreated, "Dispatcher deployment created") kc.Status.PropagateDispatcherStatus(&d.Status) return d, err } else { @@ -306,10 +295,10 @@ func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, disp kc.Status.MarkDispatcherUnknown("DispatcherDeploymentFailed", "Failed to get dispatcher deployment: %v", err) return nil, err } else if !reflect.DeepEqual(expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image) { - r.Logger.Infof("Deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image) + logging.FromContext(ctx).Sugar().Infof("Deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image) d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(expected) if err == nil { - r.Recorder.Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated") + controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated") kc.Status.PropagateDispatcherStatus(&d.Status) return d, nil } else { @@ -329,7 +318,7 @@ func (r *Reconciler) reconcileServiceAccount(ctx context.Context, dispatcherName expected := resources.MakeServiceAccount(dispatcherNamespace, dispatcherName) sa, err := r.KubeClientSet.CoreV1().ServiceAccounts(dispatcherNamespace).Create(expected) if err == nil { - r.Recorder.Event(kc, corev1.EventTypeNormal, dispatcherServiceAccountCreated, "Dispatcher service account created") + controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherServiceAccountCreated, "Dispatcher service account created") return sa, nil } else { kc.Status.MarkDispatcherFailed("DispatcherDeploymentFailed", "Failed to create the dispatcher service account: %v", err) @@ -350,7 +339,7 @@ func (r *Reconciler) reconcileRoleBinding(ctx context.Context, name string, ns s expected := resources.MakeRoleBinding(ns, name, sa, clusterRoleName) rb, err := r.KubeClientSet.RbacV1().RoleBindings(ns).Create(expected) if err == nil { - r.Recorder.Event(kc, corev1.EventTypeNormal, dispatcherRoleBindingCreated, "Dispatcher role binding created") + controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherRoleBindingCreated, "Dispatcher role binding created") return rb, nil } else { kc.Status.MarkDispatcherFailed("DispatcherDeploymentFailed", "Failed to create the dispatcher role binding: %v", err) @@ -371,11 +360,11 @@ func (r *Reconciler) reconcileDispatcherService(ctx context.Context, dispatcherN svc, err := r.KubeClientSet.CoreV1().Services(dispatcherNamespace).Create(expected) if err == nil { - r.Recorder.Event(kc, corev1.EventTypeNormal, dispatcherServiceCreated, "Dispatcher service created") + controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherServiceCreated, "Dispatcher service created") kc.Status.MarkServiceTrue() } else { logging.FromContext(ctx).Error("Unable to create the dispatcher service", zap.Error(err)) - r.Recorder.Eventf(kc, corev1.EventTypeWarning, dispatcherServiceFailed, "Failed to create the dispatcher service: %v", err) + controller.GetEventRecorder(ctx).Eventf(kc, corev1.EventTypeWarning, dispatcherServiceFailed, "Failed to create the dispatcher service: %v", err) kc.Status.MarkServiceFailed("DispatcherServiceFailed", "Failed to create the dispatcher service: %v", err) return svc, err } @@ -436,33 +425,6 @@ func (r *Reconciler) reconcileChannelService(ctx context.Context, dispatcherName return svc, nil } -func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.KafkaChannel) (*v1alpha1.KafkaChannel, error) { - kc, err := r.kafkachannelLister.KafkaChannels(desired.Namespace).Get(desired.Name) - if err != nil { - return nil, err - } - - if reflect.DeepEqual(kc.Status, desired.Status) { - return kc, nil - } - - becomesReady := desired.Status.IsReady() && !kc.Status.IsReady() - - // Don't modify the informers copy. - existing := kc.DeepCopy() - existing.Status = desired.Status - - new, err := r.kafkaClientSet.MessagingV1alpha1().KafkaChannels(desired.Namespace).UpdateStatus(existing) - if err == nil && becomesReady { - duration := time.Since(new.ObjectMeta.CreationTimestamp.Time) - r.Logger.Infof("KafkaChannel %q became ready after %v", kc.Name, duration) - if err := r.StatsReporter.ReportReady("KafkaChannel", kc.Namespace, kc.Name, duration); err != nil { - r.Logger.Infof("Failed to record ready for KafkaChannel %q: %v", kc.Name, err) - } - } - return new, err -} - func (r *Reconciler) createClient(ctx context.Context, kc *v1alpha1.KafkaChannel) (sarama.ClusterAdmin, error) { // We don't currently initialize r.kafkaClusterAdmin, hence we end up creating the cluster admin client every time. // This is because of an issue with Shopify/sarama. See https://github.com/Shopify/sarama/issues/1162. @@ -514,17 +476,16 @@ func (r *Reconciler) deleteTopic(ctx context.Context, channel *v1alpha1.KafkaCha return err } -func (r *Reconciler) updateKafkaConfig(configMap *corev1.ConfigMap) { - r.Logger.Info("Reloading Kafka configuration") +func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.ConfigMap) { + logging.FromContext(ctx).Info("Reloading Kafka configuration") kafkaConfig, err := utils.GetKafkaConfig(configMap.Data) if err != nil { - r.Logger.Errorw("Error reading Kafka configuration", zap.Error(err)) + logging.FromContext(ctx).Error("Error reading Kafka configuration", zap.Error(err)) } // For now just override the previous config. // Eventually the previous config should be snapshotted to delete Kafka topics r.kafkaConfig = kafkaConfig r.kafkaConfigError = err - } func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1alpha1.KafkaChannel) pkgreconciler.Event { diff --git a/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go b/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go index e894062538..cd50396956 100644 --- a/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go +++ b/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go @@ -22,22 +22,25 @@ import ( "testing" "github.com/Shopify/sarama" - "go.uber.org/zap" - "k8s.io/apimachinery/pkg/runtime" - "knative.dev/eventing/pkg/reconciler" - "knative.dev/eventing/pkg/utils" - "knative.dev/pkg/configmap" - "knative.dev/pkg/kmeta" - . "knative.dev/eventing-contrib/kafka/channel/pkg/utils" + "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" clientgotesting "k8s.io/client-go/testing" + + eventingClient "knative.dev/eventing/pkg/client/injection/client" + "knative.dev/eventing/pkg/utils" + duckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + "knative.dev/pkg/kmeta" + "knative.dev/pkg/logging" logtesting "knative.dev/pkg/logging/testing" . "knative.dev/pkg/reconciler/testing" @@ -47,12 +50,10 @@ import ( "knative.dev/eventing-contrib/kafka/channel/pkg/reconciler/controller/resources" reconcilekafkatesting "knative.dev/eventing-contrib/kafka/channel/pkg/reconciler/testing" reconcilertesting "knative.dev/eventing-contrib/kafka/channel/pkg/reconciler/testing" - eventingClient "knative.dev/eventing/pkg/client/injection/client" - kubeclient "knative.dev/pkg/client/injection/kube/client" + . "knative.dev/eventing-contrib/kafka/channel/pkg/utils" ) const ( - systemNS = "knative-eventing" testNS = "test-namespace" kcName = "test-kc" testDispatcherImage = "test-image" @@ -62,11 +63,6 @@ const ( ) var ( - trueVal = true - // deletionTime is used when objects are marked as deleted. Rfc3339Copy() - // truncates to seconds to match the loss of precision during serialization. - deletionTime = metav1.Now().Rfc3339Copy() - finalizerUpdatedEvent = Eventf(corev1.EventTypeNormal, "FinalizerUpdate", `Updated "test-kc" finalizers`) ) @@ -270,7 +266,7 @@ func TestAllCases(t *testing.T) { makeReadyEndpoints(), reconcilekafkatesting.NewKafkaChannel(kcName, testNS, reconcilekafkatesting.WithKafkaFinalizer(finalizerName)), - makeChannelServiceNotOwnedByUs(reconcilekafkatesting.NewKafkaChannel(kcName, testNS)), + makeChannelServiceNotOwnedByUs(), }, WantErr: true, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ @@ -328,7 +324,6 @@ func TestAllCases(t *testing.T) { table.Test(t, reconcilertesting.MakeFactory(func(ctx context.Context, listers *reconcilekafkatesting.Listers, cmw configmap.Watcher) controller.Reconciler { r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), systemNamespace: testNS, dispatcherImage: testDispatcherImage, kafkaConfig: &KafkaConfig{ @@ -345,7 +340,7 @@ func TestAllCases(t *testing.T) { KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), } - return kafkachannel.NewReconciler(ctx, r.Logger, r.kafkaClientSet, listers.GetKafkaChannelLister(), r.Recorder, r) + return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) } @@ -387,7 +382,6 @@ func TestTopicExists(t *testing.T) { row.Test(t, reconcilertesting.MakeFactory(func(ctx context.Context, listers *reconcilekafkatesting.Listers, cmw configmap.Watcher) controller.Reconciler { r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), systemNamespace: testNS, dispatcherImage: testDispatcherImage, kafkaConfig: &KafkaConfig{ @@ -412,7 +406,7 @@ func TestTopicExists(t *testing.T) { KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), } - return kafkachannel.NewReconciler(ctx, r.Logger, r.kafkaClientSet, listers.GetKafkaChannelLister(), r.Recorder, r) + return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) } @@ -458,7 +452,6 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) { row.Test(t, reconcilertesting.MakeFactory(func(ctx context.Context, listers *reconcilekafkatesting.Listers, cmw configmap.Watcher) controller.Reconciler { r := &Reconciler{ - Base: reconciler.NewBase(ctx, controllerAgentName, cmw), systemNamespace: testNS, dispatcherImage: testDispatcherImage, kafkaConfig: &KafkaConfig{ @@ -483,7 +476,7 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) { KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), } - return kafkachannel.NewReconciler(ctx, r.Logger, r.kafkaClientSet, listers.GetKafkaChannelLister(), r.Recorder, r) + return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) } @@ -611,7 +604,7 @@ func makeChannelService(nc *v1alpha1.KafkaChannel) *corev1.Service { } } -func makeChannelServiceNotOwnedByUs(nc *v1alpha1.KafkaChannel) *corev1.Service { +func makeChannelServiceNotOwnedByUs() *corev1.Service { return &corev1.Service{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -658,12 +651,3 @@ func patchFinalizers(namespace, name string) clientgotesting.PatchActionImpl { action.Patch = []byte(patch) return action } - -func patchRemoveFinalizers(namespace, name string) clientgotesting.PatchActionImpl { - action := clientgotesting.PatchActionImpl{} - action.Name = name - action.Namespace = namespace - patch := `{"metadata":{"finalizers":[],"resourceVersion":""}}` - action.Patch = []byte(patch) - return action -} diff --git a/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go b/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go index 9dd36924c6..7be3b99f4e 100644 --- a/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go +++ b/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go @@ -22,18 +22,24 @@ import ( "reflect" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/eventing/pkg/apis/eventing" "knative.dev/eventing/pkg/channel/fanout" "knative.dev/eventing/pkg/channel/multichannelfanout" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/logging" - "knative.dev/eventing/pkg/reconciler" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/injection" @@ -71,7 +77,7 @@ const ( // Reconciler reconciles Kafka Channels. type Reconciler struct { - *reconciler.Base + recorder record.EventRecorder kafkaDispatcher *dispatcher.KafkaDispatcher @@ -86,13 +92,9 @@ var _ controller.Reconciler = (*Reconciler)(nil) // NewController initializes the controller and is called by the generated code. // Registers event handlers to enqueue events. -func NewController( - ctx context.Context, - cmw configmap.Watcher, -) *controller.Impl { +func NewController(ctx context.Context, _ configmap.Watcher, cfg *rest.Config) *controller.Impl { logger := logging.FromContext(ctx) - base := reconciler.NewBase(ctx, controllerAgentName, cmw) configMap, err := configmap.Load("/etc/config-kafka") if err != nil { @@ -125,15 +127,15 @@ func NewController( logger.Info("Kafka broker configuration", zap.Strings(utils.BrokerConfigMapKey, kafkaConfig.Brokers)) r := &Reconciler{ - Base: base, + recorder: getRecorder(ctx, cfg), kafkaDispatcher: kafkaDispatcher, + kafkaClientSet: kafkaclientsetinjection.Get(ctx), kafkachannelLister: kafkaChannelInformer.Lister(), kafkachannelInformer: kafkaChannelInformer.Informer(), - kafkaClientSet: kafkaclientsetinjection.Get(ctx), } - r.impl = controller.NewImpl(r, r.Logger, ReconcilerName) + r.impl = controller.NewImpl(r, logger.Sugar(), ReconcilerName) - r.Logger.Info("Setting up event handlers") + logger.Info("Setting up event handlers") // Watch for kafka channels. kafkaChannelInformer.Informer().AddEventHandler( @@ -152,6 +154,29 @@ func NewController( return r.impl } +func getRecorder(ctx context.Context, cfg *rest.Config) record.EventRecorder { + recorder := controller.GetEventRecorder(ctx) + if recorder == nil { + // Create event broadcaster + logging.FromContext(ctx).Debug("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + watches := []watch.Interface{ + eventBroadcaster.StartLogging(logging.FromContext(ctx).Sugar().Named("event-broadcaster").Infof), + eventBroadcaster.StartRecordingToSink( + &typedcorev1.EventSinkImpl{Interface: kubernetes.NewForConfigOrDie(cfg).CoreV1().Events("")}), + } + recorder = eventBroadcaster.NewRecorder( + scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + go func() { + <-ctx.Done() + for _, w := range watches { + w.Stop() + } + }() + } + return recorder +} + func filterWithAnnotation(namespaced bool) func(obj interface{}) bool { if namespaced { return pkgreconciler.AnnotationFilterFunc(eventing.ScopeAnnotationKey, "namespace", false) @@ -189,16 +214,16 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { reconcileErr := r.reconcile(ctx, channel) if reconcileErr != nil { logging.FromContext(ctx).Error("Error reconciling KafkaChannel", zap.Error(reconcileErr)) - r.Recorder.Eventf(channel, corev1.EventTypeWarning, channelReconcileFailed, "KafkaChannel reconciliation failed: %v", reconcileErr) + r.recorder.Eventf(channel, corev1.EventTypeWarning, channelReconcileFailed, "KafkaChannel reconciliation failed: %v", reconcileErr) } else { logging.FromContext(ctx).Debug("KafkaChannel reconciled") - r.Recorder.Event(channel, corev1.EventTypeNormal, channelReconciled, "KafkaChannel reconciled") + r.recorder.Event(channel, corev1.EventTypeNormal, channelReconciled, "KafkaChannel reconciled") } // TODO: Should this check for subscribable status rather than entire status? if _, updateStatusErr := r.updateStatus(ctx, channel); updateStatusErr != nil { logging.FromContext(ctx).Error("Failed to update KafkaChannel status", zap.Error(updateStatusErr)) - r.Recorder.Eventf(channel, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update KafkaChannel's status: %v", updateStatusErr) + r.recorder.Eventf(channel, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update KafkaChannel's status: %v", updateStatusErr) return updateStatusErr }