Skip to content

Commit

Permalink
Release v2.10.22 (#6013)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored Oct 17, 2024
2 parents 417ca01 + 2147089 commit 240e9a4
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 85 deletions.
7 changes: 5 additions & 2 deletions logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"time"
)

// Default file permissions for log files.
const defaultLogPerms = os.FileMode(0640)

// Logger is the server logger
type Logger struct {
sync.Mutex
Expand Down Expand Up @@ -142,7 +145,7 @@ type fileLogger struct {

func newFileLogger(filename, pidPrefix string, time bool) (*fileLogger, error) {
fileflags := os.O_WRONLY | os.O_APPEND | os.O_CREATE
f, err := os.OpenFile(filename, fileflags, 0660)
f, err := os.OpenFile(filename, fileflags, defaultLogPerms)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -260,7 +263,7 @@ func (l *fileLogger) Write(b []byte) (int, error) {
now.Second(), now.Nanosecond())
os.Rename(fname, bak)
fileflags := os.O_WRONLY | os.O_APPEND | os.O_CREATE
f, err := os.OpenFile(fname, fileflags, 0660)
f, err := os.OpenFile(fname, fileflags, defaultLogPerms)
if err != nil {
l.Unlock()
panic(fmt.Sprintf("Unable to re-open the logfile %q after rotation: %v", fname, err))
Expand Down
2 changes: 1 addition & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func init() {

const (
// VERSION is the current version for the server.
VERSION = "2.10.22-RC.3"
VERSION = "2.10.22"

// PROTO is the currently supported protocol.
// 0 was the original
Expand Down
16 changes: 6 additions & 10 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4907,16 +4907,12 @@ func (o *consumer) selectStartingSeqNo() {
o.sseq = o.cfg.OptStartSeq
}

// Only clip the sseq if the OptStartSeq is not provided, otherwise
// it's possible that the stream just doesn't contain OptStartSeq yet.
if o.cfg.OptStartSeq == 0 {
if state.FirstSeq == 0 {
o.sseq = 1
} else if o.sseq < state.FirstSeq {
o.sseq = state.FirstSeq
} else if o.sseq > state.LastSeq {
o.sseq = state.LastSeq + 1
}
if state.FirstSeq == 0 {
o.sseq = 1
} else if o.sseq < state.FirstSeq {
o.sseq = state.FirstSeq
} else if o.sseq > state.LastSeq {
o.sseq = state.LastSeq + 1
}
}

Expand Down
4 changes: 2 additions & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ type FileConsumerInfo struct {

// Default file and directory permissions.
const (
defaultDirPerms = os.FileMode(0750)
defaultFilePerms = os.FileMode(0640)
defaultDirPerms = os.FileMode(0700)
defaultFilePerms = os.FileMode(0600)
)

type psi struct {
Expand Down
38 changes: 21 additions & 17 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3013,6 +3013,8 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) {
// This is the success condition for all sub-tests below
var ExpectedMsgId = ""
checkMessage := func(t *testing.T, msg *nats.Msg) {
t.Helper()

msgMeta, err := msg.Metadata()
require_NoError(t, err)

Expand All @@ -3025,6 +3027,8 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) {
}

checkRawMessage := func(t *testing.T, msg *nats.RawStreamMsg) {
t.Helper()

// Check sequence number
require_Equal(t, msg.Sequence, ChosenSeq)

Expand Down Expand Up @@ -3058,7 +3062,23 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) {
})
require_NoError(t, err)

// Setup: create subscriptions before stream is populated
// Setup: populate stream
buf := make([]byte, 100)
for i := uint64(1); i <= NumMessages; i++ {
msgId := nuid.Next()
pubAck, err := js.Publish(StreamSubjectPrefix+strconv.Itoa(int(i)), buf, nats.MsgId(msgId))
require_NoError(t, err)

// Verify assumption made in tests below
require_Equal(t, pubAck.Sequence, i)

if i == ChosenSeq {
// Save the expected message id for the chosen message
ExpectedMsgId = msgId
}
}

// Setup: create subscriptions, needs to be after stream creation or OptStartSeq could be clipped
var preCreatedSub, preCreatedSubDurable *nats.Subscription
{
preCreatedSub, err = js.PullSubscribe(
Expand Down Expand Up @@ -3094,22 +3114,6 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) {
}()
}

// Setup: populate stream
buf := make([]byte, 100)
for i := uint64(1); i <= NumMessages; i++ {
msgId := nuid.Next()
pubAck, err := js.Publish(StreamSubjectPrefix+strconv.Itoa(int(i)), buf, nats.MsgId(msgId))
require_NoError(t, err)

// Verify assumption made in tests below
require_Equal(t, pubAck.Sequence, i)

if i == ChosenSeq {
// Save the expected message id for the chosen message
ExpectedMsgId = msgId
}
}

// Tests various ways to consume the stream starting at the ChosenSeq sequence

t.Run(
Expand Down
128 changes: 78 additions & 50 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22836,87 +22836,115 @@ func TestJetStreamConsumerInfoNumPending(t *testing.T) {
require_Equal(t, ci.NumPending, 100)
}

func TestJetStreamConsumerStartSequenceNotInStream(t *testing.T) {
// This test is checking that we still correctly set the start
// sequence of a consumer if that start sequence doesn't appear
// in the stream yet. Previously this would have been clipped
// back to between the first and last seq from the stream state.

func TestJetStreamInterestStreamWithDuplicateMessages(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
})
cfg := &nats.StreamConfig{
Name: "INTEREST",
Subjects: []string{"interest"},
Replicas: 1,
Retention: nats.InterestPolicy,
}
_, err := js.AddStream(cfg)
require_NoError(t, err)

sub, err := js.PullSubscribe("test", "test_consumer", nats.StartSequence(10))
// Publishing the first time should give a sequence, even when there's no interest.
pa, err := js.Publish("interest", nil, nats.MsgId("dedupe"))
require_NoError(t, err)
require_Equal(t, pa.Sequence, 1)
require_Equal(t, pa.Duplicate, false)

stream, err := s.gacc.lookupStream("TEST")
// Publishing a duplicate with no interest should return the same sequence as above.
pa, err = js.Publish("interest", nil, nats.MsgId("dedupe"))
require_NoError(t, err)
consumer := stream.lookupConsumer("test_consumer")
require_Equal(t, pa.Sequence, 1)
require_Equal(t, pa.Duplicate, true)
}

func() {
consumer.mu.RLock()
defer consumer.mu.RUnlock()
func TestJetStreamSourcingClipStartSeq(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

require_Equal(t, consumer.dseq, 1)
require_Equal(t, consumer.sseq, 10)
}()
nc, js := jsClientConnect(t, s)
defer nc.Close()

for i := 1; i <= 10; i++ {
_, err = js.Publish("test", []byte{byte(i)})
_, err := js.AddStream(&nats.StreamConfig{
Name: "ORIGIN",
Subjects: []string{"test"},
})
require_NoError(t, err)

for i := 0; i < 10; i++ {
_, err := js.Publish("test", nil)
require_NoError(t, err)
}

msgs, err := sub.Fetch(1)
_, err = js.AddStream(&nats.StreamConfig{
Name: "SOURCING",
Sources: []*nats.StreamSource{
{
Name: "ORIGIN",
OptStartSeq: 20,
},
},
})
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_Equal(t, msgs[0].Data[0], 10)

require_NoError(t, msgs[0].AckSync())

func() {
consumer.mu.RLock()
defer consumer.mu.RUnlock()
// Wait for sourcing consumer to be created.
time.Sleep(time.Second)

require_Equal(t, consumer.dseq, 2)
require_Equal(t, consumer.adflr, 1)
require_Equal(t, consumer.sseq, 11)
require_Equal(t, consumer.asflr, 10)
}()
mset, err := s.GlobalAccount().lookupStream("ORIGIN")
require_NoError(t, err)
require_True(t, mset != nil)
require_Len(t, len(mset.consumers), 1)
for _, o := range mset.consumers {
// Should have been clipped back to below 20 as only
// 10 messages in the origin stream.
require_Equal(t, o.sseq, 11)
}
}

func TestJetStreamInterestStreamWithDuplicateMessages(t *testing.T) {
func TestJetStreamMirroringClipStartSeq(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

cfg := &nats.StreamConfig{
Name: "INTEREST",
Subjects: []string{"interest"},
Replicas: 1,
Retention: nats.InterestPolicy,
}
_, err := js.AddStream(cfg)
_, err := js.AddStream(&nats.StreamConfig{
Name: "ORIGIN",
Subjects: []string{"test"},
})
require_NoError(t, err)

// Publishing the first time should give a sequence, even when there's no interest.
pa, err := js.Publish("interest", nil, nats.MsgId("dedupe"))
for i := 0; i < 10; i++ {
_, err := js.Publish("test", nil)
require_NoError(t, err)
}

_, err = js.AddStream(&nats.StreamConfig{
Name: "MIRRORING",
Mirror: &nats.StreamSource{
Name: "ORIGIN",
OptStartSeq: 20,
},
})
require_NoError(t, err)
require_Equal(t, pa.Sequence, 1)
require_Equal(t, pa.Duplicate, false)

// Publishing a duplicate with no interest should return the same sequence as above.
pa, err = js.Publish("interest", nil, nats.MsgId("dedupe"))
// Wait for mirroring consumer to be created.
time.Sleep(time.Second)

mset, err := s.GlobalAccount().lookupStream("ORIGIN")
require_NoError(t, err)
require_Equal(t, pa.Sequence, 1)
require_Equal(t, pa.Duplicate, true)
require_True(t, mset != nil)
require_Len(t, len(mset.consumers), 1)
for _, o := range mset.consumers {
// Should have been clipped back to below 20 as only
// 10 messages in the origin stream.
require_Equal(t, o.sseq, 11)
}
}
4 changes: 2 additions & 2 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer

// Check the store directory. If we have a memory based WAL we need to make sure the directory is setup.
if stat, err := os.Stat(cfg.Store); os.IsNotExist(err) {
if err := os.MkdirAll(cfg.Store, 0750); err != nil {
if err := os.MkdirAll(cfg.Store, defaultDirPerms); err != nil {
return fmt.Errorf("raft: could not create storage directory - %v", err)
}
} else if stat == nil || !stat.IsDir() {
Expand Down Expand Up @@ -416,7 +416,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
}

// Make sure that the snapshots directory exists.
if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), 0750); err != nil {
if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), defaultDirPerms); err != nil {
return nil, fmt.Errorf("could not create snapshots directory - %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1542,7 +1542,7 @@ func (s *Server) isRunning() bool {

func (s *Server) logPid() error {
pidStr := strconv.Itoa(os.Getpid())
return os.WriteFile(s.getOpts().PidFile, []byte(pidStr), 0660)
return os.WriteFile(s.getOpts().PidFile, []byte(pidStr), defaultFilePerms)
}

// numReservedAccounts will return the number of reserved accounts configured in the server.
Expand Down

0 comments on commit 240e9a4

Please sign in to comment.