Skip to content

Commit

Permalink
Disable jetstream if filesystem permission denied detected during write
Browse files Browse the repository at this point in the history
Signed-off-by: Sourabh Agrawal <[email protected]>
  • Loading branch information
souravagrawal committed Jan 16, 2025
1 parent 0af3ce5 commit 5dc283f
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 31 deletions.
35 changes: 19 additions & 16 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ type FileStoreConfig struct {
Cipher StoreCipher
// Compression is the algorithm to use when compressing.
Compression StoreCompression
// Allows disabling jetstream when fs is not writable
JetStreamDisableOnDiskError bool

// Internal reference to our server.
srv *Server
Expand Down Expand Up @@ -519,7 +517,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Do age checks too, make sure to call in place.
if fs.cfg.MaxAge != 0 {
err := fs.expireMsgsOnRecover()
if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError {
if err != nil && err == errFileSystemPermissionDenied {
fs.srv.Warnf("file system permission denied while expiring msgs, disabling jetstream: %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
Expand Down Expand Up @@ -2129,7 +2127,7 @@ func (fs *fileStore) expireMsgsOnRecover() error {
return true
})
err := mb.dirtyCloseWithRemove(true)
if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError{
if err != nil && err == errFileSystemPermissionDenied {
return err
}
deleted++
Expand All @@ -2148,7 +2146,8 @@ func (fs *fileStore) expireMsgsOnRecover() error {
purged += mb.msgs
bytes += mb.bytes
err := deleteEmptyBlock(mb)
if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError{
if err != nil && err == errFileSystemPermissionDenied {
mb.mu.Unlock()
return err
}
mb.mu.Unlock()
Expand Down Expand Up @@ -3703,10 +3702,10 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
dios <- struct{}{}

if err != nil && os.IsPermission(err) && fs.fcfg.JetStreamDisableOnDiskError {
return nil, err
}
if err != nil {
if os.IsPermission(err) {
return nil, err
}
mb.dirtyCloseWithRemove(true)
return nil, fmt.Errorf("Error creating msg block file: %v", err)
}
Expand Down Expand Up @@ -8133,14 +8132,14 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error {
mb.fss = nil
if mb.mfn != _EMPTY_ {
err := os.Remove(mb.mfn)
if err != nil && os.IsPermission(err) && mb.fs.fcfg.JetStreamDisableOnDiskError{
if err != nil && os.IsPermission(err){
return errFileSystemPermissionDenied
}
mb.mfn = _EMPTY_
}
if mb.kfn != _EMPTY_ {
err := os.Remove(mb.kfn)
if err != nil && os.IsPermission(err) && mb.fs.fcfg.JetStreamDisableOnDiskError{
if err != nil && os.IsPermission(err){
return errFileSystemPermissionDenied
}
}
Expand Down Expand Up @@ -8562,7 +8561,15 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) {
for {
select {
case <-t.C:
fs.writeFullState()
err := fs.writeFullState()
if err != nil && os.IsPermission(err) {
fs.warn("file system permission denied when flushing stream state, disabling jetstream %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
return
}

case <-qch:
return
}
Expand Down Expand Up @@ -8772,11 +8779,7 @@ func (fs *fileStore) _writeFullState(force bool) error {
<-dios
err := os.WriteFile(fn, buf, defaultFilePerms)
// if file system is not writable os.IsPermission is set to true
if err != nil && os.IsPermission(err) && fs.fcfg.JetStreamDisableOnDiskError {
fs.warn("file system permission denied when flushing stream state, disabling jetstream %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
if err != nil && os.IsPermission(err) {
return err
}
dios <- struct{}{}
Expand Down
81 changes: 80 additions & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"math/bits"
"math/rand"
"os"
Expand Down Expand Up @@ -144,9 +145,9 @@ func TestFileStoreBasics(t *testing.T) {
func TestFileStoreMsgHeaders(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil)

require_NoError(t, err)
defer fs.Stop()

subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World")
elen := 22 + len(subj) + 4 + len(hdr) + len(msg) + 8
if sz := int(fileStoreMsgSize(subj, hdr, msg)); sz != elen {
Expand Down Expand Up @@ -8562,3 +8563,81 @@ func TestFileStoreDontSpamCompactWhenMostlyTombstones(t *testing.T) {
fmb.bytes /= 2
require_True(t, fmb.shouldCompactInline())
}

func TestStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
cfg)

require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("Z"), 1024)
directory := fs.fcfg.StoreDir
ORIGINAL_FILE_MODE, _ := os.Stat(directory)
READONLY_MODE := os.FileMode(0o555)
changeDirectoryPermission(directory, READONLY_MODE)
require_NoError(t, err)
totalMsgs := 10000
i := 0
for i = 0; i < totalMsgs; i++ {
_, _, err = fs.StoreMsg("ev.1", nil, msg, 0)
if err != nil {
break
}
}
changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode())
require_Error(t, err, os.ErrPermission)
}

func TestWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
cfg)

require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("Z"), 1024)
directory := fs.fcfg.StoreDir
ORIGINAL_FILE_MODE, _ := os.Stat(directory)
READONLY_MODE := os.FileMode(0o555)
require_NoError(t, err)
totalMsgs := 10000
i := 0
for i = 0; i < totalMsgs; i++ {
_, _, err = fs.StoreMsg("ev.1", nil, msg, 0)
if err != nil {
break
}
}
changeDirectoryPermission(directory, READONLY_MODE)
err = fs.writeFullState()
changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode())
require_Error(t, err, os.ErrPermission)
}

func changeDirectoryPermission(directory string, mode fs.FileMode) error {
err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("error accessing path %q: %w", path, err)
}

// Check if the path is a directory or file and set permissions accordingly
if info.IsDir() {
err = os.Chmod(path, mode)
if err != nil {
return fmt.Errorf("error changing directory permissions for %q: %w", path, err)
}
} else {
err = os.Chmod(path, mode)
if err != nil {
return fmt.Errorf("error changing file permissions for %q: %w", path, err)
}
}
return nil
})
return err
}
5 changes: 0 additions & 5 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ type Options struct {
JetStreamMaxCatchup int64
JetStreamRequestQueueLimit int64
StreamMaxBufferedMsgs int `json:"-"`
JetStreamDisableOnDiskError bool `json:"-"`
StreamMaxBufferedSize int64 `json:"-"`
StoreDir string `json:"-"`
SyncInterval time.Duration `json:"-"`
Expand Down Expand Up @@ -2446,10 +2445,6 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)}
}
opts.JetStreamRequestQueueLimit = lim
case "disable_js_on_disk_error":
if v, ok := mv.(bool); ok {
opts.JetStreamDisableOnDiskError = v
}
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down
17 changes: 8 additions & 9 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
fsCfg.SyncInterval = s.getOpts().SyncInterval
fsCfg.SyncAlways = s.getOpts().SyncAlways
fsCfg.Compression = config.Compression
fsCfg.JetStreamDisableOnDiskError = s.getOpts().JetStreamDisableOnDiskError

if err := mset.setupStore(fsCfg); err != nil {
mset.stop(true, false)
Expand Down Expand Up @@ -5097,16 +5096,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
err = store.StoreRawMsg(subject, hdr, msg, seq, ts, ttl)
}
if err != nil && os.IsPermission(err) && mset.srv.getOpts().JetStreamDisableOnDiskError {
mset.mu.Unlock()
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
mset.srv.DisableJetStream()
mset.srv.Warnf("file system permission denied while writing msg, disabling jetstream: %v", err)
return err
}

if err != nil {
if os.IsPermission(err){
mset.mu.Unlock()
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
mset.srv.DisableJetStream()
mset.srv.Warnf("file system permission denied while writing msg, disabling jetstream: %v", err)
return err
}
// If we did not succeed put those values back and increment clfs in case we are clustered.
var state StreamState
mset.store.FastState(&state)
Expand Down

0 comments on commit 5dc283f

Please sign in to comment.