Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of detailed "took too long" warnings to release-3.2 #9821

3 changes: 3 additions & 0 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func (s *EtcdServer) newApplierV3() applierV3 {

func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
ar := &applyResult{}
defer func(start time.Time) {
warnOfExpensiveRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
}(time.Now())

// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
switch {
Expand Down
6 changes: 4 additions & 2 deletions etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,12 @@ func (a *applierV2store) Sync(r *pb.Request) Response {
return Response{}
}

// applyV2Request interprets r as a call to store.X and returns a Response interpreted
// from store.Event
// applyV2Request interprets r as a call to v2store.X
// and returns a Response interpreted from v2store.Event
func (s *EtcdServer) applyV2Request(r *pb.Request) Response {
defer warnOfExpensiveRequest(time.Now(), r, nil, nil)
toTTLOptions(r)

switch r.Method {
case "POST":
return s.applyV2.Post(r)
Expand Down
179 changes: 179 additions & 0 deletions etcdserver/etcdserverpb/raft_internal_stringer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdserverpb

import (
"fmt"
"strings"

proto "github.com/golang/protobuf/proto"
)

// InternalRaftStringer implements custom proto Stringer:
// redact password, replace value fields with value_size fields.
type InternalRaftStringer struct {
Request *InternalRaftRequest
}

func (as *InternalRaftStringer) String() string {
switch {
case as.Request.LeaseGrant != nil:
return fmt.Sprintf("header:<%s> lease_grant:<ttl:%d-second id:%016x>",
as.Request.Header.String(),
as.Request.LeaseGrant.TTL,
as.Request.LeaseGrant.ID,
)
case as.Request.LeaseRevoke != nil:
return fmt.Sprintf("header:<%s> lease_revoke:<id:%016x>",
as.Request.Header.String(),
as.Request.LeaseRevoke.ID,
)
case as.Request.Authenticate != nil:
return fmt.Sprintf("header:<%s> authenticate:<name:%s simple_token:%s>",
as.Request.Header.String(),
as.Request.Authenticate.Name,
as.Request.Authenticate.SimpleToken,
)
case as.Request.AuthUserAdd != nil:
return fmt.Sprintf("header:<%s> auth_user_add:<name:%s>",
as.Request.Header.String(),
as.Request.AuthUserAdd.Name,
)
case as.Request.AuthUserChangePassword != nil:
return fmt.Sprintf("header:<%s> auth_user_change_password:<name:%s>",
as.Request.Header.String(),
as.Request.AuthUserChangePassword.Name,
)
case as.Request.Put != nil:
return fmt.Sprintf("header:<%s> put:<%s>",
as.Request.Header.String(),
newLoggablePutRequest(as.Request.Put).String(),
)
case as.Request.Txn != nil:
return fmt.Sprintf("header:<%s> txn:<%s>",
as.Request.Header.String(),
NewLoggableTxnRequest(as.Request.Txn).String(),
)
default:
// nothing to redact
}
return as.Request.String()
}

// txnRequestStringer implements a custom proto String to replace value bytes fields with value size
// fields in any nested txn and put operations.
type txnRequestStringer struct {
Request *TxnRequest
}

func NewLoggableTxnRequest(request *TxnRequest) *txnRequestStringer {
return &txnRequestStringer{request}
}

func (as *txnRequestStringer) String() string {
var compare []string
for _, c := range as.Request.Compare {
switch cv := c.TargetUnion.(type) {
case *Compare_Value:
compare = append(compare, newLoggableValueCompare(c, cv).String())
default:
// nothing to redact
compare = append(compare, c.String())
}
}
var success []string
for _, s := range as.Request.Success {
success = append(success, newLoggableRequestOp(s).String())
}
var failure []string
for _, f := range as.Request.Failure {
failure = append(failure, newLoggableRequestOp(f).String())
}
return fmt.Sprintf("compare:<%s> success:<%s> failure:<%s>",
strings.Join(compare, " "),
strings.Join(success, " "),
strings.Join(failure, " "),
)
}

// requestOpStringer implements a custom proto String to replace value bytes fields with value
// size fields in any nested txn and put operations.
type requestOpStringer struct {
Op *RequestOp
}

func newLoggableRequestOp(op *RequestOp) *requestOpStringer {
return &requestOpStringer{op}
}

func (as *requestOpStringer) String() string {
switch op := as.Op.Request.(type) {
case *RequestOp_RequestPut:
return fmt.Sprintf("request_put:<%s>", newLoggablePutRequest(op.RequestPut).String())
default:
// nothing to redact
}
return as.Op.String()
}

// loggableValueCompare implements a custom proto String for Compare.Value union member types to
// replace the value bytes field with a value size field.
// To preserve proto encoding of the key and range_end bytes, a faked out proto type is used here.
type loggableValueCompare struct {
Result Compare_CompareResult `protobuf:"varint,1,opt,name=result,proto3,enum=etcdserverpb.Compare_CompareResult"`
Target Compare_CompareTarget `protobuf:"varint,2,opt,name=target,proto3,enum=etcdserverpb.Compare_CompareTarget"`
Key []byte `protobuf:"bytes,3,opt,name=key,proto3"`
ValueSize int `protobuf:"bytes,7,opt,name=value_size,proto3"`
}

func newLoggableValueCompare(c *Compare, cv *Compare_Value) *loggableValueCompare {
return &loggableValueCompare{
c.Result,
c.Target,
c.Key,
len(cv.Value),
}
}

func (m *loggableValueCompare) Reset() { *m = loggableValueCompare{} }
func (m *loggableValueCompare) String() string { return proto.CompactTextString(m) }
func (*loggableValueCompare) ProtoMessage() {}

// loggablePutRequest implements a custom proto String to replace value bytes field with a value
// size field.
// To preserve proto encoding of the key bytes, a faked out proto type is used here.
type loggablePutRequest struct {
Key []byte `protobuf:"bytes,1,opt,name=key,proto3"`
ValueSize int `protobuf:"varint,2,opt,name=value_size,proto3"`
Lease int64 `protobuf:"varint,3,opt,name=lease,proto3"`
PrevKv bool `protobuf:"varint,4,opt,name=prev_kv,proto3"`
IgnoreValue bool `protobuf:"varint,5,opt,name=ignore_value,proto3"`
IgnoreLease bool `protobuf:"varint,6,opt,name=ignore_lease,proto3"`
}

func newLoggablePutRequest(request *PutRequest) *loggablePutRequest {
return &loggablePutRequest{
request.Key,
len(request.Value),
request.Lease,
request.PrevKv,
request.IgnoreValue,
request.IgnoreLease,
}
}

func (m *loggablePutRequest) Reset() { *m = loggablePutRequest{} }
func (m *loggablePutRequest) String() string { return proto.CompactTextString(m) }
func (*loggablePutRequest) ProtoMessage() {}
8 changes: 1 addition & 7 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,14 +818,8 @@ func (s *EtcdServer) run() {

func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
s.applySnapshot(ep, apply)
st := time.Now()
s.applyEntries(ep, apply)
d := time.Since(st)
entriesNum := len(apply.entries)
if entriesNum != 0 && d > time.Duration(entriesNum)*warnApplyDuration {
plog.Warningf("apply entries took too long [%v for %d entries]", d, len(apply.entries))
plog.Warningf("avoid queries with large range/delete range!")
}

proposalsApplied.Set(float64(ep.appliedi))
s.applyWait.Trigger(ep.appliedi)
// wait for the raft routine to finish the disk writes before triggering a
Expand Down
57 changes: 57 additions & 0 deletions etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
package etcdserver

import (
"fmt"
"reflect"
"strings"
"time"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"
"github.com/golang/protobuf/proto"
)

// isConnectedToQuorumSince checks whether the local member is connected to the
Expand Down Expand Up @@ -95,3 +100,55 @@ func (nc *notifier) notify(err error) {
nc.err = err
close(nc.c)
}

func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
var resp string
if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
}
warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err)
}

func warnOfExpensiveReadOnlyTxnRequest(now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
reqStringer := pb.NewLoggableTxnRequest(r)
var resp string
if !isNil(txnResponse) {
var resps []string
for _, r := range txnResponse.Responses {
switch op := r.Response.(type) {
case *pb.ResponseOp_ResponseRange:
resps = append(resps, fmt.Sprintf("range_response_count:%d", len(op.ResponseRange.Kvs)))
default:
// only range responses should be in a read only txn request
}
}
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
}
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
}

func warnOfExpensiveReadOnlyRangeRequest(now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
var resp string
if !isNil(rangeResponse) {
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
}
warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
}

func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
// TODO: add metrics
d := time.Since(now)
if d > warnApplyDuration {
var result string
if err != nil {
result = fmt.Sprintf("error:%v", err)
} else {
result = resp
}
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
}
}

func isNil(msg proto.Message) bool {
return msg == nil || reflect.ValueOf(msg).IsNil()
}
24 changes: 17 additions & 7 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ import (
"encoding/binary"
"time"

"github.com/gogo/protobuf/proto"

"github.com/coreos/etcd/auth"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/raft"

"github.com/gogo/protobuf/proto"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -82,20 +80,26 @@ type Authenticator interface {
}

func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
var resp *pb.RangeResponse
var err error
defer func(start time.Time) {
warnOfExpensiveReadOnlyRangeRequest(start, r, resp, err)
}(time.Now())

if !r.Serializable {
err := s.linearizableReadNotify(ctx)
err = s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}
}
var resp *pb.RangeResponse
var err error
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}

get := func() { resp, err = s.applyV3Base.Range(nil, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
err = serr
return nil, err
}
return resp, err
}
Expand Down Expand Up @@ -129,12 +133,18 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
chk := func(ai *auth.AuthInfo) error {
return checkTxnAuth(s.authStore, ai, r)
}

defer func(start time.Time) {
warnOfExpensiveReadOnlyTxnRequest(start, r, resp, err)
}(time.Now())

get := func() { resp, err = s.applyV3Base.Txn(r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
}
return resp, err
}

resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
if err != nil {
return nil, err
Expand Down