Skip to content

Commit

Permalink
ddl: support session level tidb_ddl_reorg_worker_cnt and batch_size (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Aug 13, 2024
1 parent 9a7e5cc commit 3db0322
Show file tree
Hide file tree
Showing 24 changed files with 141 additions and 77 deletions.
17 changes: 9 additions & 8 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func newBackfillCtx(id int, rInfo *reorgInfo,
}

exprCtx := sessCtx.GetExprCtx()
batchCnt := rInfo.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
return &backfillCtx{
id: id,
ddlCtx: rInfo.d,
Expand All @@ -188,7 +189,7 @@ func newBackfillCtx(id int, rInfo *reorgInfo,
loc: exprCtx.GetEvalCtx().Location(),
schemaName: schemaName,
table: tbl,
batchCnt: int(variable.GetDDLReorgBatchSize()),
batchCnt: batchCnt,
jobContext: jobCtx,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel(label, schemaName, tbl.Meta().Name.String())),
Expand Down Expand Up @@ -415,7 +416,8 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
})

// Change the batch size dynamically.
w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize())
newBatchCnt := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
w.GetCtx().batchCnt = newBatchCnt
result := w.handleBackfillTask(d, task, bf)
w.sendResult(result)

Expand Down Expand Up @@ -675,8 +677,9 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(

//nolint: forcetypeassert
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName)
ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -705,16 +708,15 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
bcCtx.AttachCheckpointManager(cpMgr)
}

reorgCtx := dc.getReorgCtx(reorgInfo.Job.ID)
reorgCtx := dc.getReorgCtx(job.ID)
rowCntListener := &localRowCntListener{
prevPhysicalRowCnt: reorgCtx.getRowCount(),
reorgCtx: dc.getReorgCtx(reorgInfo.Job.ID),
reorgCtx: reorgCtx,
counter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", job.SchemaName, job.TableName)),
}

avgRowSize := estimateTableRowSize(ctx, dc.store, sctx.GetRestrictedSQLExecutor(), t)
concurrency := int(variable.GetDDLReorgWorkerCounter())

engines, err := bcCtx.Register(indexIDs, uniques, t)
if err != nil {
Expand All @@ -724,7 +726,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
zap.Int64s("index IDs", indexIDs))
return errors.Trace(err)
}

pipe, err := NewAddIndexIngestPipeline(
opCtx,
dc.store,
Expand All @@ -738,7 +739,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
reorgInfo.EndKey,
job.ReorgMeta,
avgRowSize,
concurrency,
importConc,
cpMgr,
rowCntListener,
)
Expand Down
10 changes: 9 additions & 1 deletion pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -147,7 +148,14 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
ddlObj := s.d
discovery := ddlObj.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()

return ingest.LitBackCtxMgr.Register(s.BaseTaskExecutor.Ctx(), job.ID, hasUnique, ddlObj.etcdCli, discovery, job.ReorgMeta.ResourceGroupName)
return ingest.LitBackCtxMgr.Register(
s.BaseTaskExecutor.Ctx(),
job.ID, hasUnique,
ddlObj.etcdCli,
discovery,
job.ReorgMeta.ResourceGroupName,
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())),
)
}

func hasUniqueIndex(job *model.Job) (bool, error) {
Expand Down
45 changes: 25 additions & 20 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,11 @@ func NewAddIndexIngestPipeline(
if err != nil {
return nil, err
}
poolSize := copReadChunkPoolSize()
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize())
}
srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize)
readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, cpMgr)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.BatchSize)
ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool,
tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, cpMgr, rowCntListener)
sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, cpMgr, rowCntListener)
Expand Down Expand Up @@ -226,11 +222,7 @@ func NewWriteIndexToExternalStoragePipeline(
if err != nil {
return nil, err
}
poolSize := copReadChunkPoolSize()
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize())
}
srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize)
readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)

backend, err := storage.ParseBackend(extStoreURI, nil)
Expand All @@ -248,7 +240,7 @@ func NewWriteIndexToExternalStoragePipeline(
})

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.BatchSize)
writeOp := NewWriteExternalStoreOperator(
ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, memSizePerIndex, reorgMeta)
sinkOp := newIndexWriteResultSink(ctx, nil, tbl, indexes, nil, rowCntListener)
Expand All @@ -270,6 +262,16 @@ func NewWriteIndexToExternalStoragePipeline(
), nil
}

func createChunkPool(copCtx copr.CopContext, hintConc, hintBatchSize int) chan *chunk.Chunk {
poolSize := ingest.CopReadChunkPoolSize(hintConc)
batchSize := ingest.CopReadBatchSize(hintBatchSize)
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, batchSize)
}
return srcChkPool
}

// TableScanTask contains the start key and the end key of a region.
type TableScanTask struct {
ID int
Expand Down Expand Up @@ -457,19 +459,21 @@ func NewTableScanOperator(
srcChkPool chan *chunk.Chunk,
concurrency int,
cpMgr *ingest.CheckpointManager,
hintBatchSize int,
) *TableScanOperator {
pool := workerpool.NewWorkerPool(
"TableScanOperator",
util.DDL,
concurrency,
func() workerpool.Worker[TableScanTask, IndexRecordChunk] {
return &tableScanWorker{
ctx: ctx,
copCtx: copCtx,
sessPool: sessPool,
se: nil,
srcChkPool: srcChkPool,
cpMgr: cpMgr,
ctx: ctx,
copCtx: copCtx,
sessPool: sessPool,
se: nil,
srcChkPool: srcChkPool,
cpMgr: cpMgr,
hintBatchSize: hintBatchSize,
}
})
return &TableScanOperator{
Expand All @@ -484,7 +488,8 @@ type tableScanWorker struct {
se *session.Session
srcChkPool chan *chunk.Chunk

cpMgr *ingest.CheckpointManager
cpMgr *ingest.CheckpointManager
hintBatchSize int
}

func (w *tableScanWorker) HandleTask(task TableScanTask, sender func(IndexRecordChunk)) {
Expand Down Expand Up @@ -554,7 +559,7 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor

func (w *tableScanWorker) getChunk() *chunk.Chunk {
chk := <-w.srcChkPool
newCap := copReadBatchSize()
newCap := ingest.CopReadBatchSize(w.hintBatchSize)
if chk.Capacity() != newCap {
chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, newCap)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses
if err != nil {
return nil, err
}
workerCnt := info.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
return &txnBackfillScheduler{
ctx: ctx,
reorgInfo: info,
Expand All @@ -93,7 +94,7 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses
tbl: tbl,
decodeColMap: decColMap,
jobCtx: jobCtx,
workers: make([]*backfillWorker, 0, variable.GetDDLReorgWorkerCounter()),
workers: make([]*backfillWorker, 0, workerCnt),
taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultCh: make(chan *backfillResult, backfillTaskChanSize),
}, nil
Expand Down Expand Up @@ -230,8 +231,8 @@ func restoreSessCtx(sessCtx sessionctx.Context) func(sessCtx sessionctx.Context)
}
}

func (*txnBackfillScheduler) expectedWorkerSize() (size int) {
workerCnt := int(variable.GetDDLReorgWorkerCounter())
func (b *txnBackfillScheduler) expectedWorkerSize() (size int) {
workerCnt := b.reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
return min(workerCnt, maxBackfillWorkerSize)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ func TestCancel(t *testing.T) {

// Change some configurations.
ddl.ReorgWaitTimeout = 10 * time.Millisecond
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 8")
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1")
tk.MustExec("set @@tidb_ddl_reorg_batch_size = 8")
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1")
tk = testkit.NewTestKit(t, store)
tk.MustExec("use test")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockBackfillSlow", "return"))
Expand Down
17 changes: 17 additions & 0 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4721,6 +4721,12 @@ func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model.
reorgMeta.IsDistReorg = variable.EnableDistTask.Load()
reorgMeta.IsFastReorg = variable.EnableFastReorg.Load()
reorgMeta.TargetScope = variable.ServiceScope.Load()
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok {
reorgMeta.Concurrency = variable.TidbOptInt(sv, 0)
}
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
reorgMeta.BatchSize = variable.TidbOptInt(sv, 0)
}

if reorgMeta.IsDistReorg && !reorgMeta.IsFastReorg {
return nil, dbterror.ErrUnsupportedDistTask
Expand All @@ -4736,6 +4742,17 @@ func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model.
LastReorgMetaFastReorgDisabled = true
})
}

logutil.DDLLogger().Info("initialize reorg meta",
zap.String("jobSchema", job.SchemaName),
zap.String("jobTable", job.TableName),
zap.Stringer("jobType", job.Type),
zap.Bool("enableDistTask", reorgMeta.IsDistReorg),
zap.Bool("enableFastReorg", reorgMeta.IsFastReorg),
zap.String("targetScope", reorgMeta.TargetScope),
zap.Int("concurrency", reorgMeta.Concurrency),
zap.Int("batchSize", reorgMeta.BatchSize),
)
return reorgMeta, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey,
}
opCtx := ddl.NewLocalOperatorCtx(context.Background(), 1)
src := testutil.NewOperatorTestSource(ddl.TableScanTask{1, startKey, endKey})
scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, nil)
scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, nil, 0)
sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]()

operator.Compose[ddl.TableScanTask](src, scanOp)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1952,7 +1952,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
if indexInfo.Unique {
ctx := tidblogutil.WithCategory(ctx, "ddl-ingest")
if bc == nil {
bc, err = ingest.LitBackCtxMgr.Register(ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName)
bc, err = ingest.LitBackCtxMgr.Register(ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1)
if err != nil {
return err
}
Expand Down Expand Up @@ -2027,7 +2027,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error {
})
} else {
job := reorgInfo.Job
workerCntLimit := int(variable.GetDDLReorgWorkerCounter())
workerCntLimit := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
cpuCount, err := handle.GetCPUCountOfNode(ctx)
if err != nil {
return err
Expand Down
15 changes: 0 additions & 15 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand All @@ -41,20 +40,6 @@ import (
kvutil "github.com/tikv/client-go/v2/util"
)

// copReadBatchSize is the batch size of coprocessor read.
// It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid
// sending too many cop requests for the same handle range.
func copReadBatchSize() int {
return 10 * int(variable.GetDDLReorgBatchSize())
}

// copReadChunkPoolSize is the size of chunk pool, which
// represents the max concurrent ongoing coprocessor requests.
// It multiplies the tidb_ddl_reorg_worker_cnt by 10.
func copReadChunkPoolSize() int {
return 10 * int(variable.GetDDLReorgWorkerCounter())
}

func wrapInBeginRollback(se *sess.Session, f func(startTS uint64) error) error {
err := se.Begin(context.Background())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ func TestAddIndexUniqueFailOnDuplicate(t *testing.T) {
tk.MustExec("create table t (a bigint primary key clustered, b int);")
// The subtask execution order is not guaranteed in distributed reorg. We need to disable it first.
tk.MustExec("set @@global.tidb_enable_dist_task = 0;")
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;")
for i := 1; i <= 12; i++ {
tk.MustExec("insert into t values (?, ?)", i, i)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type BackendCtxMgr interface {
etcdClient *clientv3.Client,
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
importConc int,
) (BackendCtx, error)
Unregister(jobID int64)
// EncodeJobSortPath encodes the job ID to the local disk sort path.
Expand Down Expand Up @@ -114,6 +115,7 @@ func (m *litBackendCtxMgr) Register(
etcdClient *clientv3.Client,
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
concurrency int,
) (BackendCtx, error) {
bc, exist := m.Load(jobID)
if exist {
Expand All @@ -131,7 +133,7 @@ func (m *litBackendCtxMgr) Register(
logutil.Logger(ctx).Error(LitErrCreateDirFail, zap.Error(err))
return nil, err
}
cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName)
cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency)
if err != nil {
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
Expand Down
Loading

0 comments on commit 3db0322

Please sign in to comment.