Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Get rid of probing for checking subscription readiness
Browse files Browse the repository at this point in the history
  • Loading branch information
aliok committed Sep 22, 2021
1 parent 23993ad commit 1058793
Show file tree
Hide file tree
Showing 9 changed files with 6 additions and 1,262 deletions.
13 changes: 0 additions & 13 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ import (
"knative.dev/eventing-kafka/pkg/common/tracing"
)

const (
dispatcherReadySubHeader = "K-Subscriber-Status"
)

type TopicFunc func(separator, namespace, name string) string

type KafkaDispatcherArgs struct {
Expand Down Expand Up @@ -95,15 +91,6 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
topicFunc: args.TopicFunc,
}

// initialize and start the subscription endpoint server
subscriptionEndpoint := &subscriptionEndpoint{
dispatcher: dispatcher,
logger: logging.FromContext(ctx),
}
go func() {
subscriptionEndpoint.start()
}()

podName, err := env.GetRequiredConfigValue(logging.FromContext(ctx).Desugar(), env.PodNameEnvVarKey)
if err != nil {
return nil, err
Expand Down
60 changes: 0 additions & 60 deletions pkg/channel/consolidated/dispatcher/subscription_endpoint.go

This file was deleted.

155 changes: 0 additions & 155 deletions pkg/channel/consolidated/dispatcher/subscription_endpoint_test.go

This file was deleted.

26 changes: 6 additions & 20 deletions pkg/channel/consolidated/reconciler/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/pointer"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingClient "knative.dev/eventing/pkg/client/injection/client"
kubeclient "knative.dev/pkg/client/injection/kube/client"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
Expand All @@ -45,8 +43,6 @@ import (
knativeReconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/system"

"knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
"knative.dev/eventing-kafka/pkg/channel/consolidated/status"
kafkamessagingv1beta1 "knative.dev/eventing-kafka/pkg/client/informers/externalversions/messaging/v1beta1"
kafkaChannelClient "knative.dev/eventing-kafka/pkg/client/injection/client"
"knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
Expand Down Expand Up @@ -110,17 +106,6 @@ func NewController(

impl := kafkaChannelReconciler.NewImpl(ctx, r)

statusProber := status.NewProber(
logger.Named("status-manager"),
NewProbeTargetLister(logger, endpointsInformer.Lister()),
func(c v1beta1.KafkaChannel, s eventingduckv1.SubscriberSpec) {
logger.Debugf("Ready callback triggered for channel: %s/%s subscription: %s", c.Namespace, c.Name, string(s.UID))
impl.EnqueueKey(types.NamespacedName{Namespace: c.Namespace, Name: c.Name})
},
)
r.statusManager = statusProber
statusProber.Start(ctx.Done())

// Call GlobalResync on kafkachannels.
grCh := func(obj interface{}) {
logger.Debug("Changes detected, doing global resync")
Expand Down Expand Up @@ -174,22 +159,23 @@ func NewController(
knativeReconciler.LabelFilterFunc(roleLabelKey, dispatcherRoleLabelValue, false),
),
Handler: cache.ResourceEventHandlerFuncs{
// Cancel probing when a Pod is deleted
DeleteFunc: getPodInformerEventHandler(ctx, logger, statusProber, impl, kafkaChannelInformer, "Delete"),
AddFunc: getPodInformerEventHandler(ctx, logger, statusProber, impl, kafkaChannelInformer, "Add"),
// TODO: do we still need this?
// Global sync when a Pod is deleted or added
DeleteFunc: getPodInformerEventHandler(ctx, logger, impl, kafkaChannelInformer, "Delete"),
AddFunc: getPodInformerEventHandler(ctx, logger, impl, kafkaChannelInformer, "Add"),
},
})

return impl
}

func getPodInformerEventHandler(ctx context.Context, logger *zap.SugaredLogger, statusProber *status.Prober, impl *controller.Impl, kafkaChannelInformer kafkamessagingv1beta1.KafkaChannelInformer, handlerType string) func(obj interface{}) {
func getPodInformerEventHandler(ctx context.Context, logger *zap.SugaredLogger, impl *controller.Impl, kafkaChannelInformer kafkamessagingv1beta1.KafkaChannelInformer, handlerType string) func(obj interface{}) {
return func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if ok && pod != nil {
logger.Debugw("%s pods. Refreshing pod probing.", handlerType,
zap.String("pod", pod.GetName()))
statusProber.RefreshPodProbing(ctx)
// TODO: do we still need this? see above!
impl.GlobalResync(kafkaChannelInformer.Informer())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (

"knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
"knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/controller/resources"
"knative.dev/eventing-kafka/pkg/channel/consolidated/status"
"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
kafkaclientset "knative.dev/eventing-kafka/pkg/client/clientset/versioned"
kafkaScheme "knative.dev/eventing-kafka/pkg/client/clientset/versioned/scheme"
Expand Down Expand Up @@ -135,7 +134,6 @@ type Reconciler struct {
endpointsLister corev1listers.EndpointsLister
serviceAccountLister corev1listers.ServiceAccountLister
roleBindingLister rbacv1listers.RoleBindingLister
statusManager status.Manager
controllerRef metav1.OwnerReference
}

Expand Down Expand Up @@ -635,9 +633,5 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1beta1.KafkaChannel)
return err
}
}
for _, s := range kc.Spec.Subscribers {
logger.Debugw("Canceling probing", zap.String("channel", channel), zap.Any("subscription", s))
r.statusManager.CancelProbing(s)
}
return newReconciledNormal(kc.Namespace, kc.Name) //ok to remove finalizer
}
Loading

0 comments on commit 1058793

Please sign in to comment.