Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: directly use BackendConfig rather than use lightning config #55433

Merged
merged 17 commits into from
Aug 20, 2024
1 change: 0 additions & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ go_library(
"//pkg/lightning/checkpoints",
"//pkg/lightning/common",
"//pkg/lightning/config",
"//pkg/lightning/errormanager",
"//pkg/lightning/log",
"//pkg/meta",
"//pkg/parser/model",
Expand Down
6 changes: 2 additions & 4 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/common"
lightning "github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/errormanager"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
Expand Down Expand Up @@ -95,7 +94,7 @@ type litBackendCtx struct {
tbl table.Table
backend *local.Backend
ctx context.Context
cfg *lightning.Config
cfg *local.BackendConfig
sysVars map[string]string

flushing atomic.Bool
Expand Down Expand Up @@ -148,8 +147,7 @@ func (bc *litBackendCtx) CollectRemoteDuplicateRows(indexID int64, tbl table.Tab
}

func (bc *litBackendCtx) collectRemoteDuplicateRows(indexID int64, tbl table.Table) error {
errorMgr := errormanager.New(nil, bc.cfg, log.Logger{Logger: logutil.Logger(bc.ctx)})
dupeController := bc.backend.GetDupeController(bc.cfg.TikvImporter.RangeConcurrency*2, errorMgr)
dupeController := bc.backend.GetDupeController(bc.cfg.WorkerConcurrency, nil)
hasDupe, err := dupeController.CollectRemoteDuplicateRows(bc.ctx, tbl, tbl.Meta().Name.L, &encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
SysVars: bc.sysVars,
Expand Down
30 changes: 17 additions & 13 deletions pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ package ingest
import (
"context"
"math"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
ddllogutil "github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/util/logutil"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
Expand Down Expand Up @@ -152,7 +153,7 @@ func (m *litBackendCtxMgr) Register(
return nil, err
}

bcCtx := newBackendContext(ctx, jobID, bd, cfg.lightning, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient)
bcCtx := newBackendContext(ctx, jobID, bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient)
m.backends.m[jobID] = bcCtx
m.memRoot.Consume(structSizeBackendCtx)
m.backends.mu.Unlock()
Expand All @@ -171,23 +172,26 @@ func (m *litBackendCtxMgr) EncodeJobSortPath(jobID int64) string {

func createLocalBackend(
ctx context.Context,
cfg *litConfig,
cfg *local.BackendConfig,
pdSvcDiscovery pd.ServiceDiscovery,
) (*local.Backend, error) {
tls, err := cfg.lightning.ToTLS()
tidbCfg := config.GetGlobalConfig()
tls, err := common.NewTLS(
tidbCfg.Security.ClusterSSLCA,
tidbCfg.Security.ClusterSSLCert,
tidbCfg.Security.ClusterSSLKey,
net.JoinHostPort("127.0.0.1", strconv.Itoa(int(tidbCfg.Status.StatusPort))),
nil, nil, nil,
)
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Error(err))
return nil, err
}

ddllogutil.DDLIngestLogger().Info("create local backend for adding index",
zap.String("sortDir", cfg.lightning.TikvImporter.SortedKVDir),
zap.String("keyspaceName", cfg.keyspaceName))
// We disable the switch TiKV mode feature for now,
// because the impact is not fully tested.
var raftKV2SwitchModeDuration time.Duration
backendConfig := local.NewBackendConfig(cfg.lightning, int(litRLimit), cfg.keyspaceName, cfg.resourceGroup, kvutil.ExplicitTypeDDL, raftKV2SwitchModeDuration)
return local.NewBackend(ctx, tls, backendConfig, pdSvcDiscovery)
zap.String("sortDir", cfg.LocalStoreDir),
zap.String("keyspaceName", cfg.KeyspaceName))
return local.NewBackend(ctx, tls, *cfg, pdSvcDiscovery)
}

const checkpointUpdateInterval = 10 * time.Minute
Expand All @@ -196,7 +200,7 @@ func newBackendContext(
ctx context.Context,
jobID int64,
be *local.Backend,
cfg *config.Config,
cfg *local.BackendConfig,
vars map[string]string,
memRoot MemRoot,
diskRoot DiskRoot,
Expand Down
107 changes: 50 additions & 57 deletions pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,78 +17,71 @@ package ingest
import (
"context"
"net"
"runtime"
"strconv"
"sync/atomic"

tidb "github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/common"
lightning "github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

// ImporterRangeConcurrencyForTest is only used for test.
var ImporterRangeConcurrencyForTest *atomic.Int32

// litConfig is the configuration for the lightning local backend used in DDL.
type litConfig struct {
lightning *lightning.Config
keyspaceName string
isRaftKV2 bool
resourceGroup string
}

func genConfig(
ctx context.Context,
jobSortPath string,
memRoot MemRoot,
unique bool,
resourceGroup string,
concurrency int,
) (*litConfig, error) {
tidbCfg := tidb.GetGlobalConfig()
cfg := lightning.NewConfig()
cfg.TikvImporter.Backend = lightning.BackendLocal
) (*local.BackendConfig, error) {
cfg := &local.BackendConfig{
LocalStoreDir: jobSortPath,
ResourceGroupName: resourceGroup,
MaxConnPerStore: concurrency,
WorkerConcurrency: concurrency * 2,
KeyspaceName: tidb.GetGlobalKeyspaceName(),
// We disable the switch TiKV mode feature for now, because the impact is not
// fully tested.
ShouldCheckWriteStall: true,

// lighting default values
CheckpointEnabled: true,
BlockSize: lightning.DefaultBlockSize,
KVWriteBatchSize: lightning.KVWriteBatchSize,
RegionSplitBatchSize: lightning.DefaultRegionSplitBatchSize,
RegionSplitConcurrency: runtime.GOMAXPROCS(0),
MemTableSize: lightning.DefaultEngineMemCacheSize,
LocalWriterMemCacheSize: lightning.DefaultLocalWriterMemCacheSize,
ShouldCheckTiKV: true,
MaxOpenFiles: int(litRLimit),
PausePDSchedulerScope: lightning.PausePDSchedulerScopeTable,
TaskType: kvutil.ExplicitTypeDDL,
DisableAutomaticCompactions: true,
}
// Each backend will build a single dir in lightning dir.
cfg.TikvImporter.SortedKVDir = jobSortPath
cfg.TikvImporter.RangeConcurrency = concurrency
if ImporterRangeConcurrencyForTest != nil {
cfg.TikvImporter.RangeConcurrency = int(ImporterRangeConcurrencyForTest.Load())
}
err := cfg.AdjustForDDL()
if err != nil {
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Error(err))
return nil, err
cfg.WorkerConcurrency = int(ImporterRangeConcurrencyForTest.Load()) * 2
}
adjustImportMemory(ctx, memRoot, cfg)
cfg.Checkpoint.Enable = true
if unique {
cfg.Conflict.Strategy = lightning.ErrorOnDup
cfg.Conflict.Threshold = lightning.DefaultRecordDuplicateThreshold
cfg.DupeDetectEnabled = true
cfg.DuplicateDetectOpt = common.DupDetectOpt{ReportErrOnDup: true}
} else {
cfg.Conflict.Strategy = lightning.NoneOnDup
}
cfg.TiDB.Host = "127.0.0.1"
cfg.TiDB.StatusPort = int(tidbCfg.Status.StatusPort)
// Set TLS related information
cfg.Security.CAPath = tidbCfg.Security.ClusterSSLCA
cfg.Security.CertPath = tidbCfg.Security.ClusterSSLCert
cfg.Security.KeyPath = tidbCfg.Security.ClusterSSLKey
// in DDL scenario, we don't switch import mode
cfg.Cron.SwitchMode = lightning.Duration{Duration: 0}

c := &litConfig{
lightning: cfg,
keyspaceName: tidb.GetGlobalKeyspaceName(),
isRaftKV2: false,
resourceGroup: resourceGroup,
cfg.DupeDetectEnabled = false
}

return c, nil
return cfg, nil
}

// CopReadBatchSize is the batch size of coprocessor read.
Expand Down Expand Up @@ -144,19 +137,19 @@ func generateLocalEngineConfig(ts uint64) *backend.EngineConfig {
}

// adjustImportMemory adjusts the lightning memory parameters according to the memory root's max limitation.
func adjustImportMemory(ctx context.Context, memRoot MemRoot, cfg *lightning.Config) {
func adjustImportMemory(ctx context.Context, memRoot MemRoot, cfg *local.BackendConfig) {
var scale int64
// Try aggressive resource usage successful.
if tryAggressiveMemory(ctx, memRoot, cfg) {
return
}

defaultMemSize := int64(cfg.TikvImporter.LocalWriterMemCacheSize) * int64(cfg.TikvImporter.RangeConcurrency)
defaultMemSize += 4 * int64(cfg.TikvImporter.EngineMemCacheSize)
defaultMemSize := int64(int(cfg.LocalWriterMemCacheSize) * cfg.WorkerConcurrency / 2)
defaultMemSize += 4 * int64(cfg.MemTableSize)
logutil.Logger(ctx).Info(LitInfoInitMemSetting,
zap.Int64("local writer memory cache size", int64(cfg.TikvImporter.LocalWriterMemCacheSize)),
zap.Int64("engine memory cache size", int64(cfg.TikvImporter.EngineMemCacheSize)),
zap.Int("range concurrency", cfg.TikvImporter.RangeConcurrency))
zap.Int64("local writer memory cache size", cfg.LocalWriterMemCacheSize),
zap.Int("engine memory cache size", cfg.MemTableSize),
zap.Int("worker concurrency", cfg.WorkerConcurrency))

maxLimit := memRoot.MaxMemoryQuota()
scale = defaultMemSize / maxLimit
Expand All @@ -165,28 +158,28 @@ func adjustImportMemory(ctx context.Context, memRoot MemRoot, cfg *lightning.Con
return
}

cfg.TikvImporter.LocalWriterMemCacheSize /= lightning.ByteSize(scale)
cfg.TikvImporter.EngineMemCacheSize /= lightning.ByteSize(scale)
cfg.LocalWriterMemCacheSize /= scale
cfg.MemTableSize /= int(scale)

logutil.Logger(ctx).Info(LitInfoChgMemSetting,
zap.Int64("local writer memory cache size", int64(cfg.TikvImporter.LocalWriterMemCacheSize)),
zap.Int64("engine memory cache size", int64(cfg.TikvImporter.EngineMemCacheSize)),
zap.Int("range concurrency", cfg.TikvImporter.RangeConcurrency))
zap.Int64("local writer memory cache size", cfg.LocalWriterMemCacheSize),
zap.Int("engine memory cache size", cfg.MemTableSize),
zap.Int("worker concurrency", cfg.WorkerConcurrency))
}

// tryAggressiveMemory lightning memory parameters according memory root's max limitation.
func tryAggressiveMemory(ctx context.Context, memRoot MemRoot, cfg *lightning.Config) bool {
func tryAggressiveMemory(ctx context.Context, memRoot MemRoot, cfg *local.BackendConfig) bool {
var defaultMemSize int64
defaultMemSize = int64(int(cfg.TikvImporter.LocalWriterMemCacheSize) * cfg.TikvImporter.RangeConcurrency)
defaultMemSize += int64(cfg.TikvImporter.EngineMemCacheSize)
defaultMemSize = int64(int(cfg.LocalWriterMemCacheSize) * cfg.WorkerConcurrency / 2)
defaultMemSize += int64(cfg.MemTableSize)

if (defaultMemSize + memRoot.CurrentUsage()) > memRoot.MaxMemoryQuota() {
return false
}
logutil.Logger(ctx).Info(LitInfoChgMemSetting,
zap.Int64("local writer memory cache size", int64(cfg.TikvImporter.LocalWriterMemCacheSize)),
zap.Int64("engine memory cache size", int64(cfg.TikvImporter.EngineMemCacheSize)),
zap.Int("range concurrency", cfg.TikvImporter.RangeConcurrency))
zap.Int64("local writer memory cache size", cfg.LocalWriterMemCacheSize),
zap.Int("engine memory cache size", cfg.MemTableSize),
zap.Int("worker concurrency", cfg.WorkerConcurrency))
return true
}

Expand Down
4 changes: 0 additions & 4 deletions pkg/ddl/ingest/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -55,7 +54,6 @@ type engineInfo struct {

uuid uuid.UUID
cfg *backend.EngineConfig
litCfg *config.Config
writerCache generic.SyncMap[int, backend.EngineWriter]
memRoot MemRoot
flushLock *sync.RWMutex
Expand All @@ -67,7 +65,6 @@ func newEngineInfo(
jobID, indexID int64,
unique bool,
cfg *backend.EngineConfig,
litCfg *config.Config,
en *backend.OpenedEngine,
uuid uuid.UUID,
memRoot MemRoot,
Expand All @@ -78,7 +75,6 @@ func newEngineInfo(
indexID: indexID,
unique: unique,
cfg: cfg,
litCfg: litCfg,
openedEngine: en,
uuid: uuid,
writerCache: generic.NewSyncMap[int, backend.EngineWriter](4),
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/ingest/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func (bc *litBackendCtx) Register(indexIDs []int64, uniques []bool, tbl table.Ta
indexID,
uniques[i],
cfg,
bc.cfg,
openedEngine,
openedEngine.GetEngineUUID(),
bc.memRoot,
Expand Down
Loading