Skip to content

Commit

Permalink
Per-message TTLs set metadata API level to 1 (#6363)
Browse files Browse the repository at this point in the history
As per the ADR:

> When either these settings are set the Stream should require API level
`1`.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
derekcollison authored Jan 10, 2025
2 parents 158d7cd + f053ba0 commit bd4be70
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
24 changes: 20 additions & 4 deletions server/jetstream_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,25 @@ const (
// - cfg!=nil, prevCfg!=nil update stream: required metadata is updated
//
// Any dynamic metadata is removed, it must not be stored and only be added for responses.
func setStaticStreamMetadata(cfg *StreamConfig, prevCfg *StreamConfig) {
func setStaticStreamMetadata(cfg *StreamConfig, _ *StreamConfig) {
if cfg.Metadata == nil {
cfg.Metadata = make(map[string]string)
} else {
deleteDynamicMetadata(cfg.Metadata)
}

var requiredApiLevel int
requires := func(level int) {
if level > requiredApiLevel {
requiredApiLevel = level
}
}

// TTLs were added in v2.11 and require API level 1.
if cfg.AllowMsgTTL {
requires(1)
}

cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel)
}

Expand All @@ -59,24 +70,29 @@ func setDynamicStreamMetadata(cfg *StreamConfig) *StreamConfig {
// - cfg!=nil, prevCfg!=nil update consumer: required metadata is updated
//
// Any dynamic metadata is removed, it must not be stored and only be added for responses.
func setStaticConsumerMetadata(cfg *ConsumerConfig, prevCfg *ConsumerConfig) {
func setStaticConsumerMetadata(cfg *ConsumerConfig, _ *ConsumerConfig) {
if cfg.Metadata == nil {
cfg.Metadata = make(map[string]string)
} else {
deleteDynamicMetadata(cfg.Metadata)
}

var requiredApiLevel int
requires := func(level int) {
if level > requiredApiLevel {
requiredApiLevel = level
}
}

// Added in 2.11, absent | zero is the feature is not used.
// one could be stricter and say even if its set but the time
// has already passed it is also not needed to restore the consumer
if cfg.PauseUntil != nil && !cfg.PauseUntil.IsZero() {
requiredApiLevel = 1
requires(1)
}

if cfg.PriorityPolicy != PriorityNone || cfg.PinnedTTL != 0 || len(cfg.PriorityGroups) > 0 {
requiredApiLevel = 1
requires(1)
}

cfg.Metadata[JSRequiredLevelMetadataKey] = strconv.Itoa(requiredApiLevel)
Expand Down
6 changes: 6 additions & 0 deletions server/jetstream_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ func TestJetStreamSetStaticStreamMetadata(t *testing.T) {
prev: &StreamConfig{},
expectedMetadata: metadataAtLevel("0"),
},
{
desc: "create/AllowMsgTTL",
cfg: &StreamConfig{AllowMsgTTL: true},
prev: nil,
expectedMetadata: metadataAtLevel("1"),
},
} {
t.Run(test.desc, func(t *testing.T) {
setStaticStreamMetadata(test.cfg, test.prev)
Expand Down

0 comments on commit bd4be70

Please sign in to comment.