diff --git a/internal/atc/atc.go b/internal/atc/atc.go index f435881..97f9103 100644 --- a/internal/atc/atc.go +++ b/internal/atc/atc.go @@ -27,6 +27,7 @@ import ( "k8s.io/utils/ptr" "github.com/tetratelabs/wazero" + "github.com/yokecd/yoke/internal" "github.com/yokecd/yoke/internal/atc/wasm" "github.com/yokecd/yoke/internal/k8s" diff --git a/internal/k8s/ctrl/controller.go b/internal/k8s/ctrl/controller.go index 32aeefb..54d1b66 100644 --- a/internal/k8s/ctrl/controller.go +++ b/internal/k8s/ctrl/controller.go @@ -10,7 +10,6 @@ import ( "sync" "time" - kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -18,6 +17,8 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/metadata" + kcache "k8s.io/client-go/tools/cache" + retryWatcher "k8s.io/client-go/tools/watch" "github.com/yokecd/yoke/internal/k8s" ) @@ -67,7 +68,10 @@ func (ctrl Instance) ProcessGroupKind(ctx context.Context, gk schema.GroupKind, intf := ctrl.Client.Meta.Resource(mapping.Resource) - events := ctrl.eventsFromMetaGetter(ctx, intf, mapping) + events, err := ctrl.eventsFromMetaGetter(ctx, intf, mapping) + if err != nil { + return fmt.Errorf("failed to setup event stream: %w", err) + } return ctrl.process(ctx, events, handler) } @@ -169,24 +173,20 @@ func (ctrl Instance) process(ctx context.Context, events chan Event, handle Hand return context.Cause(ctx) } -func (ctrl Instance) eventsFromMetaGetter(ctx context.Context, getter metadata.Getter, mapping *meta.RESTMapping) chan Event { +func (ctrl Instance) eventsFromMetaGetter(ctx context.Context, getter metadata.Getter, mapping *meta.RESTMapping) (chan Event, error) { events := make(chan Event) cache := make(map[Event]*unstructured.Unstructured) - backoff := time.Second - setupWatcher := func() (watch.Interface, bool) { - for { - watcher, err := getter.Watch(context.Background(), metav1.ListOptions{}) - if err != nil { - if kerrors.IsNotFound(err) { - return nil, false - } - Logger(ctx).Error("failed to setup watcher", "error", err, "backoff", backoff.String()) - time.Sleep(backoff) - continue - } - return watcher, true - } + list, err := getter.List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to list resources: %w", err) + } + + watcher, err := retryWatcher.NewRetryWatcher(list.ResourceVersion, &kcache.ListWatch{WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return getter.Watch(ctx, options) + }}) + if err != nil { + return nil, fmt.Errorf("failed to initiate watch: %w", err) } go func() { @@ -195,13 +195,8 @@ func (ctrl Instance) eventsFromMetaGetter(ctx context.Context, getter metadata.G close(events) }() - watcher, ok := setupWatcher() - if !ok { - return - } - defer watcher.Stop() - kubeEvents := watcher.ResultChan() + defer watcher.Stop() for { select { @@ -210,13 +205,7 @@ func (ctrl Instance) eventsFromMetaGetter(ctx context.Context, getter metadata.G case event, ok := <-kubeEvents: if !ok { Logger(ctx).Error("unexpected close of kube events channel") - watcher.Stop() - watcher, ok = setupWatcher() - if !ok { - return - } - kubeEvents = watcher.ResultChan() - continue + return } if event.Type == watch.Error { @@ -263,7 +252,7 @@ func (ctrl Instance) eventsFromMetaGetter(ctx context.Context, getter metadata.G } }() - return events + return events, nil } func powInt(base int, up int) int {