From 97a0df3a9d78480ef1eb1749910a4105f27c43fc Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 28 Aug 2024 13:24:17 +0100 Subject: [PATCH 1/5] Update default file and directory permissions Filestore-owned items don't need to be group-accessible and log files don't need to be group-writable. Reported-by: Trail of Bits Signed-off-by: Neil Twigg --- logger/log.go | 7 +++++-- server/filestore.go | 4 ++-- server/raft.go | 4 ++-- server/server.go | 2 +- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/logger/log.go b/logger/log.go index c0bb7563d4a..c59bffef664 100644 --- a/logger/log.go +++ b/logger/log.go @@ -25,6 +25,9 @@ import ( "time" ) +// Default file permissions for log files. +const defaultLogPerms = os.FileMode(0640) + // Logger is the server logger type Logger struct { sync.Mutex @@ -142,7 +145,7 @@ type fileLogger struct { func newFileLogger(filename, pidPrefix string, time bool) (*fileLogger, error) { fileflags := os.O_WRONLY | os.O_APPEND | os.O_CREATE - f, err := os.OpenFile(filename, fileflags, 0660) + f, err := os.OpenFile(filename, fileflags, defaultLogPerms) if err != nil { return nil, err } @@ -260,7 +263,7 @@ func (l *fileLogger) Write(b []byte) (int, error) { now.Second(), now.Nanosecond()) os.Rename(fname, bak) fileflags := os.O_WRONLY | os.O_APPEND | os.O_CREATE - f, err := os.OpenFile(fname, fileflags, 0660) + f, err := os.OpenFile(fname, fileflags, defaultLogPerms) if err != nil { l.Unlock() panic(fmt.Sprintf("Unable to re-open the logfile %q after rotation: %v", fname, err)) diff --git a/server/filestore.go b/server/filestore.go index 45e7ac17c82..ec66ad28f2c 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -154,8 +154,8 @@ type FileConsumerInfo struct { // Default file and directory permissions. const ( - defaultDirPerms = os.FileMode(0750) - defaultFilePerms = os.FileMode(0640) + defaultDirPerms = os.FileMode(0700) + defaultFilePerms = os.FileMode(0600) ) type psi struct { diff --git a/server/raft.go b/server/raft.go index 1d75ec3a383..cd8d2d11589 100644 --- a/server/raft.go +++ b/server/raft.go @@ -326,7 +326,7 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer // Check the store directory. If we have a memory based WAL we need to make sure the directory is setup. if stat, err := os.Stat(cfg.Store); os.IsNotExist(err) { - if err := os.MkdirAll(cfg.Store, 0750); err != nil { + if err := os.MkdirAll(cfg.Store, defaultDirPerms); err != nil { return fmt.Errorf("raft: could not create storage directory - %v", err) } } else if stat == nil || !stat.IsDir() { @@ -416,7 +416,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe } // Make sure that the snapshots directory exists. - if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), 0750); err != nil { + if err := os.MkdirAll(filepath.Join(n.sd, snapshotsDir), defaultDirPerms); err != nil { return nil, fmt.Errorf("could not create snapshots directory - %v", err) } diff --git a/server/server.go b/server/server.go index 49388d30085..099a466ca81 100644 --- a/server/server.go +++ b/server/server.go @@ -1542,7 +1542,7 @@ func (s *Server) isRunning() bool { func (s *Server) logPid() error { pidStr := strconv.Itoa(os.Getpid()) - return os.WriteFile(s.getOpts().PidFile, []byte(pidStr), 0660) + return os.WriteFile(s.getOpts().PidFile, []byte(pidStr), defaultFilePerms) } // numReservedAccounts will return the number of reserved accounts configured in the server. From 23de885fdb1158858adbbb7ba7ee5a330f915f10 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Oct 2024 16:51:25 +0100 Subject: [PATCH 2/5] Revert "Fix consumer start sequence when sequence not yet in stream" This reverts commit ec54164df357fb842ae2b636c56ad99f059bf418. See discussion nats-io/nats-server#6005. --- server/consumer.go | 16 +++++------- server/jetstream_test.go | 56 ---------------------------------------- 2 files changed, 6 insertions(+), 66 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 9ef8fa1b7ab..849fb1c5362 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4907,16 +4907,12 @@ func (o *consumer) selectStartingSeqNo() { o.sseq = o.cfg.OptStartSeq } - // Only clip the sseq if the OptStartSeq is not provided, otherwise - // it's possible that the stream just doesn't contain OptStartSeq yet. - if o.cfg.OptStartSeq == 0 { - if state.FirstSeq == 0 { - o.sseq = 1 - } else if o.sseq < state.FirstSeq { - o.sseq = state.FirstSeq - } else if o.sseq > state.LastSeq { - o.sseq = state.LastSeq + 1 - } + if state.FirstSeq == 0 { + o.sseq = 1 + } else if o.sseq < state.FirstSeq { + o.sseq = state.FirstSeq + } else if o.sseq > state.LastSeq { + o.sseq = state.LastSeq + 1 } } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index fe95d5009ce..d69ddd29a4d 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22836,62 +22836,6 @@ func TestJetStreamConsumerInfoNumPending(t *testing.T) { require_Equal(t, ci.NumPending, 100) } -func TestJetStreamConsumerStartSequenceNotInStream(t *testing.T) { - // This test is checking that we still correctly set the start - // sequence of a consumer if that start sequence doesn't appear - // in the stream yet. Previously this would have been clipped - // back to between the first and last seq from the stream state. - - s := RunBasicJetStreamServer(t) - defer s.Shutdown() - - nc, js := jsClientConnect(t, s) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"test"}, - }) - require_NoError(t, err) - - sub, err := js.PullSubscribe("test", "test_consumer", nats.StartSequence(10)) - require_NoError(t, err) - - stream, err := s.gacc.lookupStream("TEST") - require_NoError(t, err) - consumer := stream.lookupConsumer("test_consumer") - - func() { - consumer.mu.RLock() - defer consumer.mu.RUnlock() - - require_Equal(t, consumer.dseq, 1) - require_Equal(t, consumer.sseq, 10) - }() - - for i := 1; i <= 10; i++ { - _, err = js.Publish("test", []byte{byte(i)}) - require_NoError(t, err) - } - - msgs, err := sub.Fetch(1) - require_NoError(t, err) - require_Len(t, len(msgs), 1) - require_Equal(t, msgs[0].Data[0], 10) - - require_NoError(t, msgs[0].AckSync()) - - func() { - consumer.mu.RLock() - defer consumer.mu.RUnlock() - - require_Equal(t, consumer.dseq, 2) - require_Equal(t, consumer.adflr, 1) - require_Equal(t, consumer.sseq, 11) - require_Equal(t, consumer.asflr, 10) - }() -} - func TestJetStreamInterestStreamWithDuplicateMessages(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() From 317f2236f3fd7f443556c3afdff83ff2eed2de4f Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Oct 2024 16:52:36 +0100 Subject: [PATCH 3/5] Add unit tests that prove sources and mirrors have their consumer start sequences clipped Signed-off-by: Neil Twigg --- server/jetstream_test.go | 84 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index d69ddd29a4d..a8b10e7fc7c 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22864,3 +22864,87 @@ func TestJetStreamInterestStreamWithDuplicateMessages(t *testing.T) { require_Equal(t, pa.Sequence, 1) require_Equal(t, pa.Duplicate, true) } + +func TestJetStreamSourcingClipStartSeq(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "ORIGIN", + Subjects: []string{"test"}, + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + _, err := js.Publish("test", nil) + require_NoError(t, err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "SOURCING", + Sources: []*nats.StreamSource{ + { + Name: "ORIGIN", + OptStartSeq: 20, + }, + }, + }) + require_NoError(t, err) + + // Wait for sourcing consumer to be created. + time.Sleep(time.Second) + + mset, err := s.GlobalAccount().lookupStream("ORIGIN") + require_NoError(t, err) + require_True(t, mset != nil) + require_Len(t, len(mset.consumers), 1) + for _, o := range mset.consumers { + // Should have been clipped back to below 20 as only + // 10 messages in the origin stream. + require_Equal(t, o.sseq, 11) + } +} + +func TestJetStreamMirroringClipStartSeq(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "ORIGIN", + Subjects: []string{"test"}, + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + _, err := js.Publish("test", nil) + require_NoError(t, err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "MIRRORING", + Mirror: &nats.StreamSource{ + Name: "ORIGIN", + OptStartSeq: 20, + }, + }) + require_NoError(t, err) + + // Wait for mirroring consumer to be created. + time.Sleep(time.Second) + + mset, err := s.GlobalAccount().lookupStream("ORIGIN") + require_NoError(t, err) + require_True(t, mset != nil) + require_Len(t, len(mset.consumers), 1) + for _, o := range mset.consumers { + // Should have been clipped back to below 20 as only + // 10 messages in the origin stream. + require_Equal(t, o.sseq, 11) + } +} From 14c46afb6abe92e92648bda1a960b0eb9df60a32 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Oct 2024 17:31:31 +0100 Subject: [PATCH 4/5] Fix `TestJetStreamClusterConsumeWithStartSequence` to account for clipping of `OptStartSeq` Signed-off-by: Neil Twigg --- server/jetstream_cluster_4_test.go | 38 +++++++++++++++++------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 4d8402c59b1..c74ba05a3e6 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3013,6 +3013,8 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) { // This is the success condition for all sub-tests below var ExpectedMsgId = "" checkMessage := func(t *testing.T, msg *nats.Msg) { + t.Helper() + msgMeta, err := msg.Metadata() require_NoError(t, err) @@ -3025,6 +3027,8 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) { } checkRawMessage := func(t *testing.T, msg *nats.RawStreamMsg) { + t.Helper() + // Check sequence number require_Equal(t, msg.Sequence, ChosenSeq) @@ -3058,7 +3062,23 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) { }) require_NoError(t, err) - // Setup: create subscriptions before stream is populated + // Setup: populate stream + buf := make([]byte, 100) + for i := uint64(1); i <= NumMessages; i++ { + msgId := nuid.Next() + pubAck, err := js.Publish(StreamSubjectPrefix+strconv.Itoa(int(i)), buf, nats.MsgId(msgId)) + require_NoError(t, err) + + // Verify assumption made in tests below + require_Equal(t, pubAck.Sequence, i) + + if i == ChosenSeq { + // Save the expected message id for the chosen message + ExpectedMsgId = msgId + } + } + + // Setup: create subscriptions, needs to be after stream creation or OptStartSeq could be clipped var preCreatedSub, preCreatedSubDurable *nats.Subscription { preCreatedSub, err = js.PullSubscribe( @@ -3094,22 +3114,6 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) { }() } - // Setup: populate stream - buf := make([]byte, 100) - for i := uint64(1); i <= NumMessages; i++ { - msgId := nuid.Next() - pubAck, err := js.Publish(StreamSubjectPrefix+strconv.Itoa(int(i)), buf, nats.MsgId(msgId)) - require_NoError(t, err) - - // Verify assumption made in tests below - require_Equal(t, pubAck.Sequence, i) - - if i == ChosenSeq { - // Save the expected message id for the chosen message - ExpectedMsgId = msgId - } - } - // Tests various ways to consume the stream starting at the ChosenSeq sequence t.Run( From 21470897fc16accd83d7748829751515836134eb Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Oct 2024 16:19:51 +0100 Subject: [PATCH 5/5] Release v2.10.22 Signed-off-by: Neil Twigg --- server/const.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/const.go b/server/const.go index dd350aef31a..69bad3f308a 100644 --- a/server/const.go +++ b/server/const.go @@ -55,7 +55,7 @@ func init() { const ( // VERSION is the current version for the server. - VERSION = "2.10.22-RC.3" + VERSION = "2.10.22" // PROTO is the currently supported protocol. // 0 was the original