Skip to content

Commit

Permalink
add multiplexer proxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyjhtangtang committed Nov 13, 2024
1 parent 7ba1591 commit b388a3b
Show file tree
Hide file tree
Showing 24 changed files with 1,988 additions and 85 deletions.
35 changes: 35 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand All @@ -54,11 +55,26 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var DefaultMultiplexerResources = []schema.GroupVersionResource{
{
Group: "",
Version: "v1",
Resource: "services",
},
{
Group: "discovery.k8s.io",
Version: "v1",
Resource: "endpointslices",
},
}

// YurtHubConfiguration represents configuration of yurthub
type YurtHubConfiguration struct {
LBMode string
Expand Down Expand Up @@ -101,6 +117,9 @@ type YurtHubConfiguration struct {
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
HostControlPlaneAddr string // ip:port
PostStartHooks map[string]func() error
MultiplexerCacheManager multiplexer.MultiplexerManager
MultiplexerResources []schema.GroupVersionResource
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -176,6 +195,8 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
LeaderElection: options.LeaderElection,
HostControlPlaneAddr: options.HostControlPlaneAddr,
MultiplexerResources: DefaultMultiplexerResources,
MultiplexerCacheManager: newMultiplexerCacheManager(options),
}

// if yurthub is in local mode, certMgr and networkMgr are no need to start
Expand Down Expand Up @@ -403,3 +424,17 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y

return nil
}

func newMultiplexerCacheManager(options *options.YurtHubOptions) multiplexer.MultiplexerManager {
config := newRestConfig(options.YurtHubProxyHost, options.YurtHubProxyPort)
rsm := storage.NewStorageManager(config)

return multiplexer.NewRequestsMultiplexerManager(rsm)
}

func newRestConfig(host string, port int) *rest.Config {
return &rest.Config{
Host: fmt.Sprintf("http://%s:%d", host, port),
UserAgent: util.MultiplexerProxyClientUserAgent,
}
}
34 changes: 34 additions & 0 deletions pkg/yurthub/filter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package filter
import (
"io"
"net/http"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"

yurtutil "github.com/openyurtio/openyurt/pkg/util"
)

type NodesInPoolGetter func(poolName string) ([]string, error)
Expand Down Expand Up @@ -59,4 +62,35 @@ type ObjectFilter interface {
Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object
}

type FilterManager interface {
FindResponseFilter(req *http.Request) (ResponseFilter, bool)
FindObjectFilters(req *http.Request) ObjectFilter
}

type NodeGetter func(name string) (*v1.Node, error)

type UnionObjectFilter []ObjectFilter

func (chain UnionObjectFilter) Name() string {
var names []string
for i := range chain {
names = append(names, chain[i].Name())
}
return strings.Join(names, ",")
}

func (chain UnionObjectFilter) SupportedResourceAndVerbs() map[string]sets.Set[string] {
// do nothing
return map[string]sets.Set[string]{}
}

func (chain UnionObjectFilter) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
for i := range chain {
obj = chain[i].Filter(obj, stopCh)
if yurtutil.IsNil(obj) {
break
}
}

return obj
}
17 changes: 17 additions & 0 deletions pkg/yurthub/filter/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,20 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter,

return nil, false
}

func (m *Manager) FindObjectFilters(req *http.Request) filter.ObjectFilter {
objectFilters := make([]filter.ObjectFilter, 0)
approved, filterNames := m.Approver.Approve(req)
if !approved {
return nil
}

for i := range filterNames {
if objectFilter, ok := m.nameToObjectFilter[filterNames[i]]; ok {
objectFilters = append(objectFilters, objectFilter)
}
}

filters := filter.UnionObjectFilter(objectFilters)
return filters
}
1 change: 0 additions & 1 deletion pkg/yurthub/multiplexer/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

type Interface interface {
Watch(ctx context.Context, key string, opts kstorage.ListOptions) (watch.Interface, error)
Get(ctx context.Context, key string, opts kstorage.GetOptions, objPtr runtime.Object) error
GetList(ctx context.Context, key string, opts kstorage.ListOptions, listObj runtime.Object) error
}

Expand Down
96 changes: 50 additions & 46 deletions pkg/yurthub/multiplexer/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,60 @@ var newServiceListFunc = func() runtime.Object {
}

func TestResourceCache_GetList(t *testing.T) {
cache, _, err := NewResourceCache(
ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}),
storage := ystorage.NewFakeServiceStorage(
[]v1.Service{
*newService(metav1.NamespaceSystem, "coredns"),
*newService(metav1.NamespaceDefault, "nginx"),
})

cache, _, _ := NewResourceCache(
storage,
serviceGVR,
&ResourceCacheConfig{
keyFunc,
KeyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
AttrsFunc,
},
)

assert.Nil(t, err)
assertCacheGetList(t, cache)
for _, tc := range []struct {
name string
key string
expectedServiceList *v1.ServiceList
}{
{
"all namespace",
"",
&v1.ServiceList{
ListMeta: metav1.ListMeta{
ResourceVersion: "100",
},
Items: []v1.Service{
*newService(metav1.NamespaceDefault, "nginx"),
*newService(metav1.NamespaceSystem, "coredns"),
},
},
},
{
"default namespace",
"/default",
&v1.ServiceList{
ListMeta: metav1.ListMeta{
ResourceVersion: "100",
},
Items: []v1.Service{
*newService(metav1.NamespaceDefault, "nginx"),
},
},
},
} {
serviceList := &v1.ServiceList{}
err := cache.GetList(context.Background(), tc.key, mockListOptions(), serviceList)

assert.Nil(t, err)
assert.Equal(t, tc.expectedServiceList.Items, serviceList.Items)
}
}

func mockListOptions() storage.ListOptions {
Expand All @@ -74,27 +115,17 @@ func mockListOptions() storage.ListOptions {
}
}

func assertCacheGetList(t testing.TB, cache Interface) {
t.Helper()

serviceList := &v1.ServiceList{}
err := cache.GetList(context.Background(), "", mockListOptions(), serviceList)

assert.Nil(t, err)
assert.Equal(t, 1, len(serviceList.Items))
}

func TestResourceCache_Watch(t *testing.T) {
fakeStorage := ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")})

cache, _, err := NewResourceCache(
fakeStorage,
serviceGVR,
&ResourceCacheConfig{
keyFunc,
KeyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
AttrsFunc,
},
)

Expand All @@ -117,7 +148,7 @@ func mockWatchOptions() storage.ListOptions {
}

func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceStorage) {
receive, err := cache.Watch(context.TODO(), "", mockWatchOptions())
receive, err := cache.Watch(context.TODO(), "/kube-system", mockWatchOptions())

go func() {
fs.AddWatchObject(newService(metav1.NamespaceSystem, "coredns2"))
Expand All @@ -127,30 +158,3 @@ func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceSto
event := <-receive.ResultChan()
assert.Equal(t, watch.Added, event.Type)
}

func TestResourceCache_Get(t *testing.T) {
cache, _, err := NewResourceCache(
ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}),
serviceGVR,
&ResourceCacheConfig{
keyFunc,
newServiceFunc,
newServiceListFunc,
attrsFunc,
},
)
assert.Nil(t, err)
assertCacheGet(t, cache)
}

func assertCacheGet(t testing.TB, cache Interface) {
t.Helper()

service := &v1.Service{}
err := cache.Get(context.Background(), "/kube-system/coredns", storage.GetOptions{
ResourceVersion: "1",
}, service)

assert.Nil(t, err)
assert.Equal(t, "coredns", service.Name)
}
23 changes: 23 additions & 0 deletions pkg/yurthub/multiplexer/fake_multiplexer_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package multiplexer

import "k8s.io/apimachinery/pkg/runtime/schema"

type FakeCacheManager struct {
cacheMap map[string]Interface
resourceConfigMap map[string]*ResourceCacheConfig
}

func NewFakeCacheManager(cacheMap map[string]Interface, resourceConfigMap map[string]*ResourceCacheConfig) *FakeCacheManager {
return &FakeCacheManager{
cacheMap: cacheMap,
resourceConfigMap: resourceConfigMap,
}
}

func (fcm *FakeCacheManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) {
return fcm.resourceConfigMap[gvr.String()], nil
}

func (fcm *FakeCacheManager) ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) {
return fcm.cacheMap[gvr.String()], nil, nil
}
10 changes: 4 additions & 6 deletions pkg/yurthub/multiplexer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
ystorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
)

var keyFunc = func(obj runtime.Object) (string, error) {
var KeyFunc = func(obj runtime.Object) (string, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return "", err
Expand All @@ -48,7 +48,7 @@ var keyFunc = func(obj runtime.Object) (string, error) {
return "/" + ns + "/" + name, nil
}

var attrsFunc = func(obj runtime.Object) (labels.Set, fields.Set, error) {
var AttrsFunc = func(obj runtime.Object) (labels.Set, fields.Set, error) {
metadata, err := meta.Accessor(obj)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -93,7 +93,6 @@ func NewRequestsMultiplexerManager(
cacheDestroyFuncMap: make(map[string]func()),
}
}

func (m *multiplexerManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) {
if config, ok := m.cacheConfigMap[gvr.String()]; ok {
return config, nil
Expand Down Expand Up @@ -127,7 +126,6 @@ func (m *multiplexerManager) convertToGVK(gvr *schema.GroupVersionResource) (sch

func (m *multiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind,
listGVK schema.GroupVersionKind) *ResourceCacheConfig {

resourceCacheConfig := &ResourceCacheConfig{
NewFunc: func() runtime.Object {
obj, _ := scheme.Scheme.New(gvk)
Expand All @@ -137,8 +135,8 @@ func (m *multiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind,
objList, _ := scheme.Scheme.New(listGVK)
return objList
},
KeyFunc: keyFunc,
GetAttrsFunc: attrsFunc,
KeyFunc: KeyFunc,
GetAttrsFunc: AttrsFunc,
}

return resourceCacheConfig
Expand Down
Loading

0 comments on commit b388a3b

Please sign in to comment.