Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: global index support admin check table | index #53156

Merged
merged 14 commits into from
May 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
@@ -2471,6 +2471,12 @@ func (w *cleanUpIndexWorker) BackfillData(handleRange reorgBackfillTask) (taskCt
return nil
})
logSlowOperations(time.Since(oprStartTime), "cleanUpIndexBackfillDataInTxn", 3000)
failpoint.Inject("mockDMLExecution", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) && MockDMLExecution != nil {
MockDMLExecution()
}
})

return
}
6 changes: 6 additions & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
@@ -449,6 +449,9 @@ func buildIndexLookUpChecker(b *executorBuilder, p *plannercore.PhysicalIndexLoo
if !e.isCommonHandle() {
fullColLen++
}
if e.index.Global {
fullColLen++
}
e.dagPB.OutputOffsets = make([]uint32, fullColLen)
for i := 0; i < fullColLen; i++ {
e.dagPB.OutputOffsets[i] = uint32(i)
@@ -468,6 +471,9 @@ func buildIndexLookUpChecker(b *executorBuilder, p *plannercore.PhysicalIndexLoo
if !e.isCommonHandle() {
tps = append(tps, types.NewFieldType(mysql.TypeLonglong))
}
if e.index.Global {
tps = append(tps, types.NewFieldType(mysql.TypeLonglong))
}

e.checkIndexValue = &checkIndexValue{idxColTps: tps}

7 changes: 3 additions & 4 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
@@ -594,17 +594,16 @@ func (e *IndexLookUpExecutor) needPartitionHandle(tp getHandleType) (bool, error
outputOffsets := e.tableRequest.OutputOffsets
col = cols[outputOffsets[len(outputOffsets)-1]]

// For TableScan, need partitionHandle in `indexOrder` when e.keepOrder == true
needPartitionHandle = (e.index.Global || e.partitionTableMode) && e.keepOrder
// For TableScan, need partitionHandle in `indexOrder` when e.keepOrder == true or execute `admin check [table|index]` with global index
needPartitionHandle = ((e.index.Global || e.partitionTableMode) && e.keepOrder) || (e.index.Global && e.checkIndexValue != nil)
// no ExtraPidColID here, because TableScan shouldn't contain them.
hasExtraCol = col.ID == model.ExtraPhysTblID
}

// TODO: fix global index related bugs later
// There will be two needPartitionHandle != hasExtraCol situations.
// Only `needPartitionHandle` == true and `hasExtraCol` == false are not allowed.
// `ExtraPhysTblID` will be used in `SelectLock` when `needPartitionHandle` == false and `hasExtraCol` == true.
if needPartitionHandle && !hasExtraCol && !e.index.Global {
if needPartitionHandle && !hasExtraCol {
return needPartitionHandle, errors.Errorf("Internal error, needPartitionHandle != ret, tp(%d)", tp)
}
return needPartitionHandle, nil
7 changes: 6 additions & 1 deletion pkg/executor/test/admintest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -8,9 +8,11 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 18,
shard_count = 21,
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/util/callback",
"//pkg/domain",
"//pkg/errno",
"//pkg/executor",
@@ -22,6 +24,7 @@ go_test(
"//pkg/sessionctx/variable",
"//pkg/table",
"//pkg/table/tables",
"//pkg/tablecodec",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"//pkg/testkit/testutil",
@@ -32,6 +35,8 @@ go_test(
"//pkg/util/mock",
"//pkg/util/redact",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
239 changes: 239 additions & 0 deletions pkg/executor/test/admintest/admin_test.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,9 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/domain"
mysql "github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/executor"
@@ -34,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testutil"
"github.com/pingcap/tidb/pkg/types"
@@ -42,6 +46,7 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil/consistency"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tidb/pkg/util/redact"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
@@ -1773,3 +1778,237 @@ func TestAdminCheckTableErrorLocateForClusterIndex(t *testing.T) {
tk.MustExec("admin check table admin_test")
}
}

func TestAdminCheckGlobalIndex(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
var enableFastCheck = []bool{false, true}
for _, enabled := range enableFastCheck {
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test")

tk.MustExec("set tidb_enable_global_index = true")
tk.MustExec(fmt.Sprintf("set tidb_enable_fast_table_check = %v", enabled))

tk.MustExec("create table admin_test (a int, b int, c int, unique key uidx_a(a)) partition by hash(c) partitions 5")
tk.MustExec("insert admin_test values (-10, -20, 1), (-1, -10, 2), (1, 11, 3), (2, 12, 0), (5, 15, -1), (10, 20, -2), (20, 30, -3)")

// Make some corrupted index. Build the index information.
sctx := mock.NewContext()
sctx.Store = store
is := domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("admin_test")
tbl, err := is.TableByName(dbName, tblName)
require.NoError(t, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
require.True(t, idxInfo.Global)
idx := tbl.Indices()[0]
require.NotNil(t, idx)

// Reduce one row of table.
// Index count > table count, (2, 12, 0) is deleted.
txn, err := store.Begin()
require.NoError(t, err)
err = txn.Delete(tablecodec.EncodeRowKey(tblInfo.GetPartitionInfo().Definitions[0].ID, kv.IntHandle(4).Encoded()))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table admin_test")
require.Error(t, err)
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))
require.ErrorContains(t, err, "[admin:8223]data inconsistency in table: admin_test, index: uidx_a, handle: 4, index-values:\"handle: 4, values: [KindInt64 2")

indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[0].ID, tblInfo, idxInfo)
// Remove corresponding index key/value.
// Admin check table will success.
txn, err = store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(tk.Session().GetTableCtx(), txn, []types.Datum{types.NewIntDatum(2)}, kv.IntHandle(4))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
tk.MustExec("admin check table admin_test")

indexOpr = tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[2].ID, tblInfo, idxInfo)

// Reduce one row of index.
// Index count < table count, (-1, -10, 2) is deleted.
txn, err = store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(tk.Session().GetTableCtx(), txn, []types.Datum{types.NewIntDatum(-1)}, kv.IntHandle(2))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table admin_test")
require.Error(t, err)
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))
require.EqualError(t, err, "[admin:8223]data inconsistency in table: admin_test, index: uidx_a, handle: 2, index-values:\"\" != record-values:\"handle: 2, values: [KindInt64 -1]\"")

// Add one row of index with inconsistent value.
// Index count = table count, but data is different.
txn, err = store.Begin()
require.NoError(t, err)
_, err = indexOpr.Create(tk.Session().GetTableCtx(), txn, []types.Datum{types.NewIntDatum(100)}, kv.IntHandle(2), nil)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table admin_test")
require.Error(t, err)
if !enabled {
require.True(t, consistency.ErrAdminCheckInconsistentWithColInfo.Equal(err))
require.EqualError(t, err, "[executor:8134]data inconsistency in table: admin_test, index: uidx_a, col: a, handle: \"2\", index-values:\"KindInt64 100\" != record-values:\"KindInt64 -1\", compare err:<nil>")
} else {
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))
require.EqualError(t, err, "[admin:8223]data inconsistency in table: admin_test, index: uidx_a, handle: 2, index-values:\"handle: 2, values: [KindInt64 100]\" != record-values:\"handle: 2, values: [KindInt64 -1]\"")
}
}
}

func TestAdminCheckGlobalIndexWithClusterIndex(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)

getCommonHandle := func(row int) *kv.CommonHandle {
h, err := codec.EncodeKey(tk.Session().GetSessionVars().StmtCtx.TimeZone(), nil, types.MakeDatums(row)...)
require.NoError(t, err)
ch, err := kv.NewCommonHandle(h)
require.NoError(t, err)
return ch
}

var enableFastCheck = []bool{false, true}
for _, enabled := range enableFastCheck {
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test")

tk.MustExec("set tidb_enable_global_index = true")
tk.MustExec(fmt.Sprintf("set tidb_enable_fast_table_check = %v", enabled))

tk.MustExec("create table admin_test (a int, b int, c int, unique key uidx_a(a), primary key(c)) partition by hash(c) partitions 5")
tk.MustExec("insert admin_test values (-10, -20, 1), (-1, -10, 2), (1, 11, 3), (2, 12, 0), (5, 15, -1), (10, 20, -2), (20, 30, -3)")

// Make some corrupted index. Build the index information.
sctx := mock.NewContext()
sctx.Store = store
is := domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("admin_test")
tbl, err := is.TableByName(dbName, tblName)
require.NoError(t, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
require.True(t, idxInfo.Global)
df := tblInfo.GetPartitionInfo().Definitions[0]

// Reduce one row of table.
// Index count > table count, (2, 12, 0) is deleted.
txn, err := store.Begin()
require.NoError(t, err)
txn.Delete(tablecodec.EncodeRowKey(df.ID, kv.IntHandle(0).Encoded()))
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table admin_test")
require.Error(t, err)
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))
require.ErrorContains(t, err, "[admin:8223]data inconsistency in table: admin_test, index: uidx_a, handle: 0, index-values:\"handle: 0, values: [KindInt64 2")

indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[0].ID, tblInfo, idxInfo)
// Remove corresponding index key/value.
// Admin check table will success.
txn, err = store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(tk.Session().GetTableCtx(), txn, []types.Datum{types.NewIntDatum(2)}, getCommonHandle(0))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
tk.MustExec("admin check table admin_test")

indexOpr = tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[2].ID, tblInfo, idxInfo)
// Reduce one row of index.
// Index count < table count, (-1, -10, 2) is deleted.
txn, err = store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(tk.Session().GetTableCtx(), txn, []types.Datum{types.NewIntDatum(-1)}, getCommonHandle(2))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table admin_test")
require.Error(t, err)
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))
require.EqualError(t, err, "[admin:8223]data inconsistency in table: admin_test, index: uidx_a, handle: 2, index-values:\"\" != record-values:\"handle: 2, values: [KindInt64 -1]\"")

// Add one row with inconsistent value.
// Index count = table count, but data is different.
txn, err = store.Begin()
require.NoError(t, err)
_, err = indexOpr.Create(tk.Session().GetTableCtx(), txn, []types.Datum{types.NewIntDatum(100)}, getCommonHandle(2), nil)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table admin_test")
require.Error(t, err)
if !enabled {
require.True(t, consistency.ErrAdminCheckInconsistentWithColInfo.Equal(err))
require.EqualError(t, err, "[executor:8134]data inconsistency in table: admin_test, index: uidx_a, col: a, handle: \"2\", index-values:\"KindInt64 100\" != record-values:\"KindInt64 -1\", compare err:<nil>")
} else {
require.True(t, consistency.ErrAdminCheckInconsistent.Equal(err))
require.EqualError(t, err, "[admin:8223]data inconsistency in table: admin_test, index: uidx_a, handle: 2, index-values:\"handle: 2, values: [KindInt64 100]\" != record-values:\"handle: 2, values: [KindInt64 -1]\"")
}
}
}

func TestAdminCheckGlobalIndexDuringDDL(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
originalHook := dom.DDL().GetHook()
tk := testkit.NewTestKit(t, store)

var schemaMap = make(map[model.SchemaState]struct{})

hook := &callback.TestDDLCallback{Do: dom}
onJobUpdatedExportedFunc := func(job *model.Job) {
schemaMap[job.SchemaState] = struct{}{}
_, err := tk.Exec("admin check table admin_test")
assert.NoError(t, err)
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)

// check table after delete some index key/value pairs.
ddl.MockDMLExecution = func() {
_, err := tk.Exec("admin check table admin_test")
assert.NoError(t, err)
}

batchSize := 32
tk.MustExec(fmt.Sprintf("set global tidb_ddl_reorg_batch_size = %d", batchSize))

var enableFastCheck = []bool{false, true}
for _, enabled := range enableFastCheck {
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test")

tk.MustExec("set tidb_enable_global_index = true")
tk.MustExec(fmt.Sprintf("set tidb_enable_fast_table_check = %v", enabled))

tk.MustExec("create table admin_test (a int, b int, c int, unique key uidx_a(a), primary key(c)) partition by hash(c) partitions 5")
tk.MustExec("insert admin_test values (-10, -20, 1), (-1, -10, 2), (1, 11, 3), (2, 12, 0), (5, 15, -1), (10, 20, -2), (20, 30, -3)")
for i := 1; i <= batchSize*2; i++ {
tk.MustExec(fmt.Sprintf("insert admin_test values (%d, %d, %d)", i*5+1, i, i*5+1))
}

dom.DDL().SetHook(hook)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecution", "1*return(true)->return(false)"))
tk.MustExec("alter table admin_test truncate partition p1")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecution"))
dom.DDL().SetHook(originalHook)

// Should have 3 different schema states, `none`, `deleteOnly`, `deleteReorg`
require.Len(t, schemaMap, 3)
for ss := range schemaMap {
delete(schemaMap, ss)
}
}
}
13 changes: 8 additions & 5 deletions pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
@@ -1481,7 +1481,7 @@ func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (base.P

func (b *PlanBuilder) buildPhysicalIndexLookUpReader(_ context.Context, dbName model.CIStr, tbl table.Table, idx *model.IndexInfo) (base.Plan, error) {
tblInfo := tbl.Meta()
physicalID, isPartition := getPhysicalID(tbl)
physicalID, isPartition := getPhysicalID(tbl, idx.Global)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

planner part LGTM

fullExprCols, _, err := expression.TableInfo2SchemaAndNames(b.ctx.GetExprCtx(), dbName, tblInfo)
if err != nil {
return nil, err
@@ -1555,6 +1555,9 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReader(_ context.Context, dbName m
}
}
}
if is.Index.Global {
ts.Columns, ts.schema, _ = AddExtraPhysTblIDColumn(b.ctx, ts.Columns, ts.schema)
}

cop := &CopTask{
indexPlan: is,
@@ -1591,9 +1594,9 @@ func getIndexColsSchema(tblInfo *model.TableInfo, idx *model.IndexInfo, allColSc
return schema
}

func getPhysicalID(t table.Table) (physicalID int64, isPartition bool) {
func getPhysicalID(t table.Table, isGlobalIndex bool) (physicalID int64, isPartition bool) {
tblInfo := t.Meta()
if tblInfo.GetPartitionInfo() != nil {
if !isGlobalIndex && tblInfo.GetPartitionInfo() != nil {
pid := t.(table.PhysicalTable).GetPhysicalID()
return pid, true
}
@@ -1681,8 +1684,8 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbNam
}
}
indexInfos = append(indexInfos, idxInfo)
// For partition tables.
if pi := tbl.Meta().GetPartitionInfo(); pi != nil {
// For partition tables except global index.
if pi := tbl.Meta().GetPartitionInfo(); pi != nil && !idxInfo.Global {
for _, def := range pi.Definitions {
t := tbl.(table.PartitionedTable).GetPartition(def.ID)
reader, err := b.buildPhysicalIndexLookUpReader(ctx, dbName, t, idxInfo)
4 changes: 4 additions & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
@@ -1872,6 +1872,9 @@ func (s *session) getInternalSession(execOption sqlexec.ExecOption) (*session, f
}
se.sessionVars.OptimizerUseInvisibleIndexes = s.sessionVars.OptimizerUseInvisibleIndexes

preSkipStats := s.sessionVars.SkipMissingPartitionStats
se.sessionVars.SkipMissingPartitionStats = s.sessionVars.SkipMissingPartitionStats

if execOption.SnapshotTS != 0 {
if err := se.sessionVars.SetSystemVar(variable.TiDBSnapshot, strconv.FormatUint(execOption.SnapshotTS, 10)); err != nil {
return nil, nil, err
@@ -1913,6 +1916,7 @@ func (s *session) getInternalSession(execOption sqlexec.ExecOption) (*session, f
}
se.sessionVars.PartitionPruneMode.Store(prePruneMode)
se.sessionVars.OptimizerUseInvisibleIndexes = false
se.sessionVars.SkipMissingPartitionStats = preSkipStats
se.sessionVars.InspectionTableCache = nil
se.sessionVars.MemTracker.Detach()
s.sysSessionPool().Put(tmp)
3 changes: 3 additions & 0 deletions tests/integrationtest/r/executor/sample.result
Original file line number Diff line number Diff line change
@@ -210,6 +210,9 @@ set @@global.tidb_scatter_region=default;
drop table if exists t;
create table t (a int, b varchar(255), primary key (a)) partition by hash(a) partitions 2;
insert into t values (1, '1'), (2, '2'), (3, '3');
select sleep(0.5);
sleep(0.5)
0
set @@tidb_partition_prune_mode='static';
select * from t tablesample regions() order by a;
a b
2 changes: 2 additions & 0 deletions tests/integrationtest/t/executor/sample.test
Original file line number Diff line number Diff line change
@@ -136,6 +136,8 @@ set @@global.tidb_scatter_region=default;
drop table if exists t;
create table t (a int, b varchar(255), primary key (a)) partition by hash(a) partitions 2;
insert into t values (1, '1'), (2, '2'), (3, '3');
# https://github.com/pingcap/tidb/issues/52897
select sleep(0.5);
set @@tidb_partition_prune_mode='static';
select * from t tablesample regions() order by a;
set @@tidb_partition_prune_mode='dynamic';