Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable JetStream on disk errors #6292

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 44 additions & 11 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim

// Do age checks too, make sure to call in place.
if fs.cfg.MaxAge != 0 {
fs.expireMsgsOnRecover()
err := fs.expireMsgsOnRecover()
if isPermissionError(err) {
return nil, err
}
fs.startAgeChk()
}

Expand Down Expand Up @@ -2090,9 +2093,9 @@ func (fs *fileStore) recoverMsgs() error {
// We will treat this differently in case we have a recovery
// that will expire alot of messages on startup.
// Should only be called on startup.
func (fs *fileStore) expireMsgsOnRecover() {
func (fs *fileStore) expireMsgsOnRecover() error {
if fs.state.Msgs == 0 {
return
return nil
}

var minAge = time.Now().UnixNano() - int64(fs.cfg.MaxAge)
Expand All @@ -2104,7 +2107,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
// usually taken care of by fs.removeMsgBlock() but we do not call that here.
var last msgId

deleteEmptyBlock := func(mb *msgBlock) {
deleteEmptyBlock := func(mb *msgBlock) error {
// If we are the last keep state to remember first/last sequence.
// Do this part by hand since not deleting one by one.
if mb == fs.lmb {
Expand All @@ -2120,8 +2123,12 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
return true
})
mb.dirtyCloseWithRemove(true)
err := mb.dirtyCloseWithRemove(true)
if isPermissionError(err) {
return err
}
deleted++
return nil
}

for _, mb := range fs.blks {
Expand All @@ -2139,8 +2146,11 @@ func (fs *fileStore) expireMsgsOnRecover() {
if !mb.fs.cfg.SubjectDeleteMarkers && mb.last.ts <= minAge {
purged += mb.msgs
bytes += mb.bytes
deleteEmptyBlock(mb)
err := deleteEmptyBlock(mb)
mb.mu.Unlock()
if isPermissionError(err) {
return err
}
continue
}

Expand Down Expand Up @@ -2274,6 +2284,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
if purged > 0 {
fs.dirty++
}
return nil
}

func copyMsgBlocks(src []*msgBlock) []*msgBlock {
Expand Down Expand Up @@ -3709,6 +3720,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
dios <- struct{}{}

if err != nil {
if isPermissionError(err) {
return nil, err
}
mb.dirtyCloseWithRemove(true)
return nil, fmt.Errorf("Error creating msg block file: %v", err)
}
Expand Down Expand Up @@ -8163,9 +8177,9 @@ func (mb *msgBlock) dirtyClose() {
}

// Should be called with lock held.
func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error {
if mb == nil {
return
return nil
}
// Stop cache expiration timer.
if mb.ctmr != nil {
Expand All @@ -8187,13 +8201,20 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
// Clear any tracking by subject if we are removing.
mb.fss = nil
if mb.mfn != _EMPTY_ {
os.Remove(mb.mfn)
err := os.Remove(mb.mfn)
if isPermissionError(err) {
return err
}
mb.mfn = _EMPTY_
}
if mb.kfn != _EMPTY_ {
os.Remove(mb.kfn)
err := os.Remove(mb.kfn)
if isPermissionError(err) {
return err
}
}
}
return nil
}

// Remove a seq from the fss and select new first.
Expand Down Expand Up @@ -8610,7 +8631,15 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) {
for {
select {
case <-t.C:
fs.writeFullState()
err := fs.writeFullState()
if isPermissionError(err) && fs.srv != nil {
fs.warn("File system permission denied when flushing stream state, disabling JetStream: %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
return
}

case <-qch:
return
}
Expand Down Expand Up @@ -8819,7 +8848,11 @@ func (fs *fileStore) _writeFullState(force bool) error {
// Protect with dios.
<-dios
err := os.WriteFile(fn, buf, defaultFilePerms)
// if file system is not writable isPermissionError is set to true
dios <- struct{}{}
if isPermissionError(err) {
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
return err
}

// Update dirty if successful.
if err == nil {
Expand Down
81 changes: 80 additions & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"math/bits"
"math/rand"
"os"
Expand Down Expand Up @@ -144,9 +145,9 @@ func TestFileStoreBasics(t *testing.T) {
func TestFileStoreMsgHeaders(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil)

require_NoError(t, err)
defer fs.Stop()

subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World")
elen := 22 + len(subj) + 4 + len(hdr) + len(msg) + 8
if sz := int(fileStoreMsgSize(subj, hdr, msg)); sz != elen {
Expand Down Expand Up @@ -8685,3 +8686,81 @@ func TestFileStoreSubjectDeleteMarkersOnRestart(t *testing.T) {
require_Equal(t, bytesToString(getHeader(JSAppliedLimit, sm.hdr)), JSAppliedLimitMaxAge)
require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s")
}

func TestFileStoreStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
cfg)

require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("Z"), 1024)
directory := fs.fcfg.StoreDir
ORIGINAL_FILE_MODE, _ := os.Stat(directory)
READONLY_MODE := os.FileMode(0o555)
changeDirectoryPermission(directory, READONLY_MODE)
require_NoError(t, err)
totalMsgs := 10000
i := 0
for i = 0; i < totalMsgs; i++ {
_, _, err = fs.StoreMsg("ev.1", nil, msg, 0)
if err != nil {
break
}
}
changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode())
require_Error(t, err, os.ErrPermission)
}

func TestFileStoreWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
cfg)

require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("Z"), 1024)
directory := fs.fcfg.StoreDir
ORIGINAL_FILE_MODE, _ := os.Stat(directory)
READONLY_MODE := os.FileMode(0o555)
require_NoError(t, err)
totalMsgs := 10000
i := 0
for i = 0; i < totalMsgs; i++ {
_, _, err = fs.StoreMsg("ev.1", nil, msg, 0)
if err != nil {
break
}
}
changeDirectoryPermission(directory, READONLY_MODE)
err = fs.writeFullState()
changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode())
require_Error(t, err, os.ErrPermission)
}

func changeDirectoryPermission(directory string, mode fs.FileMode) error {
err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("error accessing path %q: %w", path, err)
}

// Check if the path is a directory or file and set permissions accordingly
if info.IsDir() {
err = os.Chmod(path, mode)
if err != nil {
return fmt.Errorf("error changing directory permissions for %q: %w", path, err)
}
} else {
err = os.Chmod(path, mode)
if err != nil {
return fmt.Errorf("error changing file permissions for %q: %w", path, err)
}
}
return nil
})
return err
}
11 changes: 11 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3034,3 +3034,14 @@ func fixCfgMirrorWithDedupWindow(cfg *StreamConfig) {
cfg.Duplicates = 0
}
}

func (s *Server) handleWritePermissionError() {
//TODO Check if we should add s.jetStreamOOSPending in condition
if s.JetStreamEnabled() {
s.Errorf("File system permission denied while writing, disabling JetStream")

go s.DisableJetStream()

//TODO Send respective advisory if needed, same as in handleOutOfSpace
}
}
4 changes: 4 additions & 0 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3978,6 +3978,10 @@ func (n *raft) setWriteErrLocked(err error) {
n.error("Critical write error: %v", err)
n.werr = err

if isPermissionError(err) {
go n.s.handleWritePermissionError()
}

if isOutOfSpaceErr(err) {
// For now since this can be happening all under the covers, we will call up and disable JetStream.
go n.s.handleOutOfSpace(nil)
Expand Down
5 changes: 5 additions & 0 deletions server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"io"
"os"
"strings"
"time"
"unsafe"
Expand Down Expand Up @@ -788,3 +789,7 @@ func copyString(s string) string {
copy(b, s)
return bytesToString(b)
}

func isPermissionError(err error) bool {
return err != nil && os.IsPermission(err)
}
8 changes: 8 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5143,6 +5143,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}

if err != nil {
if isPermissionError(err) {
mset.mu.Unlock()
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
mset.srv.DisableJetStream()
mset.srv.Warnf("Filesystem permission denied while writing msg, disabling JetStream: %v", err)
return err
}
// If we did not succeed put those values back and increment clfs in case we are clustered.
var state StreamState
mset.store.FastState(&state)
Expand Down
Loading