From 20b1f1df6c196cf9090d636d7d42d57291ab0872 Mon Sep 17 00:00:00 2001 From: souravagrawal Date: Sat, 21 Dec 2024 22:54:18 +0530 Subject: [PATCH 1/3] Disable jetstream if filesystem permission denied detected during write Signed-off-by: Sourabh Agrawal --- server/filestore.go | 56 +++++++++++++++++++++++++++++++++++++-------- server/opts.go | 7 +++++- server/stream.go | 9 ++++++++ 3 files changed, 61 insertions(+), 11 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 73dd95269d7..cb4c4ddfeb8 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -69,6 +69,8 @@ type FileStoreConfig struct { Cipher StoreCipher // Compression is the algorithm to use when compressing. Compression StoreCompression + // Allows disabling jetstream when fs is not writable + JetStreamDisableOnDiskError bool // Internal reference to our server. srv *Server @@ -517,7 +519,14 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Do age checks too, make sure to call in place. if fs.cfg.MaxAge != 0 { - fs.expireMsgsOnRecover() + err := fs.expireMsgsOnRecover() + if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError { + fs.srv.Warnf("file system permission denied while expiring msgs, disabling jetstream: %v", err) + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + fs.srv.DisableJetStream() + return nil, err + } fs.startAgeChk() } @@ -2090,9 +2099,9 @@ func (fs *fileStore) recoverMsgs() error { // We will treat this differently in case we have a recovery // that will expire alot of messages on startup. // Should only be called on startup. -func (fs *fileStore) expireMsgsOnRecover() { +func (fs *fileStore) expireMsgsOnRecover() error { if fs.state.Msgs == 0 { - return + return nil } var minAge = time.Now().UnixNano() - int64(fs.cfg.MaxAge) @@ -2104,7 +2113,7 @@ func (fs *fileStore) expireMsgsOnRecover() { // usually taken care of by fs.removeMsgBlock() but we do not call that here. var last msgId - deleteEmptyBlock := func(mb *msgBlock) { + deleteEmptyBlock := func(mb *msgBlock) error { // If we are the last keep state to remember first/last sequence. // Do this part by hand since not deleting one by one. if mb == fs.lmb { @@ -2120,8 +2129,12 @@ func (fs *fileStore) expireMsgsOnRecover() { } return true }) - mb.dirtyCloseWithRemove(true) + err := mb.dirtyCloseWithRemove(true) + if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError{ + return err + } deleted++ + return nil } for _, mb := range fs.blks { @@ -2139,7 +2152,10 @@ func (fs *fileStore) expireMsgsOnRecover() { if !mb.fs.cfg.SubjectDeleteMarkers && mb.last.ts <= minAge { purged += mb.msgs bytes += mb.bytes - deleteEmptyBlock(mb) + err := deleteEmptyBlock(mb) + if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError{ + return err + } mb.mu.Unlock() continue } @@ -2274,6 +2290,7 @@ func (fs *fileStore) expireMsgsOnRecover() { if purged > 0 { fs.dirty++ } + return nil } func copyMsgBlocks(src []*msgBlock) []*msgBlock { @@ -3708,6 +3725,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms) dios <- struct{}{} + if err != nil && os.IsPermission(err) && fs.fcfg.JetStreamDisableOnDiskError { + return nil, err + } if err != nil { mb.dirtyCloseWithRemove(true) return nil, fmt.Errorf("Error creating msg block file: %v", err) @@ -6697,6 +6717,7 @@ var ( errNoMainKey = errors.New("encrypted store encountered with no main key") errNoBlkData = errors.New("message block data missing") errStateTooBig = errors.New("store state too big for optional write") + errFileSystemPermissionDenied = errors.New("storage directory not writeable") ) const ( @@ -8163,9 +8184,9 @@ func (mb *msgBlock) dirtyClose() { } // Should be called with lock held. -func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { +func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error { if mb == nil { - return + return nil } // Stop cache expiration timer. if mb.ctmr != nil { @@ -8187,13 +8208,20 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { // Clear any tracking by subject if we are removing. mb.fss = nil if mb.mfn != _EMPTY_ { - os.Remove(mb.mfn) + err := os.Remove(mb.mfn) + if err != nil && os.IsPermission(err) && mb.fs.fcfg.JetStreamDisableOnDiskError{ + return errFileSystemPermissionDenied + } mb.mfn = _EMPTY_ } if mb.kfn != _EMPTY_ { - os.Remove(mb.kfn) + err := os.Remove(mb.kfn) + if err != nil && os.IsPermission(err) && mb.fs.fcfg.JetStreamDisableOnDiskError{ + return errFileSystemPermissionDenied + } } } + return nil } // Remove a seq from the fss and select new first. @@ -8819,6 +8847,14 @@ func (fs *fileStore) _writeFullState(force bool) error { // Protect with dios. <-dios err := os.WriteFile(fn, buf, defaultFilePerms) + // if file system is not writable os.IsPermission is set to true + if err != nil && os.IsPermission(err) && fs.fcfg.JetStreamDisableOnDiskError { + fs.warn("file system permission denied when flushing stream state, disabling jetstream %v", err) + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + fs.srv.DisableJetStream() + return err + } dios <- struct{}{} // Update dirty if successful. diff --git a/server/opts.go b/server/opts.go index 172377253ee..45b41f00fef 100644 --- a/server/opts.go +++ b/server/opts.go @@ -338,6 +338,7 @@ type Options struct { JetStreamMaxCatchup int64 JetStreamRequestQueueLimit int64 StreamMaxBufferedMsgs int `json:"-"` + JetStreamDisableOnDiskError bool `json:"-"` StreamMaxBufferedSize int64 `json:"-"` StoreDir string `json:"-"` SyncInterval time.Duration `json:"-"` @@ -2445,6 +2446,10 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } opts.JetStreamRequestQueueLimit = lim + case "disable_js_on_disk_error": + if v, ok := mv.(bool); ok { + opts.JetStreamDisableOnDiskError = v + } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -6086,4 +6091,4 @@ func expandPath(p string) (string, error) { } return filepath.Join(home, p[1:]), nil -} +} \ No newline at end of file diff --git a/server/stream.go b/server/stream.go index a6eab08c3f6..e1459e415a8 100644 --- a/server/stream.go +++ b/server/stream.go @@ -730,6 +730,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt fsCfg.SyncInterval = s.getOpts().SyncInterval fsCfg.SyncAlways = s.getOpts().SyncAlways fsCfg.Compression = config.Compression + fsCfg.JetStreamDisableOnDiskError = s.getOpts().JetStreamDisableOnDiskError if err := mset.setupStore(fsCfg); err != nil { mset.stop(true, false) @@ -5141,6 +5142,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } err = store.StoreRawMsg(subject, hdr, msg, seq, ts, ttl) } + if err != nil && os.IsPermission(err) && mset.srv.getOpts().JetStreamDisableOnDiskError { + mset.mu.Unlock() + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + mset.srv.DisableJetStream() + mset.srv.Warnf("file system permission denied while writing msg, disabling jetstream: %v", err) + return err + } if err != nil { // If we did not succeed put those values back and increment clfs in case we are clustered. From 068bd41ea442c87b7429f4b17cbcdcfdda37f497 Mon Sep 17 00:00:00 2001 From: souravagrawal Date: Sat, 21 Dec 2024 22:54:18 +0530 Subject: [PATCH 2/3] Disable jetstream if filesystem permission denied detected during write Signed-off-by: Sourabh Agrawal --- server/filestore.go | 35 +++++++++-------- server/filestore_test.go | 81 +++++++++++++++++++++++++++++++++++++++- server/opts.go | 5 --- server/stream.go | 17 ++++----- 4 files changed, 107 insertions(+), 31 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index cb4c4ddfeb8..f2005c84b2c 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -69,8 +69,6 @@ type FileStoreConfig struct { Cipher StoreCipher // Compression is the algorithm to use when compressing. Compression StoreCompression - // Allows disabling jetstream when fs is not writable - JetStreamDisableOnDiskError bool // Internal reference to our server. srv *Server @@ -520,7 +518,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Do age checks too, make sure to call in place. if fs.cfg.MaxAge != 0 { err := fs.expireMsgsOnRecover() - if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError { + if err != nil && err == errFileSystemPermissionDenied { fs.srv.Warnf("file system permission denied while expiring msgs, disabling jetstream: %v", err) // messages in block cache could be lost in the worst case. // In the clustered mode it is very highly unlikely as a result of replication. @@ -2130,7 +2128,7 @@ func (fs *fileStore) expireMsgsOnRecover() error { return true }) err := mb.dirtyCloseWithRemove(true) - if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError{ + if err != nil && err == errFileSystemPermissionDenied { return err } deleted++ @@ -2153,7 +2151,8 @@ func (fs *fileStore) expireMsgsOnRecover() error { purged += mb.msgs bytes += mb.bytes err := deleteEmptyBlock(mb) - if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError{ + if err != nil && err == errFileSystemPermissionDenied { + mb.mu.Unlock() return err } mb.mu.Unlock() @@ -3725,10 +3724,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms) dios <- struct{}{} - if err != nil && os.IsPermission(err) && fs.fcfg.JetStreamDisableOnDiskError { - return nil, err - } if err != nil { + if os.IsPermission(err) { + return nil, err + } mb.dirtyCloseWithRemove(true) return nil, fmt.Errorf("Error creating msg block file: %v", err) } @@ -8209,14 +8208,14 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error { mb.fss = nil if mb.mfn != _EMPTY_ { err := os.Remove(mb.mfn) - if err != nil && os.IsPermission(err) && mb.fs.fcfg.JetStreamDisableOnDiskError{ + if err != nil && os.IsPermission(err){ return errFileSystemPermissionDenied } mb.mfn = _EMPTY_ } if mb.kfn != _EMPTY_ { err := os.Remove(mb.kfn) - if err != nil && os.IsPermission(err) && mb.fs.fcfg.JetStreamDisableOnDiskError{ + if err != nil && os.IsPermission(err){ return errFileSystemPermissionDenied } } @@ -8638,7 +8637,15 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) { for { select { case <-t.C: - fs.writeFullState() + err := fs.writeFullState() + if err != nil && os.IsPermission(err) { + fs.warn("file system permission denied when flushing stream state, disabling jetstream %v", err) + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + fs.srv.DisableJetStream() + return + } + case <-qch: return } @@ -8848,11 +8855,7 @@ func (fs *fileStore) _writeFullState(force bool) error { <-dios err := os.WriteFile(fn, buf, defaultFilePerms) // if file system is not writable os.IsPermission is set to true - if err != nil && os.IsPermission(err) && fs.fcfg.JetStreamDisableOnDiskError { - fs.warn("file system permission denied when flushing stream state, disabling jetstream %v", err) - // messages in block cache could be lost in the worst case. - // In the clustered mode it is very highly unlikely as a result of replication. - fs.srv.DisableJetStream() + if err != nil && os.IsPermission(err) { return err } dios <- struct{}{} diff --git a/server/filestore_test.go b/server/filestore_test.go index 4f9a761a0bc..fe88736c4cc 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -28,6 +28,7 @@ import ( "errors" "fmt" "io" + "io/fs" "math/bits" "math/rand" "os" @@ -144,9 +145,9 @@ func TestFileStoreBasics(t *testing.T) { func TestFileStoreMsgHeaders(t *testing.T) { testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) defer fs.Stop() - subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World") elen := 22 + len(subj) + 4 + len(hdr) + len(msg) + 8 if sz := int(fileStoreMsgSize(subj, hdr, msg)); sz != elen { @@ -8685,3 +8686,81 @@ func TestFileStoreSubjectDeleteMarkersOnRestart(t *testing.T) { require_Equal(t, bytesToString(getHeader(JSAppliedLimit, sm.hdr)), JSAppliedLimitMaxAge) require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") } + +func TestStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond} + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + cfg) + + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("Z"), 1024) + directory := fs.fcfg.StoreDir + ORIGINAL_FILE_MODE, _ := os.Stat(directory) + READONLY_MODE := os.FileMode(0o555) + changeDirectoryPermission(directory, READONLY_MODE) + require_NoError(t, err) + totalMsgs := 10000 + i := 0 + for i = 0; i < totalMsgs; i++ { + _, _, err = fs.StoreMsg("ev.1", nil, msg, 0) + if err != nil { + break + } + } + changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode()) + require_Error(t, err, os.ErrPermission) +} + +func TestWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond} + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + cfg) + + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("Z"), 1024) + directory := fs.fcfg.StoreDir + ORIGINAL_FILE_MODE, _ := os.Stat(directory) + READONLY_MODE := os.FileMode(0o555) + require_NoError(t, err) + totalMsgs := 10000 + i := 0 + for i = 0; i < totalMsgs; i++ { + _, _, err = fs.StoreMsg("ev.1", nil, msg, 0) + if err != nil { + break + } + } + changeDirectoryPermission(directory, READONLY_MODE) + err = fs.writeFullState() + changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode()) + require_Error(t, err, os.ErrPermission) +} + +func changeDirectoryPermission(directory string, mode fs.FileMode) error { + err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("error accessing path %q: %w", path, err) + } + + // Check if the path is a directory or file and set permissions accordingly + if info.IsDir() { + err = os.Chmod(path, mode) + if err != nil { + return fmt.Errorf("error changing directory permissions for %q: %w", path, err) + } + } else { + err = os.Chmod(path, mode) + if err != nil { + return fmt.Errorf("error changing file permissions for %q: %w", path, err) + } + } + return nil + }) + return err +} diff --git a/server/opts.go b/server/opts.go index 45b41f00fef..41b660008f5 100644 --- a/server/opts.go +++ b/server/opts.go @@ -338,7 +338,6 @@ type Options struct { JetStreamMaxCatchup int64 JetStreamRequestQueueLimit int64 StreamMaxBufferedMsgs int `json:"-"` - JetStreamDisableOnDiskError bool `json:"-"` StreamMaxBufferedSize int64 `json:"-"` StoreDir string `json:"-"` SyncInterval time.Duration `json:"-"` @@ -2446,10 +2445,6 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } opts.JetStreamRequestQueueLimit = lim - case "disable_js_on_disk_error": - if v, ok := mv.(bool); ok { - opts.JetStreamDisableOnDiskError = v - } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/stream.go b/server/stream.go index e1459e415a8..127b2ce6d10 100644 --- a/server/stream.go +++ b/server/stream.go @@ -730,7 +730,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt fsCfg.SyncInterval = s.getOpts().SyncInterval fsCfg.SyncAlways = s.getOpts().SyncAlways fsCfg.Compression = config.Compression - fsCfg.JetStreamDisableOnDiskError = s.getOpts().JetStreamDisableOnDiskError if err := mset.setupStore(fsCfg); err != nil { mset.stop(true, false) @@ -5142,16 +5141,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } err = store.StoreRawMsg(subject, hdr, msg, seq, ts, ttl) } - if err != nil && os.IsPermission(err) && mset.srv.getOpts().JetStreamDisableOnDiskError { - mset.mu.Unlock() - // messages in block cache could be lost in the worst case. - // In the clustered mode it is very highly unlikely as a result of replication. - mset.srv.DisableJetStream() - mset.srv.Warnf("file system permission denied while writing msg, disabling jetstream: %v", err) - return err - } if err != nil { + if os.IsPermission(err){ + mset.mu.Unlock() + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + mset.srv.DisableJetStream() + mset.srv.Warnf("file system permission denied while writing msg, disabling jetstream: %v", err) + return err + } // If we did not succeed put those values back and increment clfs in case we are clustered. var state StreamState mset.store.FastState(&state) From 6a4e7fa62ce4e29a5cb9ef531d272746a272dfcb Mon Sep 17 00:00:00 2001 From: souravagrawal Date: Tue, 7 Jan 2025 20:12:24 +0530 Subject: [PATCH 3/3] Disable jetstream if disk permission error while writing raft state --- server/filestore.go | 34 ++++++++++++++-------------------- server/filestore_test.go | 4 ++-- server/jetstream.go | 11 +++++++++++ server/opts.go | 2 +- server/raft.go | 4 ++++ server/store.go | 5 +++++ server/stream.go | 4 ++-- 7 files changed, 39 insertions(+), 25 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index f2005c84b2c..1a0f993650d 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -518,11 +518,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Do age checks too, make sure to call in place. if fs.cfg.MaxAge != 0 { err := fs.expireMsgsOnRecover() - if err != nil && err == errFileSystemPermissionDenied { - fs.srv.Warnf("file system permission denied while expiring msgs, disabling jetstream: %v", err) - // messages in block cache could be lost in the worst case. - // In the clustered mode it is very highly unlikely as a result of replication. - fs.srv.DisableJetStream() + if isPermissionError(err) { return nil, err } fs.startAgeChk() @@ -2128,7 +2124,7 @@ func (fs *fileStore) expireMsgsOnRecover() error { return true }) err := mb.dirtyCloseWithRemove(true) - if err != nil && err == errFileSystemPermissionDenied { + if isPermissionError(err) { return err } deleted++ @@ -2151,11 +2147,10 @@ func (fs *fileStore) expireMsgsOnRecover() error { purged += mb.msgs bytes += mb.bytes err := deleteEmptyBlock(mb) - if err != nil && err == errFileSystemPermissionDenied { - mb.mu.Unlock() + mb.mu.Unlock() + if isPermissionError(err) { return err } - mb.mu.Unlock() continue } @@ -3725,7 +3720,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { dios <- struct{}{} if err != nil { - if os.IsPermission(err) { + if isPermissionError(err) { return nil, err } mb.dirtyCloseWithRemove(true) @@ -6716,7 +6711,6 @@ var ( errNoMainKey = errors.New("encrypted store encountered with no main key") errNoBlkData = errors.New("message block data missing") errStateTooBig = errors.New("store state too big for optional write") - errFileSystemPermissionDenied = errors.New("storage directory not writeable") ) const ( @@ -8208,15 +8202,15 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error { mb.fss = nil if mb.mfn != _EMPTY_ { err := os.Remove(mb.mfn) - if err != nil && os.IsPermission(err){ - return errFileSystemPermissionDenied + if isPermissionError(err) { + return err } mb.mfn = _EMPTY_ } if mb.kfn != _EMPTY_ { err := os.Remove(mb.kfn) - if err != nil && os.IsPermission(err){ - return errFileSystemPermissionDenied + if isPermissionError(err) { + return err } } } @@ -8638,8 +8632,8 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) { select { case <-t.C: err := fs.writeFullState() - if err != nil && os.IsPermission(err) { - fs.warn("file system permission denied when flushing stream state, disabling jetstream %v", err) + if isPermissionError(err) && fs.srv != nil { + fs.warn("File system permission denied when flushing stream state, disabling JetStream: %v", err) // messages in block cache could be lost in the worst case. // In the clustered mode it is very highly unlikely as a result of replication. fs.srv.DisableJetStream() @@ -8854,11 +8848,11 @@ func (fs *fileStore) _writeFullState(force bool) error { // Protect with dios. <-dios err := os.WriteFile(fn, buf, defaultFilePerms) - // if file system is not writable os.IsPermission is set to true - if err != nil && os.IsPermission(err) { + // if file system is not writable isPermissionError is set to true + dios <- struct{}{} + if isPermissionError(err) { return err } - dios <- struct{}{} // Update dirty if successful. if err == nil { diff --git a/server/filestore_test.go b/server/filestore_test.go index fe88736c4cc..f6d6e87e0cb 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -8687,7 +8687,7 @@ func TestFileStoreSubjectDeleteMarkersOnRestart(t *testing.T) { require_Equal(t, bytesToString(getHeader(JSMessageTTL, sm.hdr)), "1s") } -func TestStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { +func TestFileStoreStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond} fs, err := newFileStore( FileStoreConfig{StoreDir: t.TempDir()}, @@ -8714,7 +8714,7 @@ func TestStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { require_Error(t, err, os.ErrPermission) } -func TestWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { +func TestFileStoreWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond} fs, err := newFileStore( FileStoreConfig{StoreDir: t.TempDir()}, diff --git a/server/jetstream.go b/server/jetstream.go index a0b31cd6387..7b58e5b22b3 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -3034,3 +3034,14 @@ func fixCfgMirrorWithDedupWindow(cfg *StreamConfig) { cfg.Duplicates = 0 } } + +func (s *Server) handleWritePermissionError() { + //TODO Check if we should add s.jetStreamOOSPending in condition + if s.JetStreamEnabled() { + s.Errorf("File system permission denied while writing, disabling JetStream") + + go s.DisableJetStream() + + //TODO Send respective advisory if needed, same as in handleOutOfSpace + } +} diff --git a/server/opts.go b/server/opts.go index 41b660008f5..172377253ee 100644 --- a/server/opts.go +++ b/server/opts.go @@ -6086,4 +6086,4 @@ func expandPath(p string) (string, error) { } return filepath.Join(home, p[1:]), nil -} \ No newline at end of file +} diff --git a/server/raft.go b/server/raft.go index 5ab37facfd9..1c472fcaa1d 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3978,6 +3978,10 @@ func (n *raft) setWriteErrLocked(err error) { n.error("Critical write error: %v", err) n.werr = err + if isPermissionError(err) { + go n.s.handleWritePermissionError() + } + if isOutOfSpaceErr(err) { // For now since this can be happening all under the covers, we will call up and disable JetStream. go n.s.handleOutOfSpace(nil) diff --git a/server/store.go b/server/store.go index ca0dc18a6c5..56252011eaa 100644 --- a/server/store.go +++ b/server/store.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "os" "strings" "time" "unsafe" @@ -788,3 +789,7 @@ func copyString(s string) string { copy(b, s) return bytesToString(b) } + +func isPermissionError(err error) bool { + return err != nil && os.IsPermission(err) +} diff --git a/server/stream.go b/server/stream.go index 127b2ce6d10..e64d71f3cf3 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5143,12 +5143,12 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } if err != nil { - if os.IsPermission(err){ + if isPermissionError(err) { mset.mu.Unlock() // messages in block cache could be lost in the worst case. // In the clustered mode it is very highly unlikely as a result of replication. mset.srv.DisableJetStream() - mset.srv.Warnf("file system permission denied while writing msg, disabling jetstream: %v", err) + mset.srv.Warnf("Filesystem permission denied while writing msg, disabling JetStream: %v", err) return err } // If we did not succeed put those values back and increment clfs in case we are clustered.