Skip to content

Commit

Permalink
lightning: fix insert err after import for AUTO_ID_CACHE=1 and SHARD_…
Browse files Browse the repository at this point in the history
…ROW_ID_BITS (#52712) (#52722)

close #52654
  • Loading branch information
ti-chi-bot authored Apr 18, 2024
1 parent 505b8b0 commit eff0402
Show file tree
Hide file tree
Showing 19 changed files with 109 additions and 48 deletions.
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_test(
name = "kv_test",
timeout = "short",
srcs = [
"allocator_test.go",
"base_test.go",
"session_internal_test.go",
"session_test.go",
Expand All @@ -57,7 +58,7 @@ go_test(
embed = [":kv"],
flaky = True,
race = "on",
shard_count = 17,
shard_count = 18,
deps = [
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/common",
Expand Down
29 changes: 17 additions & 12 deletions br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,38 @@ import (
// panickingAllocator is an ID allocator which panics on all operations except Rebase
type panickingAllocator struct {
autoid.Allocator
base *int64
base atomic.Int64
ty autoid.AllocatorType
}

// NewPanickingAllocators creates a PanickingAllocator shared by all allocation types.
// we use this to collect the max id(either _tidb_rowid or auto_increment id or auto_random) used
// during import, and we will use this info to do ALTER TABLE xxx AUTO_RANDOM_BASE or AUTO_INCREMENT
// on post-process phase.
func NewPanickingAllocators(base int64) autoid.Allocators {
sharedBase := &base
return autoid.NewAllocators(
false,
&panickingAllocator{base: sharedBase, ty: autoid.RowIDAllocType},
&panickingAllocator{base: sharedBase, ty: autoid.AutoIncrementType},
&panickingAllocator{base: sharedBase, ty: autoid.AutoRandomType},
)
// TODO: support save all bases in checkpoint.
func NewPanickingAllocators(sepAutoInc bool, base int64) autoid.Allocators {
allocs := make([]autoid.Allocator, 0, 3)
for _, t := range []autoid.AllocatorType{
autoid.RowIDAllocType,
autoid.AutoIncrementType,
autoid.AutoRandomType,
} {
pa := &panickingAllocator{ty: t}
pa.base.Store(base)
allocs = append(allocs, pa)
}
return autoid.NewAllocators(sepAutoInc, allocs...)
}

// Rebase implements the autoid.Allocator interface
func (alloc *panickingAllocator) Rebase(_ context.Context, newBase int64, _ bool) error {
// CAS
for {
oldBase := atomic.LoadInt64(alloc.base)
oldBase := alloc.base.Load()
if newBase <= oldBase {
break
}
if atomic.CompareAndSwapInt64(alloc.base, oldBase, newBase) {
if alloc.base.CompareAndSwap(oldBase, newBase) {
break
}
}
Expand All @@ -61,7 +66,7 @@ func (alloc *panickingAllocator) Rebase(_ context.Context, newBase int64, _ bool

// Base implements the autoid.Allocator interface
func (alloc *panickingAllocator) Base() int64 {
return atomic.LoadInt64(alloc.base)
return alloc.base.Load()
}

func (alloc *panickingAllocator) GetType() autoid.AllocatorType {
Expand Down
35 changes: 35 additions & 0 deletions br/pkg/lightning/backend/kv/allocator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"testing"

"github.com/pingcap/tidb/meta/autoid"
"github.com/stretchr/testify/require"
)

func TestAllocator(t *testing.T) {
alloc := NewPanickingAllocators(true, 0)
require.NoError(t, alloc.Get(autoid.RowIDAllocType).Rebase(nil, 123, false))
// cannot revert back
require.NoError(t, alloc.Get(autoid.RowIDAllocType).Rebase(nil, 100, false))
require.NoError(t, alloc.Get(autoid.AutoIncrementType).Rebase(nil, 456, false))
require.NoError(t, alloc.Get(autoid.AutoRandomType).Rebase(nil, 789, false))

require.EqualValues(t, 123, alloc.Get(autoid.RowIDAllocType).Base())
require.EqualValues(t, 456, alloc.Get(autoid.AutoIncrementType).Base())
require.EqualValues(t, 789, alloc.Get(autoid.AutoRandomType).Base())
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestLogKVConvertFailed(t *testing.T) {
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
var tbl table.Table
tbl, err = tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
tbl, err = tables.TableFromMeta(NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

var baseKVEncoder *BaseKVEncoder
Expand Down
22 changes: 11 additions & 11 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestEncode(t *testing.T) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

logger := log.Logger{Logger: zap.NewNop()}
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestDecode(t *testing.T) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)
decoder, err := lkv.NewTableKVDecoder(tbl, "`test`.`c1`", &encode.SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestDecodeIndex(t *testing.T) {
State: model.StatePublic,
PKIsHandle: false,
}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
if err != nil {
fmt.Printf("error: %v", err.Error())
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestEncodeRowFormatV2(t *testing.T) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

rows := []types.Datum{
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestEncodeTimestamp(t *testing.T) {
}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Expand Down Expand Up @@ -342,7 +342,7 @@ func TestEncodeTimestamp(t *testing.T) {

func TestEncodeDoubleAutoIncrement(t *testing.T) {
tblInfo := mockTableInfo(t, "create table t (id double not null auto_increment, unique key `u_id` (`id`));")
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestEncodeMissingAutoValue(t *testing.T) {
},
} {
tblInfo := mockTableInfo(t, testTblInfo.CreateStmt)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestEncodeMissingAutoValue(t *testing.T) {

func TestEncodeExpressionColumn(t *testing.T) {
tblInfo := mockTableInfo(t, "create table t (id varchar(40) not null DEFAULT uuid(), unique key `u_id` (`id`));")
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Expand Down Expand Up @@ -503,7 +503,7 @@ func mockTableInfo(t *testing.T, createSQL string) *model.TableInfo {

func TestDefaultAutoRandoms(t *testing.T) {
tblInfo := mockTableInfo(t, "create table t (id bigint unsigned NOT NULL auto_random primary key clustered, a varchar(100));")
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)
encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Table: tbl,
Expand Down Expand Up @@ -541,7 +541,7 @@ func TestDefaultAutoRandoms(t *testing.T) {

func TestShardRowId(t *testing.T) {
tblInfo := mockTableInfo(t, "create table t (s varchar(16)) shard_row_id_bits = 3;")
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)
encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Table: tbl,
Expand Down Expand Up @@ -703,7 +703,7 @@ func SetUpTest(b *testing.B) *benchSQL2KVSuite {
tableInfo.State = model.StatePublic

// Construct the corresponding KV encoder.
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), tableInfo)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(tableInfo.SepAutoInc(), 0), tableInfo)
require.NoError(b, err)
encoder, err := lkv.NewTableKVEncoder(&encode.EncodingConfig{
Table: tbl,
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/duplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestBuildDupTask(t *testing.T) {
info, err := ddl.MockTableInfo(mock.NewContext(), node[0].(*ast.CreateTableStmt), 1)
require.NoError(t, err)
info.State = model.StatePublic
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(0), info)
tbl, err := tables.TableFromMeta(lkv.NewPanickingAllocators(info.SepAutoInc(), 0), info)
require.NoError(t, err)

// Test build duplicate detecting task.
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func createMysqlSuite(t *testing.T) *mysqlSuite {
cols = append(cols, col)
}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)
backendObj := tidb.NewTiDBBackend(context.Background(), db, config.ReplaceOnDup, errormanager.New(nil, config.NewConfig(), log.L()))
return &mysqlSuite{
Expand Down Expand Up @@ -242,7 +242,7 @@ func testStrictMode(t *testing.T) {
ft.SetCharset(charset.CharsetASCII)
col1 := &model.ColumnInfo{ID: 2, Name: model.NewCIStr("s1"), State: model.StatePublic, Offset: 1, FieldType: ft}
tblInfo := &model.TableInfo{ID: 1, Columns: []*model.ColumnInfo{col0, col1}, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), tblInfo)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(tblInfo.SepAutoInc(), 0), tblInfo)
require.NoError(t, err)

ctx := context.Background()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable(
if err != nil {
return 0.0, false, errors.Trace(err)
}
idAlloc := kv.NewPanickingAllocators(0)
idAlloc := kv.NewPanickingAllocators(tableInfo.SepAutoInc(), 0)
tbl, err := tables.TableFromMeta(idAlloc, tableInfo)
if err != nil {
return 0.0, false, errors.Trace(err)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/importer/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
if curStatus == metaStatusInitial {
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
// TODO this is not right when AUTO_ID_CACHE=1 and have auto row id,
// the id allocators are separated in this case.
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
}
Expand Down
24 changes: 14 additions & 10 deletions br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewTableImporter(
etcdCli *clientv3.Client,
logger log.Logger,
) (*TableImporter, error) {
idAlloc := kv.NewPanickingAllocators(cp.AllocBase)
idAlloc := kv.NewPanickingAllocators(tableInfo.Core.SepAutoInc(), cp.AllocBase)
tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core)
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName)
Expand Down Expand Up @@ -898,26 +898,30 @@ func (tr *TableImporter) postProcess(
if cp.Status < checkpoints.CheckpointStatusAlteredAutoInc {
tblInfo := tr.tableInfo.Core
var err error
// TODO why we have to rebase id for tidb backend??? remove it later.
if tblInfo.ContainsAutoRandomBits() {
ft := &common.GetAutoRandomColumn(tblInfo).FieldType
shardFmt := autoid.NewShardIDFormat(ft, tblInfo.AutoRandomBits, tblInfo.AutoRandomRangeBits)
maxCap := shardFmt.IncrementalBitsCapacity()
err = AlterAutoRandom(ctx, rc.db, tr.tableName, uint64(tr.alloc.Get(autoid.AutoRandomType).Base())+1, maxCap)
} else if common.TableHasAutoRowID(tblInfo) || tblInfo.GetAutoIncrementColInfo() != nil {
// only alter auto increment id iff table contains auto-increment column or generated handle.
// ALTER TABLE xxx AUTO_INCREMENT = yyy has a bad naming.
// if a table has implicit _tidb_rowid column & tbl.SepAutoID=false, then it works on _tidb_rowid
// allocator, even if the table has NO auto-increment column.
newBase := uint64(tr.alloc.Get(autoid.RowIDAllocType).Base()) + 1
err = AlterAutoIncrement(ctx, rc.db, tr.tableName, newBase)

if err == nil && isLocalBackend(rc.cfg) {
if isLocalBackend(rc.cfg) {
// for TiDB version >= 6.5.0, a table might have separate allocators for auto_increment column and _tidb_rowid,
// especially when a table has auto_increment non-clustered PK, it will use both allocators.
// And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column,
// not for allocator of _tidb_rowid.
// So we need to rebase IDs for those 2 allocators explicitly.
err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr, tr.dbInfo.ID, tr.tableInfo.Core)
err = common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{
autoid.RowIDAllocType: tr.alloc.Get(autoid.RowIDAllocType).Base(),
autoid.AutoIncrementType: tr.alloc.Get(autoid.AutoIncrementType).Base(),
}, tr, tr.dbInfo.ID, tr.tableInfo.Core)
} else {
// only alter auto increment id iff table contains auto-increment column or generated handle.
// ALTER TABLE xxx AUTO_INCREMENT = yyy has a bad naming.
// if a table has implicit _tidb_rowid column & tbl.SepAutoID=false, then it works on _tidb_rowid
// allocator, even if the table has NO auto-increment column.
newBase := uint64(tr.alloc.Get(autoid.RowIDAllocType).Base()) + 1
err = AlterAutoIncrement(ctx, rc.db, tr.tableName, newBase)
}
}
saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, err, checkpoints.CheckpointStatusAlteredAutoInc)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() {
mockEngineWriter.EXPECT().IsSynced().Return(true).AnyTimes()
mockEngineWriter.EXPECT().Close(gomock.Any()).Return(mockChunkFlushStatus, nil).AnyTimes()

tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), s.tableInfo.Core)
tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc(), 0), s.tableInfo.Core)
require.NoError(s.T(), err)
_, indexUUID := backend.MakeUUID("`db`.`table`", -1)
_, dataUUID := backend.MakeUUID("`db`.`table`", 0)
Expand Down Expand Up @@ -1468,7 +1468,7 @@ func (s *tableRestoreSuite) TestEstimate() {
controller := gomock.NewController(s.T())
defer controller.Finish()
mockEncBuilder := mock.NewMockEncodingBuilder(controller)
idAlloc := kv.NewPanickingAllocators(0)
idAlloc := kv.NewPanickingAllocators(s.tableInfo.Core.SepAutoInc(), 0)
tbl, err := tables.TableFromMeta(idAlloc, s.tableInfo.Core)
require.NoError(s.T(), err)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
1778961125641936898 is 0001100010110000001000111011011111101011000111011110000000000010
bigger than the max increment part of sharded auto row id.
*/
CREATE TABLE nonclustered_cache1_shard_autorowid (
id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
v int,
PRIMARY KEY (id) NONCLUSTERED
) AUTO_ID_CACHE=1 SHARD_ROW_ID_BITS=4;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1778961125641936898,1
1778961125641936899,2
1778961125641936900,3
2 changes: 1 addition & 1 deletion br/tests/lightning_csv/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ function run_with() {
run_sql 'SELECT id FROM csv.empty_strings WHERE b <> ""'
check_not_contains 'id:'

for table in clustered nonclustered clustered_cache1 nonclustered_cache1; do
for table in clustered nonclustered clustered_cache1 nonclustered_cache1 nonclustered_cache1_shard_autorowid; do
run_sql "select count(*) from auto_incr_id.$table"
check_contains 'count(*): 3'
# insert should work
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
create table specific_auto_inc (a varchar(6) primary key /*T![clustered_index] NONCLUSTERED */, b int unique auto_increment) auto_increment=80000;
create table specific_auto_inc (
a varchar(6) primary key /*T![clustered_index] NONCLUSTERED */,
b int unique auto_increment) auto_increment=80000;
4 changes: 2 additions & 2 deletions br/tests/lightning_tool_1472/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ run_lightning
run_sql 'insert into EE1472.pk values ();'
run_sql 'select count(a), max(a) from EE1472.pk;'
check_contains 'count(a): 3'
check_contains 'max(a): 6'
check_contains 'max(a): 5'

run_sql 'insert into EE1472.notpk (a) values (3333);'
run_sql 'select b from EE1472.notpk where a = 3333;'
check_contains 'b: 11'
check_contains 'b: 10'
2 changes: 1 addition & 1 deletion disttask/loaddata/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (*FlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, gT
}

func generateSubtaskMetas(ctx context.Context, taskMeta *TaskMeta) ([]*SubtaskMeta, error) {
idAlloc := kv.NewPanickingAllocators(0)
idAlloc := kv.NewPanickingAllocators(taskMeta.Plan.TableInfo.SepAutoInc(), 0)
tbl, err := tables.TableFromMeta(idAlloc, taskMeta.Plan.TableInfo)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion disttask/loaddata/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type ImportScheduler struct {
func (s *ImportScheduler) InitSubtaskExecEnv(ctx context.Context) error {
logutil.BgLogger().Info("InitSubtaskExecEnv", zap.Any("taskMeta", s.taskMeta))

idAlloc := kv.NewPanickingAllocators(0)
idAlloc := kv.NewPanickingAllocators(s.taskMeta.Plan.TableInfo.SepAutoInc(), 0)
tbl, err := tables.TableFromMeta(idAlloc, s.taskMeta.Plan.TableInfo)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func prepareSortDir(e *LoadDataController, jobID int64) (string, error) {

// NewTableImporter creates a new table importer.
func NewTableImporter(param *JobImportParam, e *LoadDataController) (ti *TableImporter, err error) {
idAlloc := kv.NewPanickingAllocators(0)
idAlloc := kv.NewPanickingAllocators(e.Table.Meta().SepAutoInc(), 0)
tbl, err := tables.TableFromMeta(idAlloc, e.Table.Meta())
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", e.Table.Meta().Name)
Expand Down

0 comments on commit eff0402

Please sign in to comment.