Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Updated subject state expiration #5377

Merged
merged 1 commit into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 28 additions & 25 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type FileStoreConfig struct {
BlockSize uint64
// CacheExpire is how long with no activity until we expire the cache.
CacheExpire time.Duration
// SubjectStateExpire is how long with no activity until we expire a msg block's subject state.
SubjectStateExpire time.Duration
// SyncInterval is how often we sync to disk in the background.
SyncInterval time.Duration
// SyncAlways is when the stream should sync all data writes.
Expand Down Expand Up @@ -222,6 +224,7 @@ type msgBlock struct {
cache *cache
cloads uint64
cexp time.Duration
fexp time.Duration
ctmr *time.Timer
werr error
dmap avl.SequenceSet
Expand Down Expand Up @@ -296,8 +299,8 @@ const (
defaultSyncInterval = 2 * time.Minute
// default idle timeout to close FDs.
closeFDsIdle = 30 * time.Second
// maximum flush time for mb.fss when idle.
maxFssFlushInterval = 10 * time.Second
// default expiration time for mb.fss when idle.
defaultFssExpiration = 10 * time.Second
// coalesceMinimum
coalesceMinimum = 16 * 1024
// maxFlushWait is maximum we will wait to gather messages to flush.
Expand Down Expand Up @@ -361,6 +364,9 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
if fcfg.CacheExpire == 0 {
fcfg.CacheExpire = defaultCacheBufferExpiration
}
if fcfg.SubjectStateExpire == 0 {
fcfg.SubjectStateExpire = defaultFssExpiration
}
if fcfg.SyncInterval == 0 {
fcfg.SyncInterval = defaultSyncInterval
}
Expand Down Expand Up @@ -879,7 +885,14 @@ func (fs *fileStore) noTrackSubjects() bool {

// Will init the basics for a message block.
func (fs *fileStore) initMsgBlock(index uint32) *msgBlock {
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects(), syncAlways: fs.fcfg.SyncAlways}
mb := &msgBlock{
fs: fs,
index: index,
cexp: fs.fcfg.CacheExpire,
fexp: fs.fcfg.SubjectStateExpire,
noTrack: fs.noTrackSubjects(),
syncAlways: fs.fcfg.SyncAlways,
}

mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = filepath.Join(mdir, fmt.Sprintf(blkScan, index))
Expand Down Expand Up @@ -4545,9 +4558,16 @@ func (mb *msgBlock) clearCacheAndOffset() {

// Lock should be held.
func (mb *msgBlock) clearCache() {
if mb.ctmr != nil && mb.fss == nil {
mb.ctmr.Stop()
mb.ctmr = nil
if mb.ctmr != nil {
tsla := mb.sinceLastActivity()
if mb.fss == nil || tsla > mb.fexp {
// Force
mb.fss = nil
mb.ctmr.Stop()
mb.ctmr = nil
} else {
mb.resetCacheExpireTimer(mb.fexp - tsla)
}
}

if mb.cache == nil {
Expand Down Expand Up @@ -4612,7 +4632,7 @@ func (mb *msgBlock) tryExpireWriteCache() []byte {

// Lock should be held.
func (mb *msgBlock) expireCacheLocked() {
if mb.cache == nil {
if mb.cache == nil && mb.fss == nil {
if mb.ctmr != nil {
mb.ctmr.Stop()
mb.ctmr = nil
Expand Down Expand Up @@ -5220,10 +5240,6 @@ func (fs *fileStore) syncBlocks() {
}
blks := append([]*msgBlock(nil), fs.blks...)
lmb := fs.lmb
fssFlushInterval := fs.fcfg.SyncInterval
if fssFlushInterval > maxFssFlushInterval {
fssFlushInterval = maxFssFlushInterval
}
fs.mu.RUnlock()

var markDirty bool
Expand All @@ -5238,11 +5254,6 @@ func (fs *fileStore) syncBlocks() {
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
// Check our fss subject metadata.
// If we have no activity within sync interval remove.
if mb.fssLoaded() && mb.sinceLastActivity() > fssFlushInterval {
mb.fss = nil
}

// Check if we should compact here as well.
// Do not compact last mb.
Expand Down Expand Up @@ -5709,12 +5720,6 @@ func (mb *msgBlock) fssNotLoaded() bool {
return mb.fss == nil && !mb.noTrack
}

// Report if we have our fss loaded.
// Lock should be held.
func (mb *msgBlock) fssLoaded() bool {
return mb.fss != nil
}

// Wrap openBlock for the gated semaphore processing.
// Lock should be held
func (mb *msgBlock) openBlock() (*os.File, error) {
Expand Down Expand Up @@ -7608,9 +7613,7 @@ const (
fullStateVersion = uint8(1)
)

// This go routine runs and receives kicks to write out our full stream state index.
// This will get kicked when we create a new block or when we delete a block in general.
// This is also called during Stop().
// This go routine periodically writes out our full stream state index.
func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) {
// Signal we are done on exit.
defer close(done)
Expand Down
21 changes: 10 additions & 11 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3793,8 +3793,8 @@ func (fs *fileStore) reportMeta() (hasPSIM, hasAnyFSS bool) {
func TestFileStoreExpireSubjectMeta(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 1024
fcfg.CacheExpire = time.Second
fcfg.SyncInterval = time.Second
fcfg.CacheExpire = 500 * time.Millisecond
fcfg.SubjectStateExpire = time.Second
cfg := StreamConfig{Name: "zzz", Subjects: []string{"kv.>"}, Storage: FileStorage, MaxMsgsPer: 1}
fs, err := newFileStoreWithCreated(fcfg, cfg, time.Now(), prf(&fcfg), nil)
require_NoError(t, err)
Expand All @@ -3821,7 +3821,7 @@ func TestFileStoreExpireSubjectMeta(t *testing.T) {
}

// Make sure we clear mb fss meta
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
checkFor(t, fcfg.SubjectStateExpire*2, 500*time.Millisecond, func() error {
if _, hasAnyFSS := fs.reportMeta(); hasAnyFSS {
return fmt.Errorf("Still have mb fss state")
}
Expand All @@ -3832,7 +3832,7 @@ func TestFileStoreExpireSubjectMeta(t *testing.T) {
_, err = fs.LoadLastMsg("kv.22", nil)
require_NoError(t, err)
// Make sure we clear mb fss meta
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
checkFor(t, fcfg.SubjectStateExpire*2, 500*time.Millisecond, func() error {
if _, hasAnyFSS := fs.reportMeta(); hasAnyFSS {
return fmt.Errorf("Still have mb fss state")
}
Expand Down Expand Up @@ -3923,7 +3923,7 @@ func TestFileStoreSubjectStateCacheExpiration(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 32
fcfg.CacheExpire = time.Second
fcfg.SyncInterval = time.Second
fcfg.SubjectStateExpire = time.Second
cfg := StreamConfig{Name: "zzz", Subjects: []string{"kv.>"}, Storage: FileStorage, MaxMsgsPer: 2}
fs, err := newFileStoreWithCreated(fcfg, cfg, time.Now(), prf(&fcfg), nil)
require_NoError(t, err)
Expand Down Expand Up @@ -6336,7 +6336,7 @@ func TestFileStorePurgeExBufPool(t *testing.T) {
func TestFileStoreFSSMeta(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 100, CacheExpire: 200 * time.Millisecond, SyncInterval: time.Second},
FileStoreConfig{StoreDir: sd, BlockSize: 100, CacheExpire: 200 * time.Millisecond, SubjectStateExpire: time.Second},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()
Expand All @@ -6358,23 +6358,22 @@ func TestFileStoreFSSMeta(t *testing.T) {
require_NoError(t, err)
require_Equal(t, p, 2)

// Make sure cache is not loaded but fss state still is.
var stillHasCache, noFSS bool
// Make sure cache is not loaded.
var stillHasCache bool
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.RLock()
stillHasCache = stillHasCache || mb.cacheAlreadyLoaded()
noFSS = noFSS || mb.fssNotLoaded()
mb.mu.RUnlock()
}
fs.mu.RUnlock()

require_False(t, stillHasCache)
require_False(t, noFSS)

// Let fss expire via syncInterval.
// Let fss expire via SubjectStateExpire.
time.Sleep(time.Second)

var noFSS bool
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.RLock()
Expand Down