diff --git a/server/filestore.go b/server/filestore.go index 7d4b2637e3..0ebf485006 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -68,6 +68,8 @@ type FileStoreConfig struct { Cipher StoreCipher // Compression is the algorithm to use when compressing. Compression StoreCompression + // Allows disabling jetstream when fs is not writable + JetStreamDisableOnDiskError bool // Internal reference to our server. srv *Server @@ -496,7 +498,14 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Do age checks too, make sure to call in place. if fs.cfg.MaxAge != 0 { - fs.expireMsgsOnRecover() + err := fs.expireMsgsOnRecover() + if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError { + fs.srv.Warnf("file system permission denied while expiring msgs, disabling jetstream: %v", err) + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + fs.srv.DisableJetStream() + return nil, err + } fs.startAgeChk() } @@ -1979,9 +1988,9 @@ func (fs *fileStore) recoverMsgs() error { // We will treat this differently in case we have a recovery // that will expire alot of messages on startup. // Should only be called on startup. -func (fs *fileStore) expireMsgsOnRecover() { +func (fs *fileStore) expireMsgsOnRecover() error { if fs.state.Msgs == 0 { - return + return nil } var minAge = time.Now().UnixNano() - int64(fs.cfg.MaxAge) @@ -1993,7 +2002,7 @@ func (fs *fileStore) expireMsgsOnRecover() { // usually taken care of by fs.removeMsgBlock() but we do not call that here. var last msgId - deleteEmptyBlock := func(mb *msgBlock) { + deleteEmptyBlock := func(mb *msgBlock) error { // If we are the last keep state to remember first/last sequence. // Do this part by hand since not deleting one by one. if mb == fs.lmb { @@ -2009,8 +2018,12 @@ func (fs *fileStore) expireMsgsOnRecover() { } return true }) - mb.dirtyCloseWithRemove(true) + err := mb.dirtyCloseWithRemove(true) + if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError{ + return err + } deleted++ + return nil } for _, mb := range fs.blks { @@ -2024,7 +2037,10 @@ func (fs *fileStore) expireMsgsOnRecover() { if mb.last.ts <= minAge { purged += mb.msgs bytes += mb.bytes - deleteEmptyBlock(mb) + err := deleteEmptyBlock(mb) + if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError{ + return err + } mb.mu.Unlock() continue } @@ -2149,6 +2165,7 @@ func (fs *fileStore) expireMsgsOnRecover() { if purged > 0 { fs.dirty++ } + return nil } func copyMsgBlocks(src []*msgBlock) []*msgBlock { @@ -3576,6 +3593,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms) dios <- struct{}{} + if err != nil && os.IsPermission(err) && fs.fcfg.JetStreamDisableOnDiskError { + return nil, err + } if err != nil { mb.dirtyCloseWithRemove(true) return nil, fmt.Errorf("Error creating msg block file: %v", err) @@ -6428,6 +6448,7 @@ var ( errNoMainKey = errors.New("encrypted store encountered with no main key") errNoBlkData = errors.New("message block data missing") errStateTooBig = errors.New("store state too big for optional write") + errFileSystemPermissionDenied = errors.New("storage directory not writeable") ) const ( @@ -7894,9 +7915,9 @@ func (mb *msgBlock) dirtyClose() { } // Should be called with lock held. -func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { +func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error { if mb == nil { - return + return nil } // Stop cache expiration timer. if mb.ctmr != nil { @@ -7918,13 +7939,20 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { // Clear any tracking by subject if we are removing. mb.fss = nil if mb.mfn != _EMPTY_ { - os.Remove(mb.mfn) + err := os.Remove(mb.mfn) + if err != nil && os.IsPermission(err) && mb.fs.fcfg.JetStreamDisableOnDiskError{ + return errFileSystemPermissionDenied + } mb.mfn = _EMPTY_ } if mb.kfn != _EMPTY_ { - os.Remove(mb.kfn) + err := os.Remove(mb.kfn) + if err != nil && os.IsPermission(err) && mb.fs.fcfg.JetStreamDisableOnDiskError{ + return errFileSystemPermissionDenied + } } } + return nil } // Remove a seq from the fss and select new first. @@ -8545,6 +8573,14 @@ func (fs *fileStore) _writeFullState(force bool) error { // Protect with dios. <-dios err := os.WriteFile(fn, buf, defaultFilePerms) + // if file system is not writable os.IsPermission is set to true + if err != nil && os.IsPermission(err) && fs.fcfg.JetStreamDisableOnDiskError { + fs.warn("file system permission denied when flushing stream state, disabling jetstream %v", err) + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + fs.srv.DisableJetStream() + return err + } dios <- struct{}{} // Update dirty if successful. diff --git a/server/opts.go b/server/opts.go index 172377253e..45b41f00fe 100644 --- a/server/opts.go +++ b/server/opts.go @@ -338,6 +338,7 @@ type Options struct { JetStreamMaxCatchup int64 JetStreamRequestQueueLimit int64 StreamMaxBufferedMsgs int `json:"-"` + JetStreamDisableOnDiskError bool `json:"-"` StreamMaxBufferedSize int64 `json:"-"` StoreDir string `json:"-"` SyncInterval time.Duration `json:"-"` @@ -2445,6 +2446,10 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } opts.JetStreamRequestQueueLimit = lim + case "disable_js_on_disk_error": + if v, ok := mv.(bool); ok { + opts.JetStreamDisableOnDiskError = v + } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -6086,4 +6091,4 @@ func expandPath(p string) (string, error) { } return filepath.Join(home, p[1:]), nil -} +} \ No newline at end of file diff --git a/server/stream.go b/server/stream.go index 995d129363..490953ace2 100644 --- a/server/stream.go +++ b/server/stream.go @@ -711,6 +711,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt fsCfg.SyncInterval = s.getOpts().SyncInterval fsCfg.SyncAlways = s.getOpts().SyncAlways fsCfg.Compression = config.Compression + fsCfg.JetStreamDisableOnDiskError = s.getOpts().JetStreamDisableOnDiskError if err := mset.setupStore(fsCfg); err != nil { mset.stop(true, false) @@ -5013,6 +5014,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } err = store.StoreRawMsg(subject, hdr, msg, seq, ts) } + if err != nil && os.IsPermission(err) && mset.srv.getOpts().JetStreamDisableOnDiskError { + mset.mu.Unlock() + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + mset.srv.DisableJetStream() + mset.srv.Warnf("file system permission denied while writing msg, disabling jetstream: %v", err) + return err + } if err != nil { // If we did not succeed put those values back and increment clfs in case we are clustered.