Skip to content

Commit

Permalink
Merge #39425
Browse files Browse the repository at this point in the history
39425: storage: create new tracing span for each replicatedCmd during application r=nvanbenschoten a=nvanbenschoten

This change adjusts our handling of contexts and replicatedCmds. We no longer
assign a local proposal's context to its replicatedCmds directly. Instead,
we always create a new tracing span that follows from this context. This
eliminates a whole class of bugs that we have fought to fix in changes
like #39203. In fact, we are still seeing issues after the recent refactor
when stressing #39390.

The change also paves the way for tracing the application of command application
on follower replicas.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Aug 7, 2019

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
2 parents 974dc50 + de8b047 commit ae6de17
Showing 5 changed files with 60 additions and 37 deletions.
2 changes: 2 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
@@ -1075,11 +1075,13 @@ func (ec *endCmds) move() endCmds {
// the timestamp cache using the final timestamp of each command.
//
// No-op if the receiver has been zeroed out by a call to move.
// Idempotent and is safe to call more than once.
func (ec *endCmds) done(ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error) {
if ec.repl == nil {
// The endCmds were cleared.
return
}
defer ec.move() // clear

// Update the timestamp cache if the request is not being re-evaluated. Each
// request is considered in turn; only those marked as affecting the cache are
16 changes: 11 additions & 5 deletions pkg/storage/replica_application_cmd.go
Original file line number Diff line number Diff line change
@@ -19,6 +19,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
opentracing "github.com/opentracing/opentracing-go"
"go.etcd.io/etcd/raft/raftpb"
)

@@ -48,9 +50,13 @@ type replicatedCmd struct {
// proposal is populated on the proposing Replica only and comes from the
// Replica's proposal map.
proposal *ProposalData
// ctx will be the proposal's context if proposed locally, otherwise it will
// be populated with the handleCommittedEntries ctx.

// ctx is a context that follows from the proposal's context if it was
// proposed locally. Otherwise, it will follow from the context passed to
// ApplyCommittedEntries. sp is the corresponding tracing span, which is
// closed in FinishAndAckOutcome.
ctx context.Context
sp opentracing.Span

// The following fields are set in shouldApplyCommand when we validate that
// a command applies given the current lease and GC threshold. The process
@@ -112,10 +118,10 @@ func (c *replicatedCmd) Rejected() bool {

// FinishAndAckOutcome implements the apply.AppliedCommand interface.
func (c *replicatedCmd) FinishAndAckOutcome() error {
if !c.IsLocal() {
return nil
tracing.FinishSpan(c.sp)
if c.IsLocal() {
c.proposal.finishApplication(c.response)
}
c.proposal.finishApplication(c.response)
return nil
}

49 changes: 30 additions & 19 deletions pkg/storage/replica_application_decoder.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/storage/apply"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"go.etcd.io/etcd/raft/raftpb"
)

@@ -52,7 +53,9 @@ func (d *replicaDecoder) DecodeAndBind(ctx context.Context, ents []raftpb.Entry)
if err := d.decode(ctx, ents); err != nil {
return false, err
}
return d.retrieveLocalProposals(ctx), nil
anyLocal := d.retrieveLocalProposals(ctx)
d.createTracingSpans(ctx)
return anyLocal, nil
}

// decode decodes the provided entries into the decoder.
@@ -81,23 +84,7 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
cmd.proposal = d.r.mu.proposals[cmd.idKey]
if cmd.IsLocal() && cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
// If this entry does not have the most up-to-date view of the
// corresponding proposal's maximum lease index then the proposal
// must have been reproposed with a higher lease index. (see
// tryReproposeWithNewLeaseIndex). In that case, there's a newer
// version of the proposal in the pipeline, so don't consider this
// entry to have been proposed locally. The entry must necessarily be
// rejected by checkForcedErr.
cmd.proposal = nil
}
if cmd.IsLocal() {
// We initiated this command, so use the caller-supplied context.
cmd.ctx = cmd.proposal.ctx
anyLocal = true
} else {
cmd.ctx = ctx
}
anyLocal = anyLocal || cmd.IsLocal()
}
if !anyLocal && d.r.mu.proposalQuota == nil {
// Fast-path.
@@ -106,7 +93,16 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
toRelease := int64(0)
if cmd.IsLocal() {
shouldRemove := cmd.IsLocal() &&
// If this entry does not have the most up-to-date view of the
// corresponding proposal's maximum lease index then the proposal
// must have been reproposed with a higher lease index. (see
// tryReproposeWithNewLeaseIndex). In that case, there's a newer
// version of the proposal in the pipeline, so don't remove the
// proposal from the map. We expect this entry to be rejected by
// checkForcedErr.
cmd.raftCmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex
if shouldRemove {
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
// lease index, meaning that they will all be rejected after this entry
@@ -131,6 +127,21 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
return anyLocal
}

// createTracingSpans creates and assigns a new tracing span for each decoded
// command. If a command was proposed locally, it will be given a tracing span
// that follows from its proposal's span.
func (d *replicaDecoder) createTracingSpans(ctx context.Context) {
var it replicatedCmdBufSlice
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
parentCtx := ctx
if cmd.IsLocal() {
parentCtx = cmd.proposal.ctx
}
cmd.ctx, cmd.sp = tracing.ForkCtxSpan(parentCtx, "raft application")
}
}

// NewCommandIter implements the apply.Decoder interface.
func (d *replicaDecoder) NewCommandIter() apply.CommandIterator {
it := d.cmdBuf.newIter()
24 changes: 11 additions & 13 deletions pkg/storage/replica_application_state_machine.go
Original file line number Diff line number Diff line change
@@ -839,22 +839,20 @@ func (sm *replicaStateMachine) ApplySideEffects(

// Mark the command as applied and return it as an apply.AppliedCommand.
if cmd.IsLocal() {
if !cmd.Rejected() && cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index")
}
if cmd.proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
if !cmd.Rejected() {
if !cmd.Rejected() {
if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index")
}
if cmd.proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd)
}
cmd.proposal = nil
} else {
cmd.proposal.applied = true
}
cmd.proposal.applied = true
}
return cmd, nil
}
6 changes: 6 additions & 0 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
@@ -126,6 +126,9 @@ type ProposalData struct {
// order to allow the original client to be canceled. (When the original client
// is canceled, it won't be listening to this done channel, and so it can't be
// counted on to invoke endCmds itself.)
//
// The method is safe to call more than once, but only the first result will be
// returned to the client.
func (proposal *ProposalData) finishApplication(pr proposalResult) {
proposal.ec.done(proposal.Request, pr.Reply, pr.Err)
proposal.signalProposalResult(pr)
@@ -139,6 +142,9 @@ func (proposal *ProposalData) finishApplication(pr proposalResult) {
// has not already been signaled. The method can be called even before the
// proposal has finished replication and command application, and does not
// release the request's latches.
//
// The method is safe to call more than once, but only the first result will be
// returned to the client.
func (proposal *ProposalData) signalProposalResult(pr proposalResult) {
if proposal.doneCh != nil {
proposal.doneCh <- pr

0 comments on commit ae6de17

Please sign in to comment.