Skip to content

Commit

Permalink
Add timeout for watch requests to member clusters to prevent request …
Browse files Browse the repository at this point in the history
…hang

Signed-off-by: xigang <[email protected]>
  • Loading branch information
xigang committed Oct 29, 2024
1 parent 331145f commit a9e3aa1
Showing 1 changed file with 50 additions and 10 deletions.
60 changes: 50 additions & 10 deletions pkg/search/proxy/store/multi_cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,25 +327,65 @@ func (c *MultiClusterCache) Watch(ctx context.Context, gvr schema.GroupVersionRe

mux := newWatchMux()
clusters := c.getClusterNames()

var wg sync.WaitGroup
errChan := make(chan error, len(clusters))

for i := range clusters {
cluster := clusters[i]
options := options.DeepCopy
options.ResourceVersion = resourceVersion.get(cluster)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / init (v1.31.0)

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / init (v1.30.0)

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / init with config file (v1.31.0)

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / init (v1.29.0)

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / init with config file (v1.29.0)

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / init with config file (v1.30.0)

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / Test on Kubernetes (v1.30.0)

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / Test on Kubernetes (v1.29.0)

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / Test on Kubernetes (v1.31.0)

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / lint

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / lint

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / lint

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / lint

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)

Check failure on line 337 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / compile

options.ResourceVersion undefined (type func() *internalversion.ListOptions has no field or method ResourceVersion)
cache := c.cacheForClusterResource(cluster, gvr)
if cache == nil {
continue
}
w, err := cache.Watch(ctx, options)
if err != nil {
return nil, err
}

mux.AddSource(w, func(e watch.Event) {
setObjectResourceVersionFunc(cluster, e.Object)
addCacheSourceAnnotation(e.Object, cluster)
})
wg.Add(1)
go func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {
defer wg.Done()

watchChan := make(chan watch.Interface, 1)
watchErrChan := make(chan error, 1)

go func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {
w, err := cache.Watch(ctx, options)
if err != nil {
watchErrChan <- fmt.Errorf("failed to start watch for resource %v in cluster %q: %v", gvr.String(), cluster, err)
return
}

select {
case watchChan <- w:
case <-ctx.Done():
w.Stop()
}
}(cluster, cache, options)

select {
case w := <-watchChan:
mux.AddSource(w, func(e watch.Event) {
setObjectResourceVersionFunc(cluster, e.Object)
addCacheSourceAnnotation(e.Object, cluster)
})
case err := <-watchErrChan:
errChan <- err
case <-time.After(30 * time.Second):
errChan <- fmt.Errorf("timeout waiting for watch for resource %v in cluster %q", gvr.String(), cluster)
case <-ctx.Done():
return
}
}(cluster, cache, options)

Check failure on line 377 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / init (v1.31.0)

cannot use options (variable of type func() *internalversion.ListOptions) as *internalversion.ListOptions value in argument to func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {…}

Check failure on line 377 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / init (v1.30.0)

cannot use options (variable of type func() *internalversion.ListOptions) as *internalversion.ListOptions value in argument to func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {…}

Check failure on line 377 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / init with config file (v1.31.0)

cannot use options (variable of type func() *internalversion.ListOptions) as *internalversion.ListOptions value in argument to func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {…}

Check failure on line 377 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / init (v1.29.0)

cannot use options (variable of type func() *internalversion.ListOptions) as *internalversion.ListOptions value in argument to func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {…}

Check failure on line 377 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / init with config file (v1.29.0)

cannot use options (variable of type func() *internalversion.ListOptions) as *internalversion.ListOptions value in argument to func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {…}

Check failure on line 377 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / init with config file (v1.30.0)

cannot use options (variable of type func() *internalversion.ListOptions) as *internalversion.ListOptions value in argument to func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {…}

Check failure on line 377 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / Test on Kubernetes (v1.30.0)

cannot use options (variable of type func() *internalversion.ListOptions) as *internalversion.ListOptions value in argument to func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {…}

Check failure on line 377 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / Test on Kubernetes (v1.29.0)

cannot use options (variable of type func() *internalversion.ListOptions) as *internalversion.ListOptions value in argument to func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {…}

Check failure on line 377 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / Test on Kubernetes (v1.31.0)

cannot use options (variable of type func() *internalversion.ListOptions) as *internalversion.ListOptions value in argument to func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {…}

Check failure on line 377 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / lint

cannot use options (variable of type func() *internalversion.ListOptions) as *internalversion.ListOptions value in argument to func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {…}) (typecheck)

Check failure on line 377 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / lint

cannot use options (variable of type func() *internalversion.ListOptions) as *internalversion.ListOptions value in argument to func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {…}) (typecheck)

Check failure on line 377 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / lint

cannot use options (variable of type func() *internalversion.ListOptions) as *internalversion.ListOptions value in argument to func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {…}) (typecheck)

Check failure on line 377 in pkg/search/proxy/store/multi_cluster_cache.go

View workflow job for this annotation

GitHub Actions / compile

cannot use options (variable of type func() *internalversion.ListOptions) as *internalversion.ListOptions value in argument to func(cluster string, cache *resourceCache, options *metainternalversion.ListOptions) {…}
}

wg.Wait()

select {
case err := <-errChan:
return nil, err
default:
mux.Start()
return mux, nil
}
mux.Start()
return mux, nil
}

func (c *MultiClusterCache) getClusterNames() []string {
Expand Down

0 comments on commit a9e3aa1

Please sign in to comment.