From 309a962d2c296341dc8c2cbb6e16de56c2309552 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 9 May 2023 14:58:08 +0800 Subject: [PATCH 1/5] ddl: use session begin TS to read record --- ddl/BUILD.bazel | 1 + ddl/backfilling_scheduler.go | 2 +- ddl/export_test.go | 4 +++- ddl/index_cop.go | 36 +++++++++++++++++++++++++++++++++--- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index 90dd1be856986..06d314533ac19 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -224,6 +224,7 @@ go_test( "//config", "//ddl/ingest", "//ddl/internal/callback", + "//ddl/internal/session", "//ddl/placement", "//ddl/schematracker", "//ddl/testutil", diff --git a/ddl/backfilling_scheduler.go b/ddl/backfilling_scheduler.go index ecaec7e5d6b03..4ce09ceda97b4 100644 --- a/ddl/backfilling_scheduler.go +++ b/ddl/backfilling_scheduler.go @@ -437,7 +437,7 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) return nil, err } - return newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore(), b.taskCh, b.checkpointMgr), nil + return newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore(), b.taskCh, b.sessPool, b.checkpointMgr), nil } func (b *ingestBackfillScheduler) expectedWorkerSize() (readerSize int, writerSize int) { diff --git a/ddl/export_test.go b/ddl/export_test.go index ed129c695581e..a763f082aab57 100644 --- a/ddl/export_test.go +++ b/ddl/export_test.go @@ -18,6 +18,7 @@ import ( "context" "time" + "github.com/pingcap/tidb/ddl/internal/session" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -51,7 +52,8 @@ func FetchChunk4Test(copCtx *copContext, tbl table.PhysicalTable, startKey, endK } taskCh := make(chan *reorgBackfillTask, 5) resultCh := make(chan idxRecResult, 5) - pool := newCopReqSenderPool(context.Background(), copCtx, store, taskCh, nil) + sessPool := session.NewSessionPool(nil, store) + pool := newCopReqSenderPool(context.Background(), copCtx, store, taskCh, sessPool, nil) pool.chunkSender = &resultChanForTest{ch: resultCh} pool.adjustSize(1) pool.tasksCh <- task diff --git a/ddl/index_cop.go b/ddl/index_cop.go index b02f23bfa288f..e23b609fad5ad 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/ingest" + sess "github.com/pingcap/tidb/ddl/internal/session" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" @@ -70,6 +71,7 @@ type copReqSenderPool struct { tasksCh chan *reorgBackfillTask chunkSender chunkSender checkpointMgr *ingest.CheckpointManager + sessPool *sess.Pool ctx context.Context copCtx *copContext @@ -96,6 +98,17 @@ func (c *copReqSender) run() { defer util.Recover(metrics.LabelDDL, "copReqSender.run", func() { p.chunkSender.AddTask(idxRecResult{id: curTaskID, err: dbterror.ErrReorgPanic}) }, false) + sessCtx, err := p.sessPool.Get() + if err != nil { + logutil.BgLogger().Error("[ddl-ingest] copReqSender get session from pool failed", zap.Error(err)) + p.chunkSender.AddTask(idxRecResult{err: err}) + return + } + se := sess.NewSession(sessCtx) + defer func() { + se.Rollback() + p.sessPool.Put(sessCtx) + }() for { if util.HasCancelled(c.ctx) { return @@ -113,12 +126,13 @@ func (c *copReqSender) run() { curTaskID = task.id logutil.BgLogger().Info("[ddl-ingest] start a cop-request task", zap.Int("id", task.id), zap.String("task", task.String())) - ver, err := p.store.CurrentVersion(kv.GlobalTxnScope) + + startTS, err := calculateStartTS(se) if err != nil { p.chunkSender.AddTask(idxRecResult{id: task.id, err: err}) return } - rs, err := p.copCtx.buildTableScan(p.ctx, ver.Ver, task.startKey, task.excludedEndKey()) + rs, err := p.copCtx.buildTableScan(p.ctx, startTS, task.startKey, task.excludedEndKey()) if err != nil { p.chunkSender.AddTask(idxRecResult{id: task.id, err: err}) return @@ -155,7 +169,8 @@ func (c *copReqSender) run() { } func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Storage, - taskCh chan *reorgBackfillTask, checkpointMgr *ingest.CheckpointManager) *copReqSenderPool { + taskCh chan *reorgBackfillTask, sessPool *sess.Pool, + checkpointMgr *ingest.CheckpointManager) *copReqSenderPool { poolSize := copReadChunkPoolSize() srcChkPool := make(chan *chunk.Chunk, poolSize) for i := 0; i < poolSize; i++ { @@ -169,6 +184,7 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Stora senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()), wg: sync.WaitGroup{}, srcChkPool: srcChkPool, + sessPool: sessPool, checkpointMgr: checkpointMgr, } } @@ -496,3 +512,17 @@ type idxRecResult struct { err error done bool } + +func calculateStartTS(se *sess.Session) (uint64, error) { + se.Rollback() + err := se.Begin() + if err != nil { + return 0, errors.Trace(err) + } + var startTS uint64 + sessVars := se.GetSessionVars() + sessVars.TxnCtxMu.Lock() + startTS = sessVars.TxnCtx.StartTS + sessVars.TxnCtxMu.Unlock() + return startTS, nil +} From 160ed338ec0d314d8f353e9eeeb605ca2ec3bc4f Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 9 May 2023 15:05:47 +0800 Subject: [PATCH 2/5] rename function calculateStartTS --- ddl/index_cop.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index e23b609fad5ad..318de85dd11d2 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -127,7 +127,7 @@ func (c *copReqSender) run() { logutil.BgLogger().Info("[ddl-ingest] start a cop-request task", zap.Int("id", task.id), zap.String("task", task.String())) - startTS, err := calculateStartTS(se) + startTS, err := beginAndGetStartTS(se) if err != nil { p.chunkSender.AddTask(idxRecResult{id: task.id, err: err}) return @@ -513,7 +513,7 @@ type idxRecResult struct { done bool } -func calculateStartTS(se *sess.Session) (uint64, error) { +func beginAndGetStartTS(se *sess.Session) (uint64, error) { se.Rollback() err := se.Begin() if err != nil { From 67bc2a27e09de50cb23eb4d95218b3faf7fcd7e4 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 9 May 2023 19:24:49 +0800 Subject: [PATCH 3/5] refine the code --- ddl/index_cop.go | 57 ++++++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 318de85dd11d2..124858988c427 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -94,9 +94,8 @@ type copReqSender struct { func (c *copReqSender) run() { p := c.senderPool defer p.wg.Done() - var curTaskID int defer util.Recover(metrics.LabelDDL, "copReqSender.run", func() { - p.chunkSender.AddTask(idxRecResult{id: curTaskID, err: dbterror.ErrReorgPanic}) + p.chunkSender.AddTask(idxRecResult{err: dbterror.ErrReorgPanic}) }, false) sessCtx, err := p.sessPool.Get() if err != nil { @@ -105,10 +104,7 @@ func (c *copReqSender) run() { return } se := sess.NewSession(sessCtx) - defer func() { - se.Rollback() - p.sessPool.Put(sessCtx) - }() + defer p.sessPool.Put(sessCtx) for { if util.HasCancelled(c.ctx) { return @@ -123,19 +119,22 @@ func (c *copReqSender) run() { zap.String("task end key", hex.EncodeToString(task.endKey))) continue } - curTaskID = task.id - logutil.BgLogger().Info("[ddl-ingest] start a cop-request task", - zap.Int("id", task.id), zap.String("task", task.String())) - - startTS, err := beginAndGetStartTS(se) + err := scanRecords(p, task, se) if err != nil { p.chunkSender.AddTask(idxRecResult{id: task.id, err: err}) return } + } +} + +func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session) error { + logutil.BgLogger().Info("[ddl-ingest] start a cop-request task", + zap.Int("id", task.id), zap.String("task", task.String())) + + return wrapInBeginRollback(se, func(startTS uint64) error { rs, err := p.copCtx.buildTableScan(p.ctx, startTS, task.startKey, task.excludedEndKey()) if err != nil { - p.chunkSender.AddTask(idxRecResult{id: task.id, err: err}) - return + return err } failpoint.Inject("MockCopSenderPanic", func(val failpoint.Value) { if val.(bool) { @@ -150,10 +149,9 @@ func (c *copReqSender) run() { srcChk := p.getChunk() done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk) if err != nil { - p.chunkSender.AddTask(idxRecResult{id: task.id, err: err}) p.recycleChunk(srcChk) terror.Call(rs.Close) - return + return err } if p.checkpointMgr != nil { p.checkpointMgr.UpdateTotal(task.id, srcChk.NumRows(), done) @@ -165,7 +163,22 @@ func (c *copReqSender) run() { p.chunkSender.AddTask(idxRs) } terror.Call(rs.Close) + return nil + }) +} + +func wrapInBeginRollback(se *sess.Session, f func(startTS uint64) error) error { + err := se.Begin() + if err != nil { + return errors.Trace(err) } + defer se.Rollback() + var startTS uint64 + sessVars := se.GetSessionVars() + sessVars.TxnCtxMu.Lock() + startTS = sessVars.TxnCtx.StartTS + sessVars.TxnCtxMu.Unlock() + return f(startTS) } func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Storage, @@ -512,17 +525,3 @@ type idxRecResult struct { err error done bool } - -func beginAndGetStartTS(se *sess.Session) (uint64, error) { - se.Rollback() - err := se.Begin() - if err != nil { - return 0, errors.Trace(err) - } - var startTS uint64 - sessVars := se.GetSessionVars() - sessVars.TxnCtxMu.Lock() - startTS = sessVars.TxnCtx.StartTS - sessVars.TxnCtxMu.Unlock() - return startTS, nil -} From c4aaae1266a53a4f3b11e2feb763573c82995565 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 15 May 2023 21:37:19 +0800 Subject: [PATCH 4/5] fix metrics --- ddl/backfilling_scheduler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/ddl/backfilling_scheduler.go b/ddl/backfilling_scheduler.go index 4ce09ceda97b4..b0d06b22ef038 100644 --- a/ddl/backfilling_scheduler.go +++ b/ddl/backfilling_scheduler.go @@ -492,7 +492,6 @@ func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) { result.scanCount = count result.nextKey = nextKey } - w.metricCounter.Add(float64(count)) if ResultCounterForTest != nil && result.err == nil { ResultCounterForTest.Add(1) } From a5bf4daf9a948dbc0ebc1ae4bcc8cad186542e14 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 15 May 2023 21:40:08 +0800 Subject: [PATCH 5/5] fix linter --- ddl/backfilling_scheduler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/ddl/backfilling_scheduler.go b/ddl/backfilling_scheduler.go index b0d06b22ef038..64dc82f87093c 100644 --- a/ddl/backfilling_scheduler.go +++ b/ddl/backfilling_scheduler.go @@ -486,7 +486,6 @@ func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) { result.totalCount = cnt result.nextKey = nextKey result.err = w.checkpointMgr.UpdateCurrent(rs.id, count) - count = cnt } else { result.addedCount = count result.scanCount = count