Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[0.25] KafkaChannel to init offsets before dispatcher (knative-extens…
Browse files Browse the repository at this point in the history
…ions#913)

* Cherry pick 3f2a9d7

[0.24] KafkaChannel to init offsets before dispatcher (knative-extensions#886)

* ./hack/update-codegen.sh
aliok committed Oct 8, 2021
1 parent dcf8e88 commit af0f8b6
Showing 33 changed files with 728 additions and 2,170 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -24,7 +24,6 @@ require (
github.com/stretchr/testify v1.7.0
github.com/xdg/scram v1.0.3
go.opencensus.io v0.23.0
go.uber.org/atomic v1.9.0
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.18.1
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
@@ -37,7 +36,6 @@ require (
knative.dev/control-protocol v0.0.0-20210803191615-72cde96b9f5a
knative.dev/eventing v0.25.0
knative.dev/hack v0.0.0-20210622141627-e28525d8d260
knative.dev/networking v0.0.0-20210803181815-acdfd41c575c
knative.dev/pkg v0.0.0-20210902173607-844a6bc45596
knative.dev/reconciler-test v0.0.0-20210803183715-b61cc77c06f6
)
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -687,7 +687,6 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/dnscache v0.0.0-20210201191234-295bba877686/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
@@ -1272,8 +1271,6 @@ knative.dev/eventing v0.25.0/go.mod h1:8jIsrnSONPgv+m63OTzpwZQJiQASYl77C3llCyYlB
knative.dev/hack v0.0.0-20210622141627-e28525d8d260 h1:f2eMtOubAOc/Q7JlvFPDKXiPlJVK+VpX2Cot8hRzCgQ=
knative.dev/hack v0.0.0-20210622141627-e28525d8d260/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack/schema v0.0.0-20210622141627-e28525d8d260/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0=
knative.dev/networking v0.0.0-20210803181815-acdfd41c575c h1:7G6TQr7ZyIHx35Dn5zuNKUDhlly3KkFxgrKLXeKmjj8=
knative.dev/networking v0.0.0-20210803181815-acdfd41c575c/go.mod h1:UA9m1M3rGssy63gVwjSh7CYoWTKZNO8cnY9QsIu7tyo=
knative.dev/pkg v0.0.0-20210803160015-21eb4c167cc5/go.mod h1:RPk5txNA3apR9X40D4MpUOP9/VqOG8CrtABWfOwGVS4=
knative.dev/pkg v0.0.0-20210902173607-844a6bc45596 h1:LCSg0O51V8I7sfnhw+j9WLBol8f2lCV5HkPyxJT9zzA=
knative.dev/pkg v0.0.0-20210902173607-844a6bc45596/go.mod h1:RPk5txNA3apR9X40D4MpUOP9/VqOG8CrtABWfOwGVS4=
23 changes: 5 additions & 18 deletions pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
@@ -43,10 +43,6 @@ import (
"knative.dev/eventing-kafka/pkg/common/tracing"
)

const (
dispatcherReadySubHeader = "K-Subscriber-Status"
)

type TopicFunc func(separator, namespace, name string) string

type KafkaDispatcherArgs struct {
@@ -86,7 +82,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat

dispatcher := &KafkaDispatcher{
dispatcher: eventingchannels.NewMessageDispatcher(logging.FromContext(ctx).Desugar()),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, args.Config.Sarama.Config),
kafkaConsumerFactory: consumer.NewConsumerGroupFactory(args.Brokers, args.Config.Sarama.Config, &consumer.KafkaConsumerGroupOffsetsChecker{}),
channelSubscriptions: make(map[types.NamespacedName]*KafkaSubscription),
subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup),
subscriptions: make(map[types.UID]Subscription),
@@ -95,15 +91,6 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
topicFunc: args.TopicFunc,
}

// initialize and start the subscription endpoint server
subscriptionEndpoint := &subscriptionEndpoint{
dispatcher: dispatcher,
logger: logging.FromContext(ctx),
}
go func() {
subscriptionEndpoint.start()
}()

podName, err := env.GetRequiredConfigValue(logging.FromContext(ctx).Desugar(), env.PodNameEnvVarKey)
if err != nil {
return nil, err
@@ -171,7 +158,7 @@ func (k UpdateError) Error() string {
}

// ReconcileConsumers will be called by new CRD based kafka channel dispatcher controller.
func (d *KafkaDispatcher) ReconcileConsumers(config *ChannelConfig) error {
func (d *KafkaDispatcher) ReconcileConsumers(ctx context.Context, config *ChannelConfig) error {
channelNamespacedName := types.NamespacedName{
Namespace: config.Namespace,
Name: config.Name,
@@ -215,7 +202,7 @@ func (d *KafkaDispatcher) ReconcileConsumers(config *ChannelConfig) error {

failedToSubscribe := make(UpdateError)
for subUid, subSpec := range toAddSubs {
if err := d.subscribe(channelNamespacedName, subSpec); err != nil {
if err := d.subscribe(ctx, channelNamespacedName, subSpec); err != nil {
failedToSubscribe[subUid] = err
}
}
@@ -286,7 +273,7 @@ func (d *KafkaDispatcher) CleanupChannel(name, namespace, hostname string) error

// subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine.
// subscribe must be called under updateLock.
func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscription) error {
func (d *KafkaDispatcher) subscribe(ctx context.Context, channelRef types.NamespacedName, sub Subscription) error {
d.logger.Infow("Subscribing to Kafka Channel", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID))

topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name)
@@ -310,7 +297,7 @@ func (d *KafkaDispatcher) subscribe(channelRef types.NamespacedName, sub Subscri
}
d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef),
zap.Any("subscription", sub.UID), zap.String("topic", topicName), zap.String("consumer group", groupID))
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(groupID, []string{topicName}, d.logger, handler)
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(ctx, groupID, []string{topicName}, handler)

if err != nil {
// we can not create a consumer - logging that, with reason
6 changes: 4 additions & 2 deletions pkg/channel/consolidated/dispatcher/dispatcher_it_test.go
Original file line number Diff line number Diff line change
@@ -55,6 +55,8 @@ func TestDispatcher(t *testing.T) {
t.Skipf("This test can't run in CI")
}

ctx := context.TODO()

logger, err := zap.NewDevelopment(zap.AddStacktrace(zap.WarnLevel))
if err != nil {
t.Fatal(err)
@@ -179,7 +181,7 @@ func TestDispatcher(t *testing.T) {
},
}
require.NoError(t, dispatcher.RegisterChannelHost(channelAConfig))
require.NoError(t, dispatcher.ReconcileConsumers(channelAConfig))
require.NoError(t, dispatcher.ReconcileConsumers(ctx, channelAConfig))

channelBConfig := &ChannelConfig{
Namespace: "default",
@@ -195,7 +197,7 @@ func TestDispatcher(t *testing.T) {
},
}
require.NoError(t, dispatcher.RegisterChannelHost(channelBConfig))
require.NoError(t, dispatcher.ReconcileConsumers(channelBConfig))
require.NoError(t, dispatcher.ReconcileConsumers(ctx, channelBConfig))

time.Sleep(5 * time.Second)

20 changes: 14 additions & 6 deletions pkg/channel/consolidated/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@ type mockKafkaConsumerFactory struct {
createErr bool
}

func (c mockKafkaConsumerFactory) StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler consumer.KafkaConsumerHandler, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
func (c mockKafkaConsumerFactory) StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler consumer.KafkaConsumerHandler, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
if c.createErr {
return nil, errors.New("error creating consumer")
}
@@ -267,9 +267,11 @@ func TestDispatcher_UpdateConsumers(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

// Initialize using oldConfig
require.NoError(t, d.RegisterChannelHost(tc.oldConfig))
require.NoError(t, d.ReconcileConsumers(tc.oldConfig))
require.NoError(t, d.ReconcileConsumers(ctx, tc.oldConfig))

oldSubscribers := sets.NewString()
for _, sub := range d.subscriptions {
@@ -283,7 +285,7 @@ func TestDispatcher_UpdateConsumers(t *testing.T) {
}

// Update with new config
err := d.ReconcileConsumers(tc.newConfig)
err := d.ReconcileConsumers(ctx, tc.newConfig)
if tc.createErr != "" {
if err == nil {
t.Errorf("Expected UpdateConfig error: '%v'. Actual nil", tc.createErr)
@@ -363,6 +365,8 @@ func TestDispatcher_MultipleChannelsInParallel(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

// Let's register channel configs first
for _, c := range configs {
require.NoError(t, d.RegisterChannelHost(c))
@@ -374,7 +378,7 @@ func TestDispatcher_MultipleChannelsInParallel(t *testing.T) {
wg.Add(1)
go func(c *ChannelConfig) {
defer wg.Done()
assert.NoError(t, d.ReconcileConsumers(c))
assert.NoError(t, d.ReconcileConsumers(ctx, c))
}(c)
}
}
@@ -418,6 +422,8 @@ func TestKafkaDispatcher_CleanupChannel(t *testing.T) {
logger: zaptest.NewLogger(t).Sugar(),
}

ctx := context.TODO()

channelConfig := &ChannelConfig{
Namespace: "default",
Name: "test-channel",
@@ -438,7 +444,7 @@ func TestKafkaDispatcher_CleanupChannel(t *testing.T) {
},
}
require.NoError(t, d.RegisterChannelHost(channelConfig))
require.NoError(t, d.ReconcileConsumers(channelConfig))
require.NoError(t, d.ReconcileConsumers(ctx, channelConfig))

require.NoError(t, d.CleanupChannel(channelConfig.Name, channelConfig.Namespace, channelConfig.HostName))
require.NotContains(t, d.subscriptions, "subscription-1")
@@ -461,6 +467,8 @@ func TestSubscribeError(t *testing.T) {
channelSubscriptions: map[types.NamespacedName]*KafkaSubscription{},
}

ctx := context.TODO()

channelRef := types.NamespacedName{
Name: "test-channel",
Namespace: "test-ns",
@@ -470,7 +478,7 @@ func TestSubscribeError(t *testing.T) {
UID: "test-sub",
Subscription: fanout.Subscription{},
}
err := d.subscribe(channelRef, subRef)
err := d.subscribe(ctx, channelRef, subRef)
if err == nil {
t.Errorf("Expected error want %s, got %s", "error creating consumer", err)
}
60 changes: 0 additions & 60 deletions pkg/channel/consolidated/dispatcher/subscription_endpoint.go

This file was deleted.

155 changes: 0 additions & 155 deletions pkg/channel/consolidated/dispatcher/subscription_endpoint_test.go

This file was deleted.

35 changes: 1 addition & 34 deletions pkg/channel/consolidated/reconciler/controller/controller.go
Original file line number Diff line number Diff line change
@@ -26,11 +26,9 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/pointer"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingClient "knative.dev/eventing/pkg/client/injection/client"
kubeclient "knative.dev/pkg/client/injection/kube/client"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
@@ -45,9 +43,6 @@ import (
knativeReconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/system"

"knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
"knative.dev/eventing-kafka/pkg/channel/consolidated/status"
kafkamessagingv1beta1 "knative.dev/eventing-kafka/pkg/client/informers/externalversions/messaging/v1beta1"
kafkaChannelClient "knative.dev/eventing-kafka/pkg/client/injection/client"
"knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
kafkaChannelReconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
@@ -110,17 +105,6 @@ func NewController(

impl := kafkaChannelReconciler.NewImpl(ctx, r)

statusProber := status.NewProber(
logger.Named("status-manager"),
NewProbeTargetLister(logger, endpointsInformer.Lister()),
func(c v1beta1.KafkaChannel, s eventingduckv1.SubscriberSpec) {
logger.Debugf("Ready callback triggered for channel: %s/%s subscription: %s", c.Namespace, c.Name, string(s.UID))
impl.EnqueueKey(types.NamespacedName{Namespace: c.Namespace, Name: c.Name})
},
)
r.statusManager = statusProber
statusProber.Start(ctx.Done())

// Call GlobalResync on kafkachannels.
grCh := func(obj interface{}) {
logger.Debug("Changes detected, doing global resync")
@@ -167,34 +151,17 @@ func NewController(
FilterFunc: filterFn,
Handler: controller.HandleAll(grCh),
})

podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: knativeReconciler.ChainFilterFuncs(
knativeReconciler.LabelFilterFunc(channelLabelKey, channelLabelValue, false),
knativeReconciler.LabelFilterFunc(roleLabelKey, dispatcherRoleLabelValue, false),
),
Handler: cache.ResourceEventHandlerFuncs{
// Cancel probing when a Pod is deleted
DeleteFunc: getPodInformerEventHandler(ctx, logger, statusProber, impl, kafkaChannelInformer, "Delete"),
AddFunc: getPodInformerEventHandler(ctx, logger, statusProber, impl, kafkaChannelInformer, "Add"),
},
Handler: controller.HandleAll(grCh),
})

return impl
}

func getPodInformerEventHandler(ctx context.Context, logger *zap.SugaredLogger, statusProber *status.Prober, impl *controller.Impl, kafkaChannelInformer kafkamessagingv1beta1.KafkaChannelInformer, handlerType string) func(obj interface{}) {
return func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if ok && pod != nil {
logger.Debugw("%s pods. Refreshing pod probing.", handlerType,
zap.String("pod", pod.GetName()))
statusProber.RefreshPodProbing(ctx)
impl.GlobalResync(kafkaChannelInformer.Informer())
}
}
}

func getControllerOwnerRef(ctx context.Context) (*metav1.OwnerReference, error) {
logger := logging.FromContext(ctx)
ctrlDeploymentLabels := labels.Set{
111 changes: 90 additions & 21 deletions pkg/channel/consolidated/reconciler/controller/kafkachannel.go
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@ import (
corev1listers "k8s.io/client-go/listers/core/v1"
rbacv1listers "k8s.io/client-go/listers/rbac/v1"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing-kafka/pkg/common/kafka/offset"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/eventing"
eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned"
@@ -48,7 +49,6 @@ import (

"knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1"
"knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/controller/resources"
"knative.dev/eventing-kafka/pkg/channel/consolidated/status"
"knative.dev/eventing-kafka/pkg/channel/consolidated/utils"
kafkaclientset "knative.dev/eventing-kafka/pkg/client/clientset/versioned"
kafkaScheme "knative.dev/eventing-kafka/pkg/client/clientset/versioned/scheme"
@@ -123,6 +123,7 @@ type Reconciler struct {
kafkaAuthConfig *client.KafkaAuthConfig
kafkaConfigError error
kafkaClientSet kafkaclientset.Interface
kafkaClient sarama.Client
// Using a shared kafkaClusterAdmin does not work currently because of an issue with
// Shopify/sarama, see https://github.com/Shopify/sarama/issues/1162.
kafkaClusterAdmin sarama.ClusterAdmin
@@ -133,7 +134,6 @@ type Reconciler struct {
endpointsLister corev1listers.EndpointsLister
serviceAccountLister corev1listers.ServiceAccountLister
roleBindingLister rbacv1listers.RoleBindingLister
statusManager status.Manager
controllerRef metav1.OwnerReference
}

@@ -163,8 +163,17 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel
return r.kafkaConfigError
}

kafkaClusterAdmin, err := r.createClient(ctx)
kafkaClient, err := r.createKafkaClient()
if err != nil {
logger.Errorw("Can't obtain Kafka Client", zap.Any("channel", kc), zap.Error(err))
kc.Status.MarkConfigFailed("InvalidConfiguration", "Unable to build Kafka client for channel %s: %v", kc.Name, err)
return err
}
defer kafkaClient.Close()

kafkaClusterAdmin, err := r.createClusterAdmin()
if err != nil {
logger.Errorw("Can't obtain Kafka cluster admin", zap.Any("channel", kc), zap.Error(err))
kc.Status.MarkConfigFailed("InvalidConfiguration", "Unable to build Kafka admin client for channel %s: %v", kc.Name, err)
return err
}
@@ -239,7 +248,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel
Scheme: "http",
Host: network.GetServiceHostname(svc.Name, svc.Namespace),
})
err = r.reconcileSubscribers(ctx, kc)
err = r.reconcileSubscribers(ctx, kc, kafkaClient, kafkaClusterAdmin)
if err != nil {
return fmt.Errorf("error reconciling subscribers %v", err)
}
@@ -249,12 +258,24 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel
return newReconciledNormal(kc.Namespace, kc.Name)
}

func (r *Reconciler) reconcileSubscribers(ctx context.Context, ch *v1beta1.KafkaChannel) error {
func (r *Reconciler) reconcileSubscribers(ctx context.Context, ch *v1beta1.KafkaChannel, kafkaClient sarama.Client, kafkaClusterAdmin sarama.ClusterAdmin) error {
after := ch.DeepCopy()
after.Status.Subscribers = make([]v1.SubscriberStatus, 0)
logger := logging.FromContext(ctx)
for _, s := range ch.Spec.Subscribers {
if r, _ := r.statusManager.IsReady(ctx, *ch, s); r {
logging.FromContext(ctx).Debugw("marking subscription", zap.Any("subscription", s))
logger.Debugw("Reconciling initial offset for subscription", zap.Any("subscription", s), zap.Any("channel", ch))
err := r.reconcileInitialOffset(ctx, ch, s, kafkaClient, kafkaClusterAdmin)

if err != nil {
logger.Errorw("reconcile failed to initial offset for subscription. Marking the subscription not ready", zap.String("channel", fmt.Sprintf("%s.%s", ch.Namespace, ch.Name)), zap.Any("subscription", s), zap.Error(err))
after.Status.Subscribers = append(after.Status.Subscribers, v1.SubscriberStatus{
UID: s.UID,
ObservedGeneration: s.Generation,
Ready: corev1.ConditionFalse,
Message: fmt.Sprintf("Initial offset cannot be committed: %v", err),
})
} else {
logger.Debugw("Reconciled initial offset for subscription. Marking the subscription ready", zap.String("channel", fmt.Sprintf("%s.%s", ch.Namespace, ch.Name)), zap.Any("subscription", s))
after.Status.Subscribers = append(after.Status.Subscribers, v1.SubscriberStatus{
UID: s.UID,
ObservedGeneration: s.Generation,
@@ -281,7 +302,7 @@ func (r *Reconciler) reconcileSubscribers(ctx context.Context, ch *v1beta1.Kafka
if err != nil {
return fmt.Errorf("Failed patching: %w", err)
}
logging.FromContext(ctx).Debugw("Patched resource", zap.Any("patch", patch), zap.Any("patched", patched))
logger.Debugw("Patched resource", zap.Any("patch", patch), zap.Any("patched", patched))
return nil
}

@@ -477,7 +498,24 @@ func (r *Reconciler) reconcileChannelService(ctx context.Context, dispatcherName
return svc, nil
}

func (r *Reconciler) createClient(ctx context.Context) (sarama.ClusterAdmin, error) {
func (r *Reconciler) createKafkaClient() (sarama.Client, error) {
kafkaClient := r.kafkaClient
if kafkaClient == nil {
var err error

if r.kafkaConfig.EventingKafka.Sarama.Config == nil {
return nil, fmt.Errorf("error creating Kafka client: Sarama config is nil")
}
kafkaClient, err = sarama.NewClient(r.kafkaConfig.Brokers, r.kafkaConfig.EventingKafka.Sarama.Config)
if err != nil {
return nil, err
}
}

return kafkaClient, nil
}

func (r *Reconciler) createClusterAdmin() (sarama.ClusterAdmin, error) {
// We don't currently initialize r.kafkaClusterAdmin, hence we end up creating the cluster admin client every time.
// This is because of an issue with Shopify/sarama. See https://github.com/Shopify/sarama/issues/1162.
// Once the issue is fixed we should use a shared cluster admin client. Also, r.kafkaClusterAdmin is currently
@@ -527,6 +565,23 @@ func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1beta1.KafkaC
return err
}

func (r *Reconciler) reconcileInitialOffset(ctx context.Context, channel *v1beta1.KafkaChannel, sub v1.SubscriberSpec, kafkaClient sarama.Client, kafkaClusterAdmin sarama.ClusterAdmin) error {
subscriptionStatus := findSubscriptionStatus(channel, sub.UID)
if subscriptionStatus != nil && subscriptionStatus.Ready == corev1.ConditionTrue {
// subscription is ready, the offsets must have been initialized already
return nil
}

topicName := utils.TopicName(utils.KafkaChannelSeparator, channel.Namespace, channel.Name)
groupID := fmt.Sprintf("kafka.%s.%s.%s", channel.Namespace, channel.Name, string(sub.UID))
_, err := offset.InitOffsets(ctx, kafkaClient, kafkaClusterAdmin, []string{topicName}, groupID)
if err != nil {
logger := logging.FromContext(ctx)
logger.Errorw("error reconciling initial offset", zap.String("channel", fmt.Sprintf("%s.%s", channel.Namespace, channel.Name)), zap.Any("subscription", sub), zap.Error(err))
}
return err
}

func (r *Reconciler) deleteTopic(ctx context.Context, channel *v1beta1.KafkaChannel, kafkaClusterAdmin sarama.ClusterAdmin) error {
logger := logging.FromContext(ctx)

@@ -564,6 +619,11 @@ func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.Co
return
}

// Manually commit the offsets in KafkaChannel controller.
// That's because we want to make sure we initialize the offsets within the controller
// before dispatcher actually starts consuming messages.
kafkaConfig.EventingKafka.Sarama.Config.Consumer.Offsets.AutoCommit.Enable = false

r.kafkaAuthConfig = kafkaConfig.EventingKafka.Auth
// For now just override the previous config.
// Eventually the previous config should be snapshotted to delete Kafka topics
@@ -578,20 +638,29 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1beta1.KafkaChannel)
logger := logging.FromContext(ctx)
channel := fmt.Sprintf("%s/%s", kc.GetNamespace(), kc.GetName())
logger.Debugw("FinalizeKind", zap.String("channel", channel))
kafkaClusterAdmin, err := r.createClient(ctx)

kafkaClusterAdmin, err := r.createClusterAdmin()
if err != nil || r.kafkaConfig == nil {
logger.Errorw("Can't obtain Kafka Client", zap.String("channel", channel), zap.Error(err))
} else {
defer kafkaClusterAdmin.Close()
logger.Debugw("Got client, about to delete topic")
if err := r.deleteTopic(ctx, kc, kafkaClusterAdmin); err != nil {
logger.Errorw("Error deleting Kafka channel topic", zap.String("channel", channel), zap.Error(err))
return err
}
logger.Errorw("cannot obtain Kafka cluster admin", zap.String("channel", channel), zap.Error(err))
// even in error case, we return `normal`, since we are fine with leaving the
// topic undeleted e.g. when we lose connection
return newReconciledNormal(kc.Namespace, kc.Name)
}
for _, s := range kc.Spec.Subscribers {
logger.Debugw("Canceling probing", zap.String("channel", channel), zap.Any("subscription", s))
r.statusManager.CancelProbing(s)
defer kafkaClusterAdmin.Close()

logger.Debugw("got client, about to delete topic")
if err := r.deleteTopic(ctx, kc, kafkaClusterAdmin); err != nil {
logger.Errorw("error deleting Kafka channel topic", zap.String("channel", channel), zap.Error(err))
return err
}
return newReconciledNormal(kc.Namespace, kc.Name) //ok to remove finalizer
}

func findSubscriptionStatus(kc *v1beta1.KafkaChannel, subUID types.UID) *v1.SubscriberStatus {
for _, subStatus := range kc.Status.Subscribers {
if subStatus.UID == subUID {
return &subStatus
}
}
return nil
}
274 changes: 135 additions & 139 deletions pkg/channel/consolidated/reconciler/controller/kafkachannel_test.go

Large diffs are not rendered by default.

83 changes: 0 additions & 83 deletions pkg/channel/consolidated/reconciler/controller/lister.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -183,7 +183,7 @@ func (r *Reconciler) syncChannel(ctx context.Context, kc *v1beta1.KafkaChannel)
}

// Update dispatcher side
err := r.kafkaDispatcher.ReconcileConsumers(config)
err := r.kafkaDispatcher.ReconcileConsumers(ctx, config)
if err != nil {
logging.FromContext(ctx).Errorw("Some kafka subscriptions failed to subscribe", zap.Error(err))
return fmt.Errorf("some kafka subscriptions failed to subscribe: %v", err)
533 changes: 0 additions & 533 deletions pkg/channel/consolidated/status/status.go

This file was deleted.

340 changes: 0 additions & 340 deletions pkg/channel/consolidated/status/status_test.go

This file was deleted.

6 changes: 3 additions & 3 deletions pkg/channel/distributed/dispatcher/controller/kafkachannel.go
Original file line number Diff line number Diff line change
@@ -194,7 +194,7 @@ func (r Reconciler) Reconcile(ctx context.Context, key string) error {
channel := original.DeepCopy()

// Perform the reconciliation (will update KafkaChannel.Status)
reconcileError := r.reconcile(channel)
reconcileError := r.reconcile(ctx, channel)
if reconcileError != nil {
r.logger.Error("Error Reconciling KafkaChannel", zap.Error(reconcileError))
r.recorder.Eventf(channel, corev1.EventTypeWarning, channelReconcileFailed, "KafkaChannel Reconciliation Failed: %v", reconcileError)
@@ -218,7 +218,7 @@ func (r Reconciler) Reconcile(ctx context.Context, key string) error {
}

// Reconcile The Specified KafkaChannel
func (r Reconciler) reconcile(channel *kafkav1beta1.KafkaChannel) error {
func (r Reconciler) reconcile(ctx context.Context, channel *kafkav1beta1.KafkaChannel) error {

// The KafkaChannel's Subscribers
var subscribers []eventingduck.SubscriberSpec
@@ -231,7 +231,7 @@ func (r Reconciler) reconcile(channel *kafkav1beta1.KafkaChannel) error {
}

// Update The ConsumerGroups To Align With Current KafkaChannel Subscribers
subscriptions := r.dispatcher.UpdateSubscriptions(subscribers)
subscriptions := r.dispatcher.UpdateSubscriptions(ctx, subscribers)

// Update The KafkaChannel Subscribable Status Based On ConsumerGroup Creation Status
channel.Status.SubscribableStatus = r.createSubscribableStatus(channel.Spec.Subscribers, subscriptions)
Original file line number Diff line number Diff line change
@@ -365,7 +365,7 @@ func (m *MockDispatcher) Shutdown() {
m.Called()
}

func (m *MockDispatcher) UpdateSubscriptions(subscriberSpecs []eventingduck.SubscriberSpec) consumer.SubscriberStatusMap {
func (m *MockDispatcher) UpdateSubscriptions(_ context.Context, subscriberSpecs []eventingduck.SubscriberSpec) consumer.SubscriberStatusMap {
args := m.Called(subscriberSpecs)
return args.Get(0).(consumer.SubscriberStatusMap)
}
8 changes: 4 additions & 4 deletions pkg/channel/distributed/dispatcher/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@ func NewSubscriberWrapper(subscriberSpec eventingduck.SubscriberSpec, groupId st
type Dispatcher interface {
SecretChanged(ctx context.Context, secret *corev1.Secret)
Shutdown()
UpdateSubscriptions(subscriberSpecs []eventingduck.SubscriberSpec) commonconsumer.SubscriberStatusMap
UpdateSubscriptions(ctx context.Context, subscriberSpecs []eventingduck.SubscriberSpec) commonconsumer.SubscriberStatusMap
}

// DispatcherImpl Is A Struct With Configuration & ConsumerGroup State
@@ -85,7 +85,7 @@ var _ Dispatcher = &DispatcherImpl{}
// NewDispatcher Is The Dispatcher Constructor
func NewDispatcher(dispatcherConfig DispatcherConfig, controlServer controlprotocol.ServerHandler) (Dispatcher, <-chan commonconsumer.ManagerEvent) {

consumerGroupManager := commonconsumer.NewConsumerGroupManager(dispatcherConfig.Logger, controlServer, dispatcherConfig.Brokers, dispatcherConfig.SaramaConfig)
consumerGroupManager := commonconsumer.NewConsumerGroupManager(dispatcherConfig.Logger, controlServer, dispatcherConfig.Brokers, dispatcherConfig.SaramaConfig, &commonconsumer.NoopConsumerGroupOffsetsChecker{})

// Create The DispatcherImpl With Specified Configuration
dispatcher := &DispatcherImpl{
@@ -123,7 +123,7 @@ func (d *DispatcherImpl) Shutdown() {
}

// UpdateSubscriptions manages the Dispatcher's Subscriptions to align with new state
func (d *DispatcherImpl) UpdateSubscriptions(subscriberSpecs []eventingduck.SubscriberSpec) commonconsumer.SubscriberStatusMap {
func (d *DispatcherImpl) UpdateSubscriptions(ctx context.Context, subscriberSpecs []eventingduck.SubscriberSpec) commonconsumer.SubscriberStatusMap {

if d.SaramaConfig == nil {
d.Logger.Error("Dispatcher has no config!")
@@ -151,7 +151,7 @@ func (d *DispatcherImpl) UpdateSubscriptions(subscriberSpecs []eventingduck.Subs

// Create/Start A New ConsumerGroup With Custom Handler
handler := NewHandler(logger, groupId, &subscriberSpec)
err := d.consumerMgr.StartConsumerGroup(groupId, []string{d.Topic}, d.Logger.Sugar(), handler)
err := d.consumerMgr.StartConsumerGroup(ctx, groupId, []string{d.Topic}, handler)
if err != nil {

// Log & Return Failure
Original file line number Diff line number Diff line change
@@ -365,7 +365,7 @@ func TestUpdateSubscriptions(t *testing.T) {
if !testCase.wantFailure {
mockManager.On("ClearNotifications").Return()
for _, id := range testCase.expectStarted {
mockManager.On("StartConsumerGroup", "kafka."+id, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(testCase.createErr)
mockManager.On("StartConsumerGroup", mock.Anything, "kafka."+id, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(testCase.createErr)
}
for _, id := range testCase.expectErrors {
mockManager.On("Errors", "kafka."+id).Return((<-chan error)(errorSource))
@@ -380,7 +380,7 @@ func TestUpdateSubscriptions(t *testing.T) {
}

// Perform The Test
result := dispatcher.UpdateSubscriptions(testCase.args.subscriberSpecs)
result := dispatcher.UpdateSubscriptions(ctx, testCase.args.subscriberSpecs)

close(errorSource)

33 changes: 25 additions & 8 deletions pkg/common/consumer/consumer_factory.go
Original file line number Diff line number Diff line change
@@ -22,22 +22,24 @@ import (

"github.com/Shopify/sarama"
"go.uber.org/zap"
"knative.dev/pkg/logging"
)

// newConsumerGroup is a wrapper for the Sarama NewConsumerGroup function, to facilitate unit testing
// wrapper functions for the Sarama functions, to facilitate unit testing
var newConsumerGroup = sarama.NewConsumerGroup

// consumeFunc is a function type that matches the Sarama ConsumerGroup's Consume function
type consumeFunc func(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error

// KafkaConsumerGroupFactory creates the ConsumerGroup and start consuming the specified topic
type KafkaConsumerGroupFactory interface {
StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error)
StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error)
}

type kafkaConsumerGroupFactoryImpl struct {
config *sarama.Config
addrs []string
config *sarama.Config
addrs []string
offsetsChecker ConsumerGroupOffsetsChecker
}

type customConsumerGroup struct {
@@ -64,13 +66,17 @@ func (c *customConsumerGroup) Close() error {
var _ sarama.ConsumerGroup = (*customConsumerGroup)(nil)

// StartConsumerGroup creates a new customConsumerGroup and starts a Consume goroutine on it
func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics []string, logger *zap.SugaredLogger, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(ctx context.Context, groupID string, topics []string, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
logger := logging.FromContext(ctx)

consumerGroup, err := c.createConsumerGroup(groupID)
if err != nil {
logger.Errorw("unable to create consumer group", zap.String("groupId", groupID), zap.Error(err))
return nil, err
}

// Start the consumerGroup.Consume function in a separate goroutine
return c.startExistingConsumerGroup(consumerGroup, consumerGroup.Consume, topics, logger, handler, options...), nil
return c.startExistingConsumerGroup(groupID, consumerGroup, consumerGroup.Consume, topics, logger, handler, options...), nil
}

// createConsumerGroup creates a Sarama ConsumerGroup using the newConsumerGroup wrapper, with the
@@ -82,6 +88,7 @@ func (c kafkaConsumerGroupFactoryImpl) createConsumerGroup(groupID string) (sara
// startExistingConsumerGroup creates a goroutine that begins a custom Consume loop on the provided ConsumerGroup
// This loop is cancelable via the function provided in the returned customConsumerGroup.
func (c kafkaConsumerGroupFactoryImpl) startExistingConsumerGroup(
groupID string,
saramaGroup sarama.ConsumerGroup,
consume consumeFunc,
topics []string,
@@ -94,6 +101,16 @@ func (c kafkaConsumerGroupFactoryImpl) startExistingConsumerGroup(
ctx, cancel := context.WithCancel(context.Background())

go func() {
// this is a blocking func
// do not proceed until the check is done
err := c.offsetsChecker.WaitForOffsetsInitialization(ctx, groupID, topics, logger, c.addrs, c.config)
if err != nil {
logger.Errorw("error while checking if offsets are initialized", zap.Any("topics", topics), zap.String("groupId", groupID), zap.Error(err))
errorCh <- err
}

logger.Debugw("all offsets are initialized", zap.Any("topics", topics), zap.Any("groupID", groupID))

defer func() {
close(errorCh)
releasedCh <- true
@@ -119,8 +136,8 @@ func (c kafkaConsumerGroupFactoryImpl) startExistingConsumerGroup(
return &customConsumerGroup{cancel, errorCh, saramaGroup, releasedCh}
}

func NewConsumerGroupFactory(addrs []string, config *sarama.Config) KafkaConsumerGroupFactory {
return kafkaConsumerGroupFactoryImpl{addrs: addrs, config: config}
func NewConsumerGroupFactory(addrs []string, config *sarama.Config, offsetsChecker ConsumerGroupOffsetsChecker) KafkaConsumerGroupFactory {
return kafkaConsumerGroupFactoryImpl{addrs: addrs, config: config, offsetsChecker: offsetsChecker}
}

var _ KafkaConsumerGroupFactory = (*kafkaConsumerGroupFactoryImpl)(nil)
66 changes: 55 additions & 11 deletions pkg/common/consumer/consumer_factory_test.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,8 @@ import (

"github.com/Shopify/sarama"
"go.uber.org/zap"
controllertesting "knative.dev/eventing-kafka/pkg/common/commands/resetoffset/controller/testing"
commontesting "knative.dev/eventing-kafka/pkg/common/testing"
)

//------ Mocks
@@ -85,18 +87,58 @@ func mockedNewConsumerGroupFromClient(mockInputMessageCh chan *sarama.ConsumerMe
}
}

func mockedNewSaramaClient(client *controllertesting.MockClient, mustFail bool) func(addrs []string, config *sarama.Config) (sarama.Client, error) {
if !mustFail {
return func(addrs []string, config *sarama.Config) (sarama.Client, error) {
return client, nil
}
} else {
return func(addrs []string, config *sarama.Config) (sarama.Client, error) {
return nil, errors.New("failed")
}
}
}

func mockedNewSaramaClusterAdmin(clusterAdmin sarama.ClusterAdmin, mustFail bool) func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) {
if !mustFail {
return func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) {
return clusterAdmin, nil
}
} else {
return func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) {
return nil, errors.New("failed")
}
}
}

//------ Tests

type mockConsumerGroupOffsetsChecker struct {
}

func (m mockConsumerGroupOffsetsChecker) WaitForOffsetsInitialization(ctx context.Context, groupID string, topics []string, logger *zap.SugaredLogger, addrs []string, config *sarama.Config) error {
return nil
}

func TestErrorPropagationCustomConsumerGroup(t *testing.T) {
ctx := context.TODO()
client := controllertesting.NewMockClient(
controllertesting.WithClientMockClosed(false),
controllertesting.WithClientMockClose(nil))
clusterAdmin := &commontesting.MockClusterAdmin{}

// override some functions
newConsumerGroup = mockedNewConsumerGroupFromClient(nil, true, true, false, false)
newSaramaClient = mockedNewSaramaClient(client, false)
newSaramaClusterAdmin = mockedNewSaramaClusterAdmin(clusterAdmin, false)

factory := kafkaConsumerGroupFactoryImpl{
config: sarama.NewConfig(),
addrs: []string{"b1", "b2"},
config: sarama.NewConfig(),
addrs: []string{"b1", "b2"},
offsetsChecker: &mockConsumerGroupOffsetsChecker{},
}

consumerGroup, err := factory.StartConsumerGroup("bla", []string{}, zap.NewNop().Sugar(), nil)
consumerGroup, err := factory.StartConsumerGroup(ctx, "bla", []string{}, nil)
if err != nil {
t.Errorf("Should not throw error %v", err)
}
@@ -137,29 +179,31 @@ func assertContainsError(t *testing.T, collection []error, errorStr string) {
}

func TestErrorWhileCreatingNewConsumerGroup(t *testing.T) {

ctx := context.TODO()
newConsumerGroup = mockedNewConsumerGroupFromClient(nil, true, true, false, true)

factory := kafkaConsumerGroupFactoryImpl{
config: sarama.NewConfig(),
addrs: []string{"b1", "b2"},
config: sarama.NewConfig(),
addrs: []string{"b1", "b2"},
offsetsChecker: &mockConsumerGroupOffsetsChecker{},
}
_, err := factory.StartConsumerGroup("bla", []string{}, zap.L().Sugar(), nil)
_, err := factory.StartConsumerGroup(ctx, "bla", []string{}, nil)

if err == nil || err.Error() != "failed" {
t.Errorf("Should contain an error with message failed. Got %v", err)
}
}

func TestErrorWhileNewConsumerGroup(t *testing.T) {

ctx := context.TODO()
newConsumerGroup = mockedNewConsumerGroupFromClient(nil, false, false, true, false)

factory := kafkaConsumerGroupFactoryImpl{
config: sarama.NewConfig(),
addrs: []string{"b1", "b2"},
config: sarama.NewConfig(),
addrs: []string{"b1", "b2"},
offsetsChecker: &mockConsumerGroupOffsetsChecker{},
}
consumerGroup, _ := factory.StartConsumerGroup("bla", []string{}, zap.L().Sugar(), nil)
consumerGroup, _ := factory.StartConsumerGroup(ctx, "bla", []string{}, nil)

consumerGroup.(*customConsumerGroup).cancel() // Stop the consume loop from spinning after the error is generated
err := <-consumerGroup.Errors()
27 changes: 16 additions & 11 deletions pkg/common/consumer/consumer_manager.go
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@ import (
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
ctrlservice "knative.dev/control-protocol/pkg/service"
"knative.dev/pkg/logging"

"knative.dev/eventing-kafka/pkg/common/controlprotocol"
"knative.dev/eventing-kafka/pkg/common/controlprotocol/commands"
@@ -93,7 +94,7 @@ type ManagerEvent struct {
// KafkaConsumerGroupManager keeps track of Sarama consumer groups and handles messages from control-protocol clients
type KafkaConsumerGroupManager interface {
Reconfigure(brokers []string, config *sarama.Config) *ReconfigureError
StartConsumerGroup(groupId string, topics []string, logger *zap.SugaredLogger, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) error
StartConsumerGroup(ctx context.Context, groupId string, topics []string, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) error
CloseConsumerGroup(groupId string) error
Errors(groupId string) <-chan error
IsManaged(groupId string) bool
@@ -115,6 +116,7 @@ type kafkaConsumerGroupManagerImpl struct {
groupLock sync.RWMutex // Synchronizes write access to the groupMap
notifyChannels []chan ManagerEvent
eventLock sync.Mutex
offsetsChecker ConsumerGroupOffsetsChecker
}

// Verify that the kafkaConsumerGroupManagerImpl satisfies the KafkaConsumerGroupManager interface
@@ -139,15 +141,16 @@ func (r ReconfigureError) Error() string {
var _ error = (*ReconfigureError)(nil)

// NewConsumerGroupManager returns a new kafkaConsumerGroupManagerImpl as a KafkaConsumerGroupManager interface
func NewConsumerGroupManager(logger *zap.Logger, serverHandler controlprotocol.ServerHandler, brokers []string, config *sarama.Config) KafkaConsumerGroupManager {
func NewConsumerGroupManager(logger *zap.Logger, serverHandler controlprotocol.ServerHandler, brokers []string, config *sarama.Config, offsetsChecker ConsumerGroupOffsetsChecker) KafkaConsumerGroupManager {

manager := &kafkaConsumerGroupManagerImpl{
logger: logger,
server: serverHandler,
groups: make(groupMap),
factory: &kafkaConsumerGroupFactoryImpl{addrs: brokers, config: config},
groupLock: sync.RWMutex{},
eventLock: sync.Mutex{},
logger: logger,
server: serverHandler,
groups: make(groupMap),
factory: &kafkaConsumerGroupFactoryImpl{addrs: brokers, config: config, offsetsChecker: offsetsChecker},
groupLock: sync.RWMutex{},
eventLock: sync.Mutex{},
offsetsChecker: offsetsChecker,
}

logger.Info("Registering Consumer Group Manager Control-Protocol Handlers")
@@ -236,7 +239,7 @@ func (m *kafkaConsumerGroupManagerImpl) Reconfigure(brokers []string, config *sa
}
}

m.factory = &kafkaConsumerGroupFactoryImpl{addrs: brokers, config: config}
m.factory = &kafkaConsumerGroupFactoryImpl{addrs: brokers, config: config, offsetsChecker: m.offsetsChecker}

// Restart any groups this function stopped
m.logger.Info("Reconfigure Consumer Group Manager - Starting All Managed Consumer Groups")
@@ -261,7 +264,9 @@ func (m *kafkaConsumerGroupManagerImpl) Reconfigure(brokers []string, config *sa

// StartConsumerGroup uses the consumer factory to create a new ConsumerGroup, add it to the list
// of managed groups (for start/stop functionality) and start the Consume loop.
func (m *kafkaConsumerGroupManagerImpl) StartConsumerGroup(groupId string, topics []string, logger *zap.SugaredLogger, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) error {
func (m *kafkaConsumerGroupManagerImpl) StartConsumerGroup(ctx context.Context, groupId string, topics []string, handler KafkaConsumerHandler, options ...SaramaConsumerHandlerOption) error {
logger := logging.FromContext(ctx)

groupLogger := m.logger.With(zap.String("GroupId", groupId))
groupLogger.Info("Creating New Managed ConsumerGroup")
group, err := m.factory.createConsumerGroup(groupId)
@@ -281,7 +286,7 @@ func (m *kafkaConsumerGroupManagerImpl) StartConsumerGroup(groupId string, topic
}

// The only thing we really want from the factory is the cancel function for the customConsumerGroup
customGroup := m.factory.startExistingConsumerGroup(group, consume, topics, logger, handler, options...)
customGroup := m.factory.startExistingConsumerGroup(groupId, group, consume, topics, logger, handler, options...)
managedGrp := createManagedGroup(ctx, m.logger, group, cancel, customGroup.cancel)

// Add the Sarama ConsumerGroup we obtained from the factory to the managed group map,
9 changes: 5 additions & 4 deletions pkg/common/consumer/consumer_manager_test.go
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ import (

func TestNewConsumerGroupManager(t *testing.T) {
server := getMockServerHandler()
manager := NewConsumerGroupManager(logtesting.TestLogger(t).Desugar(), server, []string{}, &sarama.Config{})
manager := NewConsumerGroupManager(logtesting.TestLogger(t).Desugar(), server, []string{}, &sarama.Config{}, &NoopConsumerGroupOffsetsChecker{})
assert.NotNil(t, manager)
assert.NotNil(t, server.Router[commands.StopConsumerGroupOpCode])
assert.NotNil(t, server.Router[commands.StartConsumerGroupOpCode])
@@ -155,7 +155,8 @@ func TestStartConsumerGroup(t *testing.T) {
},
} {
t.Run(testCase.name, func(t *testing.T) {
manager := NewConsumerGroupManager(logtesting.TestLogger(t).Desugar(), getMockServerHandler(), []string{}, &sarama.Config{})
ctx := context.TODO()
manager := NewConsumerGroupManager(logtesting.TestLogger(t).Desugar(), getMockServerHandler(), []string{}, &sarama.Config{}, &NoopConsumerGroupOffsetsChecker{})
mockGroup := kafkatesting.NewMockConsumerGroup()
newConsumerGroup = func(addrs []string, groupID string, config *sarama.Config) (sarama.ConsumerGroup, error) {
if testCase.factoryErr {
@@ -164,7 +165,7 @@ func TestStartConsumerGroup(t *testing.T) {
mockGroup.On("Errors").Return(mockGroup.ErrorChan)
return mockGroup, nil
}
err := manager.StartConsumerGroup("testid", []string{}, nil, nil)
err := manager.StartConsumerGroup(ctx, "testid", []string{}, nil, nil)
assert.Equal(t, testCase.factoryErr, err != nil)
time.Sleep(5 * time.Millisecond) // Give the transferErrors routine a chance to call Errors()
mockGroup.AssertExpectations(t)
@@ -620,7 +621,7 @@ func getManagerWithMockGroup(t *testing.T, groupId string, factoryErr bool) (Kaf
mockGroup.On("Errors").Return(make(chan error))
return mockGroup, nil
}
manager := NewConsumerGroupManager(logtesting.TestLogger(t).Desugar(), serverHandler, []string{}, &sarama.Config{})
manager := NewConsumerGroupManager(logtesting.TestLogger(t).Desugar(), serverHandler, []string{}, &sarama.Config{}, &NoopConsumerGroupOffsetsChecker{})
if groupId != "" {
mockGroup, managedGrp := createMockAndManagedGroups(t)
manager.(*kafkaConsumerGroupManagerImpl).groups[groupId] = managedGrp
74 changes: 74 additions & 0 deletions pkg/common/consumer/consumergroup_offsets_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package consumer

import (
"context"
"fmt"
"time"

"github.com/Shopify/sarama"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/eventing-kafka/pkg/common/kafka/offset"
)

const (
OffsetCheckRetryTimeout = 60 * time.Second
OffsetCheckRetryInterval = 5 * time.Second
)

// wrapper functions for the Sarama functions, to facilitate unit testing
var newSaramaClient = sarama.NewClient
var newSaramaClusterAdmin = sarama.NewClusterAdmin

type ConsumerGroupOffsetsChecker interface {
WaitForOffsetsInitialization(ctx context.Context, groupID string, topics []string, logger *zap.SugaredLogger, addrs []string, config *sarama.Config) error
}

type NoopConsumerGroupOffsetsChecker struct {
}

func (c *NoopConsumerGroupOffsetsChecker) WaitForOffsetsInitialization(ctx context.Context, groupID string, topics []string, logger *zap.SugaredLogger, addrs []string, config *sarama.Config) error {
return nil
}

type KafkaConsumerGroupOffsetsChecker struct {
}

func (k *KafkaConsumerGroupOffsetsChecker) WaitForOffsetsInitialization(ctx context.Context, groupID string, topics []string, logger *zap.SugaredLogger, addrs []string, config *sarama.Config) error {
logger.Debugw("checking if all offsets are initialized", zap.Any("topics", topics), zap.Any("groupID", groupID))

client, err := newSaramaClient(addrs, config)
if err != nil {
logger.Errorw("unable to create Kafka client", zap.Any("topics", topics), zap.String("groupId", groupID), zap.Error(err))
return err
}
defer client.Close()

clusterAdmin, err := newSaramaClusterAdmin(addrs, config)
if err != nil {
logger.Errorw("unable to create Kafka cluster admin client", zap.Any("topics", topics), zap.String("groupId", groupID), zap.Error(err))
return err
}
defer clusterAdmin.Close()

check := func() (bool, error) {
if initialized, err := offset.CheckIfAllOffsetsInitialized(client, clusterAdmin, topics, groupID); err == nil {
if initialized {
return true, nil
} else {
logger.Debugw("offsets not yet initialized, going to try again")
return false, nil
}
} else {
return false, fmt.Errorf("error checking if offsets are initialized. stopping trying. %w", err)
}
}
pollCtx, pollCtxCancel := context.WithTimeout(ctx, OffsetCheckRetryTimeout)
err = wait.PollUntil(OffsetCheckRetryInterval, check, pollCtx.Done())
defer pollCtxCancel()

if err != nil {
return fmt.Errorf("failed to check if offsets are initialized %w", err)
}
return nil
}
12 changes: 6 additions & 6 deletions pkg/common/consumer/testing/mocks.go
Original file line number Diff line number Diff line change
@@ -17,10 +17,10 @@ limitations under the License.
package testing

import (
"context"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"

"knative.dev/eventing-kafka/pkg/common/consumer"
)

@@ -33,8 +33,8 @@ type MockKafkaConsumerGroupFactory struct {
mock.Mock
}

func (c *MockKafkaConsumerGroupFactory) StartConsumerGroup(groupId string, topics []string, logger *zap.SugaredLogger, handler consumer.KafkaConsumerHandler, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
args := c.Called(groupId, topics, logger, handler, options)
func (c *MockKafkaConsumerGroupFactory) StartConsumerGroup(ctx context.Context, groupId string, topics []string, handler consumer.KafkaConsumerHandler, options ...consumer.SaramaConsumerHandlerOption) (sarama.ConsumerGroup, error) {
args := c.Called(ctx, groupId, topics, handler, options)
return args.Get(0).(sarama.ConsumerGroup), args.Error(1)
}

@@ -60,9 +60,9 @@ func (m *MockConsumerGroupManager) Reconfigure(brokers []string, config *sarama.
return m.Called(brokers, config).Get(0).(*consumer.ReconfigureError)
}

func (m *MockConsumerGroupManager) StartConsumerGroup(groupId string, topics []string, logger *zap.SugaredLogger,
func (m *MockConsumerGroupManager) StartConsumerGroup(ctx context.Context, groupId string, topics []string,
handler consumer.KafkaConsumerHandler, options ...consumer.SaramaConsumerHandlerOption) error {
return m.Called(groupId, topics, logger, handler, options).Error(0)
return m.Called(ctx, groupId, topics, handler, options).Error(0)
}

func (m *MockConsumerGroupManager) CloseConsumerGroup(groupId string) error {
58 changes: 47 additions & 11 deletions pkg/common/kafka/offset/offsets.go
Original file line number Diff line number Diff line change
@@ -33,22 +33,14 @@ import (
// Without InitOffsets, an event sent to a partition with an uninitialized offset
// will not be forwarded when the session is closed (or a rebalancing is in progress).
func InitOffsets(ctx context.Context, kafkaClient sarama.Client, kafkaAdminClient sarama.ClusterAdmin, topics []string, consumerGroup string) (int32, error) {
totalPartitions := 0
offsetManager, err := sarama.NewOffsetManagerFromClient(consumerGroup, kafkaClient)
if err != nil {
return -1, err
}

// Retrieve all partitions
topicPartitions := make(map[string][]int32)
for _, topic := range topics {
partitions, err := kafkaClient.Partitions(topic)
totalPartitions += len(partitions)
if err != nil {
return -1, fmt.Errorf("failed to get partitions for topic %s: %w", topic, err)
}

topicPartitions[topic] = partitions
totalPartitions, topicPartitions, err := retrieveAllPartitions(topics, kafkaClient)
if err != nil {
return -1, err
}

// Fetch topic offsets
@@ -101,3 +93,47 @@ func InitOffsets(ctx context.Context, kafkaClient sarama.Client, kafkaAdminClien
return int32(totalPartitions), nil

}

func CheckIfAllOffsetsInitialized(kafkaClient sarama.Client, kafkaAdminClient sarama.ClusterAdmin, topics []string, consumerGroup string) (bool, error) {
_, topicPartitions, err := retrieveAllPartitions(topics, kafkaClient)
if err != nil {
return false, err
}

// Look for uninitialized offset (-1)
offsets, err := kafkaAdminClient.ListConsumerGroupOffsets(consumerGroup, topicPartitions)
if err != nil {
return false, err
}

for _, partitions := range offsets.Blocks {
for _, block := range partitions {
if block.Offset == -1 { // not initialized?
return false, nil
}
}
}

return true, nil
}

func retrieveAllPartitions(topics []string, kafkaClient sarama.Client) (int, map[string][]int32, error) {
totalPartitions := 0

// Retrieve all partitions
topicPartitions := make(map[string][]int32)
for _, topic := range topics {
partitions, err := kafkaClient.Partitions(topic)
totalPartitions += len(partitions)
if err != nil {
return -1, nil, fmt.Errorf("failed to get partitions for topic %s: %w", topic, err)
}

// return a copy of the partitions array in the map
// Sarama is caching this array and we don't want nobody to mess with it
clone := make([]int32, len(partitions))
copy(clone, partitions)
topicPartitions[topic] = clone
}
return totalPartitions, topicPartitions, nil
}
212 changes: 125 additions & 87 deletions pkg/common/kafka/offset/offsets_test.go
Original file line number Diff line number Diff line change
@@ -25,110 +25,105 @@ import (
logtesting "knative.dev/pkg/logging/testing"
)

func TestInitOffsets(t *testing.T) {
testCases := map[string]struct {
topics []string
topicOffsets map[string]map[int32]int64
cgOffsets map[string]map[int32]int64
wantCommit bool
}{
"one topic, one partition, initialized": {
topics: []string{"my-topic"},
topicOffsets: map[string]map[int32]int64{
"my-topic": {
0: 5,
},
},
cgOffsets: map[string]map[int32]int64{
"my-topic": {
0: 2,
},
var testCases = map[string]struct {
topics []string
topicOffsets map[string]map[int32]int64
cgOffsets map[string]map[int32]int64
initialized bool
}{
"one topic, one partition, initialized": {
topics: []string{"my-topic"},
topicOffsets: map[string]map[int32]int64{
"my-topic": {
0: 5,
},
wantCommit: false,
},
"one topic, one partition, uninitialized": {
topics: []string{"my-topic"},
topicOffsets: map[string]map[int32]int64{
"my-topic": {
0: 5,
},
cgOffsets: map[string]map[int32]int64{
"my-topic": {
0: 2,
},
cgOffsets: map[string]map[int32]int64{
"my-topic": {
0: -1,
},
},
wantCommit: true,
},
"several topics, several partitions, not all initialized": {
topics: []string{"my-topic", "my-topic-2", "my-topic-3"},
topicOffsets: map[string]map[int32]int64{
"my-topic": {0: 5, 1: 7},
"my-topic-2": {0: 5, 1: 7, 2: 9},
"my-topic-3": {0: 5, 1: 7, 2: 2, 3: 10},
initialized: true,
},
"one topic, one partition, uninitialized": {
topics: []string{"my-topic"},
topicOffsets: map[string]map[int32]int64{
"my-topic": {
0: 5,
},
cgOffsets: map[string]map[int32]int64{
"my-topic": {0: -1, 1: 7},
"my-topic-2": {0: 5, 1: -1, 2: -1},
"my-topic-3": {0: 5, 1: 7, 2: -1, 3: 10},
},
cgOffsets: map[string]map[int32]int64{
"my-topic": {
0: -1,
},
wantCommit: true,
},
}
initialized: false,
},
"several topics, several partitions, not all initialized": {
topics: []string{"my-topic", "my-topic-2", "my-topic-3"},
topicOffsets: map[string]map[int32]int64{
"my-topic": {0: 5, 1: 7},
"my-topic-2": {0: 5, 1: 7, 2: 9},
"my-topic-3": {0: 5, 1: 7, 2: 2, 3: 10},
},
cgOffsets: map[string]map[int32]int64{
"my-topic": {0: -1, 1: 7},
"my-topic-2": {0: 5, 1: -1, 2: -1},
"my-topic-3": {0: 5, 1: 7, 2: -1, 3: 10},
},
initialized: false,
},
}

func TestInitOffsets(t *testing.T) {
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
broker := sarama.NewMockBroker(t, 1)
defer broker.Close()

group := "my-group"

offsetResponse := sarama.NewMockOffsetResponse(t).SetVersion(1)
for topic, partitions := range tc.topicOffsets {
for partition, offset := range partitions {
offsetResponse = offsetResponse.SetOffset(topic, partition, -1, offset)
}
}

offsetFetchResponse := sarama.NewMockOffsetFetchResponse(t).SetError(sarama.ErrNoError)
for topic, partitions := range tc.cgOffsets {
for partition, offset := range partitions {
offsetFetchResponse = offsetFetchResponse.SetOffset(group, topic, partition, offset, "", sarama.ErrNoError)
}
}
configureMockBroker(t, group, tc.topicOffsets, tc.cgOffsets, tc.initialized, broker)

offsetCommitResponse := sarama.NewMockOffsetCommitResponse(t)
serr := sarama.ErrNoError
if !tc.wantCommit {
serr = sarama.ErrUnknown // could be anything
config := sarama.NewConfig()
config.Version = sarama.MaxVersion

sc, err := sarama.NewClient([]string{broker.Addr()}, config)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
defer sc.Close()

for topic, partitions := range tc.cgOffsets {
for partition := range partitions {
offsetCommitResponse = offsetCommitResponse.SetError(group, topic, partition, serr)
}
kac, err := sarama.NewClusterAdminFromClient(sc)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
defer kac.Close()

metadataResponse := sarama.NewMockMetadataResponse(t).
SetController(broker.BrokerID()).
SetBroker(broker.Addr(), broker.BrokerID())
for topic, partitions := range tc.topicOffsets {
for partition := range partitions {
metadataResponse = metadataResponse.SetLeader(topic, partition, broker.BrokerID())
}
// test InitOffsets
ctx := logtesting.TestContextWithLogger(t)
partitionCt, err := InitOffsets(ctx, sc, kac, tc.topics, group)
total := 0
for _, partitions := range tc.topicOffsets {
total += len(partitions)
}
assert.Equal(t, int(partitionCt), total)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
})
}
}

broker.SetHandlerByMap(map[string]sarama.MockResponse{
"OffsetRequest": offsetResponse,
"OffsetFetchRequest": offsetFetchResponse,
"OffsetCommitRequest": offsetCommitResponse,
func TestCheckIfAllOffsetsInitialized(t *testing.T) {
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
broker := sarama.NewMockBroker(t, 1)
defer broker.Close()

"FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t).
SetCoordinator(sarama.CoordinatorGroup, group, broker),
group := "my-group"

"MetadataRequest": metadataResponse,
})
configureMockBroker(t, group, tc.topicOffsets, tc.cgOffsets, tc.initialized, broker)

config := sarama.NewConfig()
config.Version = sarama.MaxVersion
@@ -145,18 +140,61 @@ func TestInitOffsets(t *testing.T) {
}
defer kac.Close()

ctx := logtesting.TestContextWithLogger(t)
partitionCt, err := InitOffsets(ctx, sc, kac, tc.topics, group)
total := 0
for _, partitions := range tc.topicOffsets {
total += len(partitions)
}
assert.Equal(t, int(partitionCt), total)
// test CheckIfAllOffsetsInitialized
retrieved, err := CheckIfAllOffsetsInitialized(sc, kac, tc.topics, group)
assert.Equal(t, retrieved, tc.initialized)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

})
}
}

func configureMockBroker(t *testing.T, group string, topicOffsets map[string]map[int32]int64, cgOffsets map[string]map[int32]int64, initialized bool, broker *sarama.MockBroker) {
offsetResponse := sarama.NewMockOffsetResponse(t).SetVersion(1)
for topic, partitions := range topicOffsets {
for partition, offset := range partitions {
offsetResponse = offsetResponse.SetOffset(topic, partition, -1, offset)
}
}

offsetFetchResponse := sarama.NewMockOffsetFetchResponse(t).SetError(sarama.ErrNoError)
for topic, partitions := range cgOffsets {
for partition, offset := range partitions {
offsetFetchResponse = offsetFetchResponse.SetOffset(group, topic, partition, offset, "", sarama.ErrNoError)
}
}

offsetCommitResponse := sarama.NewMockOffsetCommitResponse(t)
serr := sarama.ErrNoError
if initialized { // means, we want a commit
serr = sarama.ErrUnknown // could be anything

}

for topic, partitions := range cgOffsets {
for partition := range partitions {
offsetCommitResponse = offsetCommitResponse.SetError(group, topic, partition, serr)
}
}

metadataResponse := sarama.NewMockMetadataResponse(t).
SetController(broker.BrokerID()).
SetBroker(broker.Addr(), broker.BrokerID())
for topic, partitions := range topicOffsets {
for partition := range partitions {
metadataResponse = metadataResponse.SetLeader(topic, partition, broker.BrokerID())
}
}

broker.SetHandlerByMap(map[string]sarama.MockResponse{
"OffsetRequest": offsetResponse,
"OffsetFetchRequest": offsetFetchResponse,
"OffsetCommitRequest": offsetCommitResponse,

"FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t).
SetCoordinator(sarama.CoordinatorGroup, group, broker),

"MetadataRequest": metadataResponse,
})
}
113 changes: 113 additions & 0 deletions pkg/common/testing/mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package testing

import "github.com/Shopify/sarama"

var _ sarama.ClusterAdmin = (*MockClusterAdmin)(nil)

type MockClusterAdmin struct {
MockCreateTopicFunc func(topic string, detail *sarama.TopicDetail, validateOnly bool) error
MockDeleteTopicFunc func(topic string) error
MockListConsumerGroupsFunc func() (map[string]string, error)
}

func (ca *MockClusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
return nil
}

func (ca *MockClusterAdmin) ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, err error) {
return nil, nil
}

func (ca *MockClusterAdmin) DescribeLogDirs(brokers []int32) (map[int32][]sarama.DescribeLogDirsResponseDirMetadata, error) {
return nil, nil
}

func (ca *MockClusterAdmin) DescribeUserScramCredentials(users []string) ([]*sarama.DescribeUserScramCredentialsResult, error) {
return nil, nil
}

func (ca *MockClusterAdmin) DeleteUserScramCredentials(delete []sarama.AlterUserScramCredentialsDelete) ([]*sarama.AlterUserScramCredentialsResult, error) {
return nil, nil
}

func (ca *MockClusterAdmin) UpsertUserScramCredentials(upsert []sarama.AlterUserScramCredentialsUpsert) ([]*sarama.AlterUserScramCredentialsResult, error) {
return nil, nil
}

func (ca *MockClusterAdmin) CreateTopic(topic string, detail *sarama.TopicDetail, validateOnly bool) error {
if ca.MockCreateTopicFunc != nil {
return ca.MockCreateTopicFunc(topic, detail, validateOnly)
}
return nil
}

func (ca *MockClusterAdmin) Close() error {
return nil
}

func (ca *MockClusterAdmin) DeleteTopic(topic string) error {
if ca.MockDeleteTopicFunc != nil {
return ca.MockDeleteTopicFunc(topic)
}
return nil
}

func (ca *MockClusterAdmin) DescribeTopics(topics []string) (metadata []*sarama.TopicMetadata, err error) {
return nil, nil
}

func (ca *MockClusterAdmin) ListTopics() (map[string]sarama.TopicDetail, error) {
return nil, nil
}

func (ca *MockClusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
return nil
}

func (ca *MockClusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
return nil
}

func (ca *MockClusterAdmin) DescribeConfig(resource sarama.ConfigResource) ([]sarama.ConfigEntry, error) {
return nil, nil
}

func (ca *MockClusterAdmin) AlterConfig(resourceType sarama.ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
return nil
}

func (ca *MockClusterAdmin) CreateACL(resource sarama.Resource, acl sarama.Acl) error {
return nil
}

func (ca *MockClusterAdmin) ListAcls(filter sarama.AclFilter) ([]sarama.ResourceAcls, error) {
return nil, nil
}

func (ca *MockClusterAdmin) DeleteACL(filter sarama.AclFilter, validateOnly bool) ([]sarama.MatchingAcl, error) {
return nil, nil
}

func (ca *MockClusterAdmin) ListConsumerGroups() (map[string]string, error) {
if ca.MockListConsumerGroupsFunc != nil {
return ca.MockListConsumerGroupsFunc()
}
return nil, nil
}

func (ca *MockClusterAdmin) DescribeConsumerGroups(groups []string) ([]*sarama.GroupDescription, error) {
return nil, nil
}

func (ca *MockClusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*sarama.OffsetFetchResponse, error) {
return &sarama.OffsetFetchResponse{}, nil
}

func (ca *MockClusterAdmin) DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error) {
return nil, 0, nil
}

// Delete a consumer group.
func (ca *MockClusterAdmin) DeleteConsumerGroup(group string) error {
return nil
}
4 changes: 2 additions & 2 deletions pkg/source/adapter/adapter.go
Original file line number Diff line number Diff line change
@@ -128,11 +128,11 @@ func (a *Adapter) Start(ctx context.Context) (err error) {
a.saramaConfig = config

options := []consumer.SaramaConsumerHandlerOption{consumer.WithSaramaConsumerLifecycleListener(a)}
consumerGroupFactory := consumer.NewConsumerGroupFactory(addrs, config)
consumerGroupFactory := consumer.NewConsumerGroupFactory(addrs, config, &consumer.NoopConsumerGroupOffsetsChecker{})
group, err := consumerGroupFactory.StartConsumerGroup(
ctx,
a.config.ConsumerGroup,
a.config.Topics,
a.logger,
a,
options...,
)
201 changes: 0 additions & 201 deletions third_party/VENDOR-LICENSE/knative.dev/networking/pkg/prober/LICENSE

This file was deleted.

201 changes: 0 additions & 201 deletions vendor/knative.dev/networking/LICENSE

This file was deleted.

18 changes: 0 additions & 18 deletions vendor/knative.dev/networking/pkg/prober/doc.go

This file was deleted.

199 changes: 0 additions & 199 deletions vendor/knative.dev/networking/pkg/prober/prober.go

This file was deleted.

4 changes: 0 additions & 4 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
@@ -401,7 +401,6 @@ go.opentelemetry.io/otel/label
go.opentelemetry.io/otel/propagation
go.opentelemetry.io/otel/trace
# go.uber.org/atomic v1.9.0
## explicit
go.uber.org/atomic
# go.uber.org/automaxprocs v1.4.0
go.uber.org/automaxprocs/internal/cgroups
@@ -1173,9 +1172,6 @@ knative.dev/eventing/test/upgrade/prober/wathola/sender
## explicit
knative.dev/hack
knative.dev/hack/shell
# knative.dev/networking v0.0.0-20210803181815-acdfd41c575c
## explicit
knative.dev/networking/pkg/prober
# knative.dev/pkg v0.0.0-20210902173607-844a6bc45596
## explicit
knative.dev/pkg/apiextensions/storageversion

0 comments on commit af0f8b6

Please sign in to comment.