diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index 1969445ba22..b31477dabe9 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" apiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" @@ -54,11 +55,26 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/filter/manager" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" "github.com/openyurtio/openyurt/pkg/yurthub/network" "github.com/openyurtio/openyurt/pkg/yurthub/storage/disk" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) +var DefaultMultiplexerResources = []schema.GroupVersionResource{ + { + Group: "", + Version: "v1", + Resource: "services", + }, + { + Group: "discovery.k8s.io", + Version: "v1", + Resource: "endpointslices", + }, +} + // YurtHubConfiguration represents configuration of yurthub type YurtHubConfiguration struct { LBMode string @@ -101,6 +117,9 @@ type YurtHubConfiguration struct { CoordinatorClient kubernetes.Interface LeaderElection componentbaseconfig.LeaderElectionConfiguration HostControlPlaneAddr string // ip:port + PostStartHooks map[string]func() error + MultiplexerCacheManager multiplexer.MultiplexerManager + MultiplexerResources []schema.GroupVersionResource } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration @@ -176,6 +195,8 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) { CoordinatorStorageAddr: options.CoordinatorStorageAddr, LeaderElection: options.LeaderElection, HostControlPlaneAddr: options.HostControlPlaneAddr, + MultiplexerResources: DefaultMultiplexerResources, + MultiplexerCacheManager: newMultiplexerCacheManager(options), } // if yurthub is in local mode, certMgr and networkMgr are no need to start @@ -403,3 +424,17 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y return nil } + +func newMultiplexerCacheManager(options *options.YurtHubOptions) multiplexer.MultiplexerManager { + config := newRestConfig(options.YurtHubProxyHost, options.YurtHubProxyPort) + rsm := storage.NewStorageManager(config) + + return multiplexer.NewRequestsMultiplexerManager(rsm) +} + +func newRestConfig(host string, port int) *rest.Config { + return &rest.Config{ + Host: fmt.Sprintf("http://%s:%d", host, port), + UserAgent: util.MultiplexerProxyClientUserAgent, + } +} diff --git a/pkg/yurthub/filter/interfaces.go b/pkg/yurthub/filter/interfaces.go index ca1ca83528c..4d1c62f2309 100644 --- a/pkg/yurthub/filter/interfaces.go +++ b/pkg/yurthub/filter/interfaces.go @@ -19,10 +19,13 @@ package filter import ( "io" "net/http" + "strings" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" + + yurtutil "github.com/openyurtio/openyurt/pkg/util" ) type NodesInPoolGetter func(poolName string) ([]string, error) @@ -59,4 +62,35 @@ type ObjectFilter interface { Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object } +type FilterManager interface { + FindResponseFilter(req *http.Request) (ResponseFilter, bool) + FindObjectFilters(req *http.Request) ObjectFilter +} + type NodeGetter func(name string) (*v1.Node, error) + +type UnionObjectFilter []ObjectFilter + +func (chain UnionObjectFilter) Name() string { + var names []string + for i := range chain { + names = append(names, chain[i].Name()) + } + return strings.Join(names, ",") +} + +func (chain UnionObjectFilter) SupportedResourceAndVerbs() map[string]sets.Set[string] { + // do nothing + return map[string]sets.Set[string]{} +} + +func (chain UnionObjectFilter) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { + for i := range chain { + obj = chain[i].Filter(obj, stopCh) + if yurtutil.IsNil(obj) { + break + } + } + + return obj +} diff --git a/pkg/yurthub/filter/manager/manager.go b/pkg/yurthub/filter/manager/manager.go index c5bd712e641..9cfa8f513db 100644 --- a/pkg/yurthub/filter/manager/manager.go +++ b/pkg/yurthub/filter/manager/manager.go @@ -111,3 +111,20 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter, return nil, false } + +func (m *Manager) FindObjectFilters(req *http.Request) filter.ObjectFilter { + objectFilters := make([]filter.ObjectFilter, 0) + approved, filterNames := m.Approver.Approve(req) + if !approved { + return nil + } + + for i := range filterNames { + if objectFilter, ok := m.nameToObjectFilter[filterNames[i]]; ok { + objectFilters = append(objectFilters, objectFilter) + } + } + + filters := filter.UnionObjectFilter(objectFilters) + return filters +} diff --git a/pkg/yurthub/multiplexer/cache.go b/pkg/yurthub/multiplexer/cache.go index ac89511e5f9..44155427488 100644 --- a/pkg/yurthub/multiplexer/cache.go +++ b/pkg/yurthub/multiplexer/cache.go @@ -31,7 +31,6 @@ import ( type Interface interface { Watch(ctx context.Context, key string, opts kstorage.ListOptions) (watch.Interface, error) - Get(ctx context.Context, key string, opts kstorage.GetOptions, objPtr runtime.Object) error GetList(ctx context.Context, key string, opts kstorage.ListOptions, listObj runtime.Object) error } diff --git a/pkg/yurthub/multiplexer/cache_test.go b/pkg/yurthub/multiplexer/cache_test.go index 21b04e0410d..af2c4ef3d1e 100644 --- a/pkg/yurthub/multiplexer/cache_test.go +++ b/pkg/yurthub/multiplexer/cache_test.go @@ -48,19 +48,60 @@ var newServiceListFunc = func() runtime.Object { } func TestResourceCache_GetList(t *testing.T) { - cache, _, err := NewResourceCache( - ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}), + storage := ystorage.NewFakeServiceStorage( + []v1.Service{ + *newService(metav1.NamespaceSystem, "coredns"), + *newService(metav1.NamespaceDefault, "nginx"), + }) + + cache, _, _ := NewResourceCache( + storage, serviceGVR, &ResourceCacheConfig{ - keyFunc, + KeyFunc, newServiceFunc, newServiceListFunc, - attrsFunc, + AttrsFunc, }, ) - assert.Nil(t, err) - assertCacheGetList(t, cache) + for _, tc := range []struct { + name string + key string + expectedServiceList *v1.ServiceList + }{ + { + "all namespace", + "", + &v1.ServiceList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "100", + }, + Items: []v1.Service{ + *newService(metav1.NamespaceDefault, "nginx"), + *newService(metav1.NamespaceSystem, "coredns"), + }, + }, + }, + { + "default namespace", + "/default", + &v1.ServiceList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "100", + }, + Items: []v1.Service{ + *newService(metav1.NamespaceDefault, "nginx"), + }, + }, + }, + } { + serviceList := &v1.ServiceList{} + err := cache.GetList(context.Background(), tc.key, mockListOptions(), serviceList) + + assert.Nil(t, err) + assert.Equal(t, tc.expectedServiceList.Items, serviceList.Items) + } } func mockListOptions() storage.ListOptions { @@ -74,16 +115,6 @@ func mockListOptions() storage.ListOptions { } } -func assertCacheGetList(t testing.TB, cache Interface) { - t.Helper() - - serviceList := &v1.ServiceList{} - err := cache.GetList(context.Background(), "", mockListOptions(), serviceList) - - assert.Nil(t, err) - assert.Equal(t, 1, len(serviceList.Items)) -} - func TestResourceCache_Watch(t *testing.T) { fakeStorage := ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}) @@ -91,10 +122,10 @@ func TestResourceCache_Watch(t *testing.T) { fakeStorage, serviceGVR, &ResourceCacheConfig{ - keyFunc, + KeyFunc, newServiceFunc, newServiceListFunc, - attrsFunc, + AttrsFunc, }, ) @@ -117,7 +148,7 @@ func mockWatchOptions() storage.ListOptions { } func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceStorage) { - receive, err := cache.Watch(context.TODO(), "", mockWatchOptions()) + receive, err := cache.Watch(context.TODO(), "/kube-system", mockWatchOptions()) go func() { fs.AddWatchObject(newService(metav1.NamespaceSystem, "coredns2")) @@ -127,30 +158,3 @@ func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceSto event := <-receive.ResultChan() assert.Equal(t, watch.Added, event.Type) } - -func TestResourceCache_Get(t *testing.T) { - cache, _, err := NewResourceCache( - ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}), - serviceGVR, - &ResourceCacheConfig{ - keyFunc, - newServiceFunc, - newServiceListFunc, - attrsFunc, - }, - ) - assert.Nil(t, err) - assertCacheGet(t, cache) -} - -func assertCacheGet(t testing.TB, cache Interface) { - t.Helper() - - service := &v1.Service{} - err := cache.Get(context.Background(), "/kube-system/coredns", storage.GetOptions{ - ResourceVersion: "1", - }, service) - - assert.Nil(t, err) - assert.Equal(t, "coredns", service.Name) -} diff --git a/pkg/yurthub/multiplexer/fake_multiplexer_manager.go b/pkg/yurthub/multiplexer/fake_multiplexer_manager.go new file mode 100644 index 00000000000..3e8240bfddd --- /dev/null +++ b/pkg/yurthub/multiplexer/fake_multiplexer_manager.go @@ -0,0 +1,39 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import "k8s.io/apimachinery/pkg/runtime/schema" + +type FakeCacheManager struct { + cacheMap map[string]Interface + resourceConfigMap map[string]*ResourceCacheConfig +} + +func NewFakeCacheManager(cacheMap map[string]Interface, resourceConfigMap map[string]*ResourceCacheConfig) *FakeCacheManager { + return &FakeCacheManager{ + cacheMap: cacheMap, + resourceConfigMap: resourceConfigMap, + } +} + +func (fcm *FakeCacheManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) { + return fcm.resourceConfigMap[gvr.String()], nil +} + +func (fcm *FakeCacheManager) ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) { + return fcm.cacheMap[gvr.String()], nil, nil +} diff --git a/pkg/yurthub/multiplexer/manager.go b/pkg/yurthub/multiplexer/manager.go index aa27a104f71..680c11e6a84 100644 --- a/pkg/yurthub/multiplexer/manager.go +++ b/pkg/yurthub/multiplexer/manager.go @@ -30,7 +30,7 @@ import ( ystorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" ) -var keyFunc = func(obj runtime.Object) (string, error) { +var KeyFunc = func(obj runtime.Object) (string, error) { accessor, err := meta.Accessor(obj) if err != nil { return "", err @@ -48,7 +48,7 @@ var keyFunc = func(obj runtime.Object) (string, error) { return "/" + ns + "/" + name, nil } -var attrsFunc = func(obj runtime.Object) (labels.Set, fields.Set, error) { +var AttrsFunc = func(obj runtime.Object) (labels.Set, fields.Set, error) { metadata, err := meta.Accessor(obj) if err != nil { return nil, nil, err @@ -93,7 +93,6 @@ func NewRequestsMultiplexerManager( cacheDestroyFuncMap: make(map[string]func()), } } - func (m *multiplexerManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) { if config, ok := m.cacheConfigMap[gvr.String()]; ok { return config, nil @@ -127,7 +126,6 @@ func (m *multiplexerManager) convertToGVK(gvr *schema.GroupVersionResource) (sch func (m *multiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind, listGVK schema.GroupVersionKind) *ResourceCacheConfig { - resourceCacheConfig := &ResourceCacheConfig{ NewFunc: func() runtime.Object { obj, _ := scheme.Scheme.New(gvk) @@ -137,8 +135,8 @@ func (m *multiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind, objList, _ := scheme.Scheme.New(listGVK) return objList }, - KeyFunc: keyFunc, - GetAttrsFunc: attrsFunc, + KeyFunc: KeyFunc, + GetAttrsFunc: AttrsFunc, } return resourceCacheConfig diff --git a/pkg/yurthub/multiplexer/manager_test.go b/pkg/yurthub/multiplexer/manager_test.go index 0ee45cde9ec..6cd0865dace 100644 --- a/pkg/yurthub/multiplexer/manager_test.go +++ b/pkg/yurthub/multiplexer/manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package multiplexer import ( + "context" "reflect" "testing" @@ -121,7 +122,6 @@ func TestShareCacheManager_ResourceCacheConfig(t *testing.T) { }) } } - func newService(namespace, name string) *v1.Service { return &v1.Service{ TypeMeta: metav1.TypeMeta{ @@ -166,15 +166,26 @@ func newNode() *v1.Node { } func TestShareCacheManager_ResourceCache(t *testing.T) { - svcStorage := storage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}) + svcStorage := storage.NewFakeServiceStorage( + []v1.Service{ + *newService(metav1.NamespaceSystem, "coredns"), + *newService(metav1.NamespaceDefault, "nginx"), + }) + storageMap := map[string]kstorage.Interface{ serviceGVR.String(): svcStorage, } dsm := storage.NewDummyStorageManager(storageMap) scm := NewRequestsMultiplexerManager(dsm) - cache, _, err := scm.ResourceCache(serviceGVR) + cache, _, _ := scm.ResourceCache(serviceGVR) + + serviceList := &v1.ServiceList{} + err := cache.GetList(context.Background(), "", mockListOptions(), serviceList) assert.Nil(t, err) - assertCacheGetList(t, cache) + assert.Equal(t, []v1.Service{ + *newService(metav1.NamespaceDefault, "nginx"), + *newService(metav1.NamespaceSystem, "coredns"), + }, serviceList.Items) } diff --git a/pkg/yurthub/multiplexer/storage/fake_storage.go b/pkg/yurthub/multiplexer/storage/fake_storage.go index 9bd8a356bb4..b2123bb2374 100644 --- a/pkg/yurthub/multiplexer/storage/fake_storage.go +++ b/pkg/yurthub/multiplexer/storage/fake_storage.go @@ -18,8 +18,11 @@ package storage import ( "context" + "fmt" + "strings" v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -92,3 +95,41 @@ func (fs *FakeServiceStorage) AddWatchObject(svc *v1.Service) { svc.ResourceVersion = "101" fs.watcher.Add(svc) } + +type FakeEndpointSliceStorage struct { + *CommonFakeStorage + items []discovery.EndpointSlice + watcher *watch.FakeWatcher +} + +func NewFakeEndpointSliceStorage(items []discovery.EndpointSlice) *FakeEndpointSliceStorage { + return &FakeEndpointSliceStorage{ + CommonFakeStorage: &CommonFakeStorage{}, + items: items, + watcher: watch.NewFake(), + } +} + +func (fs *FakeEndpointSliceStorage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + epsList := listObj.(*discovery.EndpointSliceList) + epsList.ListMeta = metav1.ListMeta{ + ResourceVersion: "100", + } + + for _, item := range fs.items { + itemKey := fmt.Sprintf("/%s/%s", item.Namespace, item.Name) + if strings.HasPrefix(itemKey, key) { + epsList.Items = append(epsList.Items, item) + } + } + return nil +} + +func (fs *FakeEndpointSliceStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + return fs.watcher, nil +} + +func (fs *FakeEndpointSliceStorage) AddWatchObject(eps *discovery.EndpointSlice) { + eps.ResourceVersion = "101" + fs.watcher.Add(eps) +} diff --git a/pkg/yurthub/proxy/multiplexer/filterwatch.go b/pkg/yurthub/proxy/multiplexer/filterwatch.go new file mode 100644 index 00000000000..b79836f5f1b --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/filterwatch.go @@ -0,0 +1,93 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + + yurtutil "github.com/openyurtio/openyurt/pkg/util" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" +) + +type filterWatch struct { + source watch.Interface + filter filter.ObjectFilter + result chan watch.Event + done chan struct{} +} + +func (f *filterWatch) Stop() { + select { + case <-f.done: + default: + close(f.done) + f.source.Stop() + } +} + +func newFilterWatch(source watch.Interface, filter filter.ObjectFilter) watch.Interface { + if filter == nil { + return source + } + + fw := &filterWatch{ + source: source, + filter: filter, + result: make(chan watch.Event), + done: make(chan struct{}), + } + + go fw.receive() + + return fw +} + +func (f *filterWatch) ResultChan() <-chan watch.Event { + return f.result +} + +func (f *filterWatch) receive() { + defer utilruntime.HandleCrash() + defer close(f.result) + defer f.Stop() + + for result := range f.source.ResultChan() { + watchType := result.Type + newObj := result.Object + if co, ok := newObj.(runtime.CacheableObject); ok { + newObj = co.GetObject() + } + + if !(result.Type == watch.Bookmark || result.Type == watch.Error) { + if newObj = f.filter.Filter(newObj, f.done); yurtutil.IsNil(newObj) { + watchType = watch.Deleted + newObj = result.Object + } + } + + select { + case <-f.done: + return + case f.result <- watch.Event{ + Type: watchType, + Object: newObj, + }: + } + } +} diff --git a/pkg/yurthub/proxy/multiplexer/filterwatch_test.go b/pkg/yurthub/proxy/multiplexer/filterwatch_test.go new file mode 100644 index 00000000000..d72d0c1596b --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/filterwatch_test.go @@ -0,0 +1,108 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "testing" + + "github.com/stretchr/testify/assert" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + + ctesting "github.com/openyurtio/openyurt/pkg/yurthub/proxy/multiplexer/testing" +) + +func TestFilterWatch_ResultChan(t *testing.T) { + t.Run("test filter endpointslices", func(t *testing.T) { + source := watch.NewFake() + filter := &ctesting.IgnoreEndpointslicesWithNodeName{IgnoreNodeName: "node1"} + fw := newFilterWatch(source, filter) + + go func() { + source.Add(mockEndpointslices()) + }() + + assertFilterWatchEvent(t, fw) + }) + + t.Run("test cacheable object", func(t *testing.T) { + source := watch.NewFake() + filter := &ctesting.IgnoreEndpointslicesWithNodeName{IgnoreNodeName: "node1"} + + fw := newFilterWatch(source, filter) + + go func() { + source.Add(mockCacheableObject()) + }() + + assertFilterWatchEvent(t, fw) + }) +} + +func mockEndpointslices() *discoveryv1.EndpointSlice { + node1 := "node1" + node2 := "node2" + + return &discoveryv1.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + APIVersion: "discoveryv1discoveryv1.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "coredns-12345", + Namespace: "kube-system", + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"172.16.0.1"}, + NodeName: &node1, + }, + { + Addresses: []string{"172.17.0.1"}, + NodeName: &node2, + }, + }, + } +} + +func assertFilterWatchEvent(t testing.TB, fw watch.Interface) { + t.Helper() + + event := <-fw.ResultChan() + endpointslice, ok := event.Object.(*discoveryv1.EndpointSlice) + + assert.Equal(t, true, ok) + assert.Equal(t, 1, len(endpointslice.Endpoints)) + assert.Equal(t, *endpointslice.Endpoints[0].NodeName, "node2") +} + +func mockCacheableObject() *ctesting.MockCacheableObject { + return &ctesting.MockCacheableObject{ + Obj: mockEndpointslices(), + } +} + +func TestFilterWatch_Stop(t *testing.T) { + source := watch.NewFake() + filter := &ctesting.IgnoreEndpointslicesWithNodeName{IgnoreNodeName: "node1"} + fw := newFilterWatch(source, filter) + + fw.Stop() + + assert.Equal(t, true, source.IsStopped()) +} diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerlist.go b/pkg/yurthub/proxy/multiplexer/multiplexerlist.go new file mode 100644 index 00000000000..0e151017842 --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/multiplexerlist.go @@ -0,0 +1,159 @@ +/* +Copyright 2024 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "net/http" + + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/generic/registry" + kstorage "k8s.io/apiserver/pkg/storage" + "k8s.io/klog/v2" + + yurtutil "github.com/openyurtio/openyurt/pkg/util" + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +func (sp *multiplexerProxy) multiplexerList(w http.ResponseWriter, r *http.Request, gvr *schema.GroupVersionResource) { + scope, err := sp.getReqScope(gvr) + if err != nil { + util.Err(errors.Wrapf(err, "failed to get request scope"), w, r) + return + } + + listOpts, err := sp.decodeListOptions(r, scope) + if err != nil { + util.Err(errors.Wrapf(err, "failed to decode list options, url: %v", r.URL), w, r) + return + } + + storageOpts, err := sp.storageOpts(listOpts, gvr) + if err != nil { + util.Err(err, w, r) + return + } + + obj, err := sp.listObject(r, gvr, storageOpts) + if err != nil { + util.Err(err, w, r) + return + } + + util.WriteObject(http.StatusOK, obj, w, r) +} + +func (sp *multiplexerProxy) listObject(r *http.Request, gvr *schema.GroupVersionResource, storageOpts *kstorage.ListOptions) (runtime.Object, error) { + rc, _, err := sp.cacheManager.ResourceCache(gvr) + if err != nil { + return nil, errors.Wrap(err, "failed to get resource cache") + } + + obj, err := sp.newListObject(gvr) + if err != nil { + return nil, errors.Wrapf(err, "failed to new list object") + } + + key, err := sp.getCacheKey(r, storageOpts) + if err != nil { + return nil, errors.Wrapf(err, "failed to get cache key") + } + + if err := rc.GetList(r.Context(), key, *storageOpts, obj); err != nil { + return nil, errors.Wrapf(err, "failed to get list from cache") + } + + if obj, err = sp.filterListObject(obj, sp.filterMgr.FindObjectFilters(r)); err != nil { + return nil, errors.Wrapf(err, "failed to filter list object") + } + + return obj, nil +} + +func (sp *multiplexerProxy) newListObject(gvr *schema.GroupVersionResource) (runtime.Object, error) { + rcc, err := sp.cacheManager.ResourceCacheConfig(gvr) + if err != nil { + return nil, errors.Wrapf(err, "failed to get resource cache config") + } + + return rcc.NewListFunc(), nil +} + +func (sp *multiplexerProxy) getCacheKey(r *http.Request, storageOpts *kstorage.ListOptions) (string, error) { + if ns := sp.getNamespace(r); len(ns) > 0 { + return sp.getNamespaceScopedCacheKey(r, storageOpts) + } + + return sp.getClusterScopedCacheKey(r, storageOpts) +} + +func (sp *multiplexerProxy) getNamespaceScopedCacheKey(r *http.Request, storageOpts *kstorage.ListOptions) (string, error) { + ctx := request.WithNamespace(r.Context(), sp.getNamespace(r)) + + if name, ok := storageOpts.Predicate.MatchesSingle(); ok { + return registry.NamespaceKeyFunc(ctx, "", name) + } + + return registry.NamespaceKeyRootFunc(ctx, ""), nil +} + +func (sp *multiplexerProxy) getNamespace(r *http.Request) string { + requestInfo, ok := request.RequestInfoFrom(r.Context()) + if !ok { + return "" + } + return requestInfo.Namespace +} + +func (sp *multiplexerProxy) getClusterScopedCacheKey(r *http.Request, storageOpts *kstorage.ListOptions) (string, error) { + if name, ok := storageOpts.Predicate.MatchesSingle(); ok { + return registry.NoNamespaceKeyFunc(r.Context(), "", name) + } + + return "", nil +} + +func (sp *multiplexerProxy) filterListObject(obj runtime.Object, filter filter.ObjectFilter) (runtime.Object, error) { + if filter == nil { + return obj, nil + } + + items, err := meta.ExtractList(obj) + + if err != nil || len(items) == 0 { + return filter.Filter(obj, sp.stop), nil + } + + list := make([]runtime.Object, 0) + for _, item := range items { + newObj := filter.Filter(item, sp.stop) + if !yurtutil.IsNil(newObj) { + list = append(list, newObj) + } + } + + if err = meta.SetList(obj, list); err != nil { + klog.Warningf("filter %s doesn't work correctly, couldn't set list, %v.", filter.Name(), err) + } + + return obj, nil +} diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go b/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go new file mode 100644 index 00000000000..7abae871476 --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go @@ -0,0 +1,251 @@ +/* +Copyright 2024 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "net/http" + + "github.com/pkg/errors" + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" + "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/authorization/authorizerfactory" + "k8s.io/apiserver/pkg/endpoints/handlers" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + kstorage "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" +) + +type multiplexerProxy struct { + cacheManager multiplexer.MultiplexerManager + filterMgr filter.FilterManager + stop <-chan struct{} +} + +func NewDefaultShareProxy(filterMgr filter.FilterManager, + cacheManager multiplexer.MultiplexerManager, + multiplexerResources []schema.GroupVersionResource, + stop <-chan struct{}) (*multiplexerProxy, error) { + + sp := &multiplexerProxy{ + stop: stop, + cacheManager: cacheManager, + filterMgr: filterMgr, + } + + for _, gvr := range multiplexerResources { + if _, _, err := sp.cacheManager.ResourceCache(&gvr); err != nil { + return sp, errors.Wrapf(err, "failed to init resource cache for %s", gvr.String()) + } + } + + return sp, nil +} + +func (sp *multiplexerProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + reqInfo, _ := request.RequestInfoFrom(r.Context()) + gvr := sp.getRequestGVR(reqInfo) + + switch reqInfo.Verb { + case "list": + sp.multiplexerList(w, r, gvr) + case "watch": + sp.shareWatch(w, r, gvr) + } +} + +func (sp *multiplexerProxy) getRequestGVR(reqInfo *request.RequestInfo) *schema.GroupVersionResource { + return &schema.GroupVersionResource{ + Group: reqInfo.APIGroup, + Version: reqInfo.APIVersion, + Resource: reqInfo.Resource, + } +} + +func (sp *multiplexerProxy) getReqScope(gvr *schema.GroupVersionResource) (*handlers.RequestScope, error) { + namer, err := sp.getNamer(gvr) + if err != nil { + return nil, err + } + + fqKindToRegister, err := sp.findKind(gvr) + if err != nil { + return nil, err + } + + return &handlers.RequestScope{ + Serializer: scheme.Codecs, + ParameterCodec: scheme.ParameterCodec, + Convertor: scheme.Scheme, + Defaulter: scheme.Scheme, + Typer: scheme.Scheme, + UnsafeConvertor: runtime.UnsafeObjectConvertor(scheme.Scheme), + Authorizer: authorizerfactory.NewAlwaysAllowAuthorizer(), + + EquivalentResourceMapper: runtime.NewEquivalentResourceRegistry(), + + // TODO: Check for the interface on storage + TableConvertor: rest.NewDefaultTableConvertor(gvr.GroupResource()), + + // TODO: This seems wrong for cross-group subresources. It makes an assumption that a subresource and its parent are in the same group version. Revisit this. + Resource: *gvr, + Kind: fqKindToRegister, + + HubGroupVersion: schema.GroupVersion{Group: fqKindToRegister.Group, Version: runtime.APIVersionInternal}, + + MetaGroupVersion: metav1.SchemeGroupVersion, + + MaxRequestBodyBytes: int64(3 * 1024 * 1024), + Namer: namer, + }, nil +} + +func (sp *multiplexerProxy) getNamer(gvr *schema.GroupVersionResource) (handlers.ScopeNamer, error) { + return handlers.ContextBasedNaming{ + Namer: runtime.Namer(meta.NewAccessor()), + }, nil +} + +func (sp *multiplexerProxy) findKind(gvr *schema.GroupVersionResource) (schema.GroupVersionKind, error) { + object, err := sp.newListObject(gvr) + if err != nil { + return schema.GroupVersionKind{}, errors.Wrapf(err, "failed to new list object") + } + + fqKinds, _, err := scheme.Scheme.ObjectKinds(object) + if err != nil { + return schema.GroupVersionKind{}, err + } + + for _, fqKind := range fqKinds { + if fqKind.Group == gvr.Group { + return fqKind, nil + } + } + + return schema.GroupVersionKind{}, nil +} + +func (sp *multiplexerProxy) decodeListOptions(req *http.Request, scope *handlers.RequestScope) (opts metainternalversion.ListOptions, err error) { + if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), metav1.SchemeGroupVersion, &opts); err != nil { + return opts, err + } + + if errs := validation.ValidateListOptions(&opts, false); len(errs) > 0 { + err := kerrors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs) + return opts, err + } + + if opts.FieldSelector != nil { + fn := func(label, value string) (newLabel, newValue string, err error) { + return scope.Convertor.ConvertFieldLabel(scope.Kind, label, value) + } + if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil { + return opts, kerrors.NewBadRequest(err.Error()) + } + } + + hasName := true + _, name, err := scope.Namer.Name(req) + if err != nil { + hasName = false + } + + if hasName { + nameSelector := fields.OneTermEqualSelector("metadata.name", name) + if opts.FieldSelector != nil && !opts.FieldSelector.Empty() { + selectedName, ok := opts.FieldSelector.RequiresExactMatch("metadata.name") + if !ok || name != selectedName { + return opts, kerrors.NewBadRequest("fieldSelector metadata.name doesn't match requested name") + } + } else { + opts.FieldSelector = nameSelector + } + } + + return opts, nil +} + +func (sp *multiplexerProxy) storageOpts(listOpts metainternalversion.ListOptions, gvr *schema.GroupVersionResource) (*kstorage.ListOptions, error) { + p := sp.selectionPredicate(listOpts, gvr) + + return &kstorage.ListOptions{ + ResourceVersion: getResourceVersion(listOpts), + ResourceVersionMatch: listOpts.ResourceVersionMatch, + Recursive: getRecursive(p), + Predicate: p, + SendInitialEvents: listOpts.SendInitialEvents, + }, nil +} + +func (sp *multiplexerProxy) selectionPredicate(listOpts metainternalversion.ListOptions, gvr *schema.GroupVersionResource) kstorage.SelectionPredicate { + label := labels.Everything() + if listOpts.LabelSelector != nil { + label = listOpts.LabelSelector + } + + field := fields.Everything() + if listOpts.FieldSelector != nil { + field = listOpts.FieldSelector + } + + return kstorage.SelectionPredicate{ + Label: label, + Field: field, + Limit: listOpts.Limit, + Continue: listOpts.Continue, + GetAttrs: sp.getAttrFunc(gvr), + AllowWatchBookmarks: listOpts.AllowWatchBookmarks, + } +} + +func getResourceVersion(opts metainternalversion.ListOptions) string { + if opts.ResourceVersion == "" { + return "0" + } + return opts.ResourceVersion +} + +func getRecursive(p kstorage.SelectionPredicate) bool { + if _, ok := p.MatchesSingle(); ok { + return false + } + return true +} + +func (sp *multiplexerProxy) getAttrFunc(gvr *schema.GroupVersionResource) kstorage.AttrFunc { + rcc, err := sp.cacheManager.ResourceCacheConfig(gvr) + if err != nil { + klog.Errorf("failed to get cache config for %v, error: %v", gvr, err) + return nil + } + + return rcc.GetAttrsFunc +} diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go b/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go new file mode 100644 index 00000000000..c394f815c0a --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go @@ -0,0 +1,383 @@ +/* +Copyright 2024 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "bytes" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/server" + "k8s.io/client-go/kubernetes/scheme" + + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer" + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" + ctesting "github.com/openyurtio/openyurt/pkg/yurthub/proxy/multiplexer/testing" +) + +var ( + discoveryGV = schema.GroupVersion{Group: "discovery.k8s.io", Version: "v1"} + + endpointSliceGVR = discoveryGV.WithResource("endpointslices") +) + +var mockEndpoints = []discovery.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + NodeName: newStringPointer("node1"), + }, + { + Addresses: []string{"192.168.1.1"}, + NodeName: newStringPointer("node2"), + }, + { + Addresses: []string{"192.168.2.3"}, + NodeName: newStringPointer("node3"), + }, +} + +func mockCacheMap() map[string]multiplexer.Interface { + return map[string]multiplexer.Interface{ + endpointSliceGVR.String(): storage.NewFakeEndpointSliceStorage( + []discovery.EndpointSlice{ + *newEndpointSlice(metav1.NamespaceSystem, "coredns-12345", "", mockEndpoints), + *newEndpointSlice(metav1.NamespaceDefault, "nginx", "", mockEndpoints), + }, + ), + } +} + +func mockResourceCacheMap() map[string]*multiplexer.ResourceCacheConfig { + return map[string]*multiplexer.ResourceCacheConfig{ + endpointSliceGVR.String(): { + KeyFunc: multiplexer.KeyFunc, + NewListFunc: func() runtime.Object { + return &discovery.EndpointSliceList{} + }, + NewFunc: func() runtime.Object { + return &discovery.EndpointSlice{} + }, + GetAttrsFunc: multiplexer.AttrsFunc, + }, + } +} + +func newEndpointSlice(namespace string, name string, resourceVersion string, endpoints []discovery.Endpoint) *discovery.EndpointSlice { + return &discovery.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + APIVersion: "discovery.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: resourceVersion, + }, + Endpoints: endpoints, + } +} + +type wrapResponse struct { + Done chan struct{} + *httptest.ResponseRecorder +} + +func (wr *wrapResponse) Write(buf []byte) (int, error) { + l, err := wr.ResponseRecorder.Write(buf) + wr.Done <- struct{}{} + return l, err +} + +func TestShareProxy_ServeHTTP_LIST(t *testing.T) { + for _, tc := range []struct { + tName string + filterManager filter.FilterManager + url string + expectedEndPointSliceList *discovery.EndpointSliceList + err error + }{ + { + "test list endpoint slices no filter", + &ctesting.EmptyFilterManager{}, + "/apis/discovery.k8s.io/v1/endpointslices", + expectEndpointSliceListNoFilter(), + + nil, + }, + { + "test list endpoint slice with filter", + &ctesting.FakeEndpointSliceFilter{ + NodeName: "node1", + }, + "/apis/discovery.k8s.io/v1/endpointslices", + expectEndpointSliceListWithFilter(), + nil, + }, + { + "test list endpoint slice with namespace", + &ctesting.FakeEndpointSliceFilter{ + NodeName: "node1", + }, + "/apis/discovery.k8s.io/v1/namespaces/default/endpointslices", + expectEndpointSliceListWithNamespace(), + nil, + }, + } { + t.Run(tc.tName, func(t *testing.T) { + w := &httptest.ResponseRecorder{ + Body: &bytes.Buffer{}, + } + + sp, err := NewDefaultShareProxy(tc.filterManager, + multiplexer.NewFakeCacheManager(mockCacheMap(), mockResourceCacheMap()), + []schema.GroupVersionResource{endpointSliceGVR}, + make(<-chan struct{})) + + assert.Equal(t, tc.err, err) + + sp.ServeHTTP(w, newEndpointSliceListRequest(tc.url)) + + assert.Equal(t, string(encodeEndpointSliceList(tc.expectedEndPointSliceList)), w.Body.String()) + }) + } +} + +func expectEndpointSliceListNoFilter() *discovery.EndpointSliceList { + return &discovery.EndpointSliceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "100", + }, + Items: []discovery.EndpointSlice{ + *newEndpointSlice(metav1.NamespaceSystem, "coredns-12345", "", mockEndpoints), + *newEndpointSlice(metav1.NamespaceDefault, "nginx", "", mockEndpoints), + }, + } +} + +func newStringPointer(str string) *string { + return &str +} + +func expectEndpointSliceListWithFilter() *discovery.EndpointSliceList { + endpoints := []discovery.Endpoint{ + { + Addresses: []string{"192.168.1.1"}, + NodeName: newStringPointer("node2"), + }, + { + Addresses: []string{"192.168.2.3"}, + NodeName: newStringPointer("node3"), + }, + } + + return &discovery.EndpointSliceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "100", + }, + Items: []discovery.EndpointSlice{ + *newEndpointSlice(metav1.NamespaceSystem, "coredns-12345", "", endpoints), + *newEndpointSlice(metav1.NamespaceDefault, "nginx", "", endpoints), + }, + } +} + +func expectEndpointSliceListWithNamespace() *discovery.EndpointSliceList { + endpoints := []discovery.Endpoint{ + { + Addresses: []string{"192.168.1.1"}, + NodeName: newStringPointer("node2"), + }, + { + Addresses: []string{"192.168.2.3"}, + NodeName: newStringPointer("node3"), + }, + } + + return &discovery.EndpointSliceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "100", + }, + Items: []discovery.EndpointSlice{ + *newEndpointSlice(metav1.NamespaceDefault, "nginx", "", endpoints), + }, + } +} + +func newEndpointSliceListRequest(url string) *http.Request { + req := httptest.NewRequest("GET", url, &bytes.Buffer{}) + + ctx := req.Context() + req = req.WithContext(request.WithRequestInfo(ctx, resolverRequestInfo(req))) + + return req +} + +func resolverRequestInfo(req *http.Request) *request.RequestInfo { + cfg := &server.Config{ + LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix), + } + resolver := server.NewRequestInfoResolver(cfg) + info, _ := resolver.NewRequestInfo(req) + return info +} + +func encodeEndpointSliceList(endpointSliceList *discovery.EndpointSliceList) []byte { + discoveryv1Codec := scheme.Codecs.CodecForVersions(scheme.Codecs.LegacyCodec(discoveryGV), scheme.Codecs.UniversalDecoder(discoveryGV), discoveryGV, discoveryGV) + + str := runtime.EncodeOrDie(discoveryv1Codec, endpointSliceList) + return []byte(str) +} + +func TestShareProxy_ServeHTTP_WATCH(t *testing.T) { + for _, tc := range []struct { + tName string + filterManager filter.FilterManager + url string + expectedWatchEvent *metav1.WatchEvent + Err error + }{ + {"test watch endpointslice no filter", + &ctesting.EmptyFilterManager{}, + "/apis/discovery.k8s.io/v1/endpointslices?watch=true&&resourceVersion=0&&timeoutSeconds=3", + expectedWatchEventNoFilter(), + nil, + }, + {"test watch endpointslice with filter", + &ctesting.FakeEndpointSliceFilter{ + NodeName: "node1", + }, + "/apis/discovery.k8s.io/v1/endpointslices?watch=true&&resourceVersion=0&&timeoutSeconds=3", + expectedWatchEventWithFilter(), + nil, + }, + } { + t.Run(tc.tName, func(t *testing.T) { + fcm := multiplexer.NewFakeCacheManager(mockCacheMap(), mockResourceCacheMap()) + + sp, _ := NewDefaultShareProxy( + tc.filterManager, + fcm, + []schema.GroupVersionResource{endpointSliceGVR}, + make(<-chan struct{}), + ) + + req := newWatchEndpointSliceRequest(tc.url) + w := newWatchResponse() + + go func() { + sp.ServeHTTP(w, req) + }() + generateWatchEvent(fcm) + + assertWatchResp(t, tc.expectedWatchEvent, w) + }) + } +} + +func expectedWatchEventNoFilter() *metav1.WatchEvent { + return &metav1.WatchEvent{ + Type: "ADDED", + Object: runtime.RawExtension{ + Object: newEndpointSlice(metav1.NamespaceSystem, "coredns-23456", "101", mockEndpoints), + }, + } +} + +func expectedWatchEventWithFilter() *metav1.WatchEvent { + endpoints := []discovery.Endpoint{ + { + Addresses: []string{"192.168.1.1"}, + NodeName: newStringPointer("node2"), + }, + { + Addresses: []string{"192.168.2.3"}, + NodeName: newStringPointer("node3"), + }, + } + return &metav1.WatchEvent{ + Type: "ADDED", + Object: runtime.RawExtension{ + Object: newEndpointSlice(metav1.NamespaceSystem, "coredns-23456", "101", endpoints), + }, + } +} + +func newWatchEndpointSliceRequest(url string) *http.Request { + req := httptest.NewRequest("GET", url, &bytes.Buffer{}) + + ctx := req.Context() + req = req.WithContext(request.WithRequestInfo(ctx, resolverRequestInfo(req))) + + return req +} + +func newWatchResponse() *wrapResponse { + return &wrapResponse{ + make(chan struct{}), + &httptest.ResponseRecorder{ + Body: &bytes.Buffer{}, + }, + } +} + +func generateWatchEvent(fcm *multiplexer.FakeCacheManager) { + fs, _, _ := fcm.ResourceCache(&endpointSliceGVR) + + fess, _ := fs.(*storage.FakeEndpointSliceStorage) + fess.AddWatchObject(newEndpointSlice(metav1.NamespaceSystem, "coredns-23456", "102", mockEndpoints)) +} + +func assertWatchResp(t testing.TB, expectedWatchEvent *metav1.WatchEvent, w *wrapResponse) { + t.Helper() + + select { + case <-time.After(5 * time.Second): + t.Errorf("wait watch timeout") + case <-w.Done: + assert.Equal(t, string(encodeWatchEventList(expectedWatchEvent)), w.Body.String()) + } +} + +func encodeWatchEventList(watchEvent *metav1.WatchEvent) []byte { + metav1Codec := scheme.Codecs.CodecForVersions(scheme.Codecs.LegacyCodec(discoveryGV), scheme.Codecs.UniversalDecoder(discoveryGV), discoveryGV, discoveryGV) + + str := runtime.EncodeOrDie(metav1Codec, watchEvent) + return []byte(str) +} diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go b/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go new file mode 100644 index 00000000000..bddc7631ffb --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/multiplexerwatch.go @@ -0,0 +1,246 @@ +/* +Copyright 2024 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "context" + "fmt" + "math/rand" + "net/http" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/validation" + metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/endpoints/handlers" + "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/klog/v2" + + "github.com/openyurtio/openyurt/pkg/yurthub/util" +) + +const ( + minRequestTimeout = 300 * time.Second +) + +var neverExitWatch <-chan time.Time = make(chan time.Time) + +// realTimeoutFactory implements timeoutFactory +type realTimeoutFactory struct { + timeout time.Duration +} + +// TimeoutCh returns a channel which will receive something when the watch times out, +// and a cleanup function to call when this happens. +func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { + if w.timeout == 0 { + return neverExitWatch, func() bool { return false } + } + t := time.NewTimer(w.timeout) + return t.C, t.Stop +} + +func (sp *multiplexerProxy) shareWatch(w http.ResponseWriter, r *http.Request, gvr *schema.GroupVersionResource) { + reqScope, err := sp.getReqScope(gvr) + if err != nil { + util.Err(err, w, r) + return + } + + listOpts, err := sp.decodeListOptions(r, reqScope) + if err != nil { + util.Err(err, w, r) + return + } + + storageOpts, err := sp.storageOpts(listOpts, gvr) + if err != nil { + util.Err(err, w, r) + } + + timeout := getTimeout(&listOpts) + ctx, cancel := context.WithTimeout(r.Context(), timeout) + defer cancel() + + outputMediaType, _, err := negotiation.NegotiateOutputMediaType(r, reqScope.Serializer, reqScope) + if err != nil { + util.Err(err, w, r) + return + } + + rc, _, err := sp.cacheManager.ResourceCache(gvr) + if err != nil { + util.Err(err, w, r) + return + } + + key, err := sp.getCacheKey(r, storageOpts) + if err != nil { + util.Err(err, w, r) + return + } + + watcher, err := rc.Watch(ctx, key, *storageOpts) + if err != nil { + util.Err(err, w, r) + return + } + + klog.V(3).InfoS("Starting watch", "path", r.URL.Path, "resourceVersion", listOpts.ResourceVersion, "labels", listOpts.LabelSelector, "fields", listOpts.FieldSelector, "timeout", timeout) + serveWatch(newFilterWatch(watcher, sp.filterMgr.FindObjectFilters(r)), reqScope, outputMediaType, r, w, timeout) +} + +func getTimeout(opts *metainternalversion.ListOptions) time.Duration { + timeout := time.Duration(0) + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + if timeout == 0 && minRequestTimeout > 0 { + timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0)) + } + return timeout +} + +func serveWatch(watcher watch.Interface, scope *handlers.RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) { + defer watcher.Stop() + + handler, err := serveWatchHandler(watcher, scope, mediaTypeOptions, req, timeout) + if err != nil { + util.Err(err, w, req) + return + } + + handler.ServeHTTP(w, req) +} + +func serveWatchHandler(watcher watch.Interface, scope *handlers.RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, timeout time.Duration) (http.Handler, error) { + + options, err := optionsForTransform(mediaTypeOptions, req) + if err != nil { + return nil, errors.NewInternalError(fmt.Errorf("failed to get options from transform, error: %v", err)) + } + + // negotiate for the stream serializer from the scope's serializer + serializer, err := negotiation.NegotiateOutputMediaTypeStream(req, scope.Serializer, scope) + if err != nil { + return nil, errors.NewInternalError(fmt.Errorf("failed to get output media type stream, error: %v", err)) + } + + framer := serializer.StreamSerializer.Framer + streamSerializer := serializer.StreamSerializer.Serializer + encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion()) + useTextFraming := serializer.EncodesAsText + if framer == nil { + return nil, errors.NewInternalError(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType)) + } + // TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here + mediaType := serializer.MediaType + if mediaType != runtime.ContentTypeJSON { + mediaType += ";stream=watch" + } + + // locate the appropriate embedded encoder based on the transform + var embeddedEncoder runtime.Encoder + contentKind, contentSerializer, transform := targetEncodingForTransform(scope, mediaTypeOptions, req) + if transform { + info, ok := runtime.SerializerInfoForMediaType(contentSerializer.SupportedMediaTypes(), serializer.MediaType) + if !ok { + return nil, errors.NewInternalError(fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer)) + } + embeddedEncoder = contentSerializer.EncoderForVersion(info.Serializer, contentKind.GroupVersion()) + } else { + embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion()) + } + + var memoryAllocator runtime.MemoryAllocator + + if encoderWithAllocator, supportsAllocator := embeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator { + // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. + // instead, we allocate the buffer for the entire watch session and release it when we close the connection. + memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) + embeddedEncoder = runtime.NewEncoderWithAllocator(encoderWithAllocator, memoryAllocator) + } + + var tableOptions *metav1.TableOptions + if options != nil { + if passedOptions, ok := options.(*metav1.TableOptions); ok { + tableOptions = passedOptions + } else { + return nil, errors.NewInternalError(fmt.Errorf("unexpected options type: %T", options)) + } + } + embeddedEncoder = newWatchEmbeddedEncoder(req.Context(), embeddedEncoder, mediaTypeOptions.Convert, tableOptions, scope) + + var serverShuttingDownCh <-chan struct{} + if signals := request.ServerShutdownSignalFrom(req.Context()); signals != nil { + serverShuttingDownCh = signals.ShuttingDown() + } + + server := &handlers.WatchServer{ + Watching: watcher, + Scope: scope, + + UseTextFraming: useTextFraming, + MediaType: mediaType, + Framer: framer, + Encoder: encoder, + EmbeddedEncoder: embeddedEncoder, + + TimeoutFactory: &realTimeoutFactory{timeout}, + ServerShuttingDownCh: serverShuttingDownCh, + } + + return http.HandlerFunc(server.HandleHTTP), nil +} + +func optionsForTransform(mediaType negotiation.MediaTypeOptions, req *http.Request) (interface{}, error) { + switch target := mediaType.Convert; { + case target == nil: + case target.Kind == "Table" && (target.GroupVersion() == metav1beta1.SchemeGroupVersion || target.GroupVersion() == metav1.SchemeGroupVersion): + opts := &metav1.TableOptions{} + if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), metav1.SchemeGroupVersion, opts); err != nil { + return nil, err + } + switch errs := validation.ValidateTableOptions(opts); len(errs) { + case 0: + return opts, nil + case 1: + return nil, errors.NewBadRequest(fmt.Sprintf("Unable to convert to Table as requested: %v", errs[0].Error())) + default: + return nil, errors.NewBadRequest(fmt.Sprintf("Unable to convert to Table as requested: %v", errs)) + } + } + return nil, nil +} + +func targetEncodingForTransform(scope *handlers.RequestScope, mediaType negotiation.MediaTypeOptions, req *http.Request) (schema.GroupVersionKind, runtime.NegotiatedSerializer, bool) { + switch target := mediaType.Convert; { + case target == nil: + case (target.Kind == "PartialObjectMetadata" || target.Kind == "PartialObjectMetadataList" || target.Kind == "Table") && + (target.GroupVersion() == metav1beta1.SchemeGroupVersion || target.GroupVersion() == metav1.SchemeGroupVersion): + return *target, metainternalversionscheme.Codecs, true + } + return scope.Kind, scope.Serializer, false +} diff --git a/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go b/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go new file mode 100644 index 00000000000..e6be9b9952b --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go @@ -0,0 +1,55 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + discovery "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" +) + +type IgnoreEndpointslicesWithNodeName struct { + IgnoreNodeName string +} + +func (ie *IgnoreEndpointslicesWithNodeName) Name() string { + return "ignoreendpointsliceswithname" +} + +func (ie *IgnoreEndpointslicesWithNodeName) SupportedResourceAndVerbs() map[string]sets.Set[string] { + return nil +} + +// Filter is used for filtering runtime object +// all filter logic should be located in it. +func (ie *IgnoreEndpointslicesWithNodeName) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { + endpointslice, ok := obj.(*discovery.EndpointSlice) + if !ok { + return obj + } + + var newEps []discovery.Endpoint + + for _, ep := range endpointslice.Endpoints { + if *ep.NodeName != ie.IgnoreNodeName { + newEps = append(newEps, ep) + } + } + endpointslice.Endpoints = newEps + + return endpointslice +} diff --git a/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go b/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go new file mode 100644 index 00000000000..26f7489ccc5 --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/testing/fake_filtermanager.go @@ -0,0 +1,48 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "net/http" + + "github.com/openyurtio/openyurt/pkg/yurthub/filter" +) + +type EmptyFilterManager struct { +} + +func (fm *EmptyFilterManager) FindResponseFilter(req *http.Request) (filter.ResponseFilter, bool) { + return nil, false +} + +func (fm *EmptyFilterManager) FindObjectFilters(req *http.Request) filter.ObjectFilter { + return nil +} + +type FakeEndpointSliceFilter struct { + NodeName string +} + +func (fm *FakeEndpointSliceFilter) FindResponseFilter(req *http.Request) (filter.ResponseFilter, bool) { + return nil, false +} + +func (fm *FakeEndpointSliceFilter) FindObjectFilters(req *http.Request) filter.ObjectFilter { + return &IgnoreEndpointslicesWithNodeName{ + fm.NodeName, + } +} diff --git a/pkg/yurthub/proxy/multiplexer/testing/mock_cacheableobject.go b/pkg/yurthub/proxy/multiplexer/testing/mock_cacheableobject.go new file mode 100644 index 00000000000..7c954601493 --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/testing/mock_cacheableobject.go @@ -0,0 +1,44 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "io" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type MockCacheableObject struct { + Obj runtime.Object +} + +func (mc *MockCacheableObject) CacheEncode(id runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error { + return nil +} + +func (mc *MockCacheableObject) GetObject() runtime.Object { + return mc.Obj +} + +func (mc *MockCacheableObject) GetObjectKind() schema.ObjectKind { + return nil +} + +func (mc *MockCacheableObject) DeepCopyObject() runtime.Object { + return mc.Obj +} diff --git a/pkg/yurthub/proxy/multiplexer/watchembeddedencoder.go b/pkg/yurthub/proxy/multiplexer/watchembeddedencoder.go new file mode 100644 index 00000000000..8ac19ecb3c7 --- /dev/null +++ b/pkg/yurthub/proxy/multiplexer/watchembeddedencoder.go @@ -0,0 +1,303 @@ +/* +Copyright 2024 The OpenYurt Authors. +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "reflect" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apiserver/pkg/endpoints/handlers" + "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" + "k8s.io/klog/v2" +) + +// watchEmbeddedEncoder performs encoding of the embedded object. +// +// NOTE: watchEmbeddedEncoder is NOT thread-safe. +type watchEmbeddedEncoder struct { + encoder runtime.Encoder + + ctx context.Context + + // target, if non-nil, configures transformation type. + // The other options are ignored if target is nil. + target *schema.GroupVersionKind + tableOptions *metav1.TableOptions + scope *handlers.RequestScope + + // identifier of the encoder, computed lazily + identifier runtime.Identifier +} + +func newWatchEmbeddedEncoder(ctx context.Context, encoder runtime.Encoder, target *schema.GroupVersionKind, tableOptions *metav1.TableOptions, scope *handlers.RequestScope) *watchEmbeddedEncoder { + return &watchEmbeddedEncoder{ + encoder: encoder, + ctx: ctx, + target: target, + tableOptions: tableOptions, + scope: scope, + } +} + +// Encode implements runtime.Encoder interface. +func (e *watchEmbeddedEncoder) Encode(obj runtime.Object, w io.Writer) error { + if co, ok := obj.(runtime.CacheableObject); ok { + return co.CacheEncode(e.Identifier(), e.doEncode, w) + } + return e.doEncode(obj, w) +} + +func (e *watchEmbeddedEncoder) doEncode(obj runtime.Object, w io.Writer) error { + result, err := doTransformObject(e.ctx, obj, e.tableOptions, e.target, e.scope) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err)) + result = obj + } + + // When we are transforming to a table, use the original table options when + // we should print headers only on the first object - headers should be + // omitted on subsequent events. + if e.tableOptions != nil && !e.tableOptions.NoHeaders { + e.tableOptions.NoHeaders = true + // With options change, we should recompute the identifier. + // Clearing this will trigger lazy recompute when needed. + e.identifier = "" + } + + return e.encoder.Encode(result, w) +} + +// Identifier implements runtime.Encoder interface. +func (e *watchEmbeddedEncoder) Identifier() runtime.Identifier { + if e.identifier == "" { + e.identifier = e.embeddedIdentifier() + } + return e.identifier +} + +type watchEmbeddedEncoderIdentifier struct { + Name string `json:"name,omitempty"` + Encoder string `json:"encoder,omitempty"` + Target string `json:"target,omitempty"` + Options metav1.TableOptions `json:"options,omitempty"` + NoHeaders bool `json:"noHeaders,omitempty"` +} + +func (e *watchEmbeddedEncoder) embeddedIdentifier() runtime.Identifier { + if e.target == nil { + // If no conversion is performed, we effective only use + // the embedded identifier. + return e.encoder.Identifier() + } + identifier := watchEmbeddedEncoderIdentifier{ + Name: "watch-embedded", + Encoder: string(e.encoder.Identifier()), + Target: e.target.String(), + } + if e.target.Kind == "Table" && e.tableOptions != nil { + identifier.Options = *e.tableOptions + identifier.NoHeaders = e.tableOptions.NoHeaders + } + + result, err := json.Marshal(identifier) + if err != nil { + klog.Fatalf("Failed marshaling identifier for watchEmbeddedEncoder: %v", err) + } + return runtime.Identifier(result) +} + +// doTransformResponseObject is used for handling all requests, including watch. +func doTransformObject(ctx context.Context, obj runtime.Object, opts interface{}, target *schema.GroupVersionKind, scope *handlers.RequestScope) (runtime.Object, error) { + if _, ok := obj.(*metav1.Status); ok { + return obj, nil + } + + switch { + case target == nil: + // If we ever change that from a no-op, the identifier of + // the watchEmbeddedEncoder has to be adjusted accordingly. + return obj, nil + + case target.Kind == "PartialObjectMetadata": + return asPartialObjectMetadata(obj, target.GroupVersion()) + + case target.Kind == "PartialObjectMetadataList": + return asPartialObjectMetadataList(obj, target.GroupVersion()) + + case target.Kind == "Table": + options, ok := opts.(*metav1.TableOptions) + if !ok { + return nil, fmt.Errorf("unexpected TableOptions, got %T", opts) + } + return asTable(ctx, obj, options, scope, target.GroupVersion()) + + default: + accepted, _ := negotiation.MediaTypesForSerializer(metainternalversionscheme.Codecs) + err := negotiation.NewNotAcceptableError(accepted) + return nil, err + } +} + +func asTable(ctx context.Context, result runtime.Object, opts *metav1.TableOptions, scope *handlers.RequestScope, groupVersion schema.GroupVersion) (runtime.Object, error) { + switch groupVersion { + case metav1beta1.SchemeGroupVersion, metav1.SchemeGroupVersion: + default: + return nil, newNotAcceptableError(fmt.Sprintf("no Table exists in group version %s", groupVersion)) + } + + obj, err := scope.TableConvertor.ConvertToTable(ctx, result, opts) + if err != nil { + return nil, err + } + + table := (*metav1.Table)(obj) + + for i := range table.Rows { + item := &table.Rows[i] + switch opts.IncludeObject { + case metav1.IncludeObject: + item.Object.Object, err = scope.Convertor.ConvertToVersion(item.Object.Object, scope.Kind.GroupVersion()) + if err != nil { + return nil, err + } + // TODO: rely on defaulting for the value here? + case metav1.IncludeMetadata, "": + m, err := meta.Accessor(item.Object.Object) + if err != nil { + return nil, err + } + // TODO: turn this into an internal type and do conversion in order to get object kind automatically set? + partial := meta.AsPartialObjectMetadata(m) + partial.GetObjectKind().SetGroupVersionKind(groupVersion.WithKind("PartialObjectMetadata")) + item.Object.Object = partial + case metav1.IncludeNone: + item.Object.Object = nil + default: + err = errors.NewBadRequest(fmt.Sprintf("unrecognized includeObject value: %q", opts.IncludeObject)) + return nil, err + } + } + + return table, nil +} + +// errNotAcceptable indicates Accept negotiation has failed +type errNotAcceptable struct { + message string +} + +func newNotAcceptableError(message string) error { + return errNotAcceptable{message} +} + +func (e errNotAcceptable) Error() string { + return e.message +} + +func (e errNotAcceptable) Status() metav1.Status { + return metav1.Status{ + Status: metav1.StatusFailure, + Code: http.StatusNotAcceptable, + Reason: metav1.StatusReason("NotAcceptable"), + Message: e.Error(), + } +} + +func asPartialObjectMetadata(result runtime.Object, groupVersion schema.GroupVersion) (runtime.Object, error) { + if meta.IsListType(result) { + err := newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadata, but the requested object is a list (%T)", result)) + return nil, err + } + switch groupVersion { + case metav1beta1.SchemeGroupVersion, metav1.SchemeGroupVersion: + default: + return nil, newNotAcceptableError(fmt.Sprintf("no PartialObjectMetadataList exists in group version %s", groupVersion)) + } + m, err := meta.Accessor(result) + if err != nil { + return nil, err + } + partial := meta.AsPartialObjectMetadata(m) + partial.GetObjectKind().SetGroupVersionKind(groupVersion.WithKind("PartialObjectMetadata")) + return partial, nil +} + +func asPartialObjectMetadataList(result runtime.Object, groupVersion schema.GroupVersion) (runtime.Object, error) { + li, ok := result.(metav1.ListInterface) + if !ok { + return nil, newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadataList, but the requested object is not a list (%T)", result)) + } + + gvk := groupVersion.WithKind("PartialObjectMetadata") + switch { + case groupVersion == metav1beta1.SchemeGroupVersion: + list := &metav1beta1.PartialObjectMetadataList{} + err := meta.EachListItem(result, func(obj runtime.Object) error { + m, err := meta.Accessor(obj) + if err != nil { + return err + } + partial := meta.AsPartialObjectMetadata(m) + partial.GetObjectKind().SetGroupVersionKind(gvk) + list.Items = append(list.Items, *partial) + return nil + }) + if err != nil { + return nil, err + } + list.ResourceVersion = li.GetResourceVersion() + list.Continue = li.GetContinue() + list.RemainingItemCount = li.GetRemainingItemCount() + return list, nil + + case groupVersion == metav1.SchemeGroupVersion: + list := &metav1.PartialObjectMetadataList{} + err := meta.EachListItem(result, func(obj runtime.Object) error { + m, err := meta.Accessor(obj) + if err != nil { + return err + } + partial := meta.AsPartialObjectMetadata(m) + partial.GetObjectKind().SetGroupVersionKind(gvk) + list.Items = append(list.Items, *partial) + return nil + }) + if err != nil { + return nil, err + } + list.ResourceVersion = li.GetResourceVersion() + list.Continue = li.GetContinue() + list.RemainingItemCount = li.GetRemainingItemCount() + return list, nil + + default: + return nil, newNotAcceptableError(fmt.Sprintf("no PartialObjectMetadataList exists in group version %s", groupVersion)) + } +} diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index 7c791e7a129..c1dea9ba37b 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -19,6 +19,7 @@ package proxy import ( "bytes" "errors" + "fmt" "io" "net/http" "net/url" @@ -40,6 +41,7 @@ import ( hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/autonomy" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/local" + "github.com/openyurtio/openyurt/pkg/yurthub/proxy/multiplexer" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/pool" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/remote" "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" @@ -50,6 +52,8 @@ import ( coordinatorconstants "github.com/openyurtio/openyurt/pkg/yurthub/yurtcoordinator/constants" ) +const multiplexerProxyPostHookName = "multiplexerProxy" + type yurtReverseProxy struct { resolver apirequest.RequestInfoResolver loadBalancer remote.LoadBalancer @@ -63,6 +67,8 @@ type yurtReverseProxy struct { isCoordinatorReady func() bool workingMode hubutil.WorkingMode enableYurtCoordinator bool + multiplexerProxy http.Handler + multiplexerResources []schema.GroupVersionResource } // NewYurtReverseProxyHandler creates a http handler for proxying @@ -158,6 +164,20 @@ func NewYurtReverseProxyHandler( enableYurtCoordinator: yurtHubCfg.EnableCoordinator, tenantMgr: tenantMgr, workingMode: yurtHubCfg.WorkingMode, + multiplexerResources: yurtHubCfg.MultiplexerResources, + } + + if yurtHubCfg.PostStartHooks == nil { + yurtHubCfg.PostStartHooks = make(map[string]func() error) + } + yurtHubCfg.PostStartHooks[multiplexerProxyPostHookName] = func() error { + if yurtProxy.multiplexerProxy, err = multiplexer.NewDefaultShareProxy(yurtHubCfg.FilterManager, + yurtHubCfg.MultiplexerCacheManager, + yurtHubCfg.MultiplexerResources, + stopCh); err != nil { + return fmt.Errorf("failed to new default share proxy, error: %v", err) + } + return nil } return yurtProxy.buildHandlerChain(yurtProxy), nil @@ -214,6 +234,10 @@ func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) p.poolScopedResourceHandler(rw, req) case util.IsSubjectAccessReviewCreateGetRequest(req): p.subjectAccessReviewHandler(rw, req) + case util.IsMultiplexerRequest(req, p.multiplexerResources): + if p.multiplexerProxy != nil { + p.multiplexerProxy.ServeHTTP(rw, req) + } default: // For resource request that do not need to be handled by yurt-coordinator, // handling the request with cloud apiserver or local cache. diff --git a/pkg/yurthub/proxy/util/util.go b/pkg/yurthub/proxy/util/util.go index 7dd79be2efc..abc7fad4ba2 100644 --- a/pkg/yurthub/proxy/util/util.go +++ b/pkg/yurthub/proxy/util/util.go @@ -551,3 +551,37 @@ func ReListWatchReq(rw http.ResponseWriter, req *http.Request) { klog.Infof("this request write error event back finished.") rw.(http.Flusher).Flush() } + +func IsMultiplexerRequest(req *http.Request, multiplexerResources []schema.GroupVersionResource) bool { + ctx := req.Context() + + if req.UserAgent() == util.MultiplexerProxyClientUserAgent { + return false + } + + info, ok := apirequest.RequestInfoFrom(ctx) + if !ok { + return false + } + + if info.Verb != "list" && info.Verb != "watch" { + return false + } + + return isMultiplexerResource(info, multiplexerResources) +} + +func isMultiplexerResource(info *apirequest.RequestInfo, multiplexerResources []schema.GroupVersionResource) bool { + gvr := schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + } + + for _, resource := range multiplexerResources { + if gvr.String() == resource.String() { + return true + } + } + return false +} diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 51b30774c60..ecd851b632a 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -21,6 +21,7 @@ import ( "net/http" "github.com/gorilla/mux" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" @@ -73,6 +74,12 @@ func RunYurtHubServers(cfg *config.YurtHubConfiguration, return err } } + + for name, hook := range cfg.PostStartHooks { + if err := hook(); err != nil { + return errors.Wrapf(err, "failed to run post start hooks: %s", name) + } + } return nil } diff --git a/pkg/yurthub/util/util.go b/pkg/yurthub/util/util.go index 51429d9c85f..db9c8f7bc3a 100644 --- a/pkg/yurthub/util/util.go +++ b/pkg/yurthub/util/util.go @@ -86,13 +86,15 @@ const ( CacheUserAgentsKey = "cache_agents" PoolScopeResourcesKey = "pool_scope_resources" + MultiplexerProxyClientUserAgent = "multiplexer-proxy" + YurtHubProxyPort = 10261 YurtHubPort = 10267 YurtHubProxySecurePort = 10268 ) var ( - DefaultCacheAgents = []string{"kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", "share-hub", projectinfo.GetAgentName(), projectinfo.GetHubName(), coordinatorconstants.DefaultPoolScopedUserAgent} + DefaultCacheAgents = []string{"kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", MultiplexerProxyClientUserAgent, projectinfo.GetAgentName(), projectinfo.GetHubName(), coordinatorconstants.DefaultPoolScopedUserAgent} YurthubConfigMapName = fmt.Sprintf("%s-hub-cfg", strings.TrimRightFunc(projectinfo.GetProjectPrefix(), func(c rune) bool { return c == '-' })) ) diff --git a/pkg/yurtmanager/controller/util/node/controller_utils.go b/pkg/yurtmanager/controller/util/node/controller_utils.go index 825db699007..1395d4f3cc5 100644 --- a/pkg/yurtmanager/controller/util/node/controller_utils.go +++ b/pkg/yurtmanager/controller/util/node/controller_utils.go @@ -23,7 +23,7 @@ import ( "fmt" "time" - v1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -69,12 +69,12 @@ var UpdateLabelBackoff = wait.Backoff{ // DeletePods will delete all pods from master running on given node, // and return true if any pods were deleted, or were found pending // deletion. -func DeletePods(ctx context.Context, c client.Client, pods []*v1.Pod, recorder record.EventRecorder, nodeName, nodeUID string) (bool, error) { +func DeletePods(ctx context.Context, c client.Client, pods []*corev1.Pod, recorder record.EventRecorder, nodeName, nodeUID string) (bool, error) { remaining := false var updateErrList []error if len(pods) > 0 { - RecordNodeEvent(ctx, recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) + RecordNodeEvent(ctx, recorder, nodeName, nodeUID, corev1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) } for i := range pods { @@ -100,7 +100,7 @@ func DeletePods(ctx context.Context, c client.Client, pods []*v1.Pod, recorder r } klog.InfoS("Starting deletion of pod", "pod", klog.KObj(pod)) - recorder.Eventf(pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName) + recorder.Eventf(pod, corev1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName) //if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil { if err := c.Delete(ctx, pod); err != nil { if apierrors.IsNotFound(err) { @@ -122,7 +122,7 @@ func DeletePods(ctx context.Context, c client.Client, pods []*v1.Pod, recorder r // SetPodTerminationReason attempts to set a reason and message in the // pod status, updates it in the apiserver, and returns an error if it // encounters one. -func SetPodTerminationReason(ctx context.Context, c client.Client, pod *v1.Pod, nodeName string) (*v1.Pod, error) { +func SetPodTerminationReason(ctx context.Context, c client.Client, pod *corev1.Pod, nodeName string) (*corev1.Pod, error) { if pod.Status.Reason == NodeUnreachablePodReason { return pod, nil } @@ -140,7 +140,7 @@ func SetPodTerminationReason(ctx context.Context, c client.Client, pod *v1.Pod, // MarkPodsNotReady updates ready status of given pods running on // given node from master return true if success -func MarkPodsNotReady(ctx context.Context, c client.Client, recorder record.EventRecorder, pods []*v1.Pod, nodeName string) error { +func MarkPodsNotReady(ctx context.Context, c client.Client, recorder record.EventRecorder, pods []*corev1.Pod, nodeName string) error { klog.V(2).InfoS("Update ready status of pods on node", "node", klog.KRef("", nodeName)) errs := []error{} @@ -153,11 +153,11 @@ func MarkPodsNotReady(ctx context.Context, c client.Client, recorder record.Even // Pod will be modified, so making copy is required. pod := pods[i].DeepCopy() for _, cond := range pod.Status.Conditions { - if cond.Type != v1.PodReady { + if cond.Type != corev1.PodReady { continue } - cond.Status = v1.ConditionFalse + cond.Status = corev1.ConditionFalse if !utilpod.UpdatePodCondition(&pod.Status, &cond) { break } @@ -174,7 +174,7 @@ func MarkPodsNotReady(ctx context.Context, c client.Client, recorder record.Even errs = append(errs, err) } // record NodeNotReady event after updateStatus to make sure pod still exists - recorder.Event(pod, v1.EventTypeWarning, "NodeNotReady", "Node is not ready") + recorder.Event(pod, corev1.EventTypeWarning, "NodeNotReady", "Node is not ready") break } } @@ -184,7 +184,7 @@ func MarkPodsNotReady(ctx context.Context, c client.Client, recorder record.Even // RecordNodeEvent records a event related to a node. func RecordNodeEvent(ctx context.Context, recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) { - ref := &v1.ObjectReference{ + ref := &corev1.ObjectReference{ APIVersion: "v1", Kind: "Node", Name: nodeName, @@ -196,8 +196,8 @@ func RecordNodeEvent(ctx context.Context, recorder record.EventRecorder, nodeNam } // RecordNodeStatusChange records a event related to a node status change. (Common to lifecycle and ipam) -func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newStatus string) { - ref := &v1.ObjectReference{ +func RecordNodeStatusChange(recorder record.EventRecorder, node *corev1.Node, newStatus string) { + ref := &corev1.ObjectReference{ APIVersion: "v1", Kind: "Node", Name: node.Name, @@ -207,12 +207,12 @@ func RecordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, newSta klog.V(2).InfoS("Recording status change event message for node", "status", newStatus, "node", node.Name) // TODO: This requires a transaction, either both node status is updated // and event is recorded or neither should happen, see issue #6055. - recorder.Eventf(ref, v1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus) + recorder.Eventf(ref, corev1.EventTypeNormal, newStatus, "Node %s status is now: %s", node.Name, newStatus) } // SwapNodeControllerTaint returns true in case of success and false // otherwise. -func SwapNodeControllerTaint(ctx context.Context, kubeClient clientset.Interface, taintsToAdd, taintsToRemove []*v1.Taint, node *v1.Node) bool { +func SwapNodeControllerTaint(ctx context.Context, kubeClient clientset.Interface, taintsToAdd, taintsToRemove []*corev1.Taint, node *corev1.Node) bool { for _, taintToAdd := range taintsToAdd { now := metav1.Now() taintToAdd.TimeAdded = &now @@ -247,7 +247,7 @@ func SwapNodeControllerTaint(ctx context.Context, kubeClient clientset.Interface // AddOrUpdateLabelsOnNode updates the labels on the node and returns true on // success and false on failure. -func AddOrUpdateLabelsOnNode(ctx context.Context, kubeClient clientset.Interface, labelsToUpdate map[string]string, node *v1.Node) bool { +func AddOrUpdateLabelsOnNode(ctx context.Context, kubeClient clientset.Interface, labelsToUpdate map[string]string, node *corev1.Node) bool { if err := addOrUpdateLabelsOnNode(kubeClient, node.Name, labelsToUpdate); err != nil { utilruntime.HandleError( fmt.Errorf( @@ -263,7 +263,7 @@ func AddOrUpdateLabelsOnNode(ctx context.Context, kubeClient clientset.Interface // GetNodeCondition extracts the provided condition from the given status and returns that. // Returns nil and -1 if the condition is not present, and the index of the located condition. -func GetNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) (int, *v1.NodeCondition) { +func GetNodeCondition(status *corev1.NodeStatus, conditionType corev1.NodeConditionType) (int, *corev1.NodeCondition) { if status == nil { return -1, nil } @@ -277,14 +277,14 @@ func GetNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) // AddOrUpdateTaintOnNode add taints to the node. If taint was added into node, it'll issue API calls // to update nodes; otherwise, no API calls. Return error if any. -func AddOrUpdateTaintOnNode(ctx context.Context, c clientset.Interface, nodeName string, taints ...*v1.Taint) error { +func AddOrUpdateTaintOnNode(ctx context.Context, c clientset.Interface, nodeName string, taints ...*corev1.Taint) error { if len(taints) == 0 { return nil } firstTry := true return clientretry.RetryOnConflict(UpdateTaintBackoff, func() error { var err error - var oldNode *v1.Node + var oldNode *corev1.Node // First we try getting node from the API server cache, as it's cheaper. If it fails // we get it from etcd to be sure to have fresh data. option := metav1.GetOptions{} @@ -297,7 +297,7 @@ func AddOrUpdateTaintOnNode(ctx context.Context, c clientset.Interface, nodeName return err } - var newNode *v1.Node + var newNode *corev1.Node oldNodeCopy := oldNode updated := false for _, taint := range taints { @@ -320,7 +320,7 @@ func AddOrUpdateTaintOnNode(ctx context.Context, c clientset.Interface, nodeName // won't fail if target taint doesn't exist or has been removed. // If passed a node it'll check if there's anything to be done, if taint is not present it won't issue // any API calls. -func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName string, node *v1.Node, taints ...*v1.Taint) error { +func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName string, node *corev1.Node, taints ...*corev1.Taint) error { if len(taints) == 0 { return nil } @@ -341,7 +341,7 @@ func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName str firstTry := true return clientretry.RetryOnConflict(UpdateTaintBackoff, func() error { var err error - var oldNode *v1.Node + var oldNode *corev1.Node // First we try getting node from the API server cache, as it's cheaper. If it fails // we get it from etcd to be sure to have fresh data. option := metav1.GetOptions{} @@ -354,7 +354,7 @@ func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName str return err } - var newNode *v1.Node + var newNode *corev1.Node oldNodeCopy := oldNode updated := false for _, taint := range taints { @@ -374,7 +374,7 @@ func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName str } // PatchNodeTaints patches node's taints. -func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error { +func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string, oldNode *corev1.Node, newNode *corev1.Node) error { // Strip base diff node from RV to ensure that our Patch request will set RV to check for conflicts over .spec.taints. // This is needed because .spec.taints does not specify patchMergeKey and patchStrategy and adding them is no longer an option for compatibility reasons. // Using other Patch strategy works for adding new taints, however will not resolve problem with taint removal. @@ -393,7 +393,7 @@ func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string return fmt.Errorf("could not marshal new node %#v for node %q: %v", newNodeClone, nodeName, err) } - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldDataNoRV, newData, v1.Node{}) + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldDataNoRV, newData, corev1.Node{}) if err != nil { return fmt.Errorf("could not create patch for node %q: %v", nodeName, err) } @@ -406,7 +406,7 @@ func addOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, la firstTry := true return clientretry.RetryOnConflict(UpdateLabelBackoff, func() error { var err error - var node *v1.Node + var node *corev1.Node // First we try getting node from the API server cache, as it's cheaper. If it fails // we get it from etcd to be sure to have fresh data. option := metav1.GetOptions{} @@ -436,7 +436,7 @@ func addOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, la if err != nil { return fmt.Errorf("could not marshal the new node %#v: %v", newNode, err) } - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{}) + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &corev1.Node{}) if err != nil { return fmt.Errorf("could not create a two-way merge patch: %v", err) } @@ -447,7 +447,7 @@ func addOrUpdateLabelsOnNode(kubeClient clientset.Interface, nodeName string, la }) } -func IsPodBoundenToNode(node *v1.Node) bool { +func IsPodBoundenToNode(node *corev1.Node) bool { if node.Annotations != nil && (node.Annotations[projectinfo.GetAutonomyAnnotation()] == "true" || node.Annotations[PodBindingAnnotation] == "true") {