Skip to content

Commit

Permalink
etcdserver: add trace for txn request (etcd-io#11491)
Browse files Browse the repository at this point in the history
* etcdserver: add trace for txn request

* pkg/traceutil: added StopSubTrace as a sign of the end of subtrace. Added test case for logging out subtrace.
  • Loading branch information
YoyinZyc authored Apr 4, 2020
1 parent 2092b5b commit c623f79
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 62 deletions.
104 changes: 68 additions & 36 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ type applierV3Internal interface {
type applierV3 interface {
Apply(r *pb.InternalRaftRequest) *applyResult

Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)

LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
Expand Down Expand Up @@ -142,11 +142,11 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
case r.Range != nil:
ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
case r.Put != nil:
ar.resp, ar.trace, ar.err = a.s.applyV3.Put(nil, r.Put)
ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put)
case r.DeleteRange != nil:
ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
case r.Txn != nil:
ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn)
case r.Compaction != nil:
ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
case r.LeaseGrant != nil:
Expand Down Expand Up @@ -201,14 +201,18 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
return ar
}

func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
resp = &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}
trace = traceutil.New("put",
a.s.getLogger(),
traceutil.Field{Key: "key", Value: string(p.Key)},
traceutil.Field{Key: "req_size", Value: proto.Size(p)},
)
trace = traceutil.Get(ctx)
// create put tracing if the trace in context is empty
if trace.IsEmpty() {
trace = traceutil.New("put",
a.s.getLogger(),
traceutil.Field{Key: "key", Value: string(p.Key)},
traceutil.Field{Key: "req_size", Value: proto.Size(p)},
)
}
val, leaseID := p.Value, lease.LeaseID(p.Lease)
if txn == nil {
if leaseID != lease.NoLease {
Expand All @@ -222,13 +226,13 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu

var rr *mvcc.RangeResult
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
trace.DisableStep()
rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
trace.StepWithFunction(func() {
rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{})
}, "get previous kv pair")

if err != nil {
return nil, nil, err
}
trace.EnableStep()
trace.Step("get previous kv pair")
}
if p.IgnoreValue || p.IgnoreLease {
if rr == nil || len(rr.KVs) == 0 {
Expand Down Expand Up @@ -378,22 +382,35 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra
return resp, nil
}

func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
trace := traceutil.Get(ctx)
if trace.IsEmpty() {
trace = traceutil.New("transaction", a.s.getLogger())
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
}
isWrite := !isTxnReadonly(rt)
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(traceutil.TODO()))
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace))

var txnPath []bool
trace.StepWithFunction(
func() {
txnPath = compareToPath(txn, rt)
},
"compare",
)

txnPath := compareToPath(txn, rt)
if isWrite {
trace.AddField(traceutil.Field{Key: "read_only", Value: false})
if _, err := checkRequests(txn, rt, txnPath, a.checkPut); err != nil {
txn.End()
return nil, err
return nil, nil, err
}
}
if _, err := checkRequests(txn, rt, txnPath, a.checkRange); err != nil {
txn.End()
return nil, err
return nil, nil, err
}

trace.Step("check requests")
txnResp, _ := newTxnResp(rt, txnPath)

// When executing mutable txn ops, etcd must hold the txn lock so
Expand All @@ -402,17 +419,21 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
// be the revision of the write txn.
if isWrite {
txn.End()
txn = a.s.KV().Write(traceutil.TODO())
txn = a.s.KV().Write(trace)
}
a.applyTxn(txn, rt, txnPath, txnResp)
a.applyTxn(ctx, txn, rt, txnPath, txnResp)
rev := txn.Rev()
if len(txn.Changes()) != 0 {
rev++
}
txn.End()

txnResp.Header.Revision = rev
return txnResp, nil
trace.AddField(
traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)},
traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision},
)
return txnResp, trace, nil
}

// newTxnResp allocates a txn response for a txn request given a path.
Expand Down Expand Up @@ -543,7 +564,8 @@ func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
return true
}

func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) {
trace := traceutil.Get(ctx)
reqs := rt.Success
if !txnPath[0] {
reqs = rt.Failure
Expand All @@ -554,17 +576,27 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
respi := tresp.Responses[i].Response
switch tv := req.Request.(type) {
case *pb.RequestOp_RequestRange:
resp, err := a.Range(context.TODO(), txn, tv.RequestRange)
trace.StartSubTrace(
traceutil.Field{Key: "req_type", Value: "range"},
traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)},
traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
resp, err := a.Range(ctx, txn, tv.RequestRange)
if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
}
respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
trace.StopSubTrace()
case *pb.RequestOp_RequestPut:
resp, _, err := a.Put(txn, tv.RequestPut)
trace.StartSubTrace(
traceutil.Field{Key: "req_type", Value: "put"},
traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)},
traceutil.Field{Key: "req_size", Value: proto.Size(tv.RequestPut)})
resp, _, err := a.Put(ctx, txn, tv.RequestPut)
if err != nil {
lg.Panic("unexpected error during txn", zap.Error(err))
}
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
trace.StopSubTrace()
case *pb.RequestOp_RequestDeleteRange:
resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
if err != nil {
Expand All @@ -573,7 +605,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat
respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
case *pb.RequestOp_RequestTxn:
resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
applyTxns := a.applyTxn(txn, tv.RequestTxn, txnPath[1:], resp)
applyTxns := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp)
txns += applyTxns + 1
txnPath = txnPath[applyTxns+1:]
default:
Expand Down Expand Up @@ -689,15 +721,15 @@ type applierV3Capped struct {
// with Puts so that the number of keys in the store is capped.
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }

func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
func (a *applierV3Capped) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
return nil, nil, ErrNoSpace
}

func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) {
func (a *applierV3Capped) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
if a.q.Cost(r) > 0 {
return nil, ErrNoSpace
return nil, nil, ErrNoSpace
}
return a.applierV3.Txn(r)
return a.applierV3.Txn(ctx, r)
}

func (a *applierV3Capped) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
Expand Down Expand Up @@ -859,22 +891,22 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
return &quotaApplierV3{app, NewBackendQuota(s, "v3-applier")}
}

func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
ok := a.q.Available(p)
resp, trace, err := a.applierV3.Put(txn, p)
resp, trace, err := a.applierV3.Put(ctx, txn, p)
if err == nil && !ok {
err = ErrNoSpace
}
return resp, trace, err
}

func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
func (a *quotaApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
ok := a.q.Available(rt)
resp, err := a.applierV3.Txn(rt)
resp, trace, err := a.applierV3.Txn(ctx, rt)
if err == nil && !ok {
err = ErrNoSpace
}
return resp, err
return resp, trace, err
}

func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
Expand Down
10 changes: 5 additions & 5 deletions etcdserver/apply_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult {
return ret
}

func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
func (aa *authApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil {
return nil, nil, err
}
Expand All @@ -82,7 +82,7 @@ func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutRespon
return nil, nil, err
}
}
return aa.applierV3.Put(txn, r)
return aa.applierV3.Put(ctx, txn, r)
}

func (aa *authApplierV3) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
Expand Down Expand Up @@ -161,11 +161,11 @@ func checkTxnAuth(as auth.AuthStore, ai *auth.AuthInfo, rt *pb.TxnRequest) error
return checkTxnReqsPermission(as, ai, rt.Failure)
}

func (aa *authApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
func (aa *authApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
if err := checkTxnAuth(aa.as, &aa.authInfo, rt); err != nil {
return nil, err
return nil, nil, err
}
return aa.applierV3.Txn(rt)
return aa.applierV3.Txn(ctx, rt)
}

func (aa *authApplierV3) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
Expand Down
6 changes: 3 additions & 3 deletions etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ type applierV3Corrupt struct {

func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }

func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
func (a *applierV3Corrupt) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
return nil, nil, ErrCorrupt
}

Expand All @@ -321,8 +321,8 @@ func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeReque
return nil, ErrCorrupt
}

func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
return nil, ErrCorrupt
func (a *applierV3Corrupt) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return nil, nil, ErrCorrupt
}

func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
Expand Down
10 changes: 9 additions & 1 deletion etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,14 @@ func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest)

func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
if isTxnReadonly(r) {
trace := traceutil.New("transaction",
s.getLogger(),
traceutil.Field{Key: "read_only", Value: true},
)
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
if !isTxnSerializable(r) {
err := s.linearizableReadNotify(ctx)
trace.Step("agreement among raft nodes before linearized reading")
if err != nil {
return nil, err
}
Expand All @@ -160,15 +166,17 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse

defer func(start time.Time) {
warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), start, r, resp, err)
trace.LogIfLong(traceThreshold)
}(time.Now())

get := func() { resp, err = s.applyV3Base.Txn(r) }
get := func() { resp, _, err = s.applyV3Base.Txn(ctx, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
}
return resp, err
}

ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit c623f79

Please sign in to comment.