Skip to content

Commit

Permalink
Add Consumer priority groups
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Oct 25, 2024
1 parent 7680add commit e3555a4
Show file tree
Hide file tree
Showing 10 changed files with 1,185 additions and 25 deletions.
341 changes: 317 additions & 24 deletions server/consumer.go

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1568,5 +1568,35 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerPriorityPolicyWithoutGroup",
"code": 400,
"error_code": 10159,
"description": "Setting PriorityPolicy requires at least one PriorityGroup to be set",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerInvalidPriorityGroupErr",
"code": 400,
"error_code": 10160,
"description": "Provided priority group does not exist for this consumer",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerEmptyGroupName",
"code": 400,
"error_code": 10161,
"description": "Group name cannot be an empty string",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
126 changes: 126 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ const (
// JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"

// JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
JSApiConsumerUnpin = "$JS.API.CONSUMER.UNPIN.*.*"
JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"

// jsRequestNextPre
jsRequestNextPre = "$JS.API.CONSUMER.MSG.NEXT."

Expand Down Expand Up @@ -274,6 +278,12 @@ const (
// JSAdvisoryConsumerPausePre notification that a consumer paused/unpaused.
JSAdvisoryConsumerPausePre = "$JS.EVENT.ADVISORY.CONSUMER.PAUSE"

// JSAdvisoryConsumerPinnedPre notification that a consumer was pinned.
JSAdvisoryConsumerPinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.PINNED"

// JSAdvisoryConsumerUnpinnedPre notification that a consumer was unpinned.
JSAdvisoryConsumerUnpinnedPre = "$JS.EVENT.ADVISORY.CONSUMER.UNPINNED"

// JSAdvisoryStreamSnapshotCreatePre notification that a snapshot was created.
JSAdvisoryStreamSnapshotCreatePre = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE"

Expand Down Expand Up @@ -503,6 +513,16 @@ type JSApiStreamPurgeResponse struct {

const JSApiStreamPurgeResponseType = "io.nats.jetstream.api.v1.stream_purge_response"

type JSApiConsumerUnpinRequest struct {
Group string `json:"group"`
}

type JSApiConsumerUnpinResponse struct {
ApiResponse
}

const JSApiConsumerUnpinResponseType = "io.nats.jetstream.api.v1.consumer_unpin_response"

// JSApiStreamUpdateResponse for updating a stream.
type JSApiStreamUpdateResponse struct {
ApiResponse
Expand Down Expand Up @@ -740,6 +760,7 @@ type JSApiConsumerGetNextRequest struct {
MaxBytes int `json:"max_bytes,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
PriorityGroup
}

// JSApiStreamTemplateCreateResponse for creating templates.
Expand Down Expand Up @@ -979,6 +1000,7 @@ func (s *Server) setJetStreamExportSubs() error {
{JSApiConsumerInfo, s.jsConsumerInfoRequest},
{JSApiConsumerDelete, s.jsConsumerDeleteRequest},
{JSApiConsumerPause, s.jsConsumerPauseRequest},
{JSApiConsumerUnpin, s.jsConsumerUnpinRequest},
}

js.mu.Lock()
Expand Down Expand Up @@ -3346,6 +3368,110 @@ func (s *Server) jsMsgGetRequest(sub *subscription, c *client, _ *Account, subje
s.sendInternalAccountMsg(nil, reply, s.jsonResponse(resp))
}

func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
return
}

ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
if err != nil {
s.Warnf(badAPIRequestT, msg)
return
}

stream := streamNameFromSubject(subject)
consumer := consumerNameFromSubject(subject)

var req JSApiConsumerUnpinRequest
var resp = JSApiConsumerUnpinResponse{ApiResponse: ApiResponse{Type: JSApiConsumerUnpinResponseType}}

if !isEmptyRequest(msg) {
// FIXME(we need to respondwith errors)
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}

if s.JetStreamIsClustered() {
// Check to make sure the stream is assigned.
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}

// First check if the stream and consumer is there.
sa := js.streamAssignment(acc.Name, stream)
if sa == nil {
resp.Error = NewJSStreamNotFoundError(Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

ca, ok := sa.consumers[consumer]
if !ok || ca == nil {
resp.Error = NewJSConsumerNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

// Then check if we are the leader.
mset, err := acc.lookupStream(stream)
if err != nil {
return
}

o := mset.lookupConsumer(consumer)
if o == nil {
return
}
if !o.isLeader() {
return
}
}

if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}

mset, err := acc.lookupStream(stream)
if err != nil {
resp.Error = NewJSStreamNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
o := mset.lookupConsumer(consumer)
if o == nil {
resp.Error = NewJSConsumerNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

foundPriority := false
for _, group := range o.config().PriorityGroups {
if group == req.Group {
foundPriority = true
break
}
}
if !foundPriority {
resp.Error = NewJSConsumerInvalidPriorityGroupError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

o.mu.Lock()
o.currentPinId = _EMPTY_
o.sendUnpinnedAdvisoryLocked(req.Group, "admin")
o.mu.Unlock()
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}

// Request to purge a stream.
func (s *Server) jsStreamPurgeRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
Expand Down
4 changes: 4 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7358,6 +7358,10 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
cfg.MaxAckPending = JsDefaultMaxAckPending
}

if cfg.PinnedTTL == 0 {
cfg.PinnedTTL = JsDefaultPinnedTTL
}

var ca *consumerAssignment

// See if we have an existing one already under same durable name or
Expand Down
Loading

0 comments on commit e3555a4

Please sign in to comment.