Skip to content

Commit

Permalink
De-flake TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen committed Jan 28, 2025
1 parent 2086e62 commit cb046e5
Showing 1 changed file with 64 additions and 60 deletions.
124 changes: 64 additions & 60 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4276,87 +4276,91 @@ func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T)
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

checkConsistency := func() {
t.Helper()
checkFor(t, 3*time.Second, 250*time.Millisecond, func() error {
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
if err != nil {
return err
}
mset, err := acc.lookupStream("TEST")
if err != nil {
return err
}
state := mset.state()
if state.Msgs != 3 || state.Bytes != 99 {
return fmt.Errorf("stream state didn't match, got %d messages with %d bytes", state.Msgs, state.Bytes)
}
}
return nil
})
}

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>"},
Subjects: []string{"foo"},
Replicas: 3,
})
nc.Close()
require_NoError(t, err)

// Pick one server that will only store a part of the messages in its WAL.
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
ts := time.Now().UnixNano()

// Manually add 3 append entries to each node's WAL, except for one node who is one behind.
var scratch [1024]byte
for _, s := range c.servers {
for _, n := range s.raftNodes {
rn := n.(*raft)
if rn.accName == globalAccountName {
for i := uint64(0); i < 3; i++ {
// One server will be one behind and need to catchup.
if s.Name() == rs.Name() && i >= 2 {
break
}
sl := c.streamLeader(globalAccountName, "TEST")
require_NoError(t, err)
acc, err := sl.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
rn := mset.raftNode().(*raft)
leaderId := rn.ID()

esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, i, ts, true, false)
entries := []*Entry{newEntry(EntryNormal, esm)}
rn.Lock()
ae := rn.buildAppendEntry(entries)
ae.buf, err = ae.encode(scratch[:])
require_NoError(t, err)
err = rn.storeToWAL(ae)
rn.Unlock()
require_NoError(t, err)
}
}
}
for i := 0; i < 3; i++ {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}
nc.Close()
checkConsistency()

// Restart all.
c.stopAll()
c.restartAll()
c.waitOnAllCurrent()
c.waitOnStreamLeader(globalAccountName, "TEST")
// Pick one server that will only store a part of the messages in its WAL.
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
acc, err = rs.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err = acc.lookupStream("TEST")
require_NoError(t, err)
rn = mset.raftNode().(*raft)
index, commit, _ := rn.Progress()
require_Equal(t, index, 4)
require_Equal(t, index, commit)

rs = c.serverByName(rs.Name())
// We'll simulate as-if the last message was never received/stored.
// Will need to truncate the stream, correct lseq (so the msg isn't skipped) and truncate the WAL.
// This will simulate that the RAFT layer can restore it.
mset.mu.Lock()
mset.lseq--
err = mset.store.Truncate(2)
mset.mu.Unlock()
require_NoError(t, err)
rn.Lock()
rn.truncateWAL(rn.pterm, rn.pindex-1)
rn.Unlock()

// Check all servers ended up with all published messages, which had quorum.
checkFor(t, 3*time.Second, 250*time.Millisecond, func() error {
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
if err != nil {
return err
}
mset, err := acc.lookupStream("TEST")
if err != nil {
return err
}
state := mset.state()
if state.Msgs != 3 || state.Bytes != 99 {
return fmt.Errorf("stream state didn't match, got %d messages with %d bytes", state.Msgs, state.Bytes)
}
}
return nil
})
checkConsistency()

// Check that the first two published messages came from our WAL, and
// the last came from a catchup by another leader.
// Check that all entries came from the expected leader.
for _, n := range rs.raftNodes {
rn := n.(*raft)
if rn.accName == globalAccountName {
ae, err := rn.loadEntry(2)
ae, err := rn.loadEntry(1)
require_NoError(t, err)
require_True(t, ae.leader == rn.ID())
require_Equal(t, ae.leader, leaderId)

ae, err = rn.loadEntry(3)
ae, err = rn.loadEntry(2)
require_NoError(t, err)
require_True(t, ae.leader == rn.ID())
require_Equal(t, ae.leader, leaderId)

ae, err = rn.loadEntry(4)
ae, err = rn.loadEntry(3)
require_NoError(t, err)
require_True(t, ae.leader != rn.ID())
require_Equal(t, ae.leader, leaderId)
}
}
}
Expand Down

0 comments on commit cb046e5

Please sign in to comment.