Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for per-message TTLs #6272

Merged
merged 2 commits into from
Dec 24, 2024
Merged

Initial support for per-message TTLs #6272

merged 2 commits into from
Dec 24, 2024

Conversation

neilalexander
Copy link
Member

@neilalexander neilalexander commented Dec 17, 2024

This is an incomplete but somewhat-working implementations of per-message TTLs.

Notes:

  • Adds AllowMsgTTL to stream config;
  • Parses the Nats-TTL message header as a parsable duration, or if not, second precision;
  • Bumps the index.db magic version so that we can add a new field for tracking how many TTL'd messages are in each message block (means that index.db needs to be rebuilt if downgrading);
  • Adds a new thw.db to store the timed hash wheel state, and tries to rebuild it from a linear scan if it is missing or out-of-date.

Future work:

  • Memory store support;
  • Expires "never" including not aging out with MaxAge;
  • Ability to update a TTL for a message;
  • Decide what we want to do for enabling/disabling TTL support;
  • Decide how mirroring/sourcing streams will handle TTL headers.

Signed-off-by: Neil Twigg [email protected]

Copy link
Member

@MauriceVanVeen MauriceVanVeen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Found and fixed two bugs; if there was no index.db the TTLs would not be recovered (with or without thw.db), and if a single message needed to be recovered (when no thw.db) it would not due to an off-by-one.

Still work to be done in future PRs, but think this one is good. 👍

@derekcollison derekcollison self-requested a review December 19, 2024 15:09
Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general looks good.

Let's move hdr to a parseable duration (see inline comments). Also we should reject messages when the stream has not opted in to per msg TTLs.

server/filestore.go Show resolved Hide resolved
server/filestore.go Show resolved Hide resolved
fs.ttls.ExpireTasks(func(seq uint64, ts int64) {
fs.removeMsgViaLimits(seq)
})
nextTTL = fs.ttls.GetNextExpiration(math.MaxInt64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only pass max int if we are not doing max age, if max age then we should capture the next fireIn from the loop above and pass that here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we get this changed as discussed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet, I'm still fiddling with this and want to write a proper test for it. Happy to do in either this PR or in a follow-up one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update this one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done.

server/filestore.go Outdated Show resolved Hide resolved
server/stream.go Outdated Show resolved Hide resolved
server/stream.go Outdated Show resolved Hide resolved
server/stream.go Outdated
@@ -4996,17 +5011,20 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
}

// Find the message TTL if any.
ttl := getMessageTTL(hdr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reject if ttl present and stream is not opted in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can do this, otherwise we can run into a problem when we source/mirror messages with a TTL into another stream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need special processing for streams and mirrors anyway that ignores (for ingest criteria) certain headers. This would be one added to the list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add that to the list of follow-up changes to make.

@neilalexander neilalexander marked this pull request as ready for review December 20, 2024 15:14
@neilalexander neilalexander requested a review from a team as a code owner December 20, 2024 15:14
@derekcollison derekcollison self-requested a review December 20, 2024 15:38
fs.ttls.ExpireTasks(func(seq uint64, ts int64) {
fs.removeMsgViaLimits(seq)
})
nextTTL = fs.ttls.GetNextExpiration(math.MaxInt64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we get this changed as discussed?

server/filestore.go Show resolved Hide resolved
server/filestore.go Show resolved Hide resolved

func TestFileStoreMessageTTL(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), EnforceTTLs: true},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not inherit enforeTTLS from the stream config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In stream.go when creating the store for streams, we do indeed set it from the AllowMsgTTLs value from the stream config. In the filestore-specific tests, like here, we set it to true directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we pass in a stream config in the tests we could do it there, or you feel we should keep the redundancy and have it in two places?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not attached to the duplication at all, will remove it and use the passed-in StreamConfig one.

require_Equal(t, ss.LastSeq, 1)
require_Equal(t, ss.Msgs, 0)
})
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add in test with max age but with msgs with never expire and make sure they do not get expired.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment this PR doesn't have "never expire" behaviour for MaxAge, that's one of the things I have queued up for a follow-up PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok maybe remove the header then from this PR. Confusing as just a placeholder.

@@ -100,6 +100,10 @@ type StreamConfig struct {
// TODO(nat): Can/should we name these better?
ConsumerLimits StreamConsumerLimits `json:"consumer_limits"`

// AllowMsgTTL allows header initiated per-message TTLs. If disabled,
// then the `NATS-TTL` header will be ignored.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update comment if we reject these vs ignore.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment the comment is correct, as of right now the header is passed through and ignored. We probably want to figure out the sourcing/mirroring behaviour at the same time as changing this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw the thread between you and R.I. and Byron. From an app perspective if I set a header and get no feedback it is not honored that is not good IMO.

Yes we need to fixup the sourcing and mirroring to ignore certain headers. It does so already on some but we need a better solution here.

server/stream.go Show resolved Hide resolved
@derekcollison derekcollison self-requested a review December 20, 2024 16:19
@neilalexander neilalexander force-pushed the neil/ttl branch 2 times, most recently from 621a876 to 4d4187b Compare December 23, 2024 13:05
@neilalexander
Copy link
Member Author

Have rebased the PR, I think it's best to deal with rejecting the TTL'd messages in a separate PR at the same time as stripping them from sourcing/mirroring.

Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@neilalexander
Copy link
Member Author

Fixed merge conflicts.

@derekcollison derekcollison merged commit 1a0d55f into main Dec 24, 2024
5 checks passed
@derekcollison derekcollison deleted the neil/ttl branch December 24, 2024 15:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants