Skip to content

Commit

Permalink
Add placement tags and preferred to metaleader stepdown request
Browse files Browse the repository at this point in the history
This makes it possible to nominate a JetStream metaleader transfer to a specific server
or servers that match a specific set of tags. Also factors out some of the logic for making
the preferred leader decision so that it could be reused for stream & consumer stepdowns.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Dec 19, 2024
1 parent 87f42d7 commit da0cbf7
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 75 deletions.
6 changes: 6 additions & 0 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,12 @@ func checkConsumerCfg(
}
}
}

// For now don't allow preferred server in placement.
if cfg.Placement != nil && cfg.Placement.Preferred != _EMPTY_ {
return NewJSStreamInvalidConfigError(fmt.Errorf("preferred server not permitted in placement"))
}

return nil
}

Expand Down
110 changes: 81 additions & 29 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -2987,34 +2986,9 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if req.Placement != nil {
if len(req.Placement.Tags) > 0 {
// Tags currently not supported.
resp.Error = NewJSClusterTagsError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
cn := req.Placement.Cluster
var peers []string
ourID := cc.meta.ID()
for _, p := range cc.meta.Peers() {
if si, ok := s.nodeToInfo.Load(p.ID); ok && si != nil {
if ni := si.(nodeInfo); ni.offline || ni.cluster != cn || p.ID == ourID {
continue
}
peers = append(peers, p.ID)
}
}
if len(peers) == 0 {
resp.Error = NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Randomize and select.
if len(peers) > 1 {
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
}
preferredLeader = peers[0]
if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(cc.meta, req.Placement); resp.Error != nil {
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}

Expand Down Expand Up @@ -3066,6 +3040,84 @@ func isEmptyRequest(req []byte) bool {
return len(vm) == 0
}

// getStepDownPreferredPlacement attempts to work out what the best placement is
// for a stepdown request. The preferred server name always takes precedence, but
// if not specified, the placement will be used to filter by cluster. The caller
// should check for return API errors and return those to the requestor if needed.
func (s *Server) getStepDownPreferredPlacement(group RaftNode, placement *Placement) (string, *ApiError) {
if placement == nil {
return _EMPTY_, nil
}
var preferredLeader string
if placement.Preferred != _EMPTY_ {
for _, p := range group.Peers() {
si, ok := s.nodeToInfo.Load(p.ID)
if !ok || si == nil {
continue
}
if si.(nodeInfo).name == placement.Preferred {
preferredLeader = p.ID
break
}
}
if preferredLeader == group.ID() {
return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q is already leader", placement.Preferred))
}
if preferredLeader == _EMPTY_ {
return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q not known", placement.Preferred))
}
} else {
possiblePeers := make(map[*Peer]nodeInfo, len(group.Peers()))
ourID := group.ID()
for _, p := range group.Peers() {
if p == nil {
continue // ... shouldn't happen.
}
si, ok := s.nodeToInfo.Load(p.ID)
if !ok || si == nil {
continue
}
ni := si.(nodeInfo)
if ni.offline || p.ID == ourID {
continue
}
possiblePeers[p] = ni
}
// If cluster is specified, filter out anything not matching the cluster name.
if placement.Cluster != _EMPTY_ {
for p, si := range possiblePeers {
if si.cluster != placement.Cluster {
delete(possiblePeers, p)
}
}
}
// If tags are specified, filter out anything not matching all supplied tags.
if len(placement.Tags) > 0 {
for p, si := range possiblePeers {
matchesAll := true
for _, tag := range placement.Tags {
if matchesAll = matchesAll && si.tags.Contains(tag); !matchesAll {
break
}
}
if !matchesAll {
delete(possiblePeers, p)
}
}
}
// If there are no possible peers, return an error.
if len(possiblePeers) == 0 {
return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
}
// Take advantage of random map iteration order to select the preferred.
for p := range possiblePeers {
preferredLeader = p.ID
break
}
}
return preferredLeader, nil
}

// Request to delete a stream.
func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
Expand Down
5 changes: 3 additions & 2 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ type inflightInfo struct {

// Used to guide placement of streams and meta controllers in clustered JetStream.
type Placement struct {
Cluster string `json:"cluster,omitempty"`
Tags []string `json:"tags,omitempty"`
Cluster string `json:"cluster,omitempty"`
Tags []string `json:"tags,omitempty"`
Preferred string `json:"preferred,omitempty"`
}

// Define types of the entry.
Expand Down
76 changes: 76 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5153,3 +5153,79 @@ func TestJetStreamClusterExpectedPerSubjectConsistency(t *testing.T) {
require_Len(t, len(mset.expectedPerSubjectSequence), 0)
require_Len(t, len(mset.expectedPerSubjectInProcess), 0)
}

func TestJetStreamClusterMetaStepdownPreferred(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

// We know of the preferred server and will successfully hand over to it.
t.Run("KnownPreferred", func(t *testing.T) {
leader := c.leader()
var preferred *Server
for _, s := range c.servers {
if s == leader {
continue
}
preferred = s
break
}

body, err := json.Marshal(JSApiLeaderStepdownRequest{
Placement: &Placement{
Preferred: preferred.Name(),
},
})
require_NoError(t, err)

resp, err := nc.Request(JSApiLeaderStepDown, body, time.Second)
require_NoError(t, err)

var apiresp JSApiLeaderStepDownResponse
require_NoError(t, json.Unmarshal(resp.Data, &apiresp))
require_True(t, apiresp.Success)
require_Equal(t, apiresp.Error, nil)

c.waitOnLeader()
require_Equal(t, preferred, c.leader())
})

// We don't know of a server that matches that name so the stepdown fails.
t.Run("UnknownPreferred", func(t *testing.T) {
body, err := json.Marshal(JSApiLeaderStepdownRequest{
Placement: &Placement{
Preferred: "i_dont_exist",
},
})
require_NoError(t, err)

resp, err := nc.Request(JSApiLeaderStepDown, body, time.Second)
require_NoError(t, err)

var apiresp JSApiLeaderStepDownResponse
require_NoError(t, json.Unmarshal(resp.Data, &apiresp))
require_False(t, apiresp.Success)
require_NotNil(t, apiresp.Error)
require_Equal(t, ErrorIdentifier(apiresp.Error.ErrCode), JSClusterNoPeersErrF)
})

// The preferred server happens to already be the leader so the stepdown fails.
t.Run("SamePreferred", func(t *testing.T) {
body, err := json.Marshal(JSApiLeaderStepdownRequest{
Placement: &Placement{
Preferred: c.leader().Name(),
},
})
require_NoError(t, err)

resp, err := nc.Request(JSApiLeaderStepDown, body, time.Second)
require_NoError(t, err)

var apiresp ApiResponse
require_NoError(t, json.Unmarshal(resp.Data, &apiresp))
require_NotNil(t, apiresp.Error)
require_Equal(t, ErrorIdentifier(apiresp.Error.ErrCode), JSClusterNoPeersErrF)
})
}
9 changes: 6 additions & 3 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,26 +326,29 @@ func createJetStreamTaggedSuperClusterWithGWProxy(t *testing.T, gwm gwProxyMap)
}

// Make first cluster AWS, US country code.
for _, s := range sc.clusterForName("C1").servers {
for i, s := range sc.clusterForName("C1").servers {
s.optsMu.Lock()
s.opts.Tags.Add("cloud:aws")
s.opts.Tags.Add("country:us")
s.opts.Tags.Add(fmt.Sprintf("node:%d", i+1))
s.optsMu.Unlock()
reset(s)
}
// Make second cluster GCP, UK country code.
for _, s := range sc.clusterForName("C2").servers {
for i, s := range sc.clusterForName("C2").servers {
s.optsMu.Lock()
s.opts.Tags.Add("cloud:gcp")
s.opts.Tags.Add("country:uk")
s.opts.Tags.Add(fmt.Sprintf("node:%d", i+1))
s.optsMu.Unlock()
reset(s)
}
// Make third cluster AZ, JP country code.
for _, s := range sc.clusterForName("C3").servers {
for i, s := range sc.clusterForName("C3").servers {
s.optsMu.Lock()
s.opts.Tags.Add("cloud:az")
s.opts.Tags.Add("country:jp")
s.opts.Tags.Add(fmt.Sprintf("node:%d", i+1))
s.optsMu.Unlock()
reset(s)
}
Expand Down
Loading

0 comments on commit da0cbf7

Please sign in to comment.