Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Get rid of probing for checking subscription readiness
Browse files Browse the repository at this point in the history
aliok committed Sep 22, 2021
1 parent 23993ad commit 05189a6
Showing 9 changed files with 6 additions and 1,258 deletions.
9 changes: 0 additions & 9 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
@@ -95,15 +95,6 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
topicFunc: args.TopicFunc,
}

// initialize and start the subscription endpoint server
subscriptionEndpoint := &subscriptionEndpoint{
dispatcher: dispatcher,
logger: logging.FromContext(ctx),
}
go func() {
subscriptionEndpoint.start()
}()

podName, err := env.GetRequiredConfigValue(logging.FromContext(ctx).Desugar(), env.PodNameEnvVarKey)
if err != nil {
return nil, err
60 changes: 0 additions & 60 deletions pkg/channel/consolidated/dispatcher/subscription_endpoint.go

This file was deleted.

155 changes: 0 additions & 155 deletions pkg/channel/consolidated/dispatcher/subscription_endpoint_test.go

This file was deleted.

26 changes: 6 additions & 20 deletions pkg/channel/consolidated/reconciler/controller/controller.go
Original file line number Diff line number Diff line change
@@ -26,11 +26,9 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/pointer"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingClient "knative.dev/eventing/pkg/client/injection/client"
kubeclient "knative.dev/pkg/client/injection/kube/client"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
@@ -45,8 +43,6 @@ import (
knativeReconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/system"

"knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
"knative.dev/eventing-kafka/pkg/channel/consolidated/status"
kafkamessagingv1beta1 "knative.dev/eventing-kafka/pkg/client/informers/externalversions/messaging/v1beta1"
kafkaChannelClient "knative.dev/eventing-kafka/pkg/client/injection/client"
"knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
@@ -110,17 +106,6 @@ func NewController(

impl := kafkaChannelReconciler.NewImpl(ctx, r)

statusProber := status.NewProber(
logger.Named("status-manager"),
NewProbeTargetLister(logger, endpointsInformer.Lister()),
func(c v1beta1.KafkaChannel, s eventingduckv1.SubscriberSpec) {
logger.Debugf("Ready callback triggered for channel: %s/%s subscription: %s", c.Namespace, c.Name, string(s.UID))
impl.EnqueueKey(types.NamespacedName{Namespace: c.Namespace, Name: c.Name})
},
)
r.statusManager = statusProber
statusProber.Start(ctx.Done())

// Call GlobalResync on kafkachannels.
grCh := func(obj interface{}) {
logger.Debug("Changes detected, doing global resync")
@@ -174,22 +159,23 @@ func NewController(
knativeReconciler.LabelFilterFunc(roleLabelKey, dispatcherRoleLabelValue, false),
),
Handler: cache.ResourceEventHandlerFuncs{
// Cancel probing when a Pod is deleted
DeleteFunc: getPodInformerEventHandler(ctx, logger, statusProber, impl, kafkaChannelInformer, "Delete"),
AddFunc: getPodInformerEventHandler(ctx, logger, statusProber, impl, kafkaChannelInformer, "Add"),
// TODO: do we still need this?
// Global sync when a Pod is deleted or added
DeleteFunc: getPodInformerEventHandler(ctx, logger, impl, kafkaChannelInformer, "Delete"),
AddFunc: getPodInformerEventHandler(ctx, logger, impl, kafkaChannelInformer, "Add"),
},
})

return impl
}

func getPodInformerEventHandler(ctx context.Context, logger *zap.SugaredLogger, statusProber *status.Prober, impl *controller.Impl, kafkaChannelInformer kafkamessagingv1beta1.KafkaChannelInformer, handlerType string) func(obj interface{}) {
func getPodInformerEventHandler(ctx context.Context, logger *zap.SugaredLogger, impl *controller.Impl, kafkaChannelInformer kafkamessagingv1beta1.KafkaChannelInformer, handlerType string) func(obj interface{}) {
return func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if ok && pod != nil {
logger.Debugw("%s pods. Refreshing pod probing.", handlerType,
zap.String("pod", pod.GetName()))
statusProber.RefreshPodProbing(ctx)
// TODO: do we still need this? see above!
impl.GlobalResync(kafkaChannelInformer.Informer())
}
}
Original file line number Diff line number Diff line change
@@ -49,7 +49,6 @@ import (

"knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
"knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/controller/resources"
"knative.dev/eventing-kafka/pkg/channel/consolidated/status"
"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
kafkaclientset "knative.dev/eventing-kafka/pkg/client/clientset/versioned"
kafkaScheme "knative.dev/eventing-kafka/pkg/client/clientset/versioned/scheme"
@@ -135,7 +134,6 @@ type Reconciler struct {
endpointsLister corev1listers.EndpointsLister
serviceAccountLister corev1listers.ServiceAccountLister
roleBindingLister rbacv1listers.RoleBindingLister
statusManager status.Manager
controllerRef metav1.OwnerReference
}

@@ -635,9 +633,5 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1beta1.KafkaChannel)
return err
}
}
for _, s := range kc.Spec.Subscribers {
logger.Debugw("Canceling probing", zap.String("channel", channel), zap.Any("subscription", s))
r.statusManager.CancelProbing(s)
}
return newReconciledNormal(kc.Namespace, kc.Name) //ok to remove finalizer
}
Original file line number Diff line number Diff line change
@@ -348,12 +348,6 @@ func TestAllCases(t *testing.T) {
kafkaClientSet: fakekafkaclient.Get(ctx),
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
statusManager: &fakeStatusManager{
FakeIsReady: func(ctx context.Context, ch v1beta1.KafkaChannel,
sub eventingduckv1.SubscriberSpec) (bool, error) {
return true, nil
},
},
}
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
}, zap.L()))
@@ -422,12 +416,6 @@ func TestTopicExists(t *testing.T) {
kafkaClientSet: fakekafkaclient.Get(ctx),
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
statusManager: &fakeStatusManager{
FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel,
spec eventingduckv1.SubscriberSpec) (bool, error) {
return true, nil
},
},
}
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
}, zap.L()))
@@ -500,12 +488,6 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) {
kafkaClientSet: fakekafkaclient.Get(ctx),
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
statusManager: &fakeStatusManager{
FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel,
spec eventingduckv1.SubscriberSpec) (bool, error) {
return true, nil
},
},
}
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
}, zap.L()))
@@ -578,12 +560,6 @@ func TestDeploymentZeroReplicas(t *testing.T) {
kafkaClientSet: fakekafkaclient.Get(ctx),
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
statusManager: &fakeStatusManager{
FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel,
spec eventingduckv1.SubscriberSpec) (bool, error) {
return true, nil
},
},
}
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
}, zap.L()))
@@ -652,12 +628,6 @@ func TestDeploymentMoreThanOneReplicas(t *testing.T) {
kafkaClientSet: fakekafkaclient.Get(ctx),
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
statusManager: &fakeStatusManager{
FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel,
spec eventingduckv1.SubscriberSpec) (bool, error) {
return true, nil
},
},
}
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
}, zap.L()))
@@ -730,12 +700,6 @@ func TestDeploymentUpdatedOnConfigMapHashChange(t *testing.T) {
kafkaClientSet: fakekafkaclient.Get(ctx),
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
statusManager: &fakeStatusManager{
FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel,
spec eventingduckv1.SubscriberSpec) (bool, error) {
return true, nil
},
},
}
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
}, zap.L()))
@@ -970,22 +934,6 @@ func subscribers() []eventingduckv1.SubscriberSpec {
}}
}

type fakeStatusManager struct {
FakeIsReady func(context.Context, v1beta1.KafkaChannel, eventingduckv1.SubscriberSpec) (bool, error)
}

func (m *fakeStatusManager) IsReady(ctx context.Context, ch v1beta1.KafkaChannel, sub eventingduckv1.SubscriberSpec) (bool, error) {
return m.FakeIsReady(ctx, ch, sub)
}

func (m *fakeStatusManager) CancelProbing(sub eventingduckv1.SubscriberSpec) {
//do nothing
}

func (m *fakeStatusManager) CancelPodProbing(pod corev1.Pod) {
//do nothing
}

func makePatch(namespace, name, patch string) clientgotesting.PatchActionImpl {
return clientgotesting.PatchActionImpl{
ActionImpl: clientgotesting.ActionImpl{
83 changes: 0 additions & 83 deletions pkg/channel/consolidated/reconciler/controller/lister.go

This file was deleted.

533 changes: 0 additions & 533 deletions pkg/channel/consolidated/status/status.go

This file was deleted.

340 changes: 0 additions & 340 deletions pkg/channel/consolidated/status/status_test.go

This file was deleted.

0 comments on commit 05189a6

Please sign in to comment.