Skip to content

Commit

Permalink
add multiplexer proxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyjhtangtang committed Jan 2, 2025
1 parent eed8a55 commit 59483aa
Show file tree
Hide file tree
Showing 40 changed files with 2,176 additions and 171 deletions.
35 changes: 35 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
apiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand All @@ -54,11 +55,26 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter/manager"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer"
"github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/network"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

var AllowedMultiplexerResources = []schema.GroupVersionResource{
{
Group: "",
Version: "v1",
Resource: "services",
},
{
Group: "discovery.k8s.io",
Version: "v1",
Resource: "endpointslices",
},
}

// YurtHubConfiguration represents configuration of yurthub
type YurtHubConfiguration struct {
LBMode string
Expand Down Expand Up @@ -101,6 +117,9 @@ type YurtHubConfiguration struct {
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
HostControlPlaneAddr string // ip:port
PostStartHooks map[string]func() error
RequestMultiplexerManager multiplexer.MultiplexerManager
MultiplexerResources []schema.GroupVersionResource
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -176,6 +195,8 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
LeaderElection: options.LeaderElection,
HostControlPlaneAddr: options.HostControlPlaneAddr,
MultiplexerResources: AllowedMultiplexerResources,
RequestMultiplexerManager: newMultiplexerCacheManager(options),
}

// if yurthub is in local mode, certMgr and networkMgr are no need to start
Expand Down Expand Up @@ -403,3 +424,17 @@ func prepareServerServing(options *options.YurtHubOptions, certMgr certificate.Y

return nil
}

func newMultiplexerCacheManager(options *options.YurtHubOptions) multiplexer.MultiplexerManager {
config := newRestConfig(options.NodeName, options.YurtHubProxyHost, options.YurtHubProxyPort)
rsm := storage.NewStorageManager(config)

return multiplexer.NewRequestsMultiplexerManager(rsm)
}

func newRestConfig(nodeName string, host string, port int) *rest.Config {
return &rest.Config{
Host: fmt.Sprintf("http://%s:%d", host, port),
UserAgent: util.MultiplexerProxyClientUserAgentPrefix + nodeName,
}
}
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error {
var cacheMgr cachemanager.CacheManager
if cfg.WorkingMode == util.WorkingModeEdge {
klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr = cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
cacheMgr = cachemanager.NewCacheManager(cfg.NodeName, cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager, cfg.SharedFactory)
} else {
klog.Infof("%d. disable cache manager for node %s because it is a cloud node", trace, cfg.NodeName)
}
Expand Down
Binary file added cmd/yurthub/yurthub
Binary file not shown.
14 changes: 8 additions & 6 deletions pkg/yurthub/cachemanager/cache_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ const (

type CacheAgent struct {
sync.Mutex
agents sets.Set[string]
store StorageWrapper
agents sets.Set[string]
store StorageWrapper
nodeName string
}

func NewCacheAgents(informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent {
func NewCacheAgents(nodeName string, informerFactory informers.SharedInformerFactory, store StorageWrapper) *CacheAgent {
ca := &CacheAgent{
agents: sets.New(util.DefaultCacheAgents...),
store: store,
agents: sets.New(util.DefaultCacheAgents...).Insert(util.MultiplexerProxyClientUserAgentPrefix + nodeName),
store: store,
nodeName: nodeName,
}
configmapInformer := informerFactory.Core().V1().ConfigMaps().Informer()
configmapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -108,7 +110,7 @@ func (ca *CacheAgent) deleteConfigmap(obj interface{}) {

// updateCacheAgents update cache agents
func (ca *CacheAgent) updateCacheAgents(cacheAgents, action string) sets.Set[string] {
newAgents := sets.New(util.DefaultCacheAgents...)
newAgents := sets.New(util.DefaultCacheAgents...).Insert(util.MultiplexerProxyClientUserAgentPrefix + ca.nodeName)
for _, agent := range strings.Split(cacheAgents, sepForAgent) {
agent = strings.TrimSpace(agent)
if len(agent) != 0 {
Expand Down
13 changes: 7 additions & 6 deletions pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,38 +36,39 @@ func TestUpdateCacheAgents(t *testing.T) {
"two new agents updated": {
initAgents: []string{},
cacheAgents: "agent1,agent2",
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.Set[string]{},
},
"two new agents updated but an old agent deleted": {
initAgents: []string{"agent1", "agent2"},
cacheAgents: "agent2,agent3",
resultAgents: sets.New(append([]string{"agent2", "agent3"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent2", "agent3"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New("agent1"),
},
"no agents updated ": {
initAgents: []string{"agent1", "agent2"},
cacheAgents: "agent1,agent2",
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New[string](),
},
"no agents updated with default": {
initAgents: []string{"agent1", "agent2", "kubelet"},
cacheAgents: "agent1,agent2",
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...),
resultAgents: sets.New(append([]string{"agent1", "agent2"}, util.DefaultCacheAgents...)...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New[string](),
},
"empty agents added ": {
initAgents: []string{},
cacheAgents: "",
resultAgents: sets.New(util.DefaultCacheAgents...),
resultAgents: sets.New(util.DefaultCacheAgents...).Insert("multiplexer-proxy-iz2ze21g5pq9jbesubrksvz"),
deletedAgents: sets.New[string](),
},
}
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
m := &CacheAgent{
agents: sets.New(tt.initAgents...),
agents: sets.New(tt.initAgents...),
nodeName: "iz2ze21g5pq9jbesubrksvz",
}

m.updateCacheAgents(strings.Join(tt.initAgents, ","), "")
Expand Down
3 changes: 2 additions & 1 deletion pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ type cacheManager struct {

// NewCacheManager creates a new CacheManager
func NewCacheManager(
nodeName string,
storagewrapper StorageWrapper,
serializerMgr *serializer.SerializerManager,
restMapperMgr *hubmeta.RESTMapperManager,
sharedFactory informers.SharedInformerFactory,
) CacheManager {
cacheAgents := NewCacheAgents(sharedFactory, storagewrapper)
cacheAgents := NewCacheAgents(nodeName, sharedFactory, storagewrapper)
cm := &cacheManager{
storage: storagewrapper,
serializerManager: serializerMgr,
Expand Down
12 changes: 6 additions & 6 deletions pkg/yurthub/cachemanager/cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestCacheGetResponse(t *testing.T) {
}
sWrapper := NewStorageWrapper(dStorage)
serializerM := serializer.NewSerializerManager()
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj runtime.Object
Expand Down Expand Up @@ -607,7 +607,7 @@ func TestCacheWatchResponse(t *testing.T) {
}
sWrapper := NewStorageWrapper(dStorage)
serializerM := serializer.NewSerializerManager()
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj []watch.Event
Expand Down Expand Up @@ -1014,7 +1014,7 @@ func TestCacheListResponse(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
inputObj runtime.Object
Expand Down Expand Up @@ -1607,7 +1607,7 @@ func TestQueryCacheForGet(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
keyBuildInfo storage.KeyBuildInfo
Expand Down Expand Up @@ -2334,7 +2334,7 @@ func TestQueryCacheForList(t *testing.T) {
if err != nil {
t.Errorf("failed to create RESTMapper manager, %v", err)
}
yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)
yurtCM := NewCacheManager("node1", sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory)

testcases := map[string]struct {
keyBuildInfo *storage.KeyBuildInfo
Expand Down Expand Up @@ -3158,7 +3158,7 @@ func TestCanCacheFor(t *testing.T) {
defer close(stop)
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
m := NewCacheManager(s, nil, nil, informerFactory)
m := NewCacheManager("node1", s, nil, nil, informerFactory)
informerFactory.Start(nil)
cache.WaitForCacheSync(stop, informerFactory.Core().V1().ConfigMaps().Informer().HasSynced)
if tt.preRequest != nil {
Expand Down
53 changes: 53 additions & 0 deletions pkg/yurthub/filter/filterchain/filterchain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2024 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filterchain

import (
"strings"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"

yurtutil "github.com/openyurtio/openyurt/pkg/util"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
)

type FilterChain []filter.ObjectFilter

func (chain FilterChain) Name() string {
var names []string
for i := range chain {
names = append(names, chain[i].Name())
}
return strings.Join(names, ",")
}

func (chain FilterChain) SupportedResourceAndVerbs() map[string]sets.Set[string] {
// do nothing
return map[string]sets.Set[string]{}
}

func (chain FilterChain) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
for i := range chain {
obj = chain[i].Filter(obj, stopCh)
if yurtutil.IsNil(obj) {
break
}
}

return obj
}
5 changes: 5 additions & 0 deletions pkg/yurthub/filter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ type ObjectFilter interface {
Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object
}

type FilterFinder interface {
FindResponseFilter(req *http.Request) (ResponseFilter, bool)
FindObjectFilters(req *http.Request) ObjectFilter
}

type NodeGetter func(name string) (*v1.Node, error)
18 changes: 18 additions & 0 deletions pkg/yurthub/filter/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/approver"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/base"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/filterchain"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/responsefilter"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
Expand Down Expand Up @@ -111,3 +112,20 @@ func (m *Manager) FindResponseFilter(req *http.Request) (filter.ResponseFilter,

return nil, false
}

func (m *Manager) FindObjectFilters(req *http.Request) filter.ObjectFilter {
objectFilters := make([]filter.ObjectFilter, 0)
approved, filterNames := m.Approver.Approve(req)
if !approved {
return nil
}

for i := range filterNames {
if objectFilter, ok := m.nameToObjectFilter[filterNames[i]]; ok {
objectFilters = append(objectFilters, objectFilter)
}
}

filters := filterchain.FilterChain(objectFilters)
return filters
}
1 change: 0 additions & 1 deletion pkg/yurthub/multiplexer/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

type Interface interface {
Watch(ctx context.Context, key string, opts kstorage.ListOptions) (watch.Interface, error)
Get(ctx context.Context, key string, opts kstorage.GetOptions, objPtr runtime.Object) error
GetList(ctx context.Context, key string, opts kstorage.ListOptions, listObj runtime.Object) error
}

Expand Down
Loading

0 comments on commit 59483aa

Please sign in to comment.