diff --git a/distsql/distsql.go b/distsql/distsql.go index 862ed5ec1476c..5a1fca7f0e110 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -24,8 +24,11 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/trxevents" "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" ) // Select sends a DAG request, returns SelectResult. @@ -46,7 +49,17 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie kvReq.Streaming = false } enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction - resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction) + originalSQL := sctx.GetSessionVars().StmtCtx.OriginalSQL + eventCb := func(event trxevents.TransactionEvent) { + // Note: Do not assume this callback will be invoked within the same goroutine. + if copMeetLock := event.GetCopMeetLock(); copMeetLock != nil { + logutil.Logger(ctx).Debug("coprocessor encounters lock", + zap.Uint64("startTS", kvReq.StartTs), + zap.Stringer("lock", copMeetLock.LockInfo), + zap.String("stmt", originalSQL)) + } + } + resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction, eventCb) if resp == nil { err := errors.New("client returns nil response") return nil, err @@ -107,7 +120,7 @@ func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq // Analyze do a analyze request. func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.Variables, isRestrict bool, sessionMemTracker *memory.Tracker) (SelectResult, error) { - resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false) + resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false, nil) if resp == nil { return nil, errors.New("client returns nil response") } @@ -129,7 +142,7 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv. func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.Variables) (SelectResult, error) { // FIXME: As BR have dependency of `Checksum` and TiDB also introduced BR as dependency, Currently we can't edit // Checksum function signature. The two-way dependence should be removed in future. - resp := client.Send(ctx, kvReq, vars, nil, false) + resp := client.Send(ctx, kvReq, vars, nil, false, nil) if resp == nil { return nil, errors.New("client returns nil response") } diff --git a/kv/kv.go b/kv/kv.go index cdceb588bb268..ad9904a062c80 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/trxevents" ) // Transaction options @@ -256,7 +257,7 @@ type ReturnedValue struct { // Client is used to send request to KV layer. type Client interface { // Send sends request to KV layer, returns a Response. - Send(ctx context.Context, req *Request, vars *Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) Response + Send(ctx context.Context, req *Request, vars *Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) Response // IsRequestTypeSupported checks if reqType and subType is supported. IsRequestTypeSupported(reqType, subType int64) bool diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 2846159e89331..e1e820b3b63ca 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/trxevents" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" ) @@ -59,14 +60,14 @@ type CopClient struct { } // Send builds the request and gets the coprocessor iterator response. -func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool) kv.Response { +func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) kv.Response { if req.StoreType == kv.TiFlash && req.BatchCop { logutil.BgLogger().Debug("send batch requests") return c.sendBatch(ctx, req, vars) } ctx = context.WithValue(ctx, txnStartKey, req.StartTs) bo := NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) - tasks, err := buildCopTasks(bo, c.store.regionCache, &copRanges{mid: req.KeyRanges}, req) + tasks, err := buildCopTasks(bo, c.store.regionCache, &copRanges{mid: req.KeyRanges}, req, eventCb) if err != nil { return copErrorResponse{err} } @@ -125,6 +126,8 @@ type copTask struct { storeAddr string cmdType tikvrpc.CmdType storeType kv.StoreType + + eventCb trxevents.EventCallback } func (r *copTask) String() string { @@ -245,7 +248,7 @@ func (r *copRanges) split(key []byte) (*copRanges, *copRanges) { // rangesPerTask limits the length of the ranges slice sent in one copTask. const rangesPerTask = 25000 -func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv.Request) ([]*copTask, error) { +func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv.Request, eventCb trxevents.EventCallback) ([]*copTask, error) { start := time.Now() cmdType := tikvrpc.CmdCop if req.Streaming { @@ -272,6 +275,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv respChan: make(chan *copResponse, 2), cmdType: cmdType, storeType: req.StoreType, + eventCb: eventCb, }) i = nextI } @@ -1101,11 +1105,18 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon return nil, errors.Trace(err) } // We may meet RegionError at the first packet, but not during visiting the stream. - return buildCopTasks(bo, worker.store.regionCache, task.ranges, worker.req) + return buildCopTasks(bo, worker.store.regionCache, task.ranges, worker.req, task.eventCb) } if lockErr := resp.pbResp.GetLocked(); lockErr != nil { - logutil.BgLogger().Debug("coprocessor encounters", - zap.Stringer("lock", lockErr)) + // Be care that we didn't redact the SQL statement because the log is DEBUG level. + if task.eventCb != nil { + task.eventCb(trxevents.WrapCopMeetLock(&trxevents.CopMeetLock{ + LockInfo: lockErr, + })) + } else { + logutil.Logger(bo.ctx).Debug("coprocessor encounters lock", + zap.Stringer("lock", lockErr)) + } msBeforeExpired, err1 := worker.ResolveLocks(bo, worker.req.StartTs, []*Lock{NewLock(lockErr)}) if err1 != nil { return nil, errors.Trace(err1) @@ -1119,7 +1130,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon } if otherErr := resp.pbResp.GetOtherError(); otherErr != "" { err := errors.Errorf("other error: %s", otherErr) - logutil.BgLogger().Warn("other error", + logutil.Logger(bo.ctx).Warn("other error", zap.Uint64("txnStartTS", worker.req.StartTs), zap.Uint64("regionID", task.region.id), zap.String("storeAddr", task.storeAddr), @@ -1248,7 +1259,7 @@ func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRang if worker.req.Streaming && lastRange != nil { remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc) } - return buildCopTasks(bo, worker.store.regionCache, remainedRanges, worker.req) + return buildCopTasks(bo, worker.store.regionCache, remainedRanges, worker.req, task.eventCb) } // calculateRemain splits the input ranges into two, and take one of them according to desc flag. diff --git a/store/tikv/coprocessor_test.go b/store/tikv/coprocessor_test.go index f82b605fb5e96..c1605627b564c 100644 --- a/store/tikv/coprocessor_test.go +++ b/store/tikv/coprocessor_test.go @@ -42,49 +42,49 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) { req := &kv.Request{} flashReq := &kv.Request{} flashReq.StoreType = kv.TiFlash - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req) + tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "m", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[1], "m", "n") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") s.taskEqual(c, tasks[1], regionIDs[1], "g", "k") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") s.taskEqual(c, tasks[1], regionIDs[1], "g", "k") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 4) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") @@ -92,7 +92,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) { s.taskEqual(c, tasks[2], regionIDs[2], "n", "t") s.taskEqual(c, tasks[3], regionIDs[3], "t", "x") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 4) s.taskEqual(c, tasks[0], regionIDs[0], "a", "g") @@ -100,45 +100,45 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) { s.taskEqual(c, tasks[2], regionIDs[2], "n", "t") s.taskEqual(c, tasks[3], regionIDs[3], "t", "x") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 1) s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") s.taskEqual(c, tasks[1], regionIDs[2], "o", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "g", "n") s.taskEqual(c, tasks[1], regionIDs[2], "o", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n") s.taskEqual(c, tasks[1], regionIDs[2], "n", "p") - tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n") @@ -209,7 +209,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { bo := NewBackofferWithVars(context.Background(), 3000, nil) req := &kv.Request{} - tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req) + tasks, err := buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 2) s.taskEqual(c, tasks[0], regionIDs[0], "a", "m") @@ -223,7 +223,7 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) { cache.InvalidateCachedRegion(tasks[1].region) req.Desc = true - tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req) + tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "z"), req, nil) c.Assert(err, IsNil) c.Assert(tasks, HasLen, 3) s.taskEqual(c, tasks[2], regionIDs[0], "a", "m") diff --git a/util/mock/client.go b/util/mock/client.go index 56ec53336d2e4..e88ce57ff77e2 100644 --- a/util/mock/client.go +++ b/util/mock/client.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/trxevents" ) // Client implement kv.Client interface, mocked from "CopClient" defined in @@ -28,6 +29,6 @@ type Client struct { } // Send implement kv.Client interface. -func (c *Client) Send(ctx context.Context, req *kv.Request, kv *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimit bool) kv.Response { +func (c *Client) Send(ctx context.Context, req *kv.Request, kv *kv.Variables, sessionMemTracker *memory.Tracker, enabledRateLimit bool, eventCb trxevents.EventCallback) kv.Response { return c.MockResponse } diff --git a/util/trxevents/trx_events.go b/util/trxevents/trx_events.go new file mode 100644 index 0000000000000..3b9b0e2ee6a97 --- /dev/null +++ b/util/trxevents/trx_events.go @@ -0,0 +1,58 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trxevents + +import ( + "github.com/pingcap/kvproto/pkg/kvrpcpb" +) + +// EventType represents the type of a transaction event. +type EventType = int + +const ( + // EventTypeCopMeetLock stands for the CopMeetLock event type. + EventTypeCopMeetLock = iota +) + +// CopMeetLock represents an event that coprocessor reading encounters lock. +type CopMeetLock struct { + LockInfo *kvrpcpb.LockInfo +} + +// TransactionEvent represents a transaction event that may belong to any of the possible types. +type TransactionEvent struct { + eventType EventType + inner interface{} +} + +// GetCopMeetLock tries to extract the inner CopMeetLock event from a TransactionEvent. Returns nil if it's not a +// CopMeetLock event. +func (e TransactionEvent) GetCopMeetLock() *CopMeetLock { + if e.eventType == EventTypeCopMeetLock { + return e.inner.(*CopMeetLock) + } + return nil +} + +// WrapCopMeetLock wraps a CopMeetLock event into a TransactionEvent object. +func WrapCopMeetLock(copMeetLock *CopMeetLock) TransactionEvent { + return TransactionEvent{ + eventType: EventTypeCopMeetLock, + inner: copMeetLock, + } +} + +// EventCallback is the callback type that handles `TransactionEvent`s. +type EventCallback = func(event TransactionEvent)