diff --git a/br/pkg/lightning/backend/kv/BUILD.bazel b/br/pkg/lightning/backend/kv/BUILD.bazel index b0da8a0e7deb4..e47d1f1b4b53d 100644 --- a/br/pkg/lightning/backend/kv/BUILD.bazel +++ b/br/pkg/lightning/backend/kv/BUILD.bazel @@ -53,6 +53,10 @@ go_test( embed = [":kv"], flaky = True, race = "on", +<<<<<<< HEAD +======= + shard_count = 16, +>>>>>>> 33236ef1e43 (lightning: fix uuid panic when expression index enabled (#44516)) deps = [ "//br/pkg/lightning/common", "//br/pkg/lightning/log", @@ -64,6 +68,7 @@ go_test( "//parser/ast", "//parser/model", "//parser/mysql", + "//planner/core", "//sessionctx", "//table", "//table/tables", diff --git a/br/pkg/lightning/backend/kv/base.go b/br/pkg/lightning/backend/kv/base.go new file mode 100644 index 0000000000000..a9185bc48166a --- /dev/null +++ b/br/pkg/lightning/backend/kv/base.go @@ -0,0 +1,356 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "context" + "math/rand" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/redact" + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// ExtraHandleColumnInfo is the column info of extra handle column. +var ExtraHandleColumnInfo = model.NewExtraHandleColInfo() + +// GeneratedCol generated column info. +type GeneratedCol struct { + // index of the column in the table + Index int + Expr expression.Expression +} + +// AutoIDConverterFn is a function to convert auto id. +type AutoIDConverterFn func(int64) int64 + +// RowArrayMarshaller wraps a slice of types.Datum for logging the content into zap. +type RowArrayMarshaller []types.Datum + +var kindStr = [...]string{ + types.KindNull: "null", + types.KindInt64: "int64", + types.KindUint64: "uint64", + types.KindFloat32: "float32", + types.KindFloat64: "float64", + types.KindString: "string", + types.KindBytes: "bytes", + types.KindBinaryLiteral: "binary", + types.KindMysqlDecimal: "decimal", + types.KindMysqlDuration: "duration", + types.KindMysqlEnum: "enum", + types.KindMysqlBit: "bit", + types.KindMysqlSet: "set", + types.KindMysqlTime: "time", + types.KindInterface: "interface", + types.KindMinNotNull: "min", + types.KindMaxValue: "max", + types.KindRaw: "raw", + types.KindMysqlJSON: "json", +} + +// MarshalLogArray implements the zapcore.ArrayMarshaler interface +func (row RowArrayMarshaller) MarshalLogArray(encoder zapcore.ArrayEncoder) error { + for _, datum := range row { + kind := datum.Kind() + var str string + var err error + switch kind { + case types.KindNull: + str = "NULL" + case types.KindMinNotNull: + str = "-inf" + case types.KindMaxValue: + str = "+inf" + default: + str, err = datum.ToString() + if err != nil { + return err + } + } + if err := encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error { + enc.AddString("kind", kindStr[kind]) + enc.AddString("val", redact.String(str)) + return nil + })); err != nil { + return err + } + } + return nil +} + +// BaseKVEncoder encodes a row into a KV pair. +type BaseKVEncoder struct { + GenCols []GeneratedCol + SessionCtx *Session + Table table.Table + Columns []*table.Column + AutoRandomColID int64 + // convert auto id for shard rowid or auto random id base on row id generated by lightning + AutoIDFn AutoIDConverterFn + + logger *zap.Logger + recordCache []types.Datum + // the first auto-generated ID in the current encoder. + // if there's no auto-generated id column or the column value is not auto-generated, it will be 0. + LastInsertID uint64 +} + +// NewBaseKVEncoder creates a new BaseKVEncoder. +func NewBaseKVEncoder(config *encode.EncodingConfig) (*BaseKVEncoder, error) { + meta := config.Table.Meta() + cols := config.Table.Cols() + se := NewSession(&config.SessionOptions, config.Logger) + // Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord + recordCtx := tables.NewCommonAddRecordCtx(len(cols)) + tables.SetAddRecordCtx(se, recordCtx) + + var autoRandomColID int64 + autoIDFn := func(id int64) int64 { return id } + if meta.ContainsAutoRandomBits() { + col := common.GetAutoRandomColumn(meta) + autoRandomColID = col.ID + + shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits) + shard := rand.New(rand.NewSource(config.AutoRandomSeed)).Int63() + autoIDFn = func(id int64) int64 { + return shardFmt.Compose(shard, id) + } + } else if meta.ShardRowIDBits > 0 { + rd := rand.New(rand.NewSource(config.AutoRandomSeed)) // nolint:gosec + mask := int64(1)<