diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index 8c0053d25b..cc4a959962 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -43,10 +43,6 @@ import ( "knative.dev/eventing-kafka/pkg/common/tracing" ) -const ( - dispatcherReadySubHeader = "K-Subscriber-Status" -) - type TopicFunc func(separator, namespace, name string) string type KafkaDispatcherArgs struct { @@ -95,15 +91,6 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat topicFunc: args.TopicFunc, } - // initialize and start the subscription endpoint server - subscriptionEndpoint := &subscriptionEndpoint{ - dispatcher: dispatcher, - logger: logging.FromContext(ctx), - } - go func() { - subscriptionEndpoint.start() - }() - podName, err := env.GetRequiredConfigValue(logging.FromContext(ctx).Desugar(), env.PodNameEnvVarKey) if err != nil { return nil, err diff --git a/pkg/channel/consolidated/dispatcher/subscription_endpoint.go b/pkg/channel/consolidated/dispatcher/subscription_endpoint.go deleted file mode 100644 index 942d9e26cf..0000000000 --- a/pkg/channel/consolidated/dispatcher/subscription_endpoint.go +++ /dev/null @@ -1,60 +0,0 @@ -package dispatcher - -import ( - "encoding/json" - nethttp "net/http" - "strings" - - "go.uber.org/zap" - "k8s.io/apimachinery/pkg/types" -) - -// subscriptionEndpoint is serving the subscription status of the Kafka channel. -// A prober in the controller calls the endpoint to see if the subscriper is ready. -type subscriptionEndpoint struct { - dispatcher *KafkaDispatcher - logger *zap.SugaredLogger -} - -func (d *subscriptionEndpoint) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request) { - if r.Method != nethttp.MethodGet { - w.WriteHeader(nethttp.StatusMethodNotAllowed) - d.logger.Errorf("Received request method that wasn't GET: %s", r.Method) - return - } - uriSplit := strings.Split(r.RequestURI, "/") - if len(uriSplit) != 3 { - w.WriteHeader(nethttp.StatusNotFound) - d.logger.Errorf("Unable to process request: %s", r.RequestURI) - return - } - channelRefNamespace, channelRefName := uriSplit[1], uriSplit[2] - channelRef := types.NamespacedName{ - Name: channelRefName, - Namespace: channelRefNamespace, - } - if _, ok := d.dispatcher.channelSubscriptions[channelRef]; !ok { - w.WriteHeader(nethttp.StatusNotFound) - return - } - d.dispatcher.channelSubscriptions[channelRef].readySubscriptionsLock.RLock() - defer d.dispatcher.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock() - var subscriptions = make(map[string][]int32) - w.Header().Set(dispatcherReadySubHeader, channelRefName) - for s, ps := range d.dispatcher.channelSubscriptions[channelRef].channelReadySubscriptions { - subscriptions[s] = ps.List() - } - jsonResult, err := json.Marshal(subscriptions) - if err != nil { - d.logger.Errorf("Error marshalling json for sub-status channelref: %s/%s, %w", channelRefNamespace, channelRefName, err) - return - } - _, err = w.Write(jsonResult) - if err != nil { - d.logger.Errorf("Error writing jsonResult to serveHTTP writer: %w", err) - } -} - -func (d *subscriptionEndpoint) start() { - d.logger.Fatal(nethttp.ListenAndServe(":8081", d)) -} diff --git a/pkg/channel/consolidated/dispatcher/subscription_endpoint_test.go b/pkg/channel/consolidated/dispatcher/subscription_endpoint_test.go deleted file mode 100644 index 63d7b0a6f9..0000000000 --- a/pkg/channel/consolidated/dispatcher/subscription_endpoint_test.go +++ /dev/null @@ -1,155 +0,0 @@ -package dispatcher - -import ( - "fmt" - "io/ioutil" - "net/http" - "net/http/httptest" - "testing" - - "github.com/google/go-cmp/cmp" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - klogtesting "knative.dev/pkg/logging/testing" -) - -func TestServeHTTP(t *testing.T) { - - httpGet := "GET" - httpPost := "POST" - testCases := []struct { - name string - responseReturnCode int - desiredJson []byte - channelSubs map[types.NamespacedName]*KafkaSubscription - requestURI string - httpMethod string - }{ - { - name: "channelref not found", - httpMethod: httpGet, - responseReturnCode: http.StatusNotFound, - desiredJson: []byte{}, - requestURI: "/exist/thisDoesNot", - }, { - name: "nop", - httpMethod: httpGet, - responseReturnCode: http.StatusNotFound, - desiredJson: []byte{}, - requestURI: "///", - }, { - name: "no ready subscribers", - httpMethod: httpGet, - responseReturnCode: http.StatusOK, - desiredJson: []byte(`{}`), - channelSubs: map[types.NamespacedName]*KafkaSubscription{ - {Name: "foo", Namespace: "bar"}: { - subs: sets.NewString(), - channelReadySubscriptions: map[string]sets.Int32{}, - }, - }, - requestURI: "/bar/foo", - }, { - name: "different channelref called from populated channref (different ns)", - httpMethod: httpGet, - desiredJson: []byte{}, - responseReturnCode: http.StatusNotFound, - channelSubs: map[types.NamespacedName]*KafkaSubscription{ - {Name: "foo", Namespace: "baz"}: { - subs: sets.NewString("a", "b"), - channelReadySubscriptions: map[string]sets.Int32{ - "a": sets.NewInt32(0), - "b": sets.NewInt32(0), - }, - }, - }, - requestURI: "/bar/foo", - }, { - name: "return correct subscription", - httpMethod: httpGet, - desiredJson: []byte(`{"a":[0],"b":[0,2,5]}`), - responseReturnCode: http.StatusOK, - channelSubs: map[types.NamespacedName]*KafkaSubscription{ - {Name: "foo", Namespace: "bar"}: { - subs: sets.NewString("a", "b"), - channelReadySubscriptions: map[string]sets.Int32{ - "a": sets.NewInt32(0), - "b": sets.NewInt32(0, 2, 5), - }, - }, - }, - requestURI: "/bar/foo", - }, { - name: "return correct subscription from multiple chanrefs", - httpMethod: httpGet, - desiredJson: []byte(`{"a":[0],"b":[0,2,5]}`), - responseReturnCode: http.StatusOK, - channelSubs: map[types.NamespacedName]*KafkaSubscription{ - {Name: "table", Namespace: "flip"}: { - subs: sets.NewString("c", "d"), - channelReadySubscriptions: map[string]sets.Int32{ - "c": sets.NewInt32(0), - "d": sets.NewInt32(0), - }}, - {Name: "foo", Namespace: "bar"}: { - subs: sets.NewString("a", "b"), - channelReadySubscriptions: map[string]sets.Int32{ - "a": sets.NewInt32(0), - "b": sets.NewInt32(0, 2, 5), - }, - }, - }, - requestURI: "/bar/foo", - }, { - name: "bad request uri", - httpMethod: httpGet, - desiredJson: []byte{}, - responseReturnCode: http.StatusNotFound, - requestURI: "/here/be/dragons/there/are/too/many/slashes", - }, { - name: "bad request method (POST)", - httpMethod: httpPost, - desiredJson: []byte{}, - responseReturnCode: http.StatusMethodNotAllowed, - }, - } - logger := klogtesting.TestLogger(t) - d := &KafkaDispatcher{ - channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription), - logger: logger, - } - subscriptionEndpoint := &subscriptionEndpoint{ - dispatcher: d, - logger: logger, - } - - ts := httptest.NewServer(subscriptionEndpoint) - defer ts.Close() - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - t.Logf("Running %s", t.Name()) - d.channelSubscriptions = tc.channelSubs - - request, _ := http.NewRequest(tc.httpMethod, fmt.Sprintf("%s%s", ts.URL, tc.requestURI), nil) - // resp, err := http.Get(fmt.Sprintf("%s%s", ts.URL, tc.requestURI)) - resp, err := http.DefaultClient.Do(request) - if err != nil { - t.Errorf("Could not send request to subscriber endpoint: %v", err) - } - if resp.StatusCode != tc.responseReturnCode { - t.Errorf("unepxected status returned: want: %d, got: %d", tc.responseReturnCode, resp.StatusCode) - } - respBody, err := ioutil.ReadAll(resp.Body) - defer resp.Body.Close() - if err != nil { - t.Errorf("Could not read response from subscriber endpoint: %v", err) - } - if testing.Verbose() && len(respBody) > 0 { - t.Logf("http response: %s\n", string(respBody)) - } - if diff := cmp.Diff(tc.desiredJson, respBody); diff != "" { - t.Errorf("unexpected readysubscriber status response: (-want, +got) = %v", diff) - } - }) - } -} diff --git a/pkg/channel/consolidated/reconciler/controller/controller.go b/pkg/channel/consolidated/reconciler/controller/controller.go index f96afaf1fe..4da18ab7b1 100644 --- a/pkg/channel/consolidated/reconciler/controller/controller.go +++ b/pkg/channel/consolidated/reconciler/controller/controller.go @@ -26,11 +26,9 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/utils/pointer" - eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" eventingClient "knative.dev/eventing/pkg/client/injection/client" kubeclient "knative.dev/pkg/client/injection/kube/client" deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" @@ -45,8 +43,6 @@ import ( knativeReconciler "knative.dev/pkg/reconciler" "knative.dev/pkg/system" - "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" - "knative.dev/eventing-kafka/pkg/channel/consolidated/status" kafkamessagingv1beta1 "knative.dev/eventing-kafka/pkg/client/informers/externalversions/messaging/v1beta1" kafkaChannelClient "knative.dev/eventing-kafka/pkg/client/injection/client" "knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel" @@ -110,17 +106,6 @@ func NewController( impl := kafkaChannelReconciler.NewImpl(ctx, r) - statusProber := status.NewProber( - logger.Named("status-manager"), - NewProbeTargetLister(logger, endpointsInformer.Lister()), - func(c v1beta1.KafkaChannel, s eventingduckv1.SubscriberSpec) { - logger.Debugf("Ready callback triggered for channel: %s/%s subscription: %s", c.Namespace, c.Name, string(s.UID)) - impl.EnqueueKey(types.NamespacedName{Namespace: c.Namespace, Name: c.Name}) - }, - ) - r.statusManager = statusProber - statusProber.Start(ctx.Done()) - // Call GlobalResync on kafkachannels. grCh := func(obj interface{}) { logger.Debug("Changes detected, doing global resync") @@ -174,22 +159,23 @@ func NewController( knativeReconciler.LabelFilterFunc(roleLabelKey, dispatcherRoleLabelValue, false), ), Handler: cache.ResourceEventHandlerFuncs{ - // Cancel probing when a Pod is deleted - DeleteFunc: getPodInformerEventHandler(ctx, logger, statusProber, impl, kafkaChannelInformer, "Delete"), - AddFunc: getPodInformerEventHandler(ctx, logger, statusProber, impl, kafkaChannelInformer, "Add"), + // TODO: do we still need this? + // Global sync when a Pod is deleted or added + DeleteFunc: getPodInformerEventHandler(ctx, logger, impl, kafkaChannelInformer, "Delete"), + AddFunc: getPodInformerEventHandler(ctx, logger, impl, kafkaChannelInformer, "Add"), }, }) return impl } -func getPodInformerEventHandler(ctx context.Context, logger *zap.SugaredLogger, statusProber *status.Prober, impl *controller.Impl, kafkaChannelInformer kafkamessagingv1beta1.KafkaChannelInformer, handlerType string) func(obj interface{}) { +func getPodInformerEventHandler(ctx context.Context, logger *zap.SugaredLogger, impl *controller.Impl, kafkaChannelInformer kafkamessagingv1beta1.KafkaChannelInformer, handlerType string) func(obj interface{}) { return func(obj interface{}) { pod, ok := obj.(*corev1.Pod) if ok && pod != nil { logger.Debugw("%s pods. Refreshing pod probing.", handlerType, zap.String("pod", pod.GetName())) - statusProber.RefreshPodProbing(ctx) + // TODO: do we still need this? see above! impl.GlobalResync(kafkaChannelInformer.Informer()) } } diff --git a/pkg/channel/consolidated/reconciler/controller/kafkachannel.go b/pkg/channel/consolidated/reconciler/controller/kafkachannel.go index 352b11076f..c3f7926d37 100644 --- a/pkg/channel/consolidated/reconciler/controller/kafkachannel.go +++ b/pkg/channel/consolidated/reconciler/controller/kafkachannel.go @@ -49,7 +49,6 @@ import ( "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" "knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/controller/resources" - "knative.dev/eventing-kafka/pkg/channel/consolidated/status" "knative.dev/eventing-kafka/pkg/channel/consolidated/utils" kafkaclientset "knative.dev/eventing-kafka/pkg/client/clientset/versioned" kafkaScheme "knative.dev/eventing-kafka/pkg/client/clientset/versioned/scheme" @@ -135,7 +134,6 @@ type Reconciler struct { endpointsLister corev1listers.EndpointsLister serviceAccountLister corev1listers.ServiceAccountLister roleBindingLister rbacv1listers.RoleBindingLister - statusManager status.Manager controllerRef metav1.OwnerReference } @@ -635,9 +633,5 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1beta1.KafkaChannel) return err } } - for _, s := range kc.Spec.Subscribers { - logger.Debugw("Canceling probing", zap.String("channel", channel), zap.Any("subscription", s)) - r.statusManager.CancelProbing(s) - } return newReconciledNormal(kc.Namespace, kc.Name) //ok to remove finalizer } diff --git a/pkg/channel/consolidated/reconciler/controller/kafkachannel_test.go b/pkg/channel/consolidated/reconciler/controller/kafkachannel_test.go index b25d11263c..a82dfaf969 100644 --- a/pkg/channel/consolidated/reconciler/controller/kafkachannel_test.go +++ b/pkg/channel/consolidated/reconciler/controller/kafkachannel_test.go @@ -348,12 +348,6 @@ func TestAllCases(t *testing.T) { kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), - statusManager: &fakeStatusManager{ - FakeIsReady: func(ctx context.Context, ch v1beta1.KafkaChannel, - sub eventingduckv1.SubscriberSpec) (bool, error) { - return true, nil - }, - }, } return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) @@ -422,12 +416,6 @@ func TestTopicExists(t *testing.T) { kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), - statusManager: &fakeStatusManager{ - FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel, - spec eventingduckv1.SubscriberSpec) (bool, error) { - return true, nil - }, - }, } return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) @@ -500,12 +488,6 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) { kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), - statusManager: &fakeStatusManager{ - FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel, - spec eventingduckv1.SubscriberSpec) (bool, error) { - return true, nil - }, - }, } return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) @@ -578,12 +560,6 @@ func TestDeploymentZeroReplicas(t *testing.T) { kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), - statusManager: &fakeStatusManager{ - FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel, - spec eventingduckv1.SubscriberSpec) (bool, error) { - return true, nil - }, - }, } return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) @@ -652,12 +628,6 @@ func TestDeploymentMoreThanOneReplicas(t *testing.T) { kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), - statusManager: &fakeStatusManager{ - FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel, - spec eventingduckv1.SubscriberSpec) (bool, error) { - return true, nil - }, - }, } return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) @@ -730,12 +700,6 @@ func TestDeploymentUpdatedOnConfigMapHashChange(t *testing.T) { kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), - statusManager: &fakeStatusManager{ - FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel, - spec eventingduckv1.SubscriberSpec) (bool, error) { - return true, nil - }, - }, } return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) @@ -970,22 +934,6 @@ func subscribers() []eventingduckv1.SubscriberSpec { }} } -type fakeStatusManager struct { - FakeIsReady func(context.Context, v1beta1.KafkaChannel, eventingduckv1.SubscriberSpec) (bool, error) -} - -func (m *fakeStatusManager) IsReady(ctx context.Context, ch v1beta1.KafkaChannel, sub eventingduckv1.SubscriberSpec) (bool, error) { - return m.FakeIsReady(ctx, ch, sub) -} - -func (m *fakeStatusManager) CancelProbing(sub eventingduckv1.SubscriberSpec) { - //do nothing -} - -func (m *fakeStatusManager) CancelPodProbing(pod corev1.Pod) { - //do nothing -} - func makePatch(namespace, name, patch string) clientgotesting.PatchActionImpl { return clientgotesting.PatchActionImpl{ ActionImpl: clientgotesting.ActionImpl{ diff --git a/pkg/channel/consolidated/reconciler/controller/lister.go b/pkg/channel/consolidated/reconciler/controller/lister.go deleted file mode 100644 index 07fbc68169..0000000000 --- a/pkg/channel/consolidated/reconciler/controller/lister.go +++ /dev/null @@ -1,83 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "context" - "fmt" - "net/url" - - "knative.dev/eventing-kafka/pkg/channel/consolidated/status" - - "go.uber.org/zap" - "k8s.io/apimachinery/pkg/util/sets" - v1 "k8s.io/client-go/listers/core/v1" - "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" - "knative.dev/eventing/pkg/apis/eventing" - "knative.dev/pkg/system" -) - -type DispatcherPodsLister struct { - logger *zap.SugaredLogger - endpointLister v1.EndpointsLister -} - -func (t *DispatcherPodsLister) ListProbeTargets(ctx context.Context, kc v1beta1.KafkaChannel) (*status.ProbeTarget, error) { - scope, ok := kc.Annotations[eventing.ScopeAnnotationKey] - if !ok { - scope = scopeCluster - } - - dispatcherNamespace := system.Namespace() - if scope == scopeNamespace { - dispatcherNamespace = kc.Namespace - } - - // Get the Dispatcher Service Endpoints and propagate the status to the Channel - // endpoints has the same name as the service, so not a bug. - eps, err := t.endpointLister.Endpoints(dispatcherNamespace).Get(dispatcherName) - if err != nil { - return nil, fmt.Errorf("failed to get internal service: %w", err) - } - var readyIPs []string - - for _, sub := range eps.Subsets { - for _, address := range sub.Addresses { - readyIPs = append(readyIPs, address.IP) - } - } - - if len(readyIPs) == 0 { - return nil, fmt.Errorf("no gateway pods available") - } - - u, _ := url.Parse(fmt.Sprintf("http://%s.%s/%s/%s", dispatcherName, dispatcherNamespace, kc.Namespace, kc.Name)) - - return &status.ProbeTarget{ - PodIPs: sets.NewString(readyIPs...), - PodPort: "8081", - URL: u, - }, nil -} - -func NewProbeTargetLister(logger *zap.SugaredLogger, lister v1.EndpointsLister) status.ProbeTargetLister { - tl := DispatcherPodsLister{ - logger: logger, - endpointLister: lister, - } - return &tl -} diff --git a/pkg/channel/consolidated/status/status.go b/pkg/channel/consolidated/status/status.go deleted file mode 100644 index b930a46193..0000000000 --- a/pkg/channel/consolidated/status/status.go +++ /dev/null @@ -1,533 +0,0 @@ -/* -Copyright 2019 The Knative Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package status - -import ( - "context" - "encoding/json" - "fmt" - "net" - "net/http" - "net/url" - "reflect" - "sync" - "time" - - "go.uber.org/atomic" - "go.uber.org/zap" - "golang.org/x/time/rate" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/util/workqueue" - - messagingv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" - eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" - "knative.dev/networking/pkg/prober" - "knative.dev/pkg/logging" -) - -const ( - // probeConcurrency defines how many probing calls can be issued simultaneously - probeConcurrency = 100 - // probeTimeout defines the maximum amount of time a request will wait - probeTimeout = 1 * time.Second - // initialDelay defines the delay before enqueuing a probing request the first time. - // It gives times for the change to propagate and prevents unnecessary retries. - initialDelay = 200 * time.Millisecond -) - -var dialContext = (&net.Dialer{Timeout: probeTimeout}).DialContext - -// targetState represents the probing state of a subscription -type targetState struct { - sub eventingduckv1.SubscriberSpec - ch messagingv1beta1.KafkaChannel - - readyLock sync.RWMutex - // pendingCount is the number of pods that haven't been successfully probed yet - pendingCount atomic.Int32 - // readyCount is the number of pods that have the subscription ready - readyPartitions sets.Int - probedPods sets.String - lastAccessed time.Time - ready bool - cancel func() -} - -// podState represents the probing state of a Pod (for a specific subscription) -type podState struct { - // pendingCount is the number of probes for the Pod - pendingCount atomic.Int32 - - cancel func() -} - -// cancelContext is a pair of a Context and its cancel function -type cancelContext struct { - context context.Context - cancel func() -} - -type workItem struct { - targetStates *targetState - podState *podState - context context.Context - url *url.URL - podIP string - podPort string - logger *zap.SugaredLogger -} - -// ProbeTarget contains the URLs to probes for a set of Pod IPs serving out of the same port. -type ProbeTarget struct { - PodIPs sets.String - PodPort string - URL *url.URL -} - -// ProbeTargetLister lists all the targets that requires probing. -type ProbeTargetLister interface { - // ListProbeTargets returns a list of targets to be probed - ListProbeTargets(ctx context.Context, ch messagingv1beta1.KafkaChannel) (*ProbeTarget, error) -} - -// Manager provides a way to check if a Subscription is ready -type Manager interface { - IsReady(ctx context.Context, ch messagingv1beta1.KafkaChannel, sub eventingduckv1.SubscriberSpec) (bool, error) - CancelProbing(sub eventingduckv1.SubscriberSpec) - CancelPodProbing(pod corev1.Pod) -} - -// Prober provides a way to check if a VirtualService is ready by probing the Envoy pods -// handling that VirtualService. -type Prober struct { - logger *zap.SugaredLogger - - // mu guards targetStates and podContexts - mu sync.Mutex - targetStates map[types.UID]*targetState - podContexts map[string]cancelContext - - workQueue workqueue.RateLimitingInterface - - targetLister ProbeTargetLister - - readyCallback func(messagingv1beta1.KafkaChannel, eventingduckv1.SubscriberSpec) - - probeConcurrency int - - opts []interface{} -} - -// NewProber creates a new instance of Prober -func NewProber( - logger *zap.SugaredLogger, - targetLister ProbeTargetLister, - readyCallback func(messagingv1beta1.KafkaChannel, eventingduckv1.SubscriberSpec), opts ...interface{}) *Prober { - return &Prober{ - logger: logger, - targetStates: make(map[types.UID]*targetState), - podContexts: make(map[string]cancelContext), - workQueue: workqueue.NewNamedRateLimitingQueue( - workqueue.NewMaxOfRateLimiter( - // Per item exponential backoff - workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 5*time.Minute), - // Global rate limiter - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 1000)}, - ), - "ProbingQueue"), - targetLister: targetLister, - readyCallback: readyCallback, - probeConcurrency: probeConcurrency, - opts: opts, - } -} - -func (m *Prober) checkReadiness(state *targetState) bool { - state.readyLock.Lock() - defer state.readyLock.Unlock() - partitions := state.ch.Spec.NumPartitions - if !state.ready { - state.ready = state.readyPartitions.Len() == int(partitions) - } - m.logger.Debugw("Checking subscription readiness", - zap.Any("subscription", state.sub.UID), - zap.Any("channel", state.ch.Name), - zap.Any("pod ips", state.probedPods), - zap.Any("channel partitions", partitions), - zap.Any("ready partitions", state.readyPartitions.List()), - zap.Bool("ready", state.ready), - ) - return state.ready -} - -func (m *Prober) IsReady(ctx context.Context, ch messagingv1beta1.KafkaChannel, sub eventingduckv1.SubscriberSpec) (bool, error) { - logger := logging.FromContext(ctx) - - // Get the probe targets - target, err := m.targetLister.ListProbeTargets(ctx, ch) - if err != nil { - logger.Errorw("Error listing probe targets", zap.Error(err), - zap.Any("subscription", sub.UID)) - return false, err - } - // get the state while locking for very short scope - state, ok := func() (*targetState, bool) { - m.mu.Lock() - defer m.mu.Unlock() - s, o := m.targetStates[sub.UID] - return s, o - }() - if ok { - if !isOutdatedTargetState(state, sub, target.PodIPs) { - return m.checkReadiness(state), nil - } - m.ejectStateUnsafe(sub) - } - m.probeTarget(ctx, ch, sub, target) - return false, nil -} - -func (m *Prober) probeTarget(ctx context.Context, ch messagingv1beta1.KafkaChannel, sub eventingduckv1.SubscriberSpec, target *ProbeTarget) { - subscriptionKey := sub.UID - logger := logging.FromContext(ctx) - subCtx, cancel := context.WithCancel(context.Background()) - subscriptionState := &targetState{ - sub: sub, - ch: ch, - lastAccessed: time.Now(), - cancel: cancel, - } - - // Group the probe targets by IP - workItems := make(map[string][]*workItem) - for ip := range target.PodIPs { - workItems[ip] = append(workItems[ip], &workItem{ - targetStates: subscriptionState, - url: target.URL, - podIP: ip, - podPort: target.PodPort, - logger: logger, - }) - } - - subscriptionState.probedPods = target.PodIPs - subscriptionState.pendingCount.Store(int32(len(workItems))) - subscriptionState.readyPartitions = sets.Int{} - - for ip, ipWorkItems := range workItems { - // Get or create the context for that IP - ipCtx := func() context.Context { - m.mu.Lock() - defer m.mu.Unlock() - cancelCtx, ok := m.podContexts[ip] - if !ok { - ctx, cancel := context.WithCancel(context.Background()) - cancelCtx = cancelContext{ - context: ctx, - cancel: cancel, - } - m.podContexts[ip] = cancelCtx - } - return cancelCtx.context - }() - - podCtx, cancel := context.WithCancel(subCtx) - podState := &podState{ - pendingCount: *atomic.NewInt32(int32(len(ipWorkItems))), - cancel: cancel, - } - - // Quick and dirty way to join two contexts (i.e. podCtx is cancelled when either subCtx or ipCtx are cancelled) - go func() { - select { - case <-podCtx.Done(): - // This is the actual context, there is nothing to do except - // break to avoid leaking this goroutine. - break - case <-ipCtx.Done(): - // Cancel podCtx - cancel() - } - }() - - // Update the states when probing is cancelled - go func() { - <-podCtx.Done() - m.onProbingCancellation(subscriptionState, podState) - }() - - for _, wi := range ipWorkItems { - wi.podState = podState - wi.context = podCtx - m.workQueue.AddAfter(wi, initialDelay) - logger.Infof("Queuing probe for %s, IP: %s:%s (depth: %d)", - wi.url, wi.podIP, wi.podPort, m.workQueue.Len()) - } - } - - func() { - m.mu.Lock() - defer m.mu.Unlock() - m.targetStates[subscriptionKey] = subscriptionState - }() -} - -// Start starts the Manager background operations -func (m *Prober) Start(done <-chan struct{}) chan struct{} { - var wg sync.WaitGroup - - // Start the worker goroutines - for i := 0; i < m.probeConcurrency; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for m.processWorkItem() { - } - }() - } - - // Stop processing the queue when cancelled - go func() { - <-done - m.workQueue.ShutDown() - }() - - // Return a channel closed when all work is done - ch := make(chan struct{}) - go func() { - wg.Wait() - close(ch) - }() - return ch -} - -// CancelProbing cancels probing of the provided Subscription -func (m *Prober) CancelProbing(sub eventingduckv1.SubscriberSpec) { - m.mu.Lock() - defer m.mu.Unlock() - m.ejectStateUnsafe(sub) -} - -// ejectStateUnsafe ejects a state from Cache, it's not safe for concurrent access and is meant for internal use only under proper locking. -func (m *Prober) ejectStateUnsafe(sub eventingduckv1.SubscriberSpec) { - if state, ok := m.targetStates[sub.UID]; ok { - m.logger.Debugw("Canceling state", zap.Any("subscription", sub)) - state.cancel() - delete(m.targetStates, sub.UID) - } -} - -// CancelPodProbing cancels probing of the provided Pod IP. -func (m *Prober) CancelPodProbing(pod corev1.Pod) { - m.mu.Lock() - defer m.mu.Unlock() - - if ctx, ok := m.podContexts[pod.Status.PodIP]; ok { - ctx.cancel() - delete(m.podContexts, pod.Status.PodIP) - } -} - -// RefreshPodProbing lists probe targets and invalidates any in-flight (non-ready) states whose initial probed targets changed from the -// current ones. -func (m *Prober) RefreshPodProbing(ctx context.Context) { - m.mu.Lock() - defer m.mu.Unlock() - logger := logging.FromContext(ctx) - for _, state := range m.targetStates { - if !m.checkReadiness(state) { - // This is an in-flight state - sub := state.sub - ch := state.ch - // Get the probe targets - target, err := m.targetLister.ListProbeTargets(ctx, ch) - if err != nil { - logger.Errorw("Error listing probe targets", zap.Error(err), - zap.Any("subscription", sub.UID)) - return - } - m.ejectStateUnsafe(sub) - func() { - // probeTarget requires an unlocked mutex. - m.mu.Unlock() - defer m.mu.Lock() - m.probeTarget(ctx, ch, sub, target) - }() - } - } -} - -// processWorkItem processes a single work item from workQueue. -// It returns false when there is no more items to process, true otherwise. -func (m *Prober) processWorkItem() bool { - obj, shutdown := m.workQueue.Get() - if shutdown { - return false - } - - defer m.workQueue.Done(obj) - - // Crash if the item is not of the expected type - item, ok := obj.(*workItem) - if !ok { - m.logger.Fatalf("Unexpected work item type: want: %s, got: %s\n", - reflect.TypeOf(&workItem{}).Name(), reflect.TypeOf(obj).Name()) - } - item.logger.Infof("Processing probe for %s, IP: %s:%s (depth: %d)", - item.url, item.podIP, item.podPort, m.workQueue.Len()) - - transport := http.DefaultTransport.(*http.Transport).Clone() - - transport.DialContext = func(ctx context.Context, network, addr string) (conn net.Conn, e error) { - // http.Request.URL is set to the hostname and it is substituted in here with the target IP. - return dialContext(ctx, network, net.JoinHostPort(item.podIP, item.podPort)) - } - - probeURL := deepCopy(item.url) - - ctx, cancel := context.WithTimeout(item.context, probeTimeout) - defer cancel() - var opts []interface{} - opts = append(opts, m.opts...) - opts = append(opts, m.probeVerifier(item)) - - ok, err := prober.Do( - ctx, - transport, - probeURL.String(), - opts...) - - // In case of cancellation, drop the work item - select { - case <-item.context.Done(): - m.workQueue.Forget(obj) - return true - default: - } - - if err != nil { - // In case of error, enqueue for retry - m.workQueue.AddRateLimited(obj) - item.logger.Errorw("Error occurred while probing", - zap.Any("url", item.url), zap.Any("IP", item.podIP), - zap.Any("port", item.podPort), zap.Bool("ready", ok), zap.Error(err), - zap.Int("depth", m.workQueue.Len())) - } else if !ok { - // No error, but verification failed, enqueue for retry - m.workQueue.AddRateLimited(obj) - item.logger.Debugw("Verification of pod response failed.", - zap.Any("url", item.url), zap.Any("IP", item.podIP), - zap.Any("port", item.podPort), zap.Bool("ready", ok), zap.Error(err), - zap.Int("depth", m.workQueue.Len())) - } else { - m.onProbingSuccess(item.targetStates, item.podState) - } - return true -} - -func (m *Prober) onProbingSuccess(subscriptionState *targetState, podState *podState) { - // The last probe call for the Pod succeeded, the Pod is ready - if podState.pendingCount.Dec() == 0 { - // Unlock the goroutine blocked on <-podCtx.Done() - podState.cancel() - // This is the last pod being successfully probed, the subscription might ready - if m.checkReadiness(subscriptionState) { - subscriptionState.cancel() - m.readyCallback(subscriptionState.ch, subscriptionState.sub) - } - } -} - -func (m *Prober) onProbingCancellation(subscriptionState *targetState, podState *podState) { - for { - pendingCount := podState.pendingCount.Load() - if pendingCount <= 0 { - // Probing succeeded, nothing to do - return - } - // Attempt to set pendingCount to 0. - if podState.pendingCount.CAS(pendingCount, 0) { - // This is the last pod being successfully probed, the subscription might be ready - if subscriptionState.pendingCount.Dec() == 0 { - if m.checkReadiness(subscriptionState) { - m.readyCallback(subscriptionState.ch, subscriptionState.sub) - } - } - return - } - } -} - -func (m *Prober) probeVerifier(item *workItem) prober.Verifier { - return func(r *http.Response, b []byte) (bool, error) { - m.logger.Debugw("Verifying response", zap.Int("status code", r.StatusCode), - zap.ByteString("body", b), zap.Any("subscription", item.targetStates.sub.UID), - zap.Any("channel", item.targetStates.ch)) - switch r.StatusCode { - case http.StatusOK: - var subscriptions = make(map[string][]int) - err := json.Unmarshal(b, &subscriptions) - if err != nil { - m.logger.Errorw("error unmarshaling", err) - return false, err - } - uid := string(item.targetStates.sub.UID) - key := fmt.Sprintf("%s/%s", item.targetStates.ch.Namespace, item.targetStates.ch.Name) - m.logger.Debugw("Received proper probing response from target", - zap.Any("found subscriptions", subscriptions), - zap.String("pod ip", item.podIP), - zap.String("want channel", key), - zap.String("want subscription", uid), - ) - if partitions, ok := subscriptions[uid]; ok { - func() { - item.targetStates.readyLock.Lock() - defer item.targetStates.readyLock.Unlock() - item.targetStates.readyPartitions.Insert(partitions...) - }() - return m.checkReadiness(item.targetStates), nil - } else { - return false, nil - } - case http.StatusNotFound, http.StatusServiceUnavailable: - m.logger.Errorf("unexpected status code: want %v, got %v", http.StatusOK, r.StatusCode) - return false, fmt.Errorf("unexpected status code: want %v, got %v", http.StatusOK, r.StatusCode) - default: - item.logger.Errorf("Probing of %s abandoned, IP: %s:%s: the response status is %v, expected one of: %v", - item.url, item.podIP, item.podPort, r.StatusCode, - []int{http.StatusOK, http.StatusNotFound, http.StatusServiceUnavailable}) - return true, nil - } - } -} - -// A target state is outdated if the generation is different or if the target IPs change before it becomes -// ready. -func isOutdatedTargetState(state *targetState, sub eventingduckv1.SubscriberSpec, podIPs sets.String) bool { - state.readyLock.RLock() - defer state.readyLock.RUnlock() - return state.sub.Generation != sub.Generation || (!state.ready && !state.probedPods.Equal(podIPs)) -} - -// deepCopy copies a URL into a new one -func deepCopy(in *url.URL) *url.URL { - // Safe to ignore the error since this is a deep copy - newURL, _ := url.Parse(in.String()) - return newURL -} diff --git a/pkg/channel/consolidated/status/status_test.go b/pkg/channel/consolidated/status/status_test.go deleted file mode 100644 index 4d99781f33..0000000000 --- a/pkg/channel/consolidated/status/status_test.go +++ /dev/null @@ -1,340 +0,0 @@ -/* -Copyright 2019 The Knative Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package status - -import ( - "context" - "encoding/json" - "errors" - "net/http" - "net/http/httptest" - "net/url" - "strconv" - "testing" - "time" - - "go.uber.org/atomic" - "go.uber.org/zap/zaptest" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - - "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" - messagingv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" - eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" - "knative.dev/pkg/apis" -) - -var channelObjectMeta = metav1.ObjectMeta{ - Namespace: "default", - Name: "chan4prober", -} - -const dispatcherReadySubHeader = "K-Subscriber-Status" - -type ReadyPair struct { - c v1beta1.KafkaChannel - s eventingduckv1.SubscriberSpec -} - -func TestProbeSinglePod(t *testing.T) { - ch := getChannel(1) - sub := getSubscription() - var subscriptions = map[string][]int{ - string(sub.UID): {0}, - } - - // This should be called when we want the dispatcher to return a successful result - successHandler := http.HandlerFunc(readyJSONHandler(t, subscriptions)) - - // Probes only succeed if succeed is true - var succeed atomic.Bool - - // This is a latch channel that will lock the handler goroutine until we drain it - probeRequests := make(chan *http.Request) - handler := func(w http.ResponseWriter, r *http.Request) { - probeRequests <- r - if !succeed.Load() { - w.WriteHeader(http.StatusNotFound) - return - } - - successHandler.ServeHTTP(w, r) - } - - ts := getDispatcherServer(handler) - defer ts.Close() - - lister := fakeProbeTargetLister{ - target: getTargetLister(t, ts.URL), - } - - prober, ready := getProber(t, &lister) - - done := make(chan struct{}) - cancelled := prober.Start(done) - defer func() { - close(done) - <-cancelled - }() - - assertEventuallyReady(t, prober, ch, sub, ready, &succeed, &probeRequests) -} - -func TestProbeListerFail(t *testing.T) { - ch := getChannel(1) - sub := getSubscription() - - ready := make(chan *ReadyPair) - defer close(ready) - prober := NewProber( - zaptest.NewLogger(t).Sugar(), - notFoundLister{}, - func(c v1beta1.KafkaChannel, s eventingduckv1.SubscriberSpec) { - ready <- &ReadyPair{ - c, - s, - } - }) - - // If we can't list, this must fail and return false - ok, err := prober.IsReady(context.Background(), *ch, *sub) - if err == nil { - t.Fatal("IsReady returned unexpected success") - } - if ok { - t.Fatal("IsReady() returned true") - } -} - -func TestSucceedAfterRefreshPodProbing(t *testing.T) { - // We have a channel with three partitions - ch := getChannel(3) - sub := getSubscription() - - // Dispatcher D1 will return only one ready partition - var subsD1 = map[string][]int{ - string(sub.UID): {0}, - } - - // Dispatcher D2 will return the three ready partitions - var subsD2 = map[string][]int{ - string(sub.UID): {0, 1, 2}, - } - - // The success handler for dispatcher D1, will return one partition only - successHandlerD1 := http.HandlerFunc(readyJSONHandler(t, subsD1)) - - // This is a latch channel that will lock the handler goroutine until we drain it - probeRequestsD1 := make(chan *http.Request) - - handlerD1 := func(w http.ResponseWriter, r *http.Request) { - probeRequestsD1 <- r - successHandlerD1.ServeHTTP(w, r) - } - - serverD1 := getDispatcherServer(handlerD1) - defer serverD1.Close() - - // Probes only succeed if succeed is true - var succeed atomic.Bool - - // The success handler for dispatcher D2, will return all three needed partitions - probeHandlerD2 := http.HandlerFunc(readyJSONHandler(t, subsD2)) - - // This is a latch channel that will lock the handler goroutine until we drain it - probeRequestsD2 := make(chan *http.Request) - handlerD2 := func(w http.ResponseWriter, r *http.Request) { - probeRequestsD2 <- r - if !succeed.Load() { - w.WriteHeader(http.StatusNotFound) - return - } - - probeHandlerD2.ServeHTTP(w, r) - } - - serverD2 := getDispatcherServer(handlerD2) - defer serverD2.Close() - - // Initially, lister points to d1 - lister := fakeProbeTargetLister{ - target: getTargetLister(t, serverD1.URL), - } - - prober, ready := getProber(t, &lister) - - done := make(chan struct{}) - cancelled := prober.Start(done) - defer func() { - close(done) - <-cancelled - }() - - // Assert we're not ready. - assertNeverReady(t, prober, ch, sub, ready, &probeRequestsD1) - - // Switch to new dispatcher - lister.target = getTargetLister(t, serverD2.URL) - - // Assert we're still not ready - assertNeverReady(t, prober, ch, sub, ready, &probeRequestsD1) - - // Refresh the pod probing, now the prober should probe the new disptacher - prober.RefreshPodProbing(context.Background()) - - // Assert that probing will be successful eventually - assertEventuallyReady(t, prober, ch, sub, ready, &succeed, &probeRequestsD2) -} - -func assertNeverReady(t *testing.T, prober *Prober, ch *messagingv1beta1.KafkaChannel, sub *eventingduckv1.SubscriberSpec, ready chan *ReadyPair, probeRequests *chan *http.Request) { - // The first call to IsReady must succeed and return false - ok, err := prober.IsReady(context.Background(), *ch, *sub) - if err != nil { - t.Fatal("IsReady failed:", err) - } - if ok { - t.Fatal("IsReady() returned true") - } - - // Just drain the requests in the channel to not block the handler - go func() { - for range *probeRequests { - } - }() - - select { - case <-ready: - // Prober shouldn't be ready - t.Fatal("Prober shouldn't be ready") - case <-time.After(1 * time.Second): - // Not ideal but it gives time to the prober to write to ready - break - } -} - -func assertEventuallyReady(t *testing.T, prober *Prober, ch *messagingv1beta1.KafkaChannel, sub *eventingduckv1.SubscriberSpec, ready chan *ReadyPair, succeed *atomic.Bool, probeRequests *chan *http.Request) { - - // Since succeed is still false the prober shouldn't be ready - assertNeverReady(t, prober, ch, sub, ready, probeRequests) - - // Make probes succeed - succeed.Store(true) - - // Just drain the requests in the channel to not block the handler - go func() { - for range *probeRequests { - } - }() - - select { - case <-ready: - // Wait for the probing to eventually succeed - case <-time.After(5 * time.Second): - t.Error("Timed out waiting for probing to succeed.") - } -} - -func getDispatcherServer(handler func(w http.ResponseWriter, r *http.Request)) *httptest.Server { - ts := httptest.NewServer(http.HandlerFunc(handler)) - return ts -} - -func getChannel(partitionsNum int32) *v1beta1.KafkaChannel { - return (&v1beta1.KafkaChannel{ - ObjectMeta: channelObjectMeta, - Spec: v1beta1.KafkaChannelSpec{ - NumPartitions: partitionsNum, - ReplicationFactor: 1, - }, - }).DeepCopy() -} - -func getSubscription() *eventingduckv1.SubscriberSpec { - return (&eventingduckv1.SubscriberSpec{ - UID: types.UID("90713ffd-f527-42bf-b158-57630b68ebe2"), - Generation: 1, - SubscriberURI: getURL("http://subscr.ns.local"), - }).DeepCopy() -} - -func getURL(s string) *apis.URL { - u, _ := apis.ParseURL(s) - return u -} - -// readyJSONHandler is a factory for a handler which responds with a JSON of the ready subscriptions -func readyJSONHandler(t *testing.T, subscriptions map[string][]int) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - channelRefName := channelObjectMeta.Name - channelRefNamespace := channelObjectMeta.Namespace - w.Header().Set(dispatcherReadySubHeader, channelRefName) - jsonResult, err := json.Marshal(subscriptions) - if err != nil { - t.Fatalf("Error marshalling json for sub-status channelref: %s/%s, %v", channelRefNamespace, channelRefName, err) - } - _, err = w.Write(jsonResult) - if err != nil { - t.Fatalf("Error writing jsonResult to serveHTTP writer: %v", err) - } - } -} - -func getTargetLister(t *testing.T, dURL string) *ProbeTarget { - tsURL, err := url.Parse(dURL) - if err != nil { - t.Fatalf("Failed to parse URL %q: %v", dURL, err) - } - port, err := strconv.Atoi(tsURL.Port()) - if err != nil { - t.Fatalf("Failed to parse port %q: %v", tsURL.Port(), err) - } - hostname := tsURL.Hostname() - return &ProbeTarget{ - PodIPs: sets.NewString(hostname), - PodPort: strconv.Itoa(port), - URL: tsURL, - } -} - -func getProber(t *testing.T, lister ProbeTargetLister) (*Prober, chan *ReadyPair) { - ready := make(chan *ReadyPair) - prober := NewProber( - zaptest.NewLogger(t).Sugar(), - lister, - func(c v1beta1.KafkaChannel, s eventingduckv1.SubscriberSpec) { - ready <- &ReadyPair{ - c, - s, - } - }) - return prober, ready -} - -type fakeProbeTargetLister struct { - target *ProbeTarget -} - -func (l fakeProbeTargetLister) ListProbeTargets(ctx context.Context, kc messagingv1beta1.KafkaChannel) (*ProbeTarget, error) { - return l.target, nil -} - -type notFoundLister struct{} - -func (l notFoundLister) ListProbeTargets(ctx context.Context, kc messagingv1beta1.KafkaChannel) (*ProbeTarget, error) { - return nil, errors.New("not found") -}