diff --git a/server/filestore.go b/server/filestore.go index 73dd95269d..1a0f993650 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -517,7 +517,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Do age checks too, make sure to call in place. if fs.cfg.MaxAge != 0 { - fs.expireMsgsOnRecover() + err := fs.expireMsgsOnRecover() + if isPermissionError(err) { + return nil, err + } fs.startAgeChk() } @@ -2090,9 +2093,9 @@ func (fs *fileStore) recoverMsgs() error { // We will treat this differently in case we have a recovery // that will expire alot of messages on startup. // Should only be called on startup. -func (fs *fileStore) expireMsgsOnRecover() { +func (fs *fileStore) expireMsgsOnRecover() error { if fs.state.Msgs == 0 { - return + return nil } var minAge = time.Now().UnixNano() - int64(fs.cfg.MaxAge) @@ -2104,7 +2107,7 @@ func (fs *fileStore) expireMsgsOnRecover() { // usually taken care of by fs.removeMsgBlock() but we do not call that here. var last msgId - deleteEmptyBlock := func(mb *msgBlock) { + deleteEmptyBlock := func(mb *msgBlock) error { // If we are the last keep state to remember first/last sequence. // Do this part by hand since not deleting one by one. if mb == fs.lmb { @@ -2120,8 +2123,12 @@ func (fs *fileStore) expireMsgsOnRecover() { } return true }) - mb.dirtyCloseWithRemove(true) + err := mb.dirtyCloseWithRemove(true) + if isPermissionError(err) { + return err + } deleted++ + return nil } for _, mb := range fs.blks { @@ -2139,8 +2146,11 @@ func (fs *fileStore) expireMsgsOnRecover() { if !mb.fs.cfg.SubjectDeleteMarkers && mb.last.ts <= minAge { purged += mb.msgs bytes += mb.bytes - deleteEmptyBlock(mb) + err := deleteEmptyBlock(mb) mb.mu.Unlock() + if isPermissionError(err) { + return err + } continue } @@ -2274,6 +2284,7 @@ func (fs *fileStore) expireMsgsOnRecover() { if purged > 0 { fs.dirty++ } + return nil } func copyMsgBlocks(src []*msgBlock) []*msgBlock { @@ -3709,6 +3720,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { dios <- struct{}{} if err != nil { + if isPermissionError(err) { + return nil, err + } mb.dirtyCloseWithRemove(true) return nil, fmt.Errorf("Error creating msg block file: %v", err) } @@ -8163,9 +8177,9 @@ func (mb *msgBlock) dirtyClose() { } // Should be called with lock held. -func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { +func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error { if mb == nil { - return + return nil } // Stop cache expiration timer. if mb.ctmr != nil { @@ -8187,13 +8201,20 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { // Clear any tracking by subject if we are removing. mb.fss = nil if mb.mfn != _EMPTY_ { - os.Remove(mb.mfn) + err := os.Remove(mb.mfn) + if isPermissionError(err) { + return err + } mb.mfn = _EMPTY_ } if mb.kfn != _EMPTY_ { - os.Remove(mb.kfn) + err := os.Remove(mb.kfn) + if isPermissionError(err) { + return err + } } } + return nil } // Remove a seq from the fss and select new first. @@ -8610,7 +8631,15 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) { for { select { case <-t.C: - fs.writeFullState() + err := fs.writeFullState() + if isPermissionError(err) && fs.srv != nil { + fs.warn("File system permission denied when flushing stream state, disabling JetStream: %v", err) + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + fs.srv.DisableJetStream() + return + } + case <-qch: return } @@ -8819,7 +8848,11 @@ func (fs *fileStore) _writeFullState(force bool) error { // Protect with dios. <-dios err := os.WriteFile(fn, buf, defaultFilePerms) + // if file system is not writable isPermissionError is set to true dios <- struct{}{} + if isPermissionError(err) { + return err + } // Update dirty if successful. if err == nil { diff --git a/server/filestore_test.go b/server/filestore_test.go index 4f9a761a0b..f6d6e87e0c 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -28,6 +28,7 @@ import ( "errors" "fmt" "io" + "io/fs" "math/bits" "math/rand" "os" @@ -144,9 +145,9 @@ func TestFileStoreBasics(t *testing.T) { func TestFileStoreMsgHeaders(t *testing.T) { testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) defer fs.Stop() - subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World") elen := 22 + len(subj) + 4 + len(hdr) + len(msg) + 8 if sz := int(fileStoreMsgSize(subj, hdr, msg)); sz != elen { @@ -8685,3 +8686,81 @@ func TestFileStoreSubjectDeleteMarkersOnRestart(t *testing.T) { require_Equal(t, bytesToString(getHeader(JSAppliedLimit, sm.hdr)), JSAppliedLimitMaxAge) require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") } + +func TestFileStoreStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond} + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + cfg) + + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("Z"), 1024) + directory := fs.fcfg.StoreDir + ORIGINAL_FILE_MODE, _ := os.Stat(directory) + READONLY_MODE := os.FileMode(0o555) + changeDirectoryPermission(directory, READONLY_MODE) + require_NoError(t, err) + totalMsgs := 10000 + i := 0 + for i = 0; i < totalMsgs; i++ { + _, _, err = fs.StoreMsg("ev.1", nil, msg, 0) + if err != nil { + break + } + } + changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode()) + require_Error(t, err, os.ErrPermission) +} + +func TestFileStoreWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond} + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + cfg) + + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("Z"), 1024) + directory := fs.fcfg.StoreDir + ORIGINAL_FILE_MODE, _ := os.Stat(directory) + READONLY_MODE := os.FileMode(0o555) + require_NoError(t, err) + totalMsgs := 10000 + i := 0 + for i = 0; i < totalMsgs; i++ { + _, _, err = fs.StoreMsg("ev.1", nil, msg, 0) + if err != nil { + break + } + } + changeDirectoryPermission(directory, READONLY_MODE) + err = fs.writeFullState() + changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode()) + require_Error(t, err, os.ErrPermission) +} + +func changeDirectoryPermission(directory string, mode fs.FileMode) error { + err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("error accessing path %q: %w", path, err) + } + + // Check if the path is a directory or file and set permissions accordingly + if info.IsDir() { + err = os.Chmod(path, mode) + if err != nil { + return fmt.Errorf("error changing directory permissions for %q: %w", path, err) + } + } else { + err = os.Chmod(path, mode) + if err != nil { + return fmt.Errorf("error changing file permissions for %q: %w", path, err) + } + } + return nil + }) + return err +} diff --git a/server/jetstream.go b/server/jetstream.go index a0b31cd638..7b58e5b22b 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -3034,3 +3034,14 @@ func fixCfgMirrorWithDedupWindow(cfg *StreamConfig) { cfg.Duplicates = 0 } } + +func (s *Server) handleWritePermissionError() { + //TODO Check if we should add s.jetStreamOOSPending in condition + if s.JetStreamEnabled() { + s.Errorf("File system permission denied while writing, disabling JetStream") + + go s.DisableJetStream() + + //TODO Send respective advisory if needed, same as in handleOutOfSpace + } +} diff --git a/server/raft.go b/server/raft.go index 5ab37facfd..1c472fcaa1 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3978,6 +3978,10 @@ func (n *raft) setWriteErrLocked(err error) { n.error("Critical write error: %v", err) n.werr = err + if isPermissionError(err) { + go n.s.handleWritePermissionError() + } + if isOutOfSpaceErr(err) { // For now since this can be happening all under the covers, we will call up and disable JetStream. go n.s.handleOutOfSpace(nil) diff --git a/server/store.go b/server/store.go index ca0dc18a6c..56252011ea 100644 --- a/server/store.go +++ b/server/store.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "os" "strings" "time" "unsafe" @@ -788,3 +789,7 @@ func copyString(s string) string { copy(b, s) return bytesToString(b) } + +func isPermissionError(err error) bool { + return err != nil && os.IsPermission(err) +} diff --git a/server/stream.go b/server/stream.go index a6eab08c3f..e64d71f3cf 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5143,6 +5143,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } if err != nil { + if isPermissionError(err) { + mset.mu.Unlock() + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + mset.srv.DisableJetStream() + mset.srv.Warnf("Filesystem permission denied while writing msg, disabling JetStream: %v", err) + return err + } // If we did not succeed put those values back and increment clfs in case we are clustered. var state StreamState mset.store.FastState(&state)