Skip to content

Commit

Permalink
ddl: fix the batch check for unique index (pingcap#40672)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Jan 29, 2023
1 parent 0170ac3 commit c9b9a67
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 18 deletions.
10 changes: 9 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1585,10 +1585,18 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
return nil
})
logSlowOperations(time.Since(oprStartTime), "AddIndexBackfillDataInTxn", 3000)

failpoint.Inject("mockDMLExecution", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) && MockDMLExecution != nil {
MockDMLExecution()
}
})
return
}

// MockDMLExecution is only used for test.
var MockDMLExecution func()

func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error {
if reorgInfo.mergingTmpIdx {
logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
Expand Down
63 changes: 46 additions & 17 deletions ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,12 @@ func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(txn kv.Transaction, idxR
return errors.Trace(err)
}

// 1. unique-key/primary-key is duplicate and the handle is equal, skip it.
// 2. unique-key/primary-key is duplicate and the handle is not equal, return duplicate error.
// 3. non-unique-key is duplicate, skip it.
for i, key := range w.originIdxKeys {
if val, found := batchVals[string(key)]; found {
if idxRecords[i].distinct && !bytes.Equal(val, idxRecords[i].vals) {
return kv.ErrKeyExists
}
if !idxRecords[i].delete {
idxRecords[i].skip = true
} else {
// Prevent deleting an unexpected index KV.
hdInVal, err := tablecodec.DecodeHandleInUniqueIndexValue(val, w.table.Meta().IsCommonHandle)
if err != nil {
return errors.Trace(err)
}
if !idxRecords[i].handle.Equal(hdInVal) {
idxRecords[i].skip = true
}
// Found a value in the original index key.
err := checkTempIndexKey(txn, idxRecords[i], val, w.table)
if err != nil {
return errors.Trace(err)
}
} else if idxRecords[i].distinct {
// The keys in w.batchCheckKeys also maybe duplicate,
Expand All @@ -78,6 +65,48 @@ func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(txn kv.Transaction, idxR
return nil
}

func checkTempIndexKey(txn kv.Transaction, tmpRec *temporaryIndexRecord, originIdxVal []byte, tblInfo table.Table) error {
if !tmpRec.delete {
if tmpRec.distinct && !bytes.Equal(originIdxVal, tmpRec.vals) {
return kv.ErrKeyExists
}
// The key has been found in the original index, skip merging it.
tmpRec.skip = true
return nil
}
// Delete operation.
distinct := tablecodec.IndexKVIsUnique(originIdxVal)
if !distinct {
// For non-distinct key, it is consist of a null value and the handle.
// Same as the non-unique indexes, replay the delete operation on non-distinct keys.
return nil
}
// For distinct index key values, prevent deleting an unexpected index KV in original index.
hdInVal, err := tablecodec.DecodeHandleInUniqueIndexValue(originIdxVal, tblInfo.Meta().IsCommonHandle)
if err != nil {
return errors.Trace(err)
}
if !tmpRec.handle.Equal(hdInVal) {
// The inequality means multiple modifications happened in the same key.
// We use the handle in origin index value to check if the row exists.
rowKey := tablecodec.EncodeRecordKey(tblInfo.RecordPrefix(), hdInVal)
_, err := txn.Get(context.Background(), rowKey)
if err != nil {
if kv.IsErrNotFound(err) {
// The row is deleted, so we can merge the delete operation to the origin index.
tmpRec.skip = false
return nil
}
// Unexpected errors.
return errors.Trace(err)
}
// Don't delete the index key if the row exists.
tmpRec.skip = true
return nil
}
return nil
}

// temporaryIndexRecord is the record information of an index.
type temporaryIndexRecord struct {
vals []byte
Expand Down
65 changes: 65 additions & 0 deletions ddl/index_merge_tmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -415,6 +416,70 @@ func TestAddIndexMergeDeleteUniqueOnWriteOnly(t *testing.T) {
tk.MustExec("admin check table t;")
}

func TestAddIndexMergeDeleteNullUnique(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(id int primary key, a int default 0);")
tk.MustExec("insert into t values (1, 1), (2, null);")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

ddl.MockDMLExecution = func() {
_, err := tk1.Exec("delete from t where id = 2;")
assert.NoError(t, err)
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)"))
tk.MustExec("alter table t add unique index idx(a);")
tk.MustQuery("select count(1) from t;").Check(testkit.Rows("1"))
tk.MustExec("admin check table t;")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution"))
}

func TestAddIndexMergeDoubleDelete(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(id int primary key, a int default 0);")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

d := dom.DDL()
originalCallback := d.GetHook()
defer d.SetHook(originalCallback)
callback := &ddl.TestDDLCallback{}
onJobUpdatedExportedFunc := func(job *model.Job) {
if t.Failed() {
return
}
switch job.SchemaState {
case model.StateWriteOnly:
_, err := tk1.Exec("insert into t values (1, 1);")
assert.NoError(t, err)
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(callback)

ddl.MockDMLExecution = func() {
_, err := tk1.Exec("delete from t where id = 1;")
assert.NoError(t, err)
_, err = tk1.Exec("insert into t values (2, 1);")
assert.NoError(t, err)
_, err = tk1.Exec("delete from t where id = 2;")
assert.NoError(t, err)
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)"))
tk.MustExec("alter table t add unique index idx(a);")
tk.MustQuery("select count(1) from t;").Check(testkit.Rows("0"))
tk.MustExec("admin check table t;")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution"))
}

func TestAddIndexMergeConflictWithPessimistic(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down

0 comments on commit c9b9a67

Please sign in to comment.