-
Notifications
You must be signed in to change notification settings - Fork 885
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add watch request timeout to prevent watch request hang #5732
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -334,15 +334,44 @@ func (c *MultiClusterCache) Watch(ctx context.Context, gvr schema.GroupVersionRe | |||||||||||||||||
if cache == nil { | ||||||||||||||||||
continue | ||||||||||||||||||
} | ||||||||||||||||||
w, err := cache.Watch(ctx, options) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
|
||||||||||||||||||
// The following logic adds a 30-second timeout to prevent watch requests to member clusters from hanging, | ||||||||||||||||||
// which could cause the client watch to hang. | ||||||||||||||||||
watchChan := make(chan watch.Interface, 1) | ||||||||||||||||||
errChan := make(chan error, 1) | ||||||||||||||||||
|
||||||||||||||||||
go func(cluster string) { | ||||||||||||||||||
w, err := cache.Watch(ctx, options) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
select { | ||||||||||||||||||
case errChan <- fmt.Errorf("failed to start watch for resource %v in cluster %q: %v", gvr.String(), cluster, err): | ||||||||||||||||||
case <-ctx.Done(): | ||||||||||||||||||
} | ||||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
select { | ||||||||||||||||||
case watchChan <- w: | ||||||||||||||||||
case <-ctx.Done(): | ||||||||||||||||||
w.Stop() | ||||||||||||||||||
} | ||||||||||||||||||
}(cluster) | ||||||||||||||||||
|
||||||||||||||||||
select { | ||||||||||||||||||
case w := <-watchChan: | ||||||||||||||||||
mux.AddSource(w, func(e watch.Event) { | ||||||||||||||||||
setObjectResourceVersionFunc(cluster, e.Object) | ||||||||||||||||||
addCacheSourceAnnotation(e.Object, cluster) | ||||||||||||||||||
}) | ||||||||||||||||||
case err := <-errChan: | ||||||||||||||||||
// If the watch request fails, return the error, and the client will retry. | ||||||||||||||||||
return nil, err | ||||||||||||||||||
case <-time.After(30 * time.Second): | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems wait 30s for each cluster. Should we wait all clusters paralleled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@ikaven1024 There’s no issue here; as long as a single There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
While if every cluster create watching takes 20s, not timeout, the total time spends 20s * N. |
||||||||||||||||||
// If the watch request times out, return an error, and the client will retry. | ||||||||||||||||||
return nil, fmt.Errorf("timeout waiting for watch for resource %v in cluster %q", gvr.String(), cluster) | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @xigang Hi, if a watch request is hanging and causes a timeout, will the hanging watch request continue to exist in the subprocess? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zhzhuang-zju Yes, there is this issue. When a watch request times out, the goroutine needs to be terminated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! Then that case we have to cancel the context passed to cache.Watch(). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this patch intends to terminate the hanging by raising an error after a period of time. Is this the idea? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another question: karmada/pkg/search/proxy/store/multi_cluster_cache.go Lines 333 to 336 in e7b6513
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@RainbowMango When the member cluster goes offline but the Cluster resources in the control plane are not deleted, it can prevent the offline clusters in the ResourceRegistry from being removed, resulting in the resource cache being retained for a short time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@RainbowMango @zhzhuang-zju Fixed, please take a look. |
||||||||||||||||||
case <-ctx.Done(): | ||||||||||||||||||
return nil, ctx.Err() | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
mux.AddSource(w, func(e watch.Event) { | ||||||||||||||||||
setObjectResourceVersionFunc(cluster, e.Object) | ||||||||||||||||||
addCacheSourceAnnotation(e.Object, cluster) | ||||||||||||||||||
}) | ||||||||||||||||||
} | ||||||||||||||||||
mux.Start() | ||||||||||||||||||
return mux, nil | ||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this watcher is created after 30s, then it seems no way to stop it, is it leak?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the watcher times out after 30 seconds during creation, it will trigger a time.After timeout, return an error, and call cancel to stop the watcher goroutine.
karmada/vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go
Line 263 in e65e993
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok