Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support the vector index #56409

Merged
merged 13 commits into from
Sep 30, 2024
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,11 @@ error = '''
Auto analyze is not effective for index '%-.192s', need analyze manually
'''

["ddl:9014"]
error = '''
TiFlash backfill index failed: %s
'''

["domain:8027"]
error = '''
Information schema is out of date: schema failed to update in 1 lease, please make sure TiDB can connect to TiKV
Expand Down
3 changes: 3 additions & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ go_library(
"//pkg/util/execdetails",
"//pkg/util/filter",
"//pkg/util/gcutil",
"//pkg/util/generatedexpr",
"//pkg/util/generic",
"//pkg/util/hack",
"//pkg/util/intest",
Expand Down Expand Up @@ -322,6 +323,7 @@ go_test(
"//pkg/store/gcworker",
"//pkg/store/helper",
"//pkg/store/mockstore",
"//pkg/store/mockstore/unistore",
"//pkg/table",
"//pkg/table/tables",
"//pkg/tablecodec",
Expand Down Expand Up @@ -349,6 +351,7 @@ go_test(
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
26 changes: 23 additions & 3 deletions pkg/ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/testutil"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/testkit"
Expand Down Expand Up @@ -68,6 +69,15 @@ var allTestCase = []testCancelJob{
{"alter table t add index idx_c2(c2)", true, model.StateDeleteOnly, true, true, nil},
{"alter table t add index idx_c2(c2)", true, model.StateWriteOnly, true, true, nil},
{"alter table t add index idx_cx2(c2)", false, model.StatePublic, false, true, nil},
// Drop vector index
{"alter table t drop index v_idx_1", true, model.StatePublic, true, false, []string{"alter table t add vector index v_idx_1((VEC_L2_DISTANCE(v2))) USING HNSW"}},
{"alter table t drop index v_idx_2", false, model.StateWriteOnly, true, false, []string{"alter table t add vector index v_idx_2((VEC_COSINE_DISTANCE(v2))) USING HNSW"}},
{"alter table t drop index v_idx_3", false, model.StateDeleteOnly, false, true, []string{"alter table t add vector index v_idx_3((VEC_COSINE_DISTANCE(v2))) USING HNSW"}},
{"alter table t drop index v_idx_4", false, model.StateDeleteReorganization, false, true, []string{"alter table t add vector index v_idx_4((VEC_COSINE_DISTANCE(v2))) USING HNSW"}},
// Add vector key
{"alter table t add vector index v_idx((VEC_COSINE_DISTANCE(v2))) USING HNSW", true, model.StateNone, true, false, nil},
{"alter table t add vector index v_idx((VEC_COSINE_DISTANCE(v2))) USING HNSW", true, model.StateDeleteOnly, true, true, nil},
{"alter table t add vector index v_idx((VEC_COSINE_DISTANCE(v2))) USING HNSW", true, model.StateWriteOnly, true, true, nil},
// Add column.
{"alter table t add column c4 bigint", true, model.StateNone, true, false, nil},
{"alter table t add column c4 bigint", true, model.StateDeleteOnly, true, true, nil},
Expand Down Expand Up @@ -204,7 +214,7 @@ func cancelSuccess(rs *testkit.Result) bool {
return strings.Contains(rs.Rows()[0][1].(string), "success")
}

func TestCancel(t *testing.T) {
func TestCancelVariousJobs(t *testing.T) {
var enterCnt, exitCnt atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDeliveryJob", func(job *model.Job) { enterCnt.Add(1) })
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.Job) { exitCnt.Add(1) })
Expand All @@ -213,10 +223,18 @@ func TestCancel(t *testing.T) {
return enterCnt.Load() == exitCnt.Load()
}, 10*time.Second, 10*time.Millisecond)
}
store := testkit.CreateMockStoreWithSchemaLease(t, 100*time.Millisecond)
store := testkit.CreateMockStoreWithSchemaLease(t, 100*time.Millisecond, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tkCancel := testkit.NewTestKit(t, store)

tiflash := infosync.NewMockTiFlash()
infosync.SetMockTiFlash(tiflash)
defer func() {
tiflash.Lock()
tiflash.StatusServer.Close()
tiflash.Unlock()
}()

// Prepare schema.
tk.MustExec("use test")
tk.MustExec("drop table if exists t_partition;")
Expand All @@ -231,14 +249,16 @@ func TestCancel(t *testing.T) {
partition p4 values less than (7096)
);`)
tk.MustExec(`create table t (
c1 int, c2 int, c3 int, c11 tinyint, index fk_c1(c1)
c1 int, c2 int, c3 int, c11 tinyint, v2 vector(3), index fk_c1(c1)
);`)
tk.MustExec("alter table t set tiflash replica 2 location labels 'a','b';")

// Prepare data.
for i := 0; i <= 2048; i++ {
tk.MustExec(fmt.Sprintf("insert into t_partition values(%d, %d, %d)", i*3, i*2, i))
tk.MustExec(fmt.Sprintf("insert into t(c1, c2, c3) values(%d, %d, %d)", i*3, i*2, i))
}
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess", `return(2048)`)

// Change some configurations.
ddl.ReorgWaitTimeout = 10 * time.Millisecond
Expand Down
8 changes: 6 additions & 2 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,10 +1102,14 @@ func isColumnWithIndex(colName string, indices []*model.IndexInfo) bool {

func isColumnCanDropWithIndex(colName string, indices []*model.IndexInfo) error {
for _, indexInfo := range indices {
if indexInfo.Primary || len(indexInfo.Columns) > 1 {
if indexInfo.Primary || len(indexInfo.Columns) > 1 || indexInfo.VectorInfo != nil {
for _, col := range indexInfo.Columns {
if col.Name.L == colName {
return dbterror.ErrCantDropColWithIndex.GenWithStack("can't drop column %s with composite index covered or Primary Key covered now", colName)
errMsg := "with composite index covered or Primary Key covered now"
if indexInfo.VectorInfo != nil {
errMsg = "with Vector Key covered now"
}
return dbterror.ErrCantDropColWithIndex.GenWithStack("can't drop column %s "+errMsg, colName)
}
}
}
Expand Down
80 changes: 65 additions & 15 deletions pkg/ddl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
infoschemactx "github.com/pingcap/tidb/pkg/infoschema/context"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/meta/metabuild"
Expand Down Expand Up @@ -392,12 +393,13 @@ func findTableIDFromStore(t *meta.Mutator, schemaID int64, tableName string) (in
// BuildTableInfoFromAST builds model.TableInfo from a SQL statement.
// Note: TableID and PartitionID are left as uninitialized value.
func BuildTableInfoFromAST(ctx *metabuild.Context, s *ast.CreateTableStmt) (*model.TableInfo, error) {
return buildTableInfoWithCheck(ctx, s, mysql.DefaultCharset, "", nil)
// TODO: Support the vector index for this function.
return buildTableInfoWithCheck(ctx, nil, s, mysql.DefaultCharset, "", nil)
}

// buildTableInfoWithCheck builds model.TableInfo from a SQL statement.
// Note: TableID and PartitionIDs are left as uninitialized value.
func buildTableInfoWithCheck(ctx *metabuild.Context, s *ast.CreateTableStmt, dbCharset, dbCollate string, placementPolicyRef *model.PolicyRefInfo) (*model.TableInfo, error) {
func buildTableInfoWithCheck(ctx *metabuild.Context, store kv.Storage, s *ast.CreateTableStmt, dbCharset, dbCollate string, placementPolicyRef *model.PolicyRefInfo) (*model.TableInfo, error) {
tbInfo, err := BuildTableInfoWithStmt(ctx, s, dbCharset, dbCollate, placementPolicyRef)
if err != nil {
return nil, err
Expand All @@ -408,7 +410,7 @@ func buildTableInfoWithCheck(ctx *metabuild.Context, s *ast.CreateTableStmt, dbC
if err = checkTableInfoValidWithStmt(ctx, tbInfo, s); err != nil {
return nil, err
}
if err = checkTableInfoValidExtra(ctx.GetExprCtx().GetEvalCtx().ErrCtx(), tbInfo); err != nil {
if err = checkTableInfoValidExtra(ctx.GetExprCtx().GetEvalCtx().ErrCtx(), store, tbInfo); err != nil {
return nil, err
}
return tbInfo, nil
Expand Down Expand Up @@ -509,12 +511,47 @@ func checkGeneratedColumn(ctx *metabuild.Context, schemaName pmodel.CIStr, table
return nil
}

func checkVectorIndexIfNeedTiFlashReplica(store kv.Storage, tblInfo *model.TableInfo) error {
var hasVectorIndex bool
for _, idx := range tblInfo.Indices {
if idx.VectorInfo != nil {
hasVectorIndex = true
break
}
}
if !hasVectorIndex {
return nil
}
if store == nil {
return errors.New("the store is nil")
}

if tblInfo.TiFlashReplica == nil || tblInfo.TiFlashReplica.Count == 0 {
replicas, err := infoschema.GetTiFlashStoreCount(store)
if err != nil {
return errors.Trace(err)
}
if replicas == 0 {
return errors.Trace(dbterror.ErrUnsupportedAddVectorIndex.FastGenByArgs("unsupported TiFlash store count is 0"))
}

// Always try to set to 1 as the default replica count.
defaultReplicas := uint64(1)
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: defaultReplicas,
LocationLabels: make([]string, 0),
}
tangenta marked this conversation as resolved.
Show resolved Hide resolved
}

return errors.Trace(checkTableTypeForVectorIndex(tblInfo))
}

// checkTableInfoValidExtra is like checkTableInfoValid, but also assumes the
// table info comes from untrusted source and performs further checks such as
// name length and column count.
// (checkTableInfoValid is also used in repairing objects which don't perform
// these checks. Perhaps the two functions should be merged together regardless?)
func checkTableInfoValidExtra(ec errctx.Context, tbInfo *model.TableInfo) error {
func checkTableInfoValidExtra(ec errctx.Context, store kv.Storage, tbInfo *model.TableInfo) error {
if err := checkTooLongTable(tbInfo.Name); err != nil {
return err
}
Expand All @@ -537,6 +574,9 @@ func checkTableInfoValidExtra(ec errctx.Context, tbInfo *model.TableInfo) error
if err := checkGlobalIndexes(ec, tbInfo); err != nil {
return errors.Trace(err)
}
if err := checkVectorIndexIfNeedTiFlashReplica(store, tbInfo); err != nil {
return errors.Trace(err)
}

// FIXME: perform checkConstraintNames
if err := checkCharsetAndCollation(tbInfo.Charset, tbInfo.Collate); err != nil {
Expand Down Expand Up @@ -620,7 +660,8 @@ func checkColumnAttributes(colName string, tp *types.FieldType) error {
}

// BuildSessionTemporaryTableInfo builds model.TableInfo from a SQL statement.
func BuildSessionTemporaryTableInfo(ctx *metabuild.Context, is infoschema.InfoSchema, s *ast.CreateTableStmt, dbCharset, dbCollate string, placementPolicyRef *model.PolicyRefInfo) (*model.TableInfo, error) {
func BuildSessionTemporaryTableInfo(ctx *metabuild.Context, store kv.Storage, is infoschema.InfoSchema, s *ast.CreateTableStmt,
dbCharset, dbCollate string, placementPolicyRef *model.PolicyRefInfo) (*model.TableInfo, error) {
ident := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
//build tableInfo
var tbInfo *model.TableInfo
Expand All @@ -638,7 +679,7 @@ func BuildSessionTemporaryTableInfo(ctx *metabuild.Context, is infoschema.InfoSc
}
tbInfo, err = BuildTableInfoWithLike(ident, referTbl.Meta(), s)
} else {
tbInfo, err = buildTableInfoWithCheck(ctx, s, dbCharset, dbCollate, placementPolicyRef)
tbInfo, err = buildTableInfoWithCheck(ctx, store, s, dbCharset, dbCollate, placementPolicyRef)
}
return tbInfo, err
}
Expand Down Expand Up @@ -1167,10 +1208,13 @@ func BuildTableInfo(
}
foreignKeyID := tbInfo.MaxForeignKeyID
for _, constr := range constraints {
// Build hidden columns if necessary.
hiddenCols, err := buildHiddenColumnInfoWithCheck(ctx, constr.Keys, pmodel.NewCIStr(constr.Name), tbInfo, tblColumns)
if err != nil {
return nil, err
var hiddenCols []*model.ColumnInfo
if constr.Tp != ast.ConstraintVector {
// Build hidden columns if necessary.
hiddenCols, err = buildHiddenColumnInfoWithCheck(ctx, constr.Keys, pmodel.NewCIStr(constr.Name), tbInfo, tblColumns)
if err != nil {
return nil, err
}
}
for _, hiddenCol := range hiddenCols {
hiddenCol.State = model.StatePublic
Expand Down Expand Up @@ -1235,18 +1279,23 @@ func BuildTableInfo(
}

var (
indexName = constr.Name
primary, unique bool
indexName = constr.Name
primary, unique, vector bool
)

// Check if the index is primary or unique.
// Check if the index is primary, unique or vector.
switch constr.Tp {
case ast.ConstraintPrimaryKey:
primary = true
unique = true
indexName = mysql.PrimaryKeyName
case ast.ConstraintUniq, ast.ConstraintUniqKey, ast.ConstraintUniqIndex:
unique = true
case ast.ConstraintVector:
if constr.Option.Visibility == ast.IndexVisibilityInvisible {
return nil, dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs("set vector index invisible")
}
vector = true
}

// check constraint
Expand Down Expand Up @@ -1315,10 +1364,11 @@ func BuildTableInfo(
// build index info.
idxInfo, err := BuildIndexInfo(
ctx,
tbInfo.Columns,
tbInfo,
pmodel.NewCIStr(indexName),
primary,
unique,
vector,
constr.Keys,
constr.Option,
model.StatePublic,
Expand Down Expand Up @@ -1483,7 +1533,7 @@ func addIndexForForeignKey(ctx *metabuild.Context, tbInfo *model.TableInfo) erro
Length: types.UnspecifiedLength,
})
}
idxInfo, err := BuildIndexInfo(ctx, tbInfo.Columns, idxName, false, false, keys, nil, model.StatePublic)
idxInfo, err := BuildIndexInfo(ctx, tbInfo, idxName, false, false, false, keys, nil, model.StatePublic)
if err != nil {
return errors.Trace(err)
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
Expand Down Expand Up @@ -1157,6 +1158,35 @@ func TestParallelAlterAddIndex(t *testing.T) {
testControlParallelExecSQL(t, tk, store, dom, "", sql1, sql2, f)
}

func TestParallelAlterAddVectorIndex(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, tiflashReplicaLease, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database test_db_state default charset utf8 default collate utf8_bin")
tk.MustExec("use test_db_state")
tk.MustExec("create table tt (a int, b vector, c vector(3), d vector(4));")
tk.MustExec("alter table tt set tiflash replica 2 location labels 'a','b';")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess", `return(1)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/"+
"ddl/MockCheckVectorIndexProcess"))
}()
tiflash := infosync.NewMockTiFlash()
infosync.SetMockTiFlash(tiflash)
defer func() {
tiflash.Lock()
tiflash.StatusServer.Close()
tiflash.Unlock()
}()
sql1 := "alter table tt add vector index vecIdx((vec_cosine_distance(c))) USING HNSW;"
sql2 := "alter table tt add vector index vecIdx1((vec_cosine_distance(c))) USING HNSW;"
f := func(err1, err2 error) {
require.NoError(t, err1)
require.EqualError(t, err2,
"[ddl:1061]DDL job rollback, error msg: vector index vecIdx function vec_cosine_distance already exist on column c")
}
testControlParallelExecSQL(t, tk, store, dom, "", sql1, sql2, f)
}

func TestParallelAlterAddExpressionIndex(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
6 changes: 4 additions & 2 deletions pkg/ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1982,11 +1982,13 @@ func TestDropColumnWithCompositeIndex(t *testing.T) {
defer tk.MustExec("drop table if exists t_drop_column_with_comp_idx")
tk.MustExec("create index idx_bc on t_drop_column_with_comp_idx(b, c)")
tk.MustExec("create index idx_b on t_drop_column_with_comp_idx(b)")
tk.MustGetErrMsg("alter table t_drop_column_with_comp_idx drop column b", "[ddl:8200]can't drop column b with composite index covered or Primary Key covered now")
tk.MustGetErrMsg("alter table t_drop_column_with_comp_idx drop column b",
"[ddl:8200]can't drop column b with composite index covered or Primary Key covered now")
tk.MustQuery(query).Check(testkit.Rows("idx_b YES", "idx_bc YES"))
tk.MustExec("alter table t_drop_column_with_comp_idx alter index idx_bc invisible")
tk.MustExec("alter table t_drop_column_with_comp_idx alter index idx_b invisible")
tk.MustGetErrMsg("alter table t_drop_column_with_comp_idx drop column b", "[ddl:8200]can't drop column b with composite index covered or Primary Key covered now")
tk.MustGetErrMsg("alter table t_drop_column_with_comp_idx drop column b",
"[ddl:8200]can't drop column b with composite index covered or Primary Key covered now")
tk.MustQuery(query).Check(testkit.Rows("idx_b NO", "idx_bc NO"))
}

Expand Down
11 changes: 3 additions & 8 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,14 +379,9 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
}
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableID := job.TableID
var indexName any
var partitionIDs []int64
ifExists := make([]bool, 1)
allIndexIDs := make([]int64, 1)
if err := job.DecodeArgs(&indexName, &ifExists[0], &allIndexIDs[0], &partitionIDs); err != nil {
if err = job.DecodeArgs(&indexName, &ifExists, &allIndexIDs, &partitionIDs); err != nil {
return errors.Trace(err)
}
_, _, allIndexIDs, partitionIDs, _, err := job.DecodeDropIndexFinishedArgs()
if err != nil {
return errors.Trace(err)
}
// partitionIDs len is 0 if the dropped index is a global index, even if it is a partitioned table.
if len(partitionIDs) == 0 {
Expand Down
Loading