diff --git a/Dockerfile b/Dockerfile index a22f6c57..5b3b3fd8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,13 +6,14 @@ # # SPDX-License-Identifier: Apache-2.0 -FROM golang:1.21.9 as builder +FROM golang:1.21.9 AS builder ADD . /build WORKDIR /build -RUN CGO_ENABLED=0 go build -ldflags="-s -w" -o gnmic . +#RUN CGO_ENABLED=0 go build -ldflags="-s -w" -o gnmic . +RUN CGO_ENABLED=0 go build -o gnmic . FROM alpine -LABEL org.opencontainers.image.source https://github.com/openconfig/gnmic +LABEL org.opencontainers.image.source=https://github.com/openconfig/gnmic COPY --from=builder /build/gnmic /app/ WORKDIR /app ENTRYPOINT [ "/app/gnmic" ] diff --git a/docs/user_guide/api/cluster.md b/docs/user_guide/api/cluster.md index 86844bd8..78f2595e 100644 --- a/docs/user_guide/api/cluster.md +++ b/docs/user_guide/api/cluster.md @@ -1,8 +1,10 @@ -## `GET /api/v1/cluster` +# Cluster -Request gNMIc cluster state and details +## /api/v1/cluster -Returns gNMIc cluster state and details +### `GET /api/v1/cluster` + +Request gNMIc cluster state and details. === "Request" ```bash @@ -119,7 +121,94 @@ Returns gNMIc cluster state and details } ``` -## `GET /api/v1/cluster/members` +### `POST /api/v1/cluster/rebalance` + +If the cluster load is not balanced it moves targets from the high load instances to the low load instances. + +=== "Request" + ```bash + curl --request POST gnmic-api-address:port/api/v1/cluster/rebalance + ``` +=== "200 OK" + ``` + ``` +=== "400 Bad Request" + ```json + { + "errors": [ + "not leader" + ] + } + ``` + +### `GET /api/v1/cluster/leader` + +Returns the cluster leader details. + +=== "Request" + ```bash + curl --request POST gnmic-api-address:port/api/v1/cluster/leader + ``` +=== "200 OK" + ```json + [ + { + "name": "clab-telemetry-gnmic1", + "api-endpoint": "http://clab-telemetry-gnmic1:7890", + "is-leader": true, + "number-of-locked-nodes": 23, + "locked-targets": [ + "clab-lab4-leaf8", + "clab-lab5-leaf8", + "clab-lab1-spine2", + "clab-lab3-leaf7", + "clab-lab4-leaf4", + "clab-lab2-leaf8", + "clab-lab2-spine3", + "clab-lab4-leaf1", + "clab-lab4-leaf2", + "clab-lab4-spine3", + "clab-lab5-spine2", + "clab-lab1-spine1", + "clab-lab2-leaf6", + "clab-lab5-leaf7", + "clab-lab1-leaf8", + "clab-lab3-leaf8", + "clab-lab3-spine2", + "clab-lab3-super-spine1", + "clab-lab5-spine1", + "clab-lab2-super-spine2", + "clab-lab3-leaf2", + "clab-lab2-spine2", + "clab-lab4-spine1" + ] + } + ] + ``` +=== "500 Internal Server Error" + ```json + { + "errors": [ + "Error Text" + ] + } + ``` + +### `DELETE /api/v1/cluster/leader` + +Forces the cluster leader to free its lock to allow another instance to become the leader. + +=== "Request" + ```bash + curl --request DELETE gnmic-api-address:port/api/v1/cluster/leader + ``` +=== "200 OK" + ```json + ``` + +## /api/v1/cluster/members + +### `GET /api/v1/cluster/members` Query gNMIc cluster members @@ -235,57 +324,14 @@ Returns a list of gNMIc cluster members with details } ``` -## `GET /api/v1/cluster/leader` +### `POST /api/v1/cluster/members/{id}/drain` -Queries the cluster leader deatils - -Returns details of the gNMIc cluster leader. +Drains the instance `id` from its targets, moving them to the other instances in the cluster. === "Request" ```bash - curl --request POST gnmic-api-address:port/api/v1/cluster/leader + curl --request POST gnmic-api-address:port/api/v1/cluster/members/{id}/drain ``` === "200 OK" ```json - [ - { - "name": "clab-telemetry-gnmic1", - "api-endpoint": "http://clab-telemetry-gnmic1:7890", - "is-leader": true, - "number-of-locked-nodes": 23, - "locked-targets": [ - "clab-lab4-leaf8", - "clab-lab5-leaf8", - "clab-lab1-spine2", - "clab-lab3-leaf7", - "clab-lab4-leaf4", - "clab-lab2-leaf8", - "clab-lab2-spine3", - "clab-lab4-leaf1", - "clab-lab4-leaf2", - "clab-lab4-spine3", - "clab-lab5-spine2", - "clab-lab1-spine1", - "clab-lab2-leaf6", - "clab-lab5-leaf7", - "clab-lab1-leaf8", - "clab-lab3-leaf8", - "clab-lab3-spine2", - "clab-lab3-super-spine1", - "clab-lab5-spine1", - "clab-lab2-super-spine2", - "clab-lab3-leaf2", - "clab-lab2-spine2", - "clab-lab4-spine1" - ] - } - ] - ``` -=== "500 Internal Server Error" - ```json - { - "errors": [ - "Error Text" - ] - } ``` diff --git a/mkdocs.yml b/mkdocs.yml index 457bda74..82855ab2 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -82,9 +82,11 @@ nav: - Duration Convert: user_guide/event_processors/event_duration_convert.md - Extract Tags: user_guide/event_processors/event_extract_tags.md - Group by: user_guide/event_processors/event_group_by.md + - IEEE Float32: user_guide/event_processors/event_ieeefloat32.md - JQ: user_guide/event_processors/event_jq.md - Merge: user_guide/event_processors/event_merge.md - Override TS: user_guide/event_processors/event_override_ts.md + - Plugin: user_guide/event_processors/event_plugin.md - Rate Limit: user_guide/event_processors/event_rate_limit.md - Starlark: user_guide/event_processors/event_starlark.md - Strings: user_guide/event_processors/event_strings.md @@ -104,6 +106,7 @@ nav: - Configuration: user_guide/api/configuration.md - Targets: user_guide/api/targets.md - Cluster: user_guide/api/cluster.md + - Other: user_guide/api/other.md - Golang Package: - Introduction: user_guide/golang_package/intro.md @@ -132,6 +135,8 @@ nav: - Generate: 'cmd/generate.md' - Generate Path: cmd/generate/generate_path.md - Generate Set-Request: cmd/generate/generate_set_request.md + - Processor: cmd/processor.md + - Proxy: cmd/proxy.md - Deployment examples: - Deployments: deployments/deployments_intro.md diff --git a/pkg/app/api.go b/pkg/app/api.go index 9e825767..4217b546 100644 --- a/pkg/app/api.go +++ b/pkg/app/api.go @@ -15,6 +15,8 @@ import ( "fmt" "io" "net/http" + "path/filepath" + "sort" "strings" "github.com/AlekSi/pointer" @@ -304,41 +306,41 @@ func (a *App) handleClusteringGet(w http.ResponseWriter, r *http.Request) { resp := new(clusteringResponse) resp.ClusterName = a.Config.ClusterName - leaderKey := fmt.Sprintf("gnmic/%s/leader", a.Config.ClusterName) - leader, err := a.locker.List(ctx, leaderKey) + var err error + resp.Leader, err = a.getLeaderName(ctx) if err != nil { w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(APIErrors{Errors: []string{err.Error()}}) return } - resp.Leader = leader[leaderKey] - lockedNodesPrefix := fmt.Sprintf("gnmic/%s/targets", a.Config.ClusterName) - lockedNodes, err := a.locker.List(a.ctx, lockedNodesPrefix) + services, err := a.locker.GetServices(ctx, fmt.Sprintf("%s-gnmic-api", a.Config.ClusterName), nil) if err != nil { w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(APIErrors{Errors: []string{err.Error()}}) return } - resp.NumberOfLockedTargets = len(lockedNodes) - services, err := a.locker.GetServices(ctx, fmt.Sprintf("%s-gnmic-api", a.Config.ClusterName), nil) + + instanceNodes, err := a.getInstanceToTargetsMapping(ctx) if err != nil { w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(APIErrors{Errors: []string{err.Error()}}) return } - instanceNodes := make(map[string][]string) - for k, v := range lockedNodes { - name := strings.TrimPrefix(k, fmt.Sprintf("gnmic/%s/targets/", a.Config.ClusterName)) - if _, ok := instanceNodes[v]; !ok { - instanceNodes[v] = make([]string, 0) - } - instanceNodes[v] = append(instanceNodes[v], name) + for _, v := range instanceNodes { + resp.NumberOfLockedTargets += len(v) } + resp.Members = make([]clusterMember, len(services)) for i, s := range services { - resp.Members[i].APIEndpoint = s.Address + scheme := "http://" + for _, t := range s.Tags { + if strings.HasPrefix(t, "protocol=") { + scheme = fmt.Sprintf("%s://", strings.TrimPrefix(t, "protocol=")) + } + } + resp.Members[i].APIEndpoint = fmt.Sprintf("%s%s", scheme, s.Address) resp.Members[i].Name = strings.TrimSuffix(s.ID, "-api") resp.Members[i].IsLeader = resp.Leader == resp.Members[i].Name resp.Members[i].NumberOfLockedTargets = len(instanceNodes[resp.Members[i].Name]) @@ -377,37 +379,26 @@ func (a *App) handleClusteringMembersGet(w http.ResponseWriter, r *http.Request) ctx, cancel := context.WithCancel(r.Context()) defer cancel() // get leader - leaderKey := fmt.Sprintf("gnmic/%s/leader", a.Config.ClusterName) - leader, err := a.locker.List(ctx, leaderKey) + leader, err := a.getLeaderName(ctx) if err != nil { w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(APIErrors{Errors: []string{err.Error()}}) return } - // get locked targets to instance mapping - lockedNodesPrefix := fmt.Sprintf("gnmic/%s/targets", a.Config.ClusterName) - lockedNodes, err := a.locker.List(a.ctx, lockedNodesPrefix) + + services, err := a.locker.GetServices(ctx, fmt.Sprintf("%s-gnmic-api", a.Config.ClusterName), nil) if err != nil { w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(APIErrors{Errors: []string{err.Error()}}) return } - services, err := a.locker.GetServices(ctx, fmt.Sprintf("%s-gnmic-api", a.Config.ClusterName), nil) + instanceNodes, err := a.getInstanceToTargetsMapping(ctx) if err != nil { w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(APIErrors{Errors: []string{err.Error()}}) return } - - instanceNodes := make(map[string][]string) - for k, v := range lockedNodes { - name := strings.TrimPrefix(k, fmt.Sprintf("gnmic/%s/targets/", a.Config.ClusterName)) - if _, ok := instanceNodes[v]; !ok { - instanceNodes[v] = make([]string, 0) - } - instanceNodes[v] = append(instanceNodes[v], name) - } members := make([]clusterMember, len(services)) for i, s := range services { scheme := "http://" @@ -418,7 +409,7 @@ func (a *App) handleClusteringMembersGet(w http.ResponseWriter, r *http.Request) } members[i].APIEndpoint = fmt.Sprintf("%s%s", scheme, s.Address) members[i].Name = strings.TrimSuffix(s.ID, "-api") - members[i].IsLeader = leader[leaderKey] == members[i].Name + members[i].IsLeader = leader == members[i].Name members[i].NumberOfLockedTargets = len(instanceNodes[members[i].Name]) members[i].LockedTargets = instanceNodes[members[i].Name] } @@ -439,40 +430,30 @@ func (a *App) handleClusteringLeaderGet(w http.ResponseWriter, r *http.Request) ctx, cancel := context.WithCancel(r.Context()) defer cancel() // get leader - leaderKey := fmt.Sprintf("gnmic/%s/leader", a.Config.ClusterName) - leader, err := a.locker.List(ctx, leaderKey) + leader, err := a.getLeaderName(ctx) if err != nil { w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(APIErrors{Errors: []string{err.Error()}}) return } - // get locked targets to instance mapping - lockedNodesPrefix := fmt.Sprintf("gnmic/%s/targets", a.Config.ClusterName) - lockedNodes, err := a.locker.List(a.ctx, lockedNodesPrefix) + + services, err := a.locker.GetServices(ctx, fmt.Sprintf("%s-gnmic-api", a.Config.ClusterName), nil) if err != nil { w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(APIErrors{Errors: []string{err.Error()}}) return } - services, err := a.locker.GetServices(ctx, fmt.Sprintf("%s-gnmic-api", a.Config.ClusterName), nil) + instanceNodes, err := a.getInstanceToTargetsMapping(ctx) if err != nil { w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(APIErrors{Errors: []string{err.Error()}}) return } - instanceNodes := make(map[string][]string) - for k, v := range lockedNodes { - name := strings.TrimPrefix(k, fmt.Sprintf("gnmic/%s/targets/", a.Config.ClusterName)) - if _, ok := instanceNodes[v]; !ok { - instanceNodes[v] = make([]string, 0) - } - instanceNodes[v] = append(instanceNodes[v], name) - } members := make([]clusterMember, 1) for _, s := range services { - if strings.TrimSuffix(s.ID, "-api") != leader[leaderKey] { + if strings.TrimSuffix(s.ID, "-api") != leader { continue } scheme := "http://" @@ -498,6 +479,108 @@ func (a *App) handleClusteringLeaderGet(w http.ResponseWriter, r *http.Request) w.Write(b) } +func (a *App) handleClusteringLeaderDelete(w http.ResponseWriter, r *http.Request) { + if a.Config.Clustering == nil { + return + } + + if !a.isLeader { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(APIErrors{Errors: []string{"not leader"}}) + return + } + + err := a.locker.Unlock(r.Context(), a.leaderKey()) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(APIErrors{Errors: []string{err.Error()}}) + return + } +} + +func (a *App) handleClusteringDrainInstance(w http.ResponseWriter, r *http.Request) { + if a.Config.Clustering == nil { + return + } + + if !a.isLeader { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(APIErrors{Errors: []string{"not leader"}}) + return + } + + vars := mux.Vars(r) + id := vars["id"] + if id == "" { + w.WriteHeader(http.StatusBadRequest) + return + } + ctx := r.Context() + services, err := a.locker.GetServices(ctx, fmt.Sprintf("%s-gnmic-api", a.Config.ClusterName), + []string{ + fmt.Sprintf("instance-name=%s", id), + }) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(APIErrors{Errors: []string{err.Error()}}) + return + } + if len(services) == 0 { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(APIErrors{Errors: []string{"unknown instance: " + id}}) + return + } + targets, err := a.getInstanceTargets(ctx, id) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(APIErrors{Errors: []string{err.Error()}}) + return + } + + go func() { + a.dispatchLock.Lock() + defer a.dispatchLock.Unlock() + + for _, t := range targets { + err = a.unassignTarget(a.ctx, t, services[0].ID) + if err != nil { + a.Logger.Printf("failed to unassign target %s: %v", t, err) + continue + } + tc, ok := a.Config.Targets[t] + if !ok { + a.Logger.Printf("could not find target %s config", t) + continue + } + err = a.dispatchTarget(a.ctx, tc, id+"-api") + if err != nil { + a.Logger.Printf("failed to dispatch target %s: %v", t, err) + continue + } + } + }() +} + +func (a *App) handleClusterRebalance(w http.ResponseWriter, r *http.Request) { + if a.Config.Clustering == nil { + return + } + + if !a.isLeader { + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(APIErrors{Errors: []string{"not leader"}}) + return + } + + go func() { + err := a.clusterRebalanceTargets() + if err != nil { + a.Logger.Printf("failed to rebalance: %v", err) + } + }() +} + +// helpers func headersMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Add("Content-Type", "application/json") @@ -525,3 +608,30 @@ func (a *App) handlerCommonGet(w http.ResponseWriter, i interface{}) { } w.Write(b) } + +func (a *App) getLeaderName(ctx context.Context) (string, error) { + leaderKey := fmt.Sprintf("gnmic/%s/leader", a.Config.ClusterName) + leader, err := a.locker.List(ctx, leaderKey) + if err != nil { + return "", nil + } + return leader[leaderKey], nil +} + +func (a *App) getInstanceTargets(ctx context.Context, instance string) ([]string, error) { + locks, err := a.locker.List(ctx, fmt.Sprintf("gnmic/%s/targets", a.Config.Clustering.ClusterName)) + if err != nil { + return nil, err + } + if a.Config.Debug { + a.Logger.Println("current locks:", locks) + } + targets := make([]string, 0) + for k, v := range locks { + if v == instance { + targets = append(targets, filepath.Base(k)) + } + } + sort.Strings(targets) + return targets, nil +} diff --git a/pkg/app/app.go b/pkg/app/app.go index 041e8ffe..a4da6e71 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -81,8 +81,9 @@ type App struct { locker lockers.Locker clusteringClient *http.Client // api - apiServices map[string]*lockers.Service - isLeader bool + apiServices map[string]*lockers.Service + isLeader bool + dispatchLock *sync.Mutex // prometheus registry reg *prometheus.Registry // @@ -131,8 +132,10 @@ func New() *App { activeTargets: make(map[string]struct{}), targetsLockFn: make(map[string]context.CancelFunc), // - router: mux.NewRouter(), - apiServices: make(map[string]*lockers.Service), + router: mux.NewRouter(), + apiServices: make(map[string]*lockers.Service), + dispatchLock: new(sync.Mutex), + Logger: log.New(io.Discard, "[gnmic] ", log.LstdFlags|log.Lmsgprefix), out: os.Stdout, PromptHistory: make([]string, 0, 128), @@ -404,7 +407,7 @@ func (a *App) loadTargets(e fsnotify.Event) { return } // in cluster && leader - dist, err := a.getTargetToInstanceMapping() + dist, err := a.getTargetToInstanceMapping(a.ctx) if err != nil { a.Logger.Printf("failed to get target to instance mapping: %v", err) return diff --git a/pkg/app/clustering.go b/pkg/app/clustering.go index bce50b34..6fee0915 100644 --- a/pkg/app/clustering.go +++ b/pkg/app/clustering.go @@ -18,6 +18,7 @@ import ( "net" "net/http" "path/filepath" + "sort" "strconv" "strings" "time" @@ -33,6 +34,7 @@ const ( lockWaitTime = 100 * time.Millisecond apiServiceName = "gnmic-api" protocolTagName = "__protocol" + maxRebalanceLoop = 100 ) var ( @@ -158,17 +160,19 @@ START: go a.dispatchTargets(ctx) }() - doneCh, errCh := a.locker.KeepLock(a.ctx, leaderKey) + doneCh, errCh := a.locker.KeepLock(ctx, leaderKey) select { case <-doneCh: a.Logger.Printf("%q lost leader role", a.Config.Clustering.InstanceName) cancel() a.isLeader = false + time.Sleep(retryTimer) goto START case err := <-errCh: a.Logger.Printf("%q failed to maintain the leader key: %v", a.Config.Clustering.InstanceName, err) cancel() a.isLeader = false + time.Sleep(retryTimer) goto START case <-a.ctx.Done(): return @@ -262,28 +266,9 @@ func (a *App) dispatchTargets(ctx context.Context) { time.Sleep(a.Config.Clustering.TargetsWatchTimer) continue } - var err error - //a.m.RLock() - dctx, cancel := context.WithTimeout(ctx, a.Config.Clustering.TargetsWatchTimer) - for _, tc := range a.Config.Targets { - err = a.dispatchTarget(dctx, tc) - if err != nil { - a.Logger.Printf("failed to dispatch target %q: %v", tc.Name, err) - } - if err == errNotFound { - // no registered services, - // no need to continue with other targets, - // break from the targets loop - break - } - if err == errNoMoreSuitableServices { - // target has no suitable matching services, - // continue to next target without wait - continue - } - } - //a.m.RUnlock() - cancel() + a.dispatchLock.Lock() + a.dispatchTargetsOnce(ctx) + a.dispatchLock.Unlock() select { case <-ctx.Done(): return @@ -294,7 +279,29 @@ func (a *App) dispatchTargets(ctx context.Context) { } } -func (a *App) dispatchTarget(ctx context.Context, tc *types.TargetConfig) error { +func (a *App) dispatchTargetsOnce(ctx context.Context) { + dctx, cancel := context.WithTimeout(ctx, a.Config.Clustering.TargetsWatchTimer) + defer cancel() + for _, tc := range a.Config.Targets { + err := a.dispatchTarget(dctx, tc) + if err != nil { + a.Logger.Printf("failed to dispatch target %q: %v", tc.Name, err) + } + if err == errNotFound { + // no registered services, + // no need to continue with other targets, + // break from the targets loop + break + } + if err == errNoMoreSuitableServices { + // target has no suitable matching services, + // continue to next target without wait + continue + } + } +} + +func (a *App) dispatchTarget(ctx context.Context, tc *types.TargetConfig, denied ...string) error { if a.Config.Debug { a.Logger.Printf("checking if %q is locked", tc.Name) } @@ -310,7 +317,9 @@ func (a *App) dispatchTarget(ctx context.Context, tc *types.TargetConfig) error return nil } a.Logger.Printf("dispatching target %q", tc.Name) - denied := make([]string, 0) + if denied == nil { + denied = make([]string, 0) + } SELECTSERVICE: service, err := a.selectService(tc.Tags, denied...) if err != nil { @@ -485,8 +494,27 @@ func (a *App) getLowLoadInstance(load map[string]int) string { return ss } -func (a *App) getTargetToInstanceMapping() (map[string]string, error) { - locks, err := a.locker.List(a.ctx, fmt.Sprintf("gnmic/%s/targets", a.Config.Clustering.ClusterName)) +// loop through the current cluster load +// find the instance(s) with the highest and lowest load +func (a *App) getHighAndLowInstance(load map[string]int) (string, string) { + var highIns, lowIns string + var high = -1 + var low = -1 + for s, l := range load { + if high < 0 || l > high { + highIns = s + high = l + } + if low < 0 || l < low { + lowIns = s + low = l + } + } + return highIns, lowIns +} + +func (a *App) getTargetToInstanceMapping(ctx context.Context) (map[string]string, error) { + locks, err := a.locker.List(ctx, fmt.Sprintf("gnmic/%s/targets", a.Config.Clustering.ClusterName)) if err != nil { return nil, err } @@ -500,6 +528,27 @@ func (a *App) getTargetToInstanceMapping() (map[string]string, error) { return locks, nil } +func (a *App) getInstanceToTargetsMapping(ctx context.Context) (map[string][]string, error) { + locks, err := a.locker.List(ctx, fmt.Sprintf("gnmic/%s/targets", a.Config.Clustering.ClusterName)) + if err != nil { + return nil, err + } + if a.Config.Debug { + a.Logger.Println("current locks:", locks) + } + rs := make(map[string][]string) + for k, v := range locks { + if _, ok := rs[v]; !ok { + rs[v] = make([]string, 0) + } + rs[v] = append(rs[v], filepath.Base(k)) + } + for _, ls := range rs { + sort.Strings(ls) + } + return rs, nil +} + func (a *App) getInstancesTagsMatches(tags []string) map[string]int { maxMatch := make(map[string]int) numTags := len(tags) @@ -621,28 +670,20 @@ func (a *App) unassignTarget(ctx context.Context, name string, serviceID string) if err != nil { return err } - for _, s := range a.apiServices { - if s.ID != serviceID { - continue - } + if s, ok := a.apiServices[serviceID]; ok { scheme := a.getServiceScheme(s) url := fmt.Sprintf("%s://%s/api/v1/targets/%s", scheme, s.Address, name) ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) if err != nil { - a.Logger.Printf("failed to create HTTP request: %v", err) - continue + return err } rsp, err := a.clusteringClient.Do(req) if err != nil { - // don't close the body here since Body will be nil - a.Logger.Printf("failed HTTP request: %v", err) - continue + return err } - rsp.Body.Close() a.Logger.Printf("received response code=%d, for DELETE %s", rsp.StatusCode, url) - break } return nil } @@ -692,3 +733,59 @@ func (a *App) createAPIClient() error { } return nil } + +func (a *App) clusterRebalanceTargets() error { + a.dispatchLock.Lock() + defer a.dispatchLock.Unlock() + + rebalanceCount := 0 // counts the number of iterations + maxIter := -1 // stores the maximum expected number of iterations + for { + // get most loaded and least loaded + load, err := a.getInstancesLoad() + if err != nil { + return err + } + highest, lowest := a.getHighAndLowInstance(load) + lowLoad := load[lowest] + highLoad := load[highest] + delta := highLoad - lowLoad + if maxIter < 0 { // set max number of iteration to delta/2 + maxIter = delta / 2 + if maxIter > maxRebalanceLoop { + maxIter = maxRebalanceLoop + } + } + a.Logger.Printf("rebalancing: high instance: %s=%d, low instance %s=%d", highest, highLoad, lowest, lowLoad) + // nothing to do + if delta < 2 { + return nil + } + if rebalanceCount >= maxIter { + return nil + } + // there is some work to do + // get highest load instance targets + highInstanceTargets, err := a.getInstanceTargets(a.ctx, highest) + if err != nil { + return err + } + if len(highInstanceTargets) == 0 { + return nil + } + // pick one and move it to the lowest load instance + err = a.unassignTarget(a.ctx, highInstanceTargets[0], highest+"-api") + if err != nil { + return err + } + tc, ok := a.Config.Targets[highInstanceTargets[0]] + if !ok { + return fmt.Errorf("could not find target %s config", highInstanceTargets[0]) + } + err = a.dispatchTarget(a.ctx, tc) + if err != nil { + return err + } + rebalanceCount++ + } +} diff --git a/pkg/app/routes.go b/pkg/app/routes.go index 8738acc5..155c7f67 100644 --- a/pkg/app/routes.go +++ b/pkg/app/routes.go @@ -25,8 +25,11 @@ func (a *App) routes() { func (a *App) clusterRoutes(r *mux.Router) { r.HandleFunc("/cluster", a.handleClusteringGet).Methods(http.MethodGet) - r.HandleFunc("/cluster/members", a.handleClusteringMembersGet).Methods(http.MethodGet) + r.HandleFunc("/cluster/rebalance", a.handleClusterRebalance).Methods(http.MethodPost) r.HandleFunc("/cluster/leader", a.handleClusteringLeaderGet).Methods(http.MethodGet) + r.HandleFunc("/cluster/leader", a.handleClusteringLeaderDelete).Methods(http.MethodDelete) + r.HandleFunc("/cluster/members", a.handleClusteringMembersGet).Methods(http.MethodGet) + r.HandleFunc("/cluster/members/{id}/drain", a.handleClusteringDrainInstance).Methods(http.MethodPost) } func (a *App) configRoutes(r *mux.Router) { diff --git a/pkg/app/target.go b/pkg/app/target.go index 69b02a55..91fb7795 100644 --- a/pkg/app/target.go +++ b/pkg/app/target.go @@ -72,10 +72,12 @@ func (a *App) DeleteTarget(ctx context.Context, name string) error { if !a.targetConfigExists(name) { return fmt.Errorf("target %q does not exist", name) } - a.configLock.Lock() - delete(a.Config.Targets, name) - a.configLock.Unlock() - a.Logger.Printf("target %q deleted from config", name) + if !a.isLeader { + a.configLock.Lock() + delete(a.Config.Targets, name) + a.configLock.Unlock() + a.Logger.Printf("target %q deleted from config", name) + } // delete from oper map a.operLock.Lock() defer a.operLock.Unlock() diff --git a/pkg/lockers/consul_locker/consul_locker.go b/pkg/lockers/consul_locker/consul_locker.go index ea9de0f1..cbfbc53b 100644 --- a/pkg/lockers/consul_locker/consul_locker.go +++ b/pkg/lockers/consul_locker/consul_locker.go @@ -33,24 +33,24 @@ const ( func init() { lockers.Register("consul", func() lockers.Locker { return &ConsulLocker{ - Cfg: &config{}, - m: new(sync.Mutex), - acquiredlocks: make(map[string]*locks), - attemtinglocks: make(map[string]*locks), - logger: log.New(io.Discard, loggingPrefix, utils.DefaultLoggingFlags), - services: make(map[string]context.CancelFunc), + Cfg: &config{}, + m: new(sync.Mutex), + acquiredlocks: make(map[string]*locks), + attemptinglocks: make(map[string]*locks), + logger: log.New(io.Discard, loggingPrefix, utils.DefaultLoggingFlags), + services: make(map[string]context.CancelFunc), } }) } type ConsulLocker struct { - Cfg *config - client *api.Client - logger *log.Logger - m *sync.Mutex - acquiredlocks map[string]*locks - attemtinglocks map[string]*locks - services map[string]context.CancelFunc + Cfg *config + client *api.Client + logger *log.Logger + m *sync.Mutex + acquiredlocks map[string]*locks + attemptinglocks map[string]*locks + services map[string]context.CancelFunc } type config struct { @@ -114,7 +114,7 @@ func (c *ConsulLocker) Lock(ctx context.Context, key string, val []byte) (bool, defer func() { c.m.Lock() defer c.m.Unlock() - delete(c.attemtinglocks, key) + delete(c.attemptinglocks, key) }() for { select { @@ -138,7 +138,7 @@ func (c *ConsulLocker) Lock(ctx context.Context, key string, val []byte) (bool, continue } c.m.Lock() - c.attemtinglocks[key] = &locks{sessionID: kvPair.Session, doneChan: doneChan} + c.attemptinglocks[key] = &locks{sessionID: kvPair.Session, doneChan: doneChan} c.m.Unlock() acquired, _, err = c.client.KV().Acquire(kvPair, writeOpts) if err != nil { @@ -206,7 +206,7 @@ func (c *ConsulLocker) Unlock(ctx context.Context, key string) error { delete(c.acquiredlocks, key) return nil } - if lock, ok := c.attemtinglocks[key]; ok { + if lock, ok := c.attemptinglocks[key]; ok { close(lock.doneChan) _, err := c.client.Session().Destroy(lock.sessionID, nil) if err != nil { diff --git a/pkg/lockers/consul_locker/consul_registration.go b/pkg/lockers/consul_locker/consul_registration.go index 45aaeb97..5be4b728 100644 --- a/pkg/lockers/consul_locker/consul_registration.go +++ b/pkg/lockers/consul_locker/consul_registration.go @@ -126,9 +126,6 @@ func (c *ConsulLocker) watch(qOpts *api.QueryOptions, serviceName string, tags [ c.logger.Printf("service=%q did not change, lastIndex=%d", serviceName, meta.LastIndex) return meta.LastIndex, nil } - if err != nil { - return meta.LastIndex, err - } if len(se) == 0 { return 1, nil } diff --git a/pkg/lockers/k8s_locker/k8s_locker.go b/pkg/lockers/k8s_locker/k8s_locker.go index 8e053a9b..d51a75d4 100644 --- a/pkg/lockers/k8s_locker/k8s_locker.go +++ b/pkg/lockers/k8s_locker/k8s_locker.go @@ -43,22 +43,22 @@ const ( func init() { lockers.Register("k8s", func() lockers.Locker { return &k8sLocker{ - Cfg: &config{}, - m: new(sync.RWMutex), - acquiredlocks: make(map[string]*lock), - attemtinglocks: make(map[string]*lock), - logger: log.New(io.Discard, loggingPrefix, utils.DefaultLoggingFlags), + Cfg: &config{}, + m: new(sync.RWMutex), + acquiredlocks: make(map[string]*lock), + attemptinglocks: make(map[string]*lock), + logger: log.New(io.Discard, loggingPrefix, utils.DefaultLoggingFlags), } }) } type k8sLocker struct { - Cfg *config - clientset *kubernetes.Clientset - logger *log.Logger - m *sync.RWMutex - acquiredlocks map[string]*lock - attemtinglocks map[string]*lock + Cfg *config + clientset *kubernetes.Clientset + logger *log.Logger + m *sync.RWMutex + acquiredlocks map[string]*lock + attemptinglocks map[string]*lock identity string // hostname } @@ -122,7 +122,7 @@ func (k *k8sLocker) Lock(ctx context.Context, key string, val []byte) (bool, err }, } k.m.Lock() - k.attemtinglocks[nkey] = &lock{ + k.attemptinglocks[nkey] = &lock{ lease: l, doneChan: doneChan, } @@ -131,7 +131,7 @@ func (k *k8sLocker) Lock(ctx context.Context, key string, val []byte) (bool, err defer func() { k.m.Lock() defer k.m.Unlock() - delete(k.attemtinglocks, nkey) + delete(k.attemptinglocks, nkey) }() for { select { @@ -280,8 +280,8 @@ func (k *k8sLocker) unlock(ctx context.Context, key string) error { delete(k.acquiredlocks, key) return k.clientset.CoordinationV1().Leases(k.Cfg.Namespace).Delete(ctx, lock.lease.Name, metav1.DeleteOptions{}) } - if lock, ok := k.attemtinglocks[key]; ok { - delete(k.attemtinglocks, key) + if lock, ok := k.attemptinglocks[key]; ok { + delete(k.attemptinglocks, key) close(lock.doneChan) return k.clientset.CoordinationV1().Leases(k.Cfg.Namespace).Delete(ctx, lock.lease.Name, metav1.DeleteOptions{}) }