Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding comments to cache and generic server #475

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
changelog:
- type: NON_USER_FACING
description: update comments
8 changes: 4 additions & 4 deletions pkg/api/v1/control-plane/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Request = envoy_service_discovery_v3.DiscoveryRequest
//
// roughly copied from https://github.com/envoyproxy/go-control-plane/blob/dcf5642c8e54496938e0311fe9c48e39b609e583/pkg/cache/v3/cache.go#L45
type ConfigWatcher interface {
// CreateWatch returns a new open watch from a non-empty request.
// CreateWatch returns a new open watch from a non-empty request to receive a response.
//
// Value channel produces requested resources, once they are available. If
// the channel is closed prior to cancellation of the watch, an unrecoverable
Expand Down Expand Up @@ -60,15 +60,15 @@ type Cache interface {
GetStatusKeys() []string
}

// Response is a pre-serialized xDS response.
// Response is a pre-serialized xDS response. It contains the updated resources for the original xDS request, the xDS request, and the version.
type Response struct {
// Request is the original request.
// Request is the original xDS request.
Request envoy_service_discovery_v3.DiscoveryRequest

// Version of the resources as tracked by the cache for the given type.
// Proxy responds with this version as an acknowledgement.
Version string

// Resources to be included in the response.
// Resources to be included in the response. Should match the requested resources in the xDS request.
Resources []Resource
}
34 changes: 30 additions & 4 deletions pkg/api/v1/control-plane/cache/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache
}
}

// SetSnapshotCache updates a snapshot for a node.
// SetSnapshotCache sets the node's snapshot in the cache. Then submits a response snapshot for any open watches.
//
// Looks for any response watches awaiting for a response and submits a response based off the snapshots resources.
// A response is sent if the version is different than the request version.
// This will clean the watch from the watches.
//
// Each snapshot will only submit resources based off the requests type URL.
func (cache *snapshotCache) SetSnapshot(node string, snapshot Snapshot) {
cache.mu.Lock()
defer cache.mu.Unlock()
Expand Down Expand Up @@ -217,7 +223,19 @@ func SupersetWithResource(names map[string]Resource, resources map[string]Resour
return nil
}

// CreateWatch returns a watch for an xDS request.
// CreateWatch returns a response channel for a xDS request and the cancel function.
// A response will be generated by the cache, and sent through the response channel asynchronously.
//
// If the cache snapshot does not exist or the version of the request matches the version of the snapshot,
// create a response watch with and store it in the info watches map so that other processes can send
// information once the snapshot resources exist. IE: SetSnapshot()
//
// Else, if the snapshot exists a response will be submitted to the response channel.
//
// The channel can be used to respond to responses prepared for a request.
//
// Each watch (Response channel) is given an unique watch ID.
// The node from the request is used to get the nodeID from the cache, this is used to get the set of response watches for the node.
func (cache *snapshotCache) CreateWatch(request Request) (chan Response, func()) {
nodeID := cache.hash.ID(request.Node)

Expand Down Expand Up @@ -252,8 +270,10 @@ func (cache *snapshotCache) CreateWatch(request Request) (chan Response, func())
request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo)
}
info.mu.Lock()
// check SetSnapshot() for responses on the watches map
info.watches[watchID] = ResponseWatch{Request: request, Response: value}
info.mu.Unlock()
// clean the watch from the node watches
return value, cache.cancelWatch(nodeID, watchID)
}

Expand All @@ -265,11 +285,13 @@ func (cache *snapshotCache) CreateWatch(request Request) (chan Response, func())
}
}

// nextWatchID returns the next watch id
func (cache *snapshotCache) nextWatchID() int64 {
return atomic.AddInt64(&cache.watchCount, 1)
}

// cancellation function for cleaning stale watches
// cancelWatch is used to close the response watch and delete it from the nodes set of watches.
// This cleans the watches.
func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() {
return func() {
// uses the cache mutex
Expand All @@ -286,7 +308,10 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() {
}
}

// Respond to a watch with the snapshot value. The value channel should have capacity not to block.
// respond will create a response and submit it to the value channel. The Response will contain the filtered resources that are requested.
//
// if the cache is ADS and the request contains any resources, the resource list must match all the names in the request resource names.
// Records the request type URL.
// TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46
func (cache *snapshotCache) respond(request Request, value chan Response, resources map[string]Resource, version string) {
// for ADS, the request names must match the snapshot names
Expand All @@ -309,6 +334,7 @@ func (cache *snapshotCache) respond(request Request, value chan Response, resour
value <- createResponse(request, resources, version)
}

// createResponse creates a Response based off the requested resources.
func createResponse(request Request, resources map[string]Resource, version string) Response {
filtered := make([]Resource, 0, len(resources))

Expand Down
2 changes: 2 additions & 0 deletions pkg/api/v1/control-plane/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ func NewResources(version string, items []Resource) Resources {
}
}

// Snapshot is a snashot of a cached set of resources.
type Snapshot interface {
Consistent() error
// MakeConsistent should never be called on a generic snapshot as it is not used for snapshots with dependent resources.
MakeConsistent()
// GetResources will return the resources based off the type.
GetResources(typ string) Resources
// Clone shouldn't be called on a generic snapshot until https://github.com/solo-io/solo-kit/issues/461 is resolved.
Clone() Snapshot
Expand Down
3 changes: 2 additions & 1 deletion pkg/api/v1/control-plane/cache/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type StatusInfo interface {
GetLastWatchRequestTime() time.Time
}

// statusInfo contains all the response watches for a node.
type statusInfo struct {
// node is the constant Envoy node metadata.
node *envoy_config_core_v3.Node
Expand All @@ -60,7 +61,7 @@ type ResponseWatch struct {
// Request is the original request for the watch.
Request Request

// Response is the channel to push response to.
// Response is the channel to push a response to.
Response chan Response
}

Expand Down
77 changes: 61 additions & 16 deletions pkg/api/v1/control-plane/server/generic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,41 +96,57 @@ func NewServer(ctx context.Context, config cache.Cache, callbacks Callbacks) Ser
return &server{ctx: ctx, cache: config, callbacks: callbacks}
}

// server sends requests to the cache to be fullfilled.
// This generates responses asynchronously.
// Responses are submitted back to the Envoy client or fetched from the client.
type server struct {
cache cache.Cache
// cache is an interface to handle resource cache, and to create channels when the resources are updated.
cache cache.Cache
// callbacks are pre and post callback used when responses and requests sent/received.
callbacks Callbacks
ctx context.Context

// ctx is the context in which the server is alive.
ctx context.Context
// streamCount for counting bi-di streams
streamCount int64
}

// singleWatch contians a channel that can be used to watch for new responses from the cache.
type singleWatch struct {
resource chan cache.Response
// resource is the channel used to receive the response.
resource chan cache.Response
// resourceCancel is a function that allows you to close the resource channel.
resourceCancel func()
resourceNonce string
// resourceNonce is the nonce used to identify the response to a request.
resourceNonce string
}

// watches for all xDS resource types
type watches struct {
curerntwatches map[string]singleWatch
// currentwatches are the response channels used for each resource type.
// Currently the only purpose is to cancel the stored channels from the Cancel() function.
currentwatches map[string]singleWatch
}

// newWatches returns an instantiation of watches
func newWatches() *watches {
return &watches{
curerntwatches: map[string]singleWatch{},
currentwatches: map[string]singleWatch{},
}
}

// Cancel all watches
func (values *watches) Cancel() {
for _, v := range values.curerntwatches {
for _, v := range values.currentwatches {
if v.resourceCancel != nil {
v.resourceCancel()
}
}
}

// createResponse will use the response (Envoy Request with updated resources) to serialize the resources and create an Envoy Response.
//
// The response tells Envoy that the current SotW, based off the Envoy Requested resources.
// The response contains its assocaited request.
func createResponse(resp *cache.Response, typeURL string) (*envoy_service_discovery_v3.DiscoveryResponse, error) {
if resp == nil {
return nil, errors.New("missing response")
Expand All @@ -154,11 +170,19 @@ func createResponse(resp *cache.Response, typeURL string) (*envoy_service_discov
return out, nil
}

// TypedResponse contains the response from a xDS request with the typeURL
type TypedResponse struct {
// Response is the response from a request
Response *cache.Response
TypeUrl string
// TypeUrl is the type Url of the xDS request
TypeUrl string
}

// StreamEnvoyV3 will create a request channel that will receive requests from the streams Recv() function.
// It will then set up the processes to handle requests when they are received, so that the server can respond to the requests.
// The defaultTypeURL is used to identify the type of the resources that the Envoy stream is watching for.
//
// process is called to handle both the request and the response to the request. It does this by sending the response onto the stream.
func (s *server) StreamEnvoyV3(
stream StreamEnvoyV3,
defaultTypeURL string,
Expand Down Expand Up @@ -240,6 +264,10 @@ func (s *server) sendSolo(
}
}

// sendEnvoyV3 returns a function that is called to send an Envoy response. The cahe response is used to create an Envoy response and update the nonce.
//
// It will then send the response to the stream.
// This will handle any callbacks on the server as well.
func (s *server) sendEnvoyV3(
stream envoy_service_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer,
) sendFunc {
Expand All @@ -259,7 +287,15 @@ func (s *server) sendEnvoyV3(
}
}

// process handles a bi-di stream request
// process handles both the request and the response of an Envoy request.
//
// For each request submitted onto the request channel, wait for the corresponding response on the response channel.
//
// For each response received from a request submitted, the send function to send the
// response and translates it to an Envoy Response and send it back on the Envoy client.
//
// Requests are received from the servers stream.Recv() function
// Callbacks are handled as well.
func (s *server) process(
ctx context.Context,
send sendFunc,
Expand Down Expand Up @@ -297,6 +333,7 @@ func (s *server) process(
case <-s.ctx.Done():
return nil
// config watcher can send the requested resources types in any order
// responses come from requests submitted to createWatch()
case resp, more := <-responses:
if !more {
return status.Errorf(codes.Unavailable, "watching failed")
Expand All @@ -305,13 +342,14 @@ func (s *server) process(
return status.Errorf(codes.Unavailable, "watching failed for "+resp.TypeUrl)
}
typeurl := resp.TypeUrl
// send the response of a request
nonce, err := send(*resp.Response, typeurl, streamID, &streamNonce)
if err != nil {
return err
}
sw := values.curerntwatches[typeurl]
sw := values.currentwatches[typeurl]
sw.resourceNonce = nonce
values.curerntwatches[typeurl] = sw
values.currentwatches[typeurl] = sw

case req, more := <-reqCh:
// input stream ended or errored out
Expand Down Expand Up @@ -348,20 +386,26 @@ func (s *server) process(

// cancel existing watches to (re-)request a newer version
typeurl := req.TypeUrl
sw := values.curerntwatches[typeurl]
sw := values.currentwatches[typeurl]
if sw.resourceNonce == "" || sw.resourceNonce == nonce {
if sw.resourceCancel != nil {
sw.resourceCancel()
}

sw.resource, sw.resourceCancel = s.createWatch(responses, req)
values.curerntwatches[typeurl] = sw
// wait for a response on the respones channel. Send the request to generate the response asynchronously.
sw.resource, sw.resourceCancel = s.createResponseWatch(responses, req)
values.currentwatches[typeurl] = sw
}
}
}
}

func (s *server) createWatch(responses chan<- TypedResponse, req *cache.Request) (chan cache.Response, func()) {
// createResponseWatch returns a channel for the response of a request and the cancel function.
// A request is used to generate an async responce that is submitted to the respones channel.
//
// It creates a go routine to send responses onto the respones channel.
// If the watch created canceled, then it will close the go routine, else there was an error and a nil response is sent.
func (s *server) createResponseWatch(responses chan<- TypedResponse, req *cache.Request) (chan cache.Response, func()) {
typeurl := req.TypeUrl

watchedResource, cancelwatch := s.cache.CreateWatch(*req)
Expand All @@ -386,6 +430,7 @@ func (s *server) createWatch(responses chan<- TypedResponse, req *cache.Request)
case <-canceled:
// this was canceled. goodbye
return
// receive responses for the requested resources
case response, ok := <-watchedResource:
if !ok {
// resource chan is closed. this may have happened due to cancel,
Expand Down