Skip to content

Commit

Permalink
add multiplexer proxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyjhtangtang committed Dec 3, 2024
1 parent f2e07f0 commit a2f0237
Show file tree
Hide file tree
Showing 25 changed files with 2,026 additions and 91 deletions.
35 changes: 35 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand All @@ -54,11 +55,26 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var DefaultMultiplexerResources = []schema.GroupVersionResource{
{
Group: "",
Version: "v1",
Resource: "services",
},
{
Group: "discovery.k8s.io",
Version: "v1",
Resource: "endpointslices",
},
}

// YurtHubConfiguration represents configuration of yurthub
type YurtHubConfiguration struct {
LBMode string
Expand Down Expand Up @@ -101,6 +117,9 @@ type YurtHubConfiguration struct {
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
HostControlPlaneAddr string // ip:port
PostStartHooks map[string]func() error
MultiplexerCacheManager multiplexer.MultiplexerManager
MultiplexerResources []schema.GroupVersionResource
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -176,6 +195,8 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
LeaderElection: options.LeaderElection,
HostControlPlaneAddr: options.HostControlPlaneAddr,
MultiplexerResources: DefaultMultiplexerResources,
MultiplexerCacheManager: newMultiplexerCacheManager(options),
}

// if yurthub is in local mode, certMgr and networkMgr are no need to start
Expand Down Expand Up @@ -403,3 +424,17 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y

return nil
}

func newMultiplexerCacheManager(options *options.YurtHubOptions) multiplexer.MultiplexerManager {
config := newRestConfig(options.YurtHubProxyHost, options.YurtHubProxyPort)
rsm := storage.NewStorageManager(config)

return multiplexer.NewRequestsMultiplexerManager(rsm)
}

func newRestConfig(host string, port int) *rest.Config {
return &rest.Config{
Host: fmt.Sprintf("http://%s:%d", host, port),
UserAgent: util.MultiplexerProxyClientUserAgent,
}
}
34 changes: 34 additions & 0 deletions pkg/yurthub/filter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package filter
import (
"io"
"net/http"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"

yurtutil "github.com/openyurtio/openyurt/pkg/util"
)

type NodesInPoolGetter func(poolName string) ([]string, error)
Expand Down Expand Up @@ -59,4 +62,35 @@ type ObjectFilter interface {
Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object
}

type FilterManager interface {
FindResponseFilter(req *http.Request) (ResponseFilter, bool)
FindObjectFilters(req *http.Request) ObjectFilter
}

type NodeGetter func(name string) (*v1.Node, error)

type UnionObjectFilter []ObjectFilter

func (chain UnionObjectFilter) Name() string {
var names []string
for i := range chain {
names = append(names, chain[i].Name())
}
return strings.Join(names, ",")
}

func (chain UnionObjectFilter) SupportedResourceAndVerbs() map[string]sets.Set[string] {
// do nothing
return map[string]sets.Set[string]{}
}

func (chain UnionObjectFilter) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
for i := range chain {
obj = chain[i].Filter(obj, stopCh)
if yurtutil.IsNil(obj) {
break
}
}

return obj
}
17 changes: 17 additions & 0 deletions pkg/yurthub/filter/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,20 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter,

return nil, false
}

func (m *Manager) FindObjectFilters(req *http.Request) filter.ObjectFilter {
objectFilters := make([]filter.ObjectFilter, 0)
approved, filterNames := m.Approver.Approve(req)
if !approved {
return nil
}

for i := range filterNames {
if objectFilter, ok := m.nameToObjectFilter[filterNames[i]]; ok {
objectFilters = append(objectFilters, objectFilter)
}
}

filters := filter.UnionObjectFilter(objectFilters)
return filters
}
1 change: 0 additions & 1 deletion pkg/yurthub/multiplexer/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

type Interface interface {
Watch(ctx context.Context, key string, opts kstorage.ListOptions) (watch.Interface, error)
Get(ctx context.Context, key string, opts kstorage.GetOptions, objPtr runtime.Object) error
GetList(ctx context.Context, key string, opts kstorage.ListOptions, listObj runtime.Object) error
}

Expand Down
96 changes: 50 additions & 46 deletions pkg/yurthub/multiplexer/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,60 @@ var newServiceListFunc = func() runtime.Object {
}

func TestResourceCache_GetList(t *testing.T) {
cache, _, err := NewResourceCache(
ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}),
storage := ystorage.NewFakeServiceStorage(
[]v1.Service{
*newService(metav1.NamespaceSystem, "coredns"),
*newService(metav1.NamespaceDefault, "nginx"),
})

cache, _, _ := NewResourceCache(
storage,
serviceGVR,
&ResourceCacheConfig{
keyFunc,
KeyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
AttrsFunc,
},
)

assert.Nil(t, err)
assertCacheGetList(t, cache)
for _, tc := range []struct {
name string
key string
expectedServiceList *v1.ServiceList
}{
{
"all namespace",
"",
&v1.ServiceList{
ListMeta: metav1.ListMeta{
ResourceVersion: "100",
},
Items: []v1.Service{
*newService(metav1.NamespaceDefault, "nginx"),
*newService(metav1.NamespaceSystem, "coredns"),
},
},
},
{
"default namespace",
"/default",
&v1.ServiceList{
ListMeta: metav1.ListMeta{
ResourceVersion: "100",
},
Items: []v1.Service{
*newService(metav1.NamespaceDefault, "nginx"),
},
},
},
} {
serviceList := &v1.ServiceList{}
err := cache.GetList(context.Background(), tc.key, mockListOptions(), serviceList)

assert.Nil(t, err)
assert.Equal(t, tc.expectedServiceList.Items, serviceList.Items)
}
}

func mockListOptions() storage.ListOptions {
Expand All @@ -74,27 +115,17 @@ func mockListOptions() storage.ListOptions {
}
}

func assertCacheGetList(t testing.TB, cache Interface) {
t.Helper()

serviceList := &v1.ServiceList{}
err := cache.GetList(context.Background(), "", mockListOptions(), serviceList)

assert.Nil(t, err)
assert.Equal(t, 1, len(serviceList.Items))
}

func TestResourceCache_Watch(t *testing.T) {
fakeStorage := ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")})

cache, _, err := NewResourceCache(
fakeStorage,
serviceGVR,
&ResourceCacheConfig{
keyFunc,
KeyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
AttrsFunc,
},
)

Expand All @@ -117,7 +148,7 @@ func mockWatchOptions() storage.ListOptions {
}

func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceStorage) {
receive, err := cache.Watch(context.TODO(), "", mockWatchOptions())
receive, err := cache.Watch(context.TODO(), "/kube-system", mockWatchOptions())

go func() {
fs.AddWatchObject(newService(metav1.NamespaceSystem, "coredns2"))
Expand All @@ -127,30 +158,3 @@ func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceSto
event := <-receive.ResultChan()
assert.Equal(t, watch.Added, event.Type)
}

func TestResourceCache_Get(t *testing.T) {
cache, _, err := NewResourceCache(
ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}),
serviceGVR,
&ResourceCacheConfig{
keyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
},
)
assert.Nil(t, err)
assertCacheGet(t, cache)
}

func assertCacheGet(t testing.TB, cache Interface) {
t.Helper()

service := &v1.Service{}
err := cache.Get(context.Background(), "/kube-system/coredns", storage.GetOptions{
ResourceVersion: "1",
}, service)

assert.Nil(t, err)
assert.Equal(t, "coredns", service.Name)
}
39 changes: 39 additions & 0 deletions pkg/yurthub/multiplexer/fake_multiplexer_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package multiplexer

import "k8s.io/apimachinery/pkg/runtime/schema"

type FakeCacheManager struct {
cacheMap map[string]Interface
resourceConfigMap map[string]*ResourceCacheConfig
}

func NewFakeCacheManager(cacheMap map[string]Interface, resourceConfigMap map[string]*ResourceCacheConfig) *FakeCacheManager {
return &FakeCacheManager{
cacheMap: cacheMap,
resourceConfigMap: resourceConfigMap,
}
}

func (fcm *FakeCacheManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) {
return fcm.resourceConfigMap[gvr.String()], nil
}

func (fcm *FakeCacheManager) ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) {
return fcm.cacheMap[gvr.String()], nil, nil
}
10 changes: 4 additions & 6 deletions pkg/yurthub/multiplexer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
ystorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
)

var keyFunc = func(obj runtime.Object) (string, error) {
var KeyFunc = func(obj runtime.Object) (string, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return "", err
Expand All @@ -48,7 +48,7 @@ var keyFunc = func(obj runtime.Object) (string, error) {
return "/" + ns + "/" + name, nil
}

var attrsFunc = func(obj runtime.Object) (labels.Set, fields.Set, error) {
var AttrsFunc = func(obj runtime.Object) (labels.Set, fields.Set, error) {
metadata, err := meta.Accessor(obj)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -93,7 +93,6 @@ func NewRequestsMultiplexerManager(
cacheDestroyFuncMap: make(map[string]func()),
}
}

func (m *multiplexerManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) {
if config, ok := m.cacheConfigMap[gvr.String()]; ok {
return config, nil
Expand Down Expand Up @@ -127,7 +126,6 @@ func (m *multiplexerManager) convertToGVK(gvr *schema.GroupVersionResource) (sch

func (m *multiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind,
listGVK schema.GroupVersionKind) *ResourceCacheConfig {

resourceCacheConfig := &ResourceCacheConfig{
NewFunc: func() runtime.Object {
obj, _ := scheme.Scheme.New(gvk)
Expand All @@ -137,8 +135,8 @@ func (m *multiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind,
objList, _ := scheme.Scheme.New(listGVK)
return objList
},
KeyFunc: keyFunc,
GetAttrsFunc: attrsFunc,
KeyFunc: KeyFunc,
GetAttrsFunc: AttrsFunc,
}

return resourceCacheConfig
Expand Down
Loading

0 comments on commit a2f0237

Please sign in to comment.