Skip to content

Commit

Permalink
Cherry-picks for 2.10.16-RC.3 (#5456)
Browse files Browse the repository at this point in the history
Includes:

- #5447 
- #5449
- #5454
  • Loading branch information
wallyqs authored May 20, 2024
2 parents 5286fda + e59f96f commit 314f148
Show file tree
Hide file tree
Showing 11 changed files with 364 additions and 52 deletions.
1 change: 0 additions & 1 deletion conf/lex.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ func (lx *lexer) emitString() {
} else {
finalString = lx.input[lx.start:lx.pos]
}

// Position of string in line where it started.
pos := lx.pos - lx.ilstart - len(finalString)
lx.items <- item{itemString, finalString, lx.line, pos}
Expand Down
18 changes: 18 additions & 0 deletions conf/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,24 @@ func TestBlocks(t *testing.T) {
},
"", "",
},
{
"comment in block scope after value parse",
`
{
"debug": False
"server_name": "gcp-asianortheast3-natscj1-1"
# Profile port specification.
"prof_port": 8221
}
`,
map[string]any{
"debug": false,
"prof_port": int64(8221),
"server_name": "gcp-asianortheast3-natscj1-1",
},
"", "",
},
} {
t.Run(test.name, func(t *testing.T) {
f, err := os.CreateTemp(t.TempDir(), "nats.conf-")
Expand Down
32 changes: 24 additions & 8 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2748,7 +2748,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
return
}

var sagap uint64
var sgap, floor uint64
var needSignal bool

switch o.cfg.AckPolicy {
Expand Down Expand Up @@ -2792,12 +2792,29 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
if o.maxp > 0 && len(o.pending) >= o.maxp {
needSignal = true
}
sagap = sseq - o.asflr
sgap = sseq - o.asflr
floor = sgap // start at same and set lower as we go.
o.adflr, o.asflr = dseq, sseq
for seq := sseq; seq > sseq-sagap && len(o.pending) > 0; seq-- {

remove := func(seq uint64) {
delete(o.pending, seq)
delete(o.rdc, seq)
o.removeFromRedeliverQueue(seq)
if seq < floor {
floor = seq
}
}
// Determine if smarter to walk all of pending vs the sequence range.
if sgap > uint64(len(o.pending)) {
for seq := range o.pending {
if seq <= sseq {
remove(seq)
}
}
} else {
for seq := sseq; seq > sseq-sgap && len(o.pending) > 0; seq-- {
remove(seq)
}
}
case AckNone:
// FIXME(dlc) - This is error but do we care?
Expand All @@ -2808,20 +2825,19 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
// Update underlying store.
o.updateAcks(dseq, sseq, reply)

clustered := o.node != nil

// In case retention changes for a stream, this ought to have been updated
// using the consumer lock to avoid a race.
retention := o.retention
clustered := o.node != nil
o.mu.Unlock()

// Let the owning stream know if we are interest or workqueue retention based.
// If this consumer is clustered this will be handled by processReplicatedAck
// after the ack has propagated.
if !clustered && mset != nil && retention != LimitsPolicy {
if sagap > 1 {
// FIXME(dlc) - This is very inefficient, will need to fix.
for seq := sseq; seq > sseq-sagap; seq-- {
if sgap > 1 {
// FIXME(dlc) - This can very inefficient, will need to fix.
for seq := sseq; seq >= floor; seq-- {
mset.ackMsg(o, seq)
}
} else {
Expand Down
26 changes: 17 additions & 9 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3430,25 +3430,21 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
lseq := seq + num - 1

mb.mu.Lock()
var needsRecord bool
// If we are empty update meta directly.
if mb.msgs == 0 {
atomic.StoreUint64(&mb.last.seq, lseq)
mb.last.ts = nowts
atomic.StoreUint64(&mb.first.seq, lseq+1)
mb.first.ts = nowts
} else {
needsRecord = true
for ; seq <= lseq; seq++ {
mb.dmap.Insert(seq)
}
}
mb.mu.Unlock()

// Write out our placeholder.
if needsRecord {
mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true)
}
mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true)

// Now update FS accounting.
// Update fs state.
Expand Down Expand Up @@ -8169,6 +8165,7 @@ func (fs *fileStore) deleteBlocks() DeleteBlocks {
}

// SyncDeleted will make sure this stream has same deleted state as dbs.
// This will only process deleted state within our current state.
func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
if len(dbs) == 0 {
return
Expand All @@ -8177,18 +8174,22 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
fs.mu.Lock()
defer fs.mu.Unlock()

lseq := fs.state.LastSeq
var needsCheck DeleteBlocks

fs.readLockAllMsgBlocks()
mdbs := fs.deleteBlocks()
for i, db := range dbs {
first, last, num := db.State()
// If the block is same as what we have we can skip.
if i < len(mdbs) {
first, last, num := db.State()
eFirst, eLast, eNum := mdbs[i].State()
if first == eFirst && last == eLast && num == eNum {
continue
}
} else if first > lseq {
// Skip blocks not applicable to our current state.
continue
}
// Need to insert these.
needsCheck = append(needsCheck, db)
Expand Down Expand Up @@ -8616,9 +8617,16 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
sgap := sseq - o.state.AckFloor.Stream
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
for seq := sseq; seq > sseq-sgap && len(o.state.Pending) > 0; seq-- {
delete(o.state.Pending, seq)
if len(o.state.Redelivered) > 0 {
if sgap > uint64(len(o.state.Pending)) {
for seq := range o.state.Pending {
if seq <= sseq {
delete(o.state.Pending, seq)
delete(o.state.Redelivered, seq)
}
}
} else {
for seq := sseq; seq > sseq-sgap && len(o.state.Pending) > 0; seq-- {
delete(o.state.Pending, seq)
delete(o.state.Redelivered, seq)
}
}
Expand Down
75 changes: 56 additions & 19 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2405,9 +2405,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// If we are interest based make sure to check consumers if interest retention policy.
// This is to make sure we process any outstanding acks from all consumers.
mset.checkInterestState()
// Make sure we create a new snapshot in case things have changed such that any existing
// snapshot may no longer be valid.
doSnapshot()
// If we became leader during this time and we need to send a snapshot to our
// followers, i.e. as a result of a scale-up from R1, do it now.
if sendSnapshot && isLeader && mset != nil && n != nil {
Expand Down Expand Up @@ -2941,6 +2938,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco

if err != nil {
if err == errLastSeqMismatch {

var state StreamState
mset.store.FastState(&state)

Expand All @@ -2952,6 +2950,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Retry
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts)
}
// FIXME(dlc) - We could just run a catchup with a request defining the span between what we expected
// and what we got.
}

// Only return in place if we are going to reset our stream or we are out of space, or we are closed.
Expand Down Expand Up @@ -3568,9 +3568,14 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
js.mu.Unlock()
}

var needsSetLeader bool
if !alreadyRunning && numReplicas > 1 {
if needsNode {
// Since we are scaling up we want to make sure our sync subject
// is registered before we start our raft node.
mset.mu.Lock()
mset.startClusterSubs()
mset.mu.Unlock()

js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{
"type": "stream",
"account": mset.accName(),
Expand Down Expand Up @@ -3602,16 +3607,13 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
rg.node = nil
js.mu.Unlock()
}
// Set the new stream assignment.
mset.setStreamAssignment(sa)

// Call update.
if err = mset.updateWithAdvisory(cfg, !recovering); err != nil {
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", cfg.Name, acc.Name, err)
}
// Set the new stream assignment.
mset.setStreamAssignment(sa)
// Make sure we are the leader now that we are R1.
if needsSetLeader {
mset.setLeader(true)
}
}

// If not found we must be expanding into this node since if we are here we know we are a member.
Expand Down Expand Up @@ -7582,7 +7584,8 @@ func (mset *stream) supportsBinarySnapshotLocked() bool {
// We know we support ourselves.
continue
}
if sir, ok := s.nodeToInfo.Load(p.ID); !ok || sir == nil || !sir.(nodeInfo).binarySnapshots {
// Since release 2.10.16 only deny if we know the other node does not support.
if sir, ok := s.nodeToInfo.Load(p.ID); ok && sir != nil && !sir.(nodeInfo).binarySnapshots {
return false
}
}
Expand Down Expand Up @@ -8681,7 +8684,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
return 0
}

nextBatchC := make(chan struct{}, 1)
nextBatchC := make(chan struct{}, 4)
nextBatchC <- struct{}{}
remoteQuitCh := make(chan struct{})

Expand All @@ -8706,19 +8709,18 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
// Kick ourselves and anyone else who might have stalled on global state.
select {
case nextBatchC <- struct{}{}:
// Reset our activity
notActive.Reset(activityInterval)
default:
}
// Reset our activity
notActive.Reset(activityInterval)
})
defer s.sysUnsubscribe(ackSub)
ackReplyT := strings.ReplaceAll(ackReply, ".*", ".%d")

// Grab our state.
var state StreamState
mset.mu.RLock()
// mset.store never changes after being set, don't need lock.
mset.store.FastState(&state)
mset.mu.RUnlock()

// Reset notion of first if this request wants sequences before our starting sequence
// and we would have nothing to send. If we have partial messages still need to send skips for those.
Expand Down Expand Up @@ -8756,7 +8758,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
// Wait til we can send at least 4k
const minBatchWait = int32(4 * 1024)
mw := time.NewTimer(minWait)
for done := false; !done; {
for done := maxOutMsgs-atomic.LoadInt32(&outm) > minBatchWait; !done; {
select {
case <-nextBatchC:
done = maxOutMsgs-atomic.LoadInt32(&outm) > minBatchWait
Expand Down Expand Up @@ -8811,9 +8813,33 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
dr.First, dr.Num = 0, 0
}

// See if we should use LoadNextMsg instead of walking sequence by sequence if we have an order magnitude more interior deletes.
// Only makes sense with delete range capabilities.
useLoadNext := drOk && (uint64(state.NumDeleted) > 10*state.Msgs)

var smv StoreMsg
for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbBelowMax(); seq++ {
sm, err := mset.store.LoadMsg(seq, &smv)
var sm *StoreMsg
var err error
// Is we should use load next do so here.
if useLoadNext {
var nseq uint64
sm, nseq, err = mset.store.LoadNextMsg(fwcs, true, seq, &smv)
if err == nil && nseq > seq {
dr.First, dr.Num = seq, nseq-seq
// Jump ahead
seq = nseq
} else if err == ErrStoreEOF {
dr.First, dr.Num = seq, state.LastSeq-seq
// Clear EOF here for normal processing.
err = nil
// Jump ahead
seq = state.LastSeq
}
} else {
sm, err = mset.store.LoadMsg(seq, &smv)
}

// if this is not a deleted msg, bail out.
if err != nil && err != ErrStoreMsgNotFound && err != errDeletedMsg {
if err == ErrStoreEOF {
Expand All @@ -8829,6 +8855,10 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
if n := mset.raftNode(); n != nil {
n.InstallSnapshot(mset.stateSnapshot())
}
// If we allow gap markers check if we have one pending.
if drOk && dr.First > 0 {
sendDR()
}
// Signal EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
Expand Down Expand Up @@ -8875,6 +8905,9 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
}
// Recheck our exit condition.
if seq == last {
if drOk && dr.First > 0 {
sendDR()
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
Expand All @@ -8890,7 +8923,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
if drOk && dr.First > 0 {
sendDR()
}

return true
}

Expand Down Expand Up @@ -8930,6 +8962,11 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
mset.clearCatchupPeer(sreq.Peer)
return
}
case <-time.After(500 * time.Millisecond):
if !sendNextBatchAndContinue(qch) {
mset.clearCatchupPeer(sreq.Peer)
return
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import (
func init() {
// Speed up raft for tests.
hbInterval = 50 * time.Millisecond
minElectionTimeout = 750 * time.Millisecond
maxElectionTimeout = 2500 * time.Millisecond
lostQuorumInterval = 1 * time.Second
minElectionTimeout = 1500 * time.Millisecond
maxElectionTimeout = 3500 * time.Millisecond
lostQuorumInterval = 2 * time.Second
lostQuorumCheck = 4 * hbInterval
}

Expand Down
Loading

0 comments on commit 314f148

Please sign in to comment.