diff --git a/br/pkg/lightning/backend/kv/BUILD.bazel b/br/pkg/lightning/backend/kv/BUILD.bazel index f880519eb71f4..93c57d2bda0fa 100644 --- a/br/pkg/lightning/backend/kv/BUILD.bazel +++ b/br/pkg/lightning/backend/kv/BUILD.bazel @@ -49,6 +49,7 @@ go_test( name = "kv_test", timeout = "short", srcs = [ + "allocator_test.go", "base_test.go", "session_internal_test.go", "session_test.go", @@ -57,7 +58,7 @@ go_test( embed = [":kv"], flaky = True, race = "on", - shard_count = 17, + shard_count = 18, deps = [ "//br/pkg/lightning/backend/encode", "//br/pkg/lightning/common", diff --git a/br/pkg/lightning/backend/kv/allocator.go b/br/pkg/lightning/backend/kv/allocator.go index 38f6b45050bc6..3a2aca4119e28 100644 --- a/br/pkg/lightning/backend/kv/allocator.go +++ b/br/pkg/lightning/backend/kv/allocator.go @@ -26,7 +26,7 @@ import ( // panickingAllocator is an ID allocator which panics on all operations except Rebase type panickingAllocator struct { autoid.Allocator - base *int64 + base atomic.Int64 ty autoid.AllocatorType } @@ -34,25 +34,30 @@ type panickingAllocator struct { // we use this to collect the max id(either _tidb_rowid or auto_increment id or auto_random) used // during import, and we will use this info to do ALTER TABLE xxx AUTO_RANDOM_BASE or AUTO_INCREMENT // on post-process phase. -func NewPanickingAllocators(base int64) autoid.Allocators { - sharedBase := &base - return autoid.NewAllocators( - false, - &panickingAllocator{base: sharedBase, ty: autoid.RowIDAllocType}, - &panickingAllocator{base: sharedBase, ty: autoid.AutoIncrementType}, - &panickingAllocator{base: sharedBase, ty: autoid.AutoRandomType}, - ) +// TODO: support save all bases in checkpoint. +func NewPanickingAllocators(sepAutoInc bool, base int64) autoid.Allocators { + allocs := make([]autoid.Allocator, 0, 3) + for _, t := range []autoid.AllocatorType{ + autoid.RowIDAllocType, + autoid.AutoIncrementType, + autoid.AutoRandomType, + } { + pa := &panickingAllocator{ty: t} + pa.base.Store(base) + allocs = append(allocs, pa) + } + return autoid.NewAllocators(sepAutoInc, allocs...) } // Rebase implements the autoid.Allocator interface func (alloc *panickingAllocator) Rebase(_ context.Context, newBase int64, _ bool) error { // CAS for { - oldBase := atomic.LoadInt64(alloc.base) + oldBase := alloc.base.Load() if newBase <= oldBase { break } - if atomic.CompareAndSwapInt64(alloc.base, oldBase, newBase) { + if alloc.base.CompareAndSwap(oldBase, newBase) { break } } @@ -61,7 +66,7 @@ func (alloc *panickingAllocator) Rebase(_ context.Context, newBase int64, _ bool // Base implements the autoid.Allocator interface func (alloc *panickingAllocator) Base() int64 { - return atomic.LoadInt64(alloc.base) + return alloc.base.Load() } func (alloc *panickingAllocator) GetType() autoid.AllocatorType { diff --git a/br/pkg/lightning/backend/kv/allocator_test.go b/br/pkg/lightning/backend/kv/allocator_test.go new file mode 100644 index 0000000000000..018ae14160873 --- /dev/null +++ b/br/pkg/lightning/backend/kv/allocator_test.go @@ -0,0 +1,35 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "testing" + + "github.com/pingcap/tidb/meta/autoid" + "github.com/stretchr/testify/require" +) + +func TestAllocator(t *testing.T) { + alloc := NewPanickingAllocators(true, 0) + require.NoError(t, alloc.Get(autoid.RowIDAllocType).Rebase(nil, 123, false)) + // cannot revert back + require.NoError(t, alloc.Get(autoid.RowIDAllocType).Rebase(nil, 100, false)) + require.NoError(t, alloc.Get(autoid.AutoIncrementType).Rebase(nil, 456, false)) + require.NoError(t, alloc.Get(autoid.AutoRandomType).Rebase(nil, 789, false)) + + require.EqualValues(t, 123, alloc.Get(autoid.RowIDAllocType).Base()) + require.EqualValues(t, 456, alloc.Get(autoid.AutoIncrementType).Base()) + require.EqualValues(t, 789, alloc.Get(autoid.AutoRandomType).Base()) +} diff --git a/br/pkg/lightning/backend/kv/base_test.go b/br/pkg/lightning/backend/kv/base_test.go index d02956e4016f0..505aeaf6633a8 100644 --- a/br/pkg/lightning/backend/kv/base_test.go +++ b/br/pkg/lightning/backend/kv/base_test.go @@ -43,7 +43,7 @@ func TestLogKVConvertFailed(t *testing.T) { cols := []*model.ColumnInfo{c1} tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic} var tbl table.Table - tbl, err = tables.TableFromMeta(NewPanickingAllocators(0), tblInfo) + tbl, err = tables.TableFromMeta(NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo) require.NoError(t, err) var baseKVEncoder *BaseKVEncoder diff --git a/br/pkg/lightning/backend/kv/sql2kv_test.go b/br/pkg/lightning/backend/kv/sql2kv_test.go index e1dc1c5d21ccc..7d25968bf93d8 100644 --- a/br/pkg/lightning/backend/kv/sql2kv_test.go +++ b/br/pkg/lightning/backend/kv/sql2kv_test.go @@ -80,7 +80,7 @@ func TestEncode(t *testing.T) { c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)} cols := []*model.ColumnInfo{c1} tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic} - tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo) + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo) require.NoError(t, err) logger := log.Logger{Logger: zap.NewNop()} @@ -163,7 +163,7 @@ func TestDecode(t *testing.T) { c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)} cols := []*model.ColumnInfo{c1} tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic} - tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo) + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo) require.NoError(t, err) decoder, err := lkv.NewTableKVDecoder(tbl, "`test`.`c1`", &encode.SessionOptions{ SQLMode: mysql.ModeStrictAllTables, @@ -217,7 +217,7 @@ func TestDecodeIndex(t *testing.T) { State: model.StatePublic, PKIsHandle: false, } - tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo) + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo) if err != nil { fmt.Printf("error: %v", err.Error()) } @@ -262,7 +262,7 @@ func TestEncodeRowFormatV2(t *testing.T) { c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)} cols := []*model.ColumnInfo{c1} tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic} - tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo) + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo) require.NoError(t, err) rows := []types.Datum{ @@ -313,7 +313,7 @@ func TestEncodeTimestamp(t *testing.T) { } cols := []*model.ColumnInfo{c1} tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic} - tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo) + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo) require.NoError(t, err) encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{ @@ -342,7 +342,7 @@ func TestEncodeTimestamp(t *testing.T) { func TestEncodeDoubleAutoIncrement(t *testing.T) { tblInfo := mockTableInfo(t, "create table t (id double not null auto_increment, unique key `u_id` (`id`));") - tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo) + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo) require.NoError(t, err) encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{ @@ -406,7 +406,7 @@ func TestEncodeMissingAutoValue(t *testing.T) { }, } { tblInfo := mockTableInfo(t, testTblInfo.CreateStmt) - tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo) + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo) require.NoError(t, err) encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{ @@ -458,7 +458,7 @@ func TestEncodeMissingAutoValue(t *testing.T) { func TestEncodeExpressionColumn(t *testing.T) { tblInfo := mockTableInfo(t, "create table t (id varchar(40) not null DEFAULT uuid(), unique key `u_id` (`id`));") - tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo) + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo) require.NoError(t, err) encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{ @@ -503,7 +503,7 @@ func mockTableInfo(t *testing.T, createSQL string) *model.TableInfo { func TestDefaultAutoRandoms(t *testing.T) { tblInfo := mockTableInfo(t, "create table t (id bigint unsigned NOT NULL auto_random primary key clustered, a varchar(100));") - tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo) + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo) require.NoError(t, err) encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{ Table: tbl, @@ -541,7 +541,7 @@ func TestDefaultAutoRandoms(t *testing.T) { func TestShardRowId(t *testing.T) { tblInfo := mockTableInfo(t, "create table t (s varchar(16)) shard_row_id_bits = 3;") - tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo) + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo) require.NoError(t, err) encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{ Table: tbl, @@ -703,7 +703,7 @@ func SetUpTest(b *testing.B) *benchSQL2KVSuite { tableInfo.State = model.StatePublic // Construct the corresponding KV encoder. - tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tableInfo) + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tableInfo.SepAutoInc(), 0), tableInfo) require.NoError(b, err) encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{ Table: tbl, diff --git a/br/pkg/lightning/backend/local/duplicate_test.go b/br/pkg/lightning/backend/local/duplicate_test.go index f88ea248f956d..c621c86edfdee 100644 --- a/br/pkg/lightning/backend/local/duplicate_test.go +++ b/br/pkg/lightning/backend/local/duplicate_test.go @@ -40,7 +40,7 @@ func TestBuildDupTask(t *testing.T) { info, err := ddl.MockTableInfo(mock.NewContext(), node[0].(*ast.CreateTableStmt), 1) require.NoError(t, err) info.State = model.StatePublic - tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), info) + tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(info.SepAutoInc(), 0), info) require.NoError(t, err) // Test build duplicate detecting task. diff --git a/br/pkg/lightning/backend/tidb/tidb_test.go b/br/pkg/lightning/backend/tidb/tidb_test.go index fb6710163f939..0ae11123ebf7b 100644 --- a/br/pkg/lightning/backend/tidb/tidb_test.go +++ b/br/pkg/lightning/backend/tidb/tidb_test.go @@ -64,7 +64,7 @@ func createMysqlSuite(t *testing.T) *mysqlSuite { cols = append(cols, col) } tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic} - tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), tblInfo) + tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo) require.NoError(t, err) backendObj := tidb.NewTiDBBackend(context.Background(), db, config.ReplaceOnDup, errormanager.New(nil, config.NewConfig(), log.L())) return &mysqlSuite{ @@ -242,7 +242,7 @@ func testStrictMode(t *testing.T) { ft.SetCharset(charset.CharsetASCII) col1 := &model.ColumnInfo{ID: 2, Name: model.NewCIStr("s1"), State: model.StatePublic, Offset: 1, FieldType: ft} tblInfo := &model.TableInfo{ID: 1, Columns: []*model.ColumnInfo{col0, col1}, PKIsHandle: false, State: model.StatePublic} - tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), tblInfo) + tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo) require.NoError(t, err) ctx := context.Background() diff --git a/br/pkg/lightning/importer/get_pre_info.go b/br/pkg/lightning/importer/get_pre_info.go index c11041dfaa07e..4ae9b5cb4293d 100644 --- a/br/pkg/lightning/importer/get_pre_info.go +++ b/br/pkg/lightning/importer/get_pre_info.go @@ -628,7 +628,7 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable( if err != nil { return 0.0, false, errors.Trace(err) } - idAlloc := kv.NewPanickingAllocators(0) + idAlloc := kv.NewPanickingAllocators(tableInfo.SepAutoInc(), 0) tbl, err := tables.TableFromMeta(idAlloc, tableInfo) if err != nil { return 0.0, false, errors.Trace(err) diff --git a/br/pkg/lightning/importer/meta_manager.go b/br/pkg/lightning/importer/meta_manager.go index fff80ddb09718..d8d5bfa87f013 100644 --- a/br/pkg/lightning/importer/meta_manager.go +++ b/br/pkg/lightning/importer/meta_manager.go @@ -252,6 +252,8 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64 if curStatus == metaStatusInitial { if needAutoID { // maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first. + // TODO this is not right when AUTO_ID_CACHE=1 and have auto row id, + // the id allocators are separated in this case. if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil { return errors.Trace(err) } diff --git a/br/pkg/lightning/importer/table_import.go b/br/pkg/lightning/importer/table_import.go index 4371dacd1ce29..12ed8c4887a83 100644 --- a/br/pkg/lightning/importer/table_import.go +++ b/br/pkg/lightning/importer/table_import.go @@ -88,7 +88,7 @@ func NewTableImporter( etcdCli *clientv3.Client, logger log.Logger, ) (*TableImporter, error) { - idAlloc := kv.NewPanickingAllocators(cp.AllocBase) + idAlloc := kv.NewPanickingAllocators(tableInfo.Core.SepAutoInc(), cp.AllocBase) tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core) if err != nil { return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName) @@ -898,26 +898,30 @@ func (tr *TableImporter) postProcess( if cp.Status < checkpoints.CheckpointStatusAlteredAutoInc { tblInfo := tr.tableInfo.Core var err error + // TODO why we have to rebase id for tidb backend??? remove it later. if tblInfo.ContainsAutoRandomBits() { ft := &common.GetAutoRandomColumn(tblInfo).FieldType shardFmt := autoid.NewShardIDFormat(ft, tblInfo.AutoRandomBits, tblInfo.AutoRandomRangeBits) maxCap := shardFmt.IncrementalBitsCapacity() err = AlterAutoRandom(ctx, rc.db, tr.tableName, uint64(tr.alloc.Get(autoid.AutoRandomType).Base())+1, maxCap) } else if common.TableHasAutoRowID(tblInfo) || tblInfo.GetAutoIncrementColInfo() != nil { - // only alter auto increment id iff table contains auto-increment column or generated handle. - // ALTER TABLE xxx AUTO_INCREMENT = yyy has a bad naming. - // if a table has implicit _tidb_rowid column & tbl.SepAutoID=false, then it works on _tidb_rowid - // allocator, even if the table has NO auto-increment column. - newBase := uint64(tr.alloc.Get(autoid.RowIDAllocType).Base()) + 1 - err = AlterAutoIncrement(ctx, rc.db, tr.tableName, newBase) - - if err == nil && isLocalBackend(rc.cfg) { + if isLocalBackend(rc.cfg) { // for TiDB version >= 6.5.0, a table might have separate allocators for auto_increment column and _tidb_rowid, // especially when a table has auto_increment non-clustered PK, it will use both allocators. // And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column, // not for allocator of _tidb_rowid. // So we need to rebase IDs for those 2 allocators explicitly. - err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr, tr.dbInfo.ID, tr.tableInfo.Core) + err = common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{ + autoid.RowIDAllocType: tr.alloc.Get(autoid.RowIDAllocType).Base(), + autoid.AutoIncrementType: tr.alloc.Get(autoid.AutoIncrementType).Base(), + }, tr, tr.dbInfo.ID, tr.tableInfo.Core) + } else { + // only alter auto increment id iff table contains auto-increment column or generated handle. + // ALTER TABLE xxx AUTO_INCREMENT = yyy has a bad naming. + // if a table has implicit _tidb_rowid column & tbl.SepAutoID=false, then it works on _tidb_rowid + // allocator, even if the table has NO auto-increment column. + newBase := uint64(tr.alloc.Get(autoid.RowIDAllocType).Base()) + 1 + err = AlterAutoIncrement(ctx, rc.db, tr.tableName, newBase) } } saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusAlteredAutoInc) diff --git a/br/pkg/lightning/importer/table_import_test.go b/br/pkg/lightning/importer/table_import_test.go index fa0a24fde8f8d..c041c43e3b57b 100644 --- a/br/pkg/lightning/importer/table_import_test.go +++ b/br/pkg/lightning/importer/table_import_test.go @@ -414,7 +414,7 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() { mockEngineWriter.EXPECT().IsSynced().Return(true).AnyTimes() mockEngineWriter.EXPECT().Close(gomock.Any()).Return(mockChunkFlushStatus, nil).AnyTimes() - tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), s.tableInfo.Core) + tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc(), 0), s.tableInfo.Core) require.NoError(s.T(), err) _, indexUUID := backend.MakeUUID("`db`.`table`", -1) _, dataUUID := backend.MakeUUID("`db`.`table`", 0) @@ -1468,7 +1468,7 @@ func (s *tableRestoreSuite) TestEstimate() { controller := gomock.NewController(s.T()) defer controller.Finish() mockEncBuilder := mock.NewMockEncodingBuilder(controller) - idAlloc := kv.NewPanickingAllocators(0) + idAlloc := kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc(), 0) tbl, err := tables.TableFromMeta(idAlloc, s.tableInfo.Core) require.NoError(s.T(), err) diff --git a/br/tests/lightning_csv/data/auto_incr_id.nonclustered_cache1_shard_autorowid-schema.sql b/br/tests/lightning_csv/data/auto_incr_id.nonclustered_cache1_shard_autorowid-schema.sql new file mode 100644 index 0000000000000..4b34758103d03 --- /dev/null +++ b/br/tests/lightning_csv/data/auto_incr_id.nonclustered_cache1_shard_autorowid-schema.sql @@ -0,0 +1,9 @@ +/* + 1778961125641936898 is 0001100010110000001000111011011111101011000111011110000000000010 + bigger than the max increment part of sharded auto row id. + */ +CREATE TABLE nonclustered_cache1_shard_autorowid ( + id bigint(20) unsigned NOT NULL AUTO_INCREMENT, + v int, + PRIMARY KEY (id) NONCLUSTERED +) AUTO_ID_CACHE=1 SHARD_ROW_ID_BITS=4; diff --git a/br/tests/lightning_csv/data/auto_incr_id.nonclustered_cache1_shard_autorowid.0.csv b/br/tests/lightning_csv/data/auto_incr_id.nonclustered_cache1_shard_autorowid.0.csv new file mode 100644 index 0000000000000..75562d4451313 --- /dev/null +++ b/br/tests/lightning_csv/data/auto_incr_id.nonclustered_cache1_shard_autorowid.0.csv @@ -0,0 +1,3 @@ +1778961125641936898,1 +1778961125641936899,2 +1778961125641936900,3 diff --git a/br/tests/lightning_csv/run.sh b/br/tests/lightning_csv/run.sh index 82d645cffd0b7..5328ac248af8b 100755 --- a/br/tests/lightning_csv/run.sh +++ b/br/tests/lightning_csv/run.sh @@ -44,7 +44,7 @@ function run_with() { run_sql 'SELECT id FROM csv.empty_strings WHERE b <> ""' check_not_contains 'id:' - for table in clustered nonclustered clustered_cache1 nonclustered_cache1; do + for table in clustered nonclustered clustered_cache1 nonclustered_cache1 nonclustered_cache1_shard_autorowid; do run_sql "select count(*) from auto_incr_id.$table" check_contains 'count(*): 3' # insert should work diff --git a/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql b/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql index a69f5bf4350eb..c291100c82d2c 100644 --- a/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql +++ b/br/tests/lightning_tidb_rowid/data/rowid.specific_auto_inc-schema.sql @@ -1 +1,3 @@ -create table specific_auto_inc (a varchar(6) primary key /*T![clustered_index] NONCLUSTERED */, b int unique auto_increment) auto_increment=80000; +create table specific_auto_inc ( + a varchar(6) primary key /*T![clustered_index] NONCLUSTERED */, + b int unique auto_increment) auto_increment=80000; diff --git a/br/tests/lightning_tool_1472/run.sh b/br/tests/lightning_tool_1472/run.sh index e8c2c5a8cfd08..71efe4a079a56 100755 --- a/br/tests/lightning_tool_1472/run.sh +++ b/br/tests/lightning_tool_1472/run.sh @@ -25,8 +25,8 @@ run_lightning run_sql 'insert into EE1472.pk values ();' run_sql 'select count(a), max(a) from EE1472.pk;' check_contains 'count(a): 3' -check_contains 'max(a): 6' +check_contains 'max(a): 5' run_sql 'insert into EE1472.notpk (a) values (3333);' run_sql 'select b from EE1472.notpk where a = 3333;' -check_contains 'b: 11' +check_contains 'b: 10' diff --git a/disttask/loaddata/dispatcher.go b/disttask/loaddata/dispatcher.go index 7a867dd5c3c8d..ba613d843cee1 100644 --- a/disttask/loaddata/dispatcher.go +++ b/disttask/loaddata/dispatcher.go @@ -77,7 +77,7 @@ func (*FlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, gT } func generateSubtaskMetas(ctx context.Context, taskMeta *TaskMeta) ([]*SubtaskMeta, error) { - idAlloc := kv.NewPanickingAllocators(0) + idAlloc := kv.NewPanickingAllocators(taskMeta.Plan.TableInfo.SepAutoInc(), 0) tbl, err := tables.TableFromMeta(idAlloc, taskMeta.Plan.TableInfo) if err != nil { return nil, err diff --git a/disttask/loaddata/scheduler.go b/disttask/loaddata/scheduler.go index e08213b2240d7..f2371ebd7989a 100644 --- a/disttask/loaddata/scheduler.go +++ b/disttask/loaddata/scheduler.go @@ -41,7 +41,7 @@ type ImportScheduler struct { func (s *ImportScheduler) InitSubtaskExecEnv(ctx context.Context) error { logutil.BgLogger().Info("InitSubtaskExecEnv", zap.Any("taskMeta", s.taskMeta)) - idAlloc := kv.NewPanickingAllocators(0) + idAlloc := kv.NewPanickingAllocators(s.taskMeta.Plan.TableInfo.SepAutoInc(), 0) tbl, err := tables.TableFromMeta(idAlloc, s.taskMeta.Plan.TableInfo) if err != nil { return err diff --git a/executor/importer/table_import.go b/executor/importer/table_import.go index b750657a4421e..c0945240a25f5 100644 --- a/executor/importer/table_import.go +++ b/executor/importer/table_import.go @@ -81,7 +81,7 @@ func prepareSortDir(e *LoadDataController, jobID int64) (string, error) { // NewTableImporter creates a new table importer. func NewTableImporter(param *JobImportParam, e *LoadDataController) (ti *TableImporter, err error) { - idAlloc := kv.NewPanickingAllocators(0) + idAlloc := kv.NewPanickingAllocators(e.Table.Meta().SepAutoInc(), 0) tbl, err := tables.TableFromMeta(idAlloc, e.Table.Meta()) if err != nil { return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", e.Table.Meta().Name)