Skip to content

Commit

Permalink
Merge #40505
Browse files Browse the repository at this point in the history
40505: storage: prevent reproposals of applied commands r=nvanbenschoten a=ajwerner

TestHighestMaxLeaseIndexReproposalFinishesCommand rotted when #39425 was
merged. Prior to that change there was an invariant that if a command was
reproposed at a higher MaxLeaseIndex then the client would only be acknowledged
by a command which applied at that higher MaxLeaseIndex. That change also
worked to simplify context lifecycle management for replicatedCmd's by creating
individual child spans for each application. This was not a complete solution
however because all of the commands derived from the same proposal share a
context when used for reproposals. As a consequence, commands which are
reproposed and are at a higher MaxLeaseIndex than an already applied command
would use a context which carried a tracing span which might already be
finished.

Several approaches were explored to fix this issue. The one chosen here seems
to be the least invasive.

The rotten test has been simplified to cover the now problematic case. The
enabling mechanism for the testing is a hammer of a TestingKnob which will
always refresh unconditionally all pending proposals in the proposals map
at the end of a raft ready iteration. The new test fails reliably under
stress in ~10s of iterations and <5s before making the change to delete
proposals after they've been applied.

An alternative approach would have been to partially revert #39425 and
ensure that only commands which carry the highest MaxLeaseIndex for a proposal
may be locally applied. If it's deemed cleaner and simpler then we can go with
it. This approach prevents some reproposals and allows clients of commands
which will fail due to non-equivalent lease changes to be acknowledged sooner
of their need to retry.

Fixes #40478

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Sep 5, 2019
2 parents e283f27 + 0e81674 commit 9ad3ce1
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func clearTrivialReplicatedEvalResultFields(r *storagepb.ReplicatedEvalResult) {
// engine but before its side-effects have been applied to the Replica's
// in-memory state. This method gives the command an opportunity to interact
// with testing knobs and to set up its local result if it was proposed
// locally. This is performed prior to handling the command's
// locally. This is performed prior to handling the command's
// ReplicatedEvalResult because the process of handling the replicated eval
// result will zero-out the struct to ensure that is has properly performed all
// of the implied side-effects.
Expand Down
39 changes: 27 additions & 12 deletions pkg/storage/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,19 +884,34 @@ func (sm *replicaStateMachine) ApplySideEffects(
}

// Mark the command as applied and return it as an apply.AppliedCommand.
// NB: Commands which were reproposed at a higher MaxLeaseIndex will not be
// considered local at this point as their proposal will have been detached
// in prepareLocalResult().
if cmd.IsLocal() {
if !cmd.Rejected() {
if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index")
}
if cmd.proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd)
}
rejected := cmd.Rejected()
higherReproposalsExist := cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex
if !rejected && higherReproposalsExist {
log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index")
}
if !rejected && cmd.proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd)
}
// If any reproposals at a higher MaxLeaseIndex exist we know that they will
// never successfully apply, remove them from the map to avoid future
// reproposals. If there is no command referencing this proposal at a higher
// MaxLeaseIndex then it will already have been removed (see
// shouldRemove in replicaDecoder.retrieveLocalProposals()). It is possible
// that a later command in this batch referred to this proposal but it must
// have failed because it carried the same MaxLeaseIndex.
if higherReproposalsExist {
sm.r.mu.Lock()
delete(sm.r.mu.proposals, cmd.idKey)
sm.r.mu.Unlock()
}
cmd.proposal.applied = true
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}
applicationElapsed := timeutil.Since(applicationStart).Nanoseconds()
r.store.metrics.RaftApplyCommittedLatency.RecordValue(applicationElapsed)

if r.store.TestingKnobs().EnableUnconditionalRefreshesInRaftReady {
refreshReason = reasonNewLeaderOrConfigChange
}
if refreshReason != noReason {
r.mu.Lock()
r.refreshProposalsLocked(0, refreshReason)
Expand Down
71 changes: 40 additions & 31 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11568,16 +11568,35 @@ func TestSplitSnapshotWarningStr(t *testing.T) {
)
}

// TestHighestMaxLeaseIndexReproposalFinishesCommand exercises a case where a
// command is reproposed twice at different MaxLeaseIndex values to ultimately
// fail with an error which cannot be reproposed (say due to a lease transfer
// or change to the gc threshold). This test works to exercise the invariant
// that when a proposal has been reproposed at different MaxLeaseIndex value
// the client is ultimately acknowledged with an error from a reproposal with
// the largest index. The test verfies this condition by asserting that the
// TestProposalNotAcknowledgedOrReproposedAfterApplication exercises a case
// where a command is reproposed twice at different MaxLeaseIndex values to
// ultimately fail with an error which cannot be reproposed (say due to a lease
// transfer or change to the gc threshold). This test works to exercise the
// invariant that when a proposal has been reproposed at different MaxLeaseIndex
// values are not additionally reproposed or acknowledged after applying
// locally. The test verfies this condition by asserting that the
// span used to trace the execution of the proposal is not used after the
// proposal has been finished.
func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) {
// proposal has been finished as it would be if the proposal were reproposed
// after applying locally.
//
// The test does the following things:
//
// * Propose cmd at an initial MaxLeaseIndex.
// * Refresh that cmd immediately.
// * Fail the initial command with an injected error which will lead to a
// reproposal at a higher MaxLeaseIndex.
// * Simultaneously update the lease sequence number on the replica so all
// future commands will fail with NotLeaseHolderError.
// * Enable unconditional refreshes of commands after a raft ready so that
// higher MaxLeaseIndex commands are refreshed.
//
// This order of events ensures that there will be a committed command which
// experiences the lease mismatch error but does not carry the highest
// MaxLeaseIndex for the proposal. The test attempts to verify that once a
// proposal has been acknowledged it will not be reproposed or acknowledged
// again by asserting that the proposal's context is not reused after it is
// finished by the waiting client.
func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) {
defer leaktest.AfterTest(t)()

// Set the trace infrastructure to log if a span is used after being finished.
Expand Down Expand Up @@ -11608,12 +11627,12 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) {
// In the TestingProposalFilter we populater cmdID with the id of the proposal
// which corresponds to txnID.
var cmdID storagebase.CmdIDKey
// After we evalAndPropose the command we populate prop with the ProposalData
// value to enable reproposing the same command more than once.
var prop *ProposalData
// seen is used to detect the first application of our proposal.
var seen bool
cfg.TestingKnobs = StoreTestingKnobs{
// Constant reproposals are the worst case which this test is trying to
// examine.
EnableUnconditionalRefreshesInRaftReady: true,
// Set the TestingProposalFilter in order to know the CmdIDKey for our
// request by detecting its txnID.
TestingProposalFilter: func(args storagebase.ProposalFilterArgs) *roachpb.Error {
Expand All @@ -11629,30 +11648,21 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) {
return 0, nil
}
seen = true
// Repropose on a separate location to not mess with the
// goldenProtosBelowRaft checks.
reproposed := make(chan struct{})
go func() {
if _, pErr := tc.repl.propose(prop.ctx, prop); pErr != nil {
panic(pErr)
}
close(reproposed)
}()
<-reproposed
tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()
// Flush the proposalBuf to ensure that the reproposal makes it into the
// Replica's proposal map.
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
panic(err)
}

// Increase the lease sequence so that future reproposals will fail with
// NotLeaseHolderError. This mimics the outcome of a leaseholder change
// slipping in between the application of the first proposal and the
// reproposals.
tc.repl.mu.state.Lease.Sequence++
// This return value will force another retry which will carry a yet
// higher MaxLeaseIndex and will trigger our invariant violation.
// higher MaxLeaseIndex. The first reproposal will fail and return to the
// client but the second (which hasn't been applied due to the
// MaxCommittedSizePerReady setting) will be reproposed again. This test
// ensure that it does not reuse the original proposal's context for that
// reproposal by ensuring that no event is recorded after the original
// proposal has been finished.
return int(proposalIllegalLeaseIndex),
roachpb.NewErrorf("forced error that can be reproposed at a higher index")
},
Expand All @@ -11678,8 +11688,6 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) {

// Hold the RaftLock to ensure that after evalAndPropose our proposal is in
// the proposal map. Entries are only removed from that map underneath raft.
// We want to grab the proposal so that we can shove in an extra reproposal
// while the first proposal is being applied.
tc.repl.RaftLock()
tracedCtx, cleanup := tracing.EnsureContext(ctx, cfg.AmbientCtx.Tracer, "replica send")
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, lease, &ba, &allSpans, endCmds{})
Expand All @@ -11693,14 +11701,15 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) {
errCh <- res.Err
}()

// While still holding the raftMu, repropose the initial proposal so we know
// that there will be two instances
func() {
tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
t.Fatal(err)
}
tc.repl.refreshProposalsLocked(0, reasonNewLeaderOrConfigChange)
prop = tc.repl.mu.proposals[cmdID]
}()
tc.repl.RaftUnlock()

Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ type StoreTestingKnobs struct {
// TraceAllRaftEvents enables raft event tracing even when the current
// vmodule would not have enabled it.
TraceAllRaftEvents bool
// EnableUnconditionalRefreshesInRaftReady will always set the refresh reason
// in handleRaftReady to refreshReasonNewLeaderOrConfigChange.
EnableUnconditionalRefreshesInRaftReady bool

// ReceiveSnapshot is run after receiving a snapshot header but before
// acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an
Expand Down

0 comments on commit 9ad3ce1

Please sign in to comment.