Skip to content

Commit

Permalink
storage: create new tracing span for each replicatedCmd during applic…
Browse files Browse the repository at this point in the history
…ation

This change adjusts our handling of contexts and replicatedCmds. We no longer
assign a local proposal's context to its replicatedCmds directly. Instead,
we always create a new tracing span that follows from this context. This
eliminates a whole class of bugs that we have fought to fix in changes
like #39203. In fact, we are still seeing issues after the recent refactor
when stressing #39390.

The change also paves the way for tracing the application of command application
on follower replicas.

Release note: None
  • Loading branch information
nvanbenschoten committed Aug 7, 2019
1 parent c8d1cbb commit de8b047
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 24 deletions.
16 changes: 11 additions & 5 deletions pkg/storage/replica_application_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
opentracing "github.com/opentracing/opentracing-go"
"go.etcd.io/etcd/raft/raftpb"
)

Expand Down Expand Up @@ -48,9 +50,13 @@ type replicatedCmd struct {
// proposal is populated on the proposing Replica only and comes from the
// Replica's proposal map.
proposal *ProposalData
// ctx will be the proposal's context if proposed locally, otherwise it will
// be populated with the handleCommittedEntries ctx.

// ctx is a context that follows from the proposal's context if it was
// proposed locally. Otherwise, it will follow from the context passed to
// ApplyCommittedEntries. sp is the corresponding tracing span, which is
// closed in FinishAndAckOutcome.
ctx context.Context
sp opentracing.Span

// The following fields are set in shouldApplyCommand when we validate that
// a command applies given the current lease and GC threshold. The process
Expand Down Expand Up @@ -112,10 +118,10 @@ func (c *replicatedCmd) Rejected() bool {

// FinishAndAckOutcome implements the apply.AppliedCommand interface.
func (c *replicatedCmd) FinishAndAckOutcome() error {
if !c.IsLocal() {
return nil
tracing.FinishSpan(c.sp)
if c.IsLocal() {
c.proposal.finishApplication(c.response)
}
c.proposal.finishApplication(c.response)
return nil
}

Expand Down
49 changes: 30 additions & 19 deletions pkg/storage/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/storage/apply"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"go.etcd.io/etcd/raft/raftpb"
)

Expand Down Expand Up @@ -52,7 +53,9 @@ func (d *replicaDecoder) DecodeAndBind(ctx context.Context, ents []raftpb.Entry)
if err := d.decode(ctx, ents); err != nil {
return false, err
}
return d.retrieveLocalProposals(ctx), nil
anyLocal := d.retrieveLocalProposals(ctx)
d.createTracingSpans(ctx)
return anyLocal, nil
}

// decode decodes the provided entries into the decoder.
Expand Down Expand Up @@ -81,23 +84,7 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
cmd.proposal = d.r.mu.proposals[cmd.idKey]
if cmd.IsLocal() && cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
// If this entry does not have the most up-to-date view of the
// corresponding proposal's maximum lease index then the proposal
// must have been reproposed with a higher lease index. (see
// tryReproposeWithNewLeaseIndex). In that case, there's a newer
// version of the proposal in the pipeline, so don't consider this
// entry to have been proposed locally. The entry must necessarily be
// rejected by checkForcedErr.
cmd.proposal = nil
}
if cmd.IsLocal() {
// We initiated this command, so use the caller-supplied context.
cmd.ctx = cmd.proposal.ctx
anyLocal = true
} else {
cmd.ctx = ctx
}
anyLocal = anyLocal || cmd.IsLocal()
}
if !anyLocal && d.r.mu.proposalQuota == nil {
// Fast-path.
Expand All @@ -106,7 +93,16 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
toRelease := int64(0)
if cmd.IsLocal() {
shouldRemove := cmd.IsLocal() &&
// If this entry does not have the most up-to-date view of the
// corresponding proposal's maximum lease index then the proposal
// must have been reproposed with a higher lease index. (see
// tryReproposeWithNewLeaseIndex). In that case, there's a newer
// version of the proposal in the pipeline, so don't remove the
// proposal from the map. We expect this entry to be rejected by
// checkForcedErr.
cmd.raftCmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex
if shouldRemove {
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
// lease index, meaning that they will all be rejected after this entry
Expand All @@ -131,6 +127,21 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
return anyLocal
}

// createTracingSpans creates and assigns a new tracing span for each decoded
// command. If a command was proposed locally, it will be given a tracing span
// that follows from its proposal's span.
func (d *replicaDecoder) createTracingSpans(ctx context.Context) {
var it replicatedCmdBufSlice
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
parentCtx := ctx
if cmd.IsLocal() {
parentCtx = cmd.proposal.ctx
}
cmd.ctx, cmd.sp = tracing.ForkCtxSpan(parentCtx, "raft application")
}
}

// NewCommandIter implements the apply.Decoder interface.
func (d *replicaDecoder) NewCommandIter() apply.CommandIterator {
it := d.cmdBuf.newIter()
Expand Down

0 comments on commit de8b047

Please sign in to comment.