Skip to content

Commit

Permalink
Cherry picks for v2.10.19 RC6 (#5829)
Browse files Browse the repository at this point in the history
Includes:

- #5825
- #5826
- #5821
- #5831
  • Loading branch information
wallyqs authored Aug 27, 2024
2 parents 7ad3b6f + 0b5e1ec commit e5e2d14
Show file tree
Hide file tree
Showing 19 changed files with 545 additions and 44 deletions.
4 changes: 3 additions & 1 deletion internal/ldap/dn_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2011-2015 Michael Mitton ([email protected])
// Portions copyright (c) 2015-2016 go-ldap Authors
// Static-Check Fixes Copyright 2024 The NATS Authors

package ldap

import (
Expand Down Expand Up @@ -53,7 +55,7 @@ func TestSuccessfulDNParsing(t *testing.T) {
for test, answer := range testcases {
dn, err := ParseDN(test)
if err != nil {
t.Errorf(err.Error())
t.Error(err.Error())
continue
}
if !reflect.DeepEqual(dn, &answer) {
Expand Down
4 changes: 2 additions & 2 deletions server/certidp/certidp.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 The NATS Authors
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -222,7 +222,7 @@ func CertOCSPEligible(link *ChainLink) bool {
if link == nil || link.Leaf.Raw == nil || len(link.Leaf.Raw) == 0 {
return false
}
if link.Leaf.OCSPServer == nil || len(link.Leaf.OCSPServer) == 0 {
if len(link.Leaf.OCSPServer) == 0 {
return false
}
urls := getWebEndpoints(link.Leaf.OCSPServer)
Expand Down
7 changes: 4 additions & 3 deletions server/certidp/ocsp_responder.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 The NATS Authors
// Copyright 2023-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -15,6 +15,7 @@ package certidp

import (
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -26,7 +27,7 @@ import (

func FetchOCSPResponse(link *ChainLink, opts *OCSPPeerConfig, log *Log) ([]byte, error) {
if link == nil || link.Leaf == nil || link.Issuer == nil || opts == nil || log == nil {
return nil, fmt.Errorf(ErrInvalidChainlink)
return nil, errors.New(ErrInvalidChainlink)
}

timeout := time.Duration(opts.Timeout * float64(time.Second))
Expand Down Expand Up @@ -59,7 +60,7 @@ func FetchOCSPResponse(link *ChainLink, opts *OCSPPeerConfig, log *Log) ([]byte,
responders := *link.OCSPWebEndpoints

if len(responders) == 0 {
return nil, fmt.Errorf(ErrNoAvailOCSPServers)
return nil, errors.New(ErrNoAvailOCSPServers)
}

var raw []byte
Expand Down
2 changes: 1 addition & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2972,7 +2972,7 @@ func (c *client) addShadowSub(sub *subscription, ime *ime, enact bool) (*subscri
if err := im.acc.sl.Insert(&nsub); err != nil {
errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name)
c.Debugf(errs)
return nil, fmt.Errorf(errs)
return nil, errors.New(errs)
}

// Update our route map here. But only if we are not a leaf node or a hub leafnode.
Expand Down
13 changes: 11 additions & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ type msgBlock struct {
noTrack bool
needSync bool
syncAlways bool
noCompact bool
closed bool

// Used to mock write failures.
Expand Down Expand Up @@ -3959,6 +3960,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
mb.bytes = 0
}

// Allow us to check compaction again.
mb.noCompact = false

// Mark as dirty for stream state.
fs.dirty++

Expand Down Expand Up @@ -4075,7 +4079,7 @@ func (mb *msgBlock) shouldCompactInline() bool {
// Ignores 2MB minimum.
// Lock should be held.
func (mb *msgBlock) shouldCompactSync() bool {
return mb.bytes*2 < mb.rbytes
return mb.bytes*2 < mb.rbytes && !mb.noCompact
}

// This will compact and rewrite this block. This should only be called when we know we want to rewrite this block.
Expand Down Expand Up @@ -4184,7 +4188,12 @@ func (mb *msgBlock) compact() {
mb.needSync = true

// Capture the updated rbytes.
mb.rbytes = uint64(len(nbuf))
if rbytes := uint64(len(nbuf)); rbytes == mb.rbytes {
// No change, so set our noCompact bool here to avoid attempting to continually compress in syncBlocks.
mb.noCompact = true
} else {
mb.rbytes = rbytes
}

// Remove any seqs from the beginning of the blk.
for seq, nfseq := fseq, atomic.LoadUint64(&mb.first.seq); seq < nfseq; seq++ {
Expand Down
77 changes: 77 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7379,6 +7379,83 @@ func TestFileStoreCheckSkipFirstBlockNotLoadOldBlocks(t *testing.T) {
require_Equal(t, loaded, 1)
}

func TestFileStoreSyncCompressOnlyIfDirty(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 256, SyncInterval: 250 * time.Millisecond},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("hello")

// 6 msgs per block.
// Fill 2 blocks.
for i := 0; i < 12; i++ {
fs.StoreMsg("foo.BB", nil, msg)
}
// Create third block with just one message in it.
fs.StoreMsg("foo.BB", nil, msg)

// Should have created 3 blocks.
require_Equal(t, fs.numMsgBlocks(), 3)

// Now delete a bunch that will will fill up 3 block with tombstones.
for _, seq := range []uint64{2, 3, 4, 5, 8, 9, 10, 11} {
_, err = fs.RemoveMsg(seq)
require_NoError(t, err)
}
// Now make sure we add 4th block so syncBlocks will try to compress.
for i := 0; i < 6; i++ {
fs.StoreMsg("foo.BB", nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 4)

// All should have compact set.
fs.mu.Lock()
// Only check first 3 blocks.
for i := 0; i < 3; i++ {
mb := fs.blks[i]
mb.mu.Lock()
shouldCompact := mb.shouldCompactSync()
mb.mu.Unlock()
if !shouldCompact {
fs.mu.Unlock()
t.Fatalf("Expected should compact to be true for %d, got false", mb.getIndex())
}
}
fs.mu.Unlock()

// Let sync run.
time.Sleep(300 * time.Millisecond)

// We want to make sure the last block, which is filled with tombstones and is not compactable, returns false now.
fs.mu.Lock()
for _, mb := range fs.blks {
mb.mu.Lock()
shouldCompact := mb.shouldCompactSync()
mb.mu.Unlock()
if shouldCompact {
fs.mu.Unlock()
t.Fatalf("Expected should compact to be false for %d, got true", mb.getIndex())
}
}
fs.mu.Unlock()

// Now remove some from block 3 and verify that compact is not suppressed.
_, err = fs.RemoveMsg(13)
require_NoError(t, err)

fs.mu.Lock()
mb := fs.blks[2] // block 3.
mb.mu.Lock()
noCompact := mb.noCompact
mb.mu.Unlock()
fs.mu.Unlock()
// Verify that since we deleted a message we should be considered for compaction again in syncBlocks().
require_False(t, noCompact)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
6 changes: 3 additions & 3 deletions server/gateway_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2020 The NATS Authors
// Copyright 2018-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -211,7 +211,7 @@ func waitCh(t *testing.T, ch chan bool, errTxt string) {
case <-ch:
return
case <-time.After(5 * time.Second):
t.Fatalf(errTxt)
t.Fatal(errTxt)
}
}

Expand Down Expand Up @@ -5055,7 +5055,7 @@ func TestGatewayMapReplyOnlyForRecentSub(t *testing.T) {
select {
case e := <-errCh:
if e != nil {
t.Fatalf(e.Error())
t.Fatal(e.Error())
}
case <-time.After(time.Second):
t.Fatalf("Did not get replies")
Expand Down
6 changes: 3 additions & 3 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5857,15 +5857,15 @@ func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
select {
case dl := <-loggers[0].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
errCh <- errors.New(condition)
}
case dl := <-loggers[1].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
errCh <- errors.New(condition)
}
case dl := <-loggers[2].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
errCh <- errors.New(condition)
}
case <-ctx.Done():
return
Expand Down
Loading

0 comments on commit e5e2d14

Please sign in to comment.