From c8d1cbbf57d4071f87dfbd19ac309534a8ec98f9 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 7 Aug 2019 15:15:38 -0400 Subject: [PATCH 1/2] storage: make ProposalData.finishApplication safe to call multiple times This allows us to remove some messy code around unsetting a local proposal in replicaStateMachine.ApplySideEffects. Release note: None --- pkg/storage/replica.go | 2 ++ .../replica_application_state_machine.go | 24 +++++++++---------- pkg/storage/replica_proposal.go | 6 +++++ 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 6e9ebf6dc1eb..499b6afcdaf4 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -1075,11 +1075,13 @@ func (ec *endCmds) move() endCmds { // the timestamp cache using the final timestamp of each command. // // No-op if the receiver has been zeroed out by a call to move. +// Idempotent and is safe to call more than once. func (ec *endCmds) done(ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error) { if ec.repl == nil { // The endCmds were cleared. return } + defer ec.move() // clear // Update the timestamp cache if the request is not being re-evaluated. Each // request is considered in turn; only those marked as affecting the cache are diff --git a/pkg/storage/replica_application_state_machine.go b/pkg/storage/replica_application_state_machine.go index 3d98db170c76..fc644d74320a 100644 --- a/pkg/storage/replica_application_state_machine.go +++ b/pkg/storage/replica_application_state_machine.go @@ -839,22 +839,20 @@ func (sm *replicaStateMachine) ApplySideEffects( // Mark the command as applied and return it as an apply.AppliedCommand. if cmd.IsLocal() { - if !cmd.Rejected() && cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { - log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index") - } - if cmd.proposal.applied { - // If the command already applied then we shouldn't be "finishing" its - // application again because it should only be able to apply successfully - // once. We expect that when any reproposal for the same command attempts - // to apply it will be rejected by the below raft lease sequence or lease - // index check in checkForcedErr. - if !cmd.Rejected() { + if !cmd.Rejected() { + if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { + log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index") + } + if cmd.proposal.applied { + // If the command already applied then we shouldn't be "finishing" its + // application again because it should only be able to apply successfully + // once. We expect that when any reproposal for the same command attempts + // to apply it will be rejected by the below raft lease sequence or lease + // index check in checkForcedErr. log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd) } - cmd.proposal = nil - } else { - cmd.proposal.applied = true } + cmd.proposal.applied = true } return cmd, nil } diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 75b5bb4032f2..a63dea6704b0 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -126,6 +126,9 @@ type ProposalData struct { // order to allow the original client to be canceled. (When the original client // is canceled, it won't be listening to this done channel, and so it can't be // counted on to invoke endCmds itself.) +// +// The method is safe to call more than once, but only the first result will be +// returned to the client. func (proposal *ProposalData) finishApplication(pr proposalResult) { proposal.ec.done(proposal.Request, pr.Reply, pr.Err) proposal.signalProposalResult(pr) @@ -139,6 +142,9 @@ func (proposal *ProposalData) finishApplication(pr proposalResult) { // has not already been signaled. The method can be called even before the // proposal has finished replication and command application, and does not // release the request's latches. +// +// The method is safe to call more than once, but only the first result will be +// returned to the client. func (proposal *ProposalData) signalProposalResult(pr proposalResult) { if proposal.doneCh != nil { proposal.doneCh <- pr From de8b047b32ea6e42108453faa0f598a5f5f05213 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 7 Aug 2019 15:19:23 -0400 Subject: [PATCH 2/2] storage: create new tracing span for each replicatedCmd during application This change adjusts our handling of contexts and replicatedCmds. We no longer assign a local proposal's context to its replicatedCmds directly. Instead, we always create a new tracing span that follows from this context. This eliminates a whole class of bugs that we have fought to fix in changes like #39203. In fact, we are still seeing issues after the recent refactor when stressing #39390. The change also paves the way for tracing the application of command application on follower replicas. Release note: None --- pkg/storage/replica_application_cmd.go | 16 ++++--- pkg/storage/replica_application_decoder.go | 49 +++++++++++++--------- 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/pkg/storage/replica_application_cmd.go b/pkg/storage/replica_application_cmd.go index 508a1404946b..5524bcc0c47a 100644 --- a/pkg/storage/replica_application_cmd.go +++ b/pkg/storage/replica_application_cmd.go @@ -19,6 +19,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + opentracing "github.com/opentracing/opentracing-go" "go.etcd.io/etcd/raft/raftpb" ) @@ -48,9 +50,13 @@ type replicatedCmd struct { // proposal is populated on the proposing Replica only and comes from the // Replica's proposal map. proposal *ProposalData - // ctx will be the proposal's context if proposed locally, otherwise it will - // be populated with the handleCommittedEntries ctx. + + // ctx is a context that follows from the proposal's context if it was + // proposed locally. Otherwise, it will follow from the context passed to + // ApplyCommittedEntries. sp is the corresponding tracing span, which is + // closed in FinishAndAckOutcome. ctx context.Context + sp opentracing.Span // The following fields are set in shouldApplyCommand when we validate that // a command applies given the current lease and GC threshold. The process @@ -112,10 +118,10 @@ func (c *replicatedCmd) Rejected() bool { // FinishAndAckOutcome implements the apply.AppliedCommand interface. func (c *replicatedCmd) FinishAndAckOutcome() error { - if !c.IsLocal() { - return nil + tracing.FinishSpan(c.sp) + if c.IsLocal() { + c.proposal.finishApplication(c.response) } - c.proposal.finishApplication(c.response) return nil } diff --git a/pkg/storage/replica_application_decoder.go b/pkg/storage/replica_application_decoder.go index 92ad6358cae6..966db382e98f 100644 --- a/pkg/storage/replica_application_decoder.go +++ b/pkg/storage/replica_application_decoder.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/storage/apply" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "go.etcd.io/etcd/raft/raftpb" ) @@ -52,7 +53,9 @@ func (d *replicaDecoder) DecodeAndBind(ctx context.Context, ents []raftpb.Entry) if err := d.decode(ctx, ents); err != nil { return false, err } - return d.retrieveLocalProposals(ctx), nil + anyLocal := d.retrieveLocalProposals(ctx) + d.createTracingSpans(ctx) + return anyLocal, nil } // decode decodes the provided entries into the decoder. @@ -81,23 +84,7 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b for it.init(&d.cmdBuf); it.Valid(); it.Next() { cmd := it.cur() cmd.proposal = d.r.mu.proposals[cmd.idKey] - if cmd.IsLocal() && cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { - // If this entry does not have the most up-to-date view of the - // corresponding proposal's maximum lease index then the proposal - // must have been reproposed with a higher lease index. (see - // tryReproposeWithNewLeaseIndex). In that case, there's a newer - // version of the proposal in the pipeline, so don't consider this - // entry to have been proposed locally. The entry must necessarily be - // rejected by checkForcedErr. - cmd.proposal = nil - } - if cmd.IsLocal() { - // We initiated this command, so use the caller-supplied context. - cmd.ctx = cmd.proposal.ctx - anyLocal = true - } else { - cmd.ctx = ctx - } + anyLocal = anyLocal || cmd.IsLocal() } if !anyLocal && d.r.mu.proposalQuota == nil { // Fast-path. @@ -106,7 +93,16 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b for it.init(&d.cmdBuf); it.Valid(); it.Next() { cmd := it.cur() toRelease := int64(0) - if cmd.IsLocal() { + shouldRemove := cmd.IsLocal() && + // If this entry does not have the most up-to-date view of the + // corresponding proposal's maximum lease index then the proposal + // must have been reproposed with a higher lease index. (see + // tryReproposeWithNewLeaseIndex). In that case, there's a newer + // version of the proposal in the pipeline, so don't remove the + // proposal from the map. We expect this entry to be rejected by + // checkForcedErr. + cmd.raftCmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex + if shouldRemove { // Delete the proposal from the proposals map. There may be reproposals // of the proposal in the pipeline, but those will all have the same max // lease index, meaning that they will all be rejected after this entry @@ -131,6 +127,21 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b return anyLocal } +// createTracingSpans creates and assigns a new tracing span for each decoded +// command. If a command was proposed locally, it will be given a tracing span +// that follows from its proposal's span. +func (d *replicaDecoder) createTracingSpans(ctx context.Context) { + var it replicatedCmdBufSlice + for it.init(&d.cmdBuf); it.Valid(); it.Next() { + cmd := it.cur() + parentCtx := ctx + if cmd.IsLocal() { + parentCtx = cmd.proposal.ctx + } + cmd.ctx, cmd.sp = tracing.ForkCtxSpan(parentCtx, "raft application") + } +} + // NewCommandIter implements the apply.Decoder interface. func (d *replicaDecoder) NewCommandIter() apply.CommandIterator { it := d.cmdBuf.newIter()