Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick PRs for 2.10.15-RC.5 #5376

Merged
merged 7 commits into from
May 1, 2024
52 changes: 30 additions & 22 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type FileStoreConfig struct {
BlockSize uint64
// CacheExpire is how long with no activity until we expire the cache.
CacheExpire time.Duration
// SubjectStateExpire is how long with no activity until we expire a msg block's subject state.
SubjectStateExpire time.Duration
// SyncInterval is how often we sync to disk in the background.
SyncInterval time.Duration
// SyncAlways is when the stream should sync all data writes.
Expand Down Expand Up @@ -222,6 +224,7 @@ type msgBlock struct {
cache *cache
cloads uint64
cexp time.Duration
fexp time.Duration
ctmr *time.Timer
werr error
dmap avl.SequenceSet
Expand Down Expand Up @@ -296,6 +299,8 @@ const (
defaultSyncInterval = 2 * time.Minute
// default idle timeout to close FDs.
closeFDsIdle = 30 * time.Second
// default expiration time for mb.fss when idle.
defaultFssExpiration = 10 * time.Second
// coalesceMinimum
coalesceMinimum = 16 * 1024
// maxFlushWait is maximum we will wait to gather messages to flush.
Expand Down Expand Up @@ -359,6 +364,9 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
if fcfg.CacheExpire == 0 {
fcfg.CacheExpire = defaultCacheBufferExpiration
}
if fcfg.SubjectStateExpire == 0 {
fcfg.SubjectStateExpire = defaultFssExpiration
}
if fcfg.SyncInterval == 0 {
fcfg.SyncInterval = defaultSyncInterval
}
Expand Down Expand Up @@ -877,7 +885,14 @@ func (fs *fileStore) noTrackSubjects() bool {

// Will init the basics for a message block.
func (fs *fileStore) initMsgBlock(index uint32) *msgBlock {
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire, noTrack: fs.noTrackSubjects(), syncAlways: fs.fcfg.SyncAlways}
mb := &msgBlock{
fs: fs,
index: index,
cexp: fs.fcfg.CacheExpire,
fexp: fs.fcfg.SubjectStateExpire,
noTrack: fs.noTrackSubjects(),
syncAlways: fs.fcfg.SyncAlways,
}

mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
mb.mfn = filepath.Join(mdir, fmt.Sprintf(blkScan, index))
Expand Down Expand Up @@ -4430,9 +4445,16 @@ func (mb *msgBlock) clearCacheAndOffset() {

// Lock should be held.
func (mb *msgBlock) clearCache() {
if mb.ctmr != nil && mb.fss == nil {
mb.ctmr.Stop()
mb.ctmr = nil
if mb.ctmr != nil {
tsla := mb.sinceLastActivity()
if mb.fss == nil || tsla > mb.fexp {
// Force
mb.fss = nil
mb.ctmr.Stop()
mb.ctmr = nil
} else {
mb.resetCacheExpireTimer(mb.fexp - tsla)
}
}

if mb.cache == nil {
Expand Down Expand Up @@ -4497,7 +4519,7 @@ func (mb *msgBlock) tryExpireWriteCache() []byte {

// Lock should be held.
func (mb *msgBlock) expireCacheLocked() {
if mb.cache == nil {
if mb.cache == nil && mb.fss == nil {
if mb.ctmr != nil {
mb.ctmr.Stop()
mb.ctmr = nil
Expand Down Expand Up @@ -5105,7 +5127,6 @@ func (fs *fileStore) syncBlocks() {
}
blks := append([]*msgBlock(nil), fs.blks...)
lmb := fs.lmb
syncInterval := fs.fcfg.SyncInterval
fs.mu.RUnlock()

var markDirty bool
Expand All @@ -5120,11 +5141,6 @@ func (fs *fileStore) syncBlocks() {
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
// Check our fss subject metadata.
// If we have no activity within sync interval remove.
if mb.fssLoaded() && mb.sinceLastActivity() > syncInterval {
mb.fss = nil
}

// Check if we should compact here as well.
// Do not compact last mb.
Expand Down Expand Up @@ -5591,12 +5607,6 @@ func (mb *msgBlock) fssNotLoaded() bool {
return mb.fss == nil && !mb.noTrack
}

// Report if we have our fss loaded.
// Lock should be held.
func (mb *msgBlock) fssLoaded() bool {
return mb.fss != nil
}

// Wrap openBlock for the gated semaphore processing.
// Lock should be held
func (mb *msgBlock) openBlock() (*os.File, error) {
Expand Down Expand Up @@ -7154,8 +7164,6 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
mb.ctmr.Stop()
mb.ctmr = nil
}
// Clear any tracking by subject.
mb.fss = nil
// Close cache
mb.clearCacheAndOffset()
// Quit our loops.
Expand All @@ -7168,6 +7176,8 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
mb.mfd = nil
}
if remove {
// Clear any tracking by subject if we are removing.
mb.fss = nil
if mb.mfn != _EMPTY_ {
os.Remove(mb.mfn)
mb.mfn = _EMPTY_
Expand Down Expand Up @@ -7490,9 +7500,7 @@ const (
fullStateVersion = uint8(1)
)

// This go routine runs and receives kicks to write out our full stream state index.
// This will get kicked when we create a new block or when we delete a block in general.
// This is also called during Stop().
// This go routine periodically writes out our full stream state index.
func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) {
// Signal we are done on exit.
defer close(done)
Expand Down
58 changes: 47 additions & 11 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3793,8 +3793,8 @@ func (fs *fileStore) reportMeta() (hasPSIM, hasAnyFSS bool) {
func TestFileStoreExpireSubjectMeta(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 1024
fcfg.CacheExpire = time.Second
fcfg.SyncInterval = time.Second
fcfg.CacheExpire = 500 * time.Millisecond
fcfg.SubjectStateExpire = time.Second
cfg := StreamConfig{Name: "zzz", Subjects: []string{"kv.>"}, Storage: FileStorage, MaxMsgsPer: 1}
fs, err := newFileStoreWithCreated(fcfg, cfg, time.Now(), prf(&fcfg), nil)
require_NoError(t, err)
Expand All @@ -3821,7 +3821,7 @@ func TestFileStoreExpireSubjectMeta(t *testing.T) {
}

// Make sure we clear mb fss meta
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
checkFor(t, fcfg.SubjectStateExpire*2, 500*time.Millisecond, func() error {
if _, hasAnyFSS := fs.reportMeta(); hasAnyFSS {
return fmt.Errorf("Still have mb fss state")
}
Expand All @@ -3832,7 +3832,7 @@ func TestFileStoreExpireSubjectMeta(t *testing.T) {
_, err = fs.LoadLastMsg("kv.22", nil)
require_NoError(t, err)
// Make sure we clear mb fss meta
checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
checkFor(t, fcfg.SubjectStateExpire*2, 500*time.Millisecond, func() error {
if _, hasAnyFSS := fs.reportMeta(); hasAnyFSS {
return fmt.Errorf("Still have mb fss state")
}
Expand Down Expand Up @@ -3923,7 +3923,7 @@ func TestFileStoreSubjectStateCacheExpiration(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fcfg.BlockSize = 32
fcfg.CacheExpire = time.Second
fcfg.SyncInterval = time.Second
fcfg.SubjectStateExpire = time.Second
cfg := StreamConfig{Name: "zzz", Subjects: []string{"kv.>"}, Storage: FileStorage, MaxMsgsPer: 2}
fs, err := newFileStoreWithCreated(fcfg, cfg, time.Now(), prf(&fcfg), nil)
require_NoError(t, err)
Expand Down Expand Up @@ -6336,7 +6336,7 @@ func TestFileStorePurgeExBufPool(t *testing.T) {
func TestFileStoreFSSMeta(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 100, CacheExpire: 200 * time.Millisecond, SyncInterval: time.Second},
FileStoreConfig{StoreDir: sd, BlockSize: 100, CacheExpire: 200 * time.Millisecond, SubjectStateExpire: time.Second},
StreamConfig{Name: "zzz", Subjects: []string{"*"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()
Expand All @@ -6358,23 +6358,22 @@ func TestFileStoreFSSMeta(t *testing.T) {
require_NoError(t, err)
require_Equal(t, p, 2)

// Make sure cache is not loaded but fss state still is.
var stillHasCache, noFSS bool
// Make sure cache is not loaded.
var stillHasCache bool
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.RLock()
stillHasCache = stillHasCache || mb.cacheAlreadyLoaded()
noFSS = noFSS || mb.fssNotLoaded()
mb.mu.RUnlock()
}
fs.mu.RUnlock()

require_False(t, stillHasCache)
require_False(t, noFSS)

// Let fss expire via syncInterval.
// Let fss expire via SubjectStateExpire.
time.Sleep(time.Second)

var noFSS bool
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.RLock()
Expand Down Expand Up @@ -6688,6 +6687,43 @@ func TestFileStoreWriteFullStateAfterPurgeEx(t *testing.T) {
require_Equal(t, ss.LastSeq, 10)
}

func TestFileStoreMB_FSS_Expire(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 8192, CacheExpire: 1 * time.Second, SyncInterval: 2 * time.Second},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, MaxMsgsPer: 1, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("abc")
for i := 1; i <= 1000; i++ {
fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg)
}
// Flush fss by hand, cache should be flushed as well.
fs.mu.RLock()
for _, mb := range fs.blks {
mb.mu.Lock()
mb.fss = nil
mb.mu.Unlock()
}
fs.mu.RUnlock()

fs.StoreMsg("foo.11", nil, msg)
time.Sleep(900 * time.Millisecond)
// This should keep fss alive in the first block..
// As well as cache itself due to remove activity.
fs.StoreMsg("foo.22", nil, msg)
time.Sleep(300 * time.Millisecond)
// Check that fss and the cache are still loaded.
fs.mu.RLock()
mb := fs.blks[0]
fs.mu.RUnlock()
mb.mu.RLock()
cache, fss := mb.cache, mb.fss
mb.mu.RUnlock()
require_True(t, fss != nil)
require_True(t, cache != nil)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7497,7 +7497,7 @@ func encodeStreamMsg(subject, reply string, hdr, msg []byte, lseq uint64, ts int

// Threshold for compression.
// TODO(dlc) - Eventually make configurable.
const compressThreshold = 256
const compressThreshold = 8192 // 8k

// If allowed and contents over the threshold we will compress.
func encodeStreamMsgAllowCompress(subject, reply string, hdr, msg []byte, lseq uint64, ts int64, compressOK bool) []byte {
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,7 @@ func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) {
sendBatch(100)
// Need to check both in parallel.
scheck, mcheck := uint64(0), uint64(0)
checkFor(t, 10*time.Second, 50*time.Millisecond, func() error {
checkFor(t, 20*time.Second, 500*time.Millisecond, func() error {
if scheck != 100 {
if si, _ := js.StreamInfo("S"); si != nil {
scheck = si.State.Msgs
Expand Down
6 changes: 5 additions & 1 deletion server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -240,7 +241,7 @@ func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) {

sendBatch := func(subject string, n int) {
for i := 0; i < n; i++ {
_, err = js.Publish(subject, []byte("OK"))
_, err = js.Publish(subject, []byte(strconv.Itoa(i)))
require_NoError(t, err)
}
}
Expand All @@ -266,6 +267,9 @@ func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) {
for i := 0; i < 300; i++ {
m, err := ss.Fetch(1, nats.MaxWait(3*time.Second))
require_NoError(t, err)
p, err := strconv.Atoi(string(m[0].Data))
require_NoError(t, err)
require_Equal(t, p, i)
time.Sleep(11 * time.Millisecond)
err = m[0].Ack()
require_NoError(t, err)
Expand Down
Loading