diff --git a/pkg/storage/replica_application_result.go b/pkg/storage/replica_application_result.go index dafa093e6813..aa3a0116a841 100644 --- a/pkg/storage/replica_application_result.go +++ b/pkg/storage/replica_application_result.go @@ -107,7 +107,7 @@ func clearTrivialReplicatedEvalResultFields(r *storagepb.ReplicatedEvalResult) { // engine but before its side-effects have been applied to the Replica's // in-memory state. This method gives the command an opportunity to interact // with testing knobs and to set up its local result if it was proposed -// locally. This is performed prior to handling the command's +// locally. This is performed prior to handling the command's // ReplicatedEvalResult because the process of handling the replicated eval // result will zero-out the struct to ensure that is has properly performed all // of the implied side-effects. diff --git a/pkg/storage/replica_application_state_machine.go b/pkg/storage/replica_application_state_machine.go index d7c78e376ed9..ac8432388c09 100644 --- a/pkg/storage/replica_application_state_machine.go +++ b/pkg/storage/replica_application_state_machine.go @@ -884,19 +884,34 @@ func (sm *replicaStateMachine) ApplySideEffects( } // Mark the command as applied and return it as an apply.AppliedCommand. + // NB: Commands which were reproposed at a higher MaxLeaseIndex will not be + // considered local at this point as their proposal will have been detached + // in prepareLocalResult(). if cmd.IsLocal() { - if !cmd.Rejected() { - if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { - log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index") - } - if cmd.proposal.applied { - // If the command already applied then we shouldn't be "finishing" its - // application again because it should only be able to apply successfully - // once. We expect that when any reproposal for the same command attempts - // to apply it will be rejected by the below raft lease sequence or lease - // index check in checkForcedErr. - log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd) - } + rejected := cmd.Rejected() + higherReproposalsExist := cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex + if !rejected && higherReproposalsExist { + log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index") + } + if !rejected && cmd.proposal.applied { + // If the command already applied then we shouldn't be "finishing" its + // application again because it should only be able to apply successfully + // once. We expect that when any reproposal for the same command attempts + // to apply it will be rejected by the below raft lease sequence or lease + // index check in checkForcedErr. + log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd) + } + // If any reproposals at a higher MaxLeaseIndex exist we know that they will + // never successfully apply, remove them from the map to avoid future + // reproposals. If there is no command referencing this proposal at a higher + // MaxLeaseIndex then it will already have been removed (see + // shouldRemove in replicaDecoder.retrieveLocalProposals()). It is possible + // that a later command in this batch referred to this proposal but it must + // have failed because it carried the same MaxLeaseIndex. + if higherReproposalsExist { + sm.r.mu.Lock() + delete(sm.r.mu.proposals, cmd.idKey) + sm.r.mu.Unlock() } cmd.proposal.applied = true } diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 264b17b965c0..52b577da569c 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -780,7 +780,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } applicationElapsed := timeutil.Since(applicationStart).Nanoseconds() r.store.metrics.RaftApplyCommittedLatency.RecordValue(applicationElapsed) - + if r.store.TestingKnobs().EnableUnconditionalRefreshesInRaftReady { + refreshReason = reasonNewLeaderOrConfigChange + } if refreshReason != noReason { r.mu.Lock() r.refreshProposalsLocked(0, refreshReason) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index f103ba04015d..ddf29aab62c0 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -11568,16 +11568,35 @@ func TestSplitSnapshotWarningStr(t *testing.T) { ) } -// TestHighestMaxLeaseIndexReproposalFinishesCommand exercises a case where a -// command is reproposed twice at different MaxLeaseIndex values to ultimately -// fail with an error which cannot be reproposed (say due to a lease transfer -// or change to the gc threshold). This test works to exercise the invariant -// that when a proposal has been reproposed at different MaxLeaseIndex value -// the client is ultimately acknowledged with an error from a reproposal with -// the largest index. The test verfies this condition by asserting that the +// TestProposalNotAcknowledgedOrReproposedAfterApplication exercises a case +// where a command is reproposed twice at different MaxLeaseIndex values to +// ultimately fail with an error which cannot be reproposed (say due to a lease +// transfer or change to the gc threshold). This test works to exercise the +// invariant that when a proposal has been reproposed at different MaxLeaseIndex +// values are not additionally reproposed or acknowledged after applying +// locally. The test verfies this condition by asserting that the // span used to trace the execution of the proposal is not used after the -// proposal has been finished. -func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) { +// proposal has been finished as it would be if the proposal were reproposed +// after applying locally. +// +// The test does the following things: +// +// * Propose cmd at an initial MaxLeaseIndex. +// * Refresh that cmd immediately. +// * Fail the initial command with an injected error which will lead to a +// reproposal at a higher MaxLeaseIndex. +// * Simultaneously update the lease sequence number on the replica so all +// future commands will fail with NotLeaseHolderError. +// * Enable unconditional refreshes of commands after a raft ready so that +// higher MaxLeaseIndex commands are refreshed. +// +// This order of events ensures that there will be a committed command which +// experiences the lease mismatch error but does not carry the highest +// MaxLeaseIndex for the proposal. The test attempts to verify that once a +// proposal has been acknowledged it will not be reproposed or acknowledged +// again by asserting that the proposal's context is not reused after it is +// finished by the waiting client. +func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) { defer leaktest.AfterTest(t)() // Set the trace infrastructure to log if a span is used after being finished. @@ -11608,12 +11627,12 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) { // In the TestingProposalFilter we populater cmdID with the id of the proposal // which corresponds to txnID. var cmdID storagebase.CmdIDKey - // After we evalAndPropose the command we populate prop with the ProposalData - // value to enable reproposing the same command more than once. - var prop *ProposalData // seen is used to detect the first application of our proposal. var seen bool cfg.TestingKnobs = StoreTestingKnobs{ + // Constant reproposals are the worst case which this test is trying to + // examine. + EnableUnconditionalRefreshesInRaftReady: true, // Set the TestingProposalFilter in order to know the CmdIDKey for our // request by detecting its txnID. TestingProposalFilter: func(args storagebase.ProposalFilterArgs) *roachpb.Error { @@ -11629,30 +11648,21 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) { return 0, nil } seen = true - // Repropose on a separate location to not mess with the - // goldenProtosBelowRaft checks. - reproposed := make(chan struct{}) - go func() { - if _, pErr := tc.repl.propose(prop.ctx, prop); pErr != nil { - panic(pErr) - } - close(reproposed) - }() - <-reproposed tc.repl.mu.Lock() defer tc.repl.mu.Unlock() - // Flush the proposalBuf to ensure that the reproposal makes it into the - // Replica's proposal map. - if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil { - panic(err) - } + // Increase the lease sequence so that future reproposals will fail with // NotLeaseHolderError. This mimics the outcome of a leaseholder change // slipping in between the application of the first proposal and the // reproposals. tc.repl.mu.state.Lease.Sequence++ // This return value will force another retry which will carry a yet - // higher MaxLeaseIndex and will trigger our invariant violation. + // higher MaxLeaseIndex. The first reproposal will fail and return to the + // client but the second (which hasn't been applied due to the + // MaxCommittedSizePerReady setting) will be reproposed again. This test + // ensure that it does not reuse the original proposal's context for that + // reproposal by ensuring that no event is recorded after the original + // proposal has been finished. return int(proposalIllegalLeaseIndex), roachpb.NewErrorf("forced error that can be reproposed at a higher index") }, @@ -11678,8 +11688,6 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) { // Hold the RaftLock to ensure that after evalAndPropose our proposal is in // the proposal map. Entries are only removed from that map underneath raft. - // We want to grab the proposal so that we can shove in an extra reproposal - // while the first proposal is being applied. tc.repl.RaftLock() tracedCtx, cleanup := tracing.EnsureContext(ctx, cfg.AmbientCtx.Tracer, "replica send") ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, lease, &ba, &allSpans, endCmds{}) @@ -11693,6 +11701,8 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) { errCh <- res.Err }() + // While still holding the raftMu, repropose the initial proposal so we know + // that there will be two instances func() { tc.repl.mu.Lock() defer tc.repl.mu.Unlock() @@ -11700,7 +11710,6 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) { t.Fatal(err) } tc.repl.refreshProposalsLocked(0, reasonNewLeaderOrConfigChange) - prop = tc.repl.mu.proposals[cmdID] }() tc.repl.RaftUnlock() diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index 8a8548d5bac5..3b1310c82dee 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -184,6 +184,9 @@ type StoreTestingKnobs struct { // TraceAllRaftEvents enables raft event tracing even when the current // vmodule would not have enabled it. TraceAllRaftEvents bool + // EnableUnconditionalRefreshesInRaftReady will always set the refresh reason + // in handleRaftReady to refreshReasonNewLeaderOrConfigChange. + EnableUnconditionalRefreshesInRaftReady bool // ReceiveSnapshot is run after receiving a snapshot header but before // acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an