Skip to content

Commit

Permalink
*: fix insert on duplicate and replace into with global index (#5…
Browse files Browse the repository at this point in the history
…3717)

close #53711, close #53750
  • Loading branch information
Defined2014 authored Jun 3, 2024
1 parent 44c9096 commit f1ec74b
Show file tree
Hide file tree
Showing 20 changed files with 113 additions and 73 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func checkTempIndexKey(txn kv.Transaction, tmpRec *temporaryIndexRecord, originI
return nil
}
// For distinct index key values, prevent deleting an unexpected index KV in original index.
hdInVal, err := tablecodec.DecodeHandleInUniqueIndexValue(originIdxVal, tblInfo.Meta().IsCommonHandle)
hdInVal, err := tablecodec.DecodeHandleInIndexValue(originIdxVal)
if err != nil {
return errors.Trace(err)
}
Expand Down
12 changes: 5 additions & 7 deletions pkg/executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ func (e *RecoverIndexExec) fetchRecoverRows(ctx context.Context, srcResult dists
if err != nil {
return nil, err
}
if e.index.Meta().Global {
handle = kv.NewPartitionHandle(e.physicalID, handle)
}
idxVals, err := e.buildIndexedValues(row, e.idxValsBufs[result.scanRowCount], e.colFieldTypes, idxValLen)
if err != nil {
return nil, err
Expand Down Expand Up @@ -472,12 +475,11 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows)
// 1. unique-key is duplicate and the handle is equal, skip it.
// 2. unique-key is duplicate and the handle is not equal, data is not consistent, log it and skip it.
// 3. non-unique-key is duplicate, skip it.
isCommonHandle := e.table.Meta().IsCommonHandle
for i, key := range e.batchKeys {
val, found := values[string(key)]
if found {
if distinctFlags[i] {
handle, err1 := tablecodec.DecodeHandleInUniqueIndexValue(val, isCommonHandle)
handle, err1 := tablecodec.DecodeHandleInIndexValue(val)
if err1 != nil {
return err1
}
Expand Down Expand Up @@ -614,11 +616,7 @@ func (e *CleanupIndexExec) getIdxColTypes() []*types.FieldType {

func (e *CleanupIndexExec) batchGetRecord(txn kv.Transaction) (map[string][]byte, error) {
e.idxValues.Range(func(h kv.Handle, _ any) bool {
if ph, ok := h.(kv.PartitionHandle); ok {
e.batchKeys = append(e.batchKeys, tablecodec.EncodeRecordKey(tablecodec.GenTableRecordPrefix(ph.PartitionID), ph.Handle))
} else {
e.batchKeys = append(e.batchKeys, tablecodec.EncodeRecordKey(e.table.RecordPrefix(), h))
}
e.batchKeys = append(e.batchKeys, tablecodec.EncodeRecordKey(e.table.RecordPrefix(), h))
return true
})
values, err := txn.BatchGet(context.Background(), e.batchKeys)
Expand Down
10 changes: 4 additions & 6 deletions pkg/executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ import (
)

type keyValueWithDupInfo struct {
newKey kv.Key
dupErr error
commonHandle bool
newKey kv.Key
dupErr error
}

type toBeCheckedRow struct {
Expand Down Expand Up @@ -213,9 +212,8 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D
return nil, err1
}
uniqueKeys = append(uniqueKeys, &keyValueWithDupInfo{
newKey: key,
dupErr: kv.ErrKeyExists.FastGenByArgs(colValStr, fmt.Sprintf("%s.%s", v.TableMeta().Name.String(), v.Meta().Name.String())),
commonHandle: t.Meta().IsCommonHandle,
newKey: key,
dupErr: kv.ErrKeyExists.FastGenByArgs(colValStr, fmt.Sprintf("%s.%s", v.TableMeta().Name.String(), v.Meta().Name.String())),
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if len(handleVal) == 0 {
continue
}
handle, err1 := tablecodec.DecodeHandleInUniqueIndexValue(handleVal, e.tblInfo.IsCommonHandle)
handle, err1 := tablecodec.DecodeHandleInIndexValue(handleVal)
if err1 != nil {
return err1
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t
for _, uk := range r.uniqueKeys {
if val, found := values[string(uk.newKey)]; found {
if tablecodec.IsTempIndexKey(uk.newKey) {
// If it is a temp index, the value cannot be decoded by DecodeHandleInUniqueIndexValue.
// If it is a temp index, the value cannot be decoded by DecodeHandleInIndexValue.
// Since this function is an optimization, we can skip prefetching the rows referenced by
// temp indexes.
continue
}
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle)
handle, err := tablecodec.DecodeHandleInIndexValue(val)
if err != nil {
return err
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}

for _, uk := range r.uniqueKeys {
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle)
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ func (e *InsertValues) handleDuplicateKey(ctx context.Context, txn kv.Transactio
}
return true, nil
}
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle)
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -1373,7 +1373,11 @@ func (e *InsertValues) removeRow(
return true, nil
}

err = r.t.RemoveRecord(e.Ctx().GetTableCtx(), handle, oldRow)
if ph, ok := handle.(kv.PartitionHandle); ok {
err = e.Table.(table.PartitionedTable).GetPartition(ph.PartitionID).RemoveRecord(e.Ctx().GetTableCtx(), ph.Handle, oldRow)
} else {
err = r.t.RemoveRecord(e.Ctx().GetTableCtx(), handle, oldRow)
}
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
}

var iv kv.Handle
iv, err = tablecodec.DecodeHandleInUniqueIndexValue(e.handleVal, e.tblInfo.IsCommonHandle)
iv, err = tablecodec.DecodeHandleInIndexValue(e.handleVal)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestReturnValues(t *testing.T) {
txnCtx := tk.Session().GetSessionVars().TxnCtx
val, ok := txnCtx.GetKeyInPessimisticLockCache(pk)
require.True(t, ok)
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, false)
handle, err := tablecodec.DecodeHandleInIndexValue(val)
require.NoError(t, err)
rowKey := tablecodec.EncodeRowKeyWithHandle(tid, handle)
_, ok = txnCtx.GetKeyInPessimisticLockCache(rowKey)
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
// 3. error: the error.
func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (rowUnchanged, foundDupKey bool, err error) {
for _, uk := range r.uniqueKeys {
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle)
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID)
if err != nil {
return false, false, err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1951,8 +1951,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return nil, err1
}
for idxKey, idxVal := range idxVals {
isCommonHd := isCommonHandle[idxKey]
h, err2 := tablecodec.DecodeHandleInUniqueIndexValue(idxVal, isCommonHd)
h, err2 := tablecodec.DecodeHandleInIndexValue(idxVal)
if err2 != nil {
return nil, err2
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu
val = tempVal.Encode(value)
}
needPresumeNotExists, err := needPresumeKeyNotExistsFlag(ctx, txn, key, tempKey, h,
keyIsTempIdxKey, c.tblInfo.IsCommonHandle, c.tblInfo.ID)
keyIsTempIdxKey, c.tblInfo.ID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -356,7 +356,7 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu
if keyIsTempIdxKey && !tempIdxVal.IsEmpty() {
value = tempIdxVal.Current().Value
}
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle)
handle, err := tablecodec.DecodeHandleInIndexValue(value)
if err != nil {
return nil, err
}
Expand All @@ -366,7 +366,7 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu
}

func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, tempKey kv.Key,
h kv.Handle, keyIsTempIdxKey bool, isCommon bool, tblID int64) (needFlag bool, err error) {
h kv.Handle, keyIsTempIdxKey bool, tblID int64) (needFlag bool, err error) {
var uniqueTempKey kv.Key
if keyIsTempIdxKey {
uniqueTempKey = key
Expand All @@ -375,7 +375,7 @@ func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, t
} else {
return true, nil
}
foundKey, dupHandle, err := FetchDuplicatedHandle(ctx, uniqueTempKey, true, txn, tblID, isCommon)
foundKey, dupHandle, err := FetchDuplicatedHandle(ctx, uniqueTempKey, true, txn, tblID)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -417,7 +417,7 @@ func (c *index) Delete(ctx table.MutateContext, txn kv.Transaction, indexedValue
return err
}
if len(originVal) > 0 {
oh, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, c.tblInfo.IsCommonHandle)
oh, err := tablecodec.DecodeHandleInIndexValue(originVal)
if err != nil {
return err
}
Expand Down Expand Up @@ -526,7 +526,7 @@ func (c *index) Exist(ec errctx.Context, loc *time.Location, txn kv.Transaction,
if len(tempKey) > 0 {
key = tempKey
}
foundKey, dupHandle, err := FetchDuplicatedHandle(context.TODO(), key, distinct, txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle)
foundKey, dupHandle, err := FetchDuplicatedHandle(context.TODO(), key, distinct, txn, c.tblInfo.ID)
if err != nil || !foundKey {
return false, nil, err
}
Expand All @@ -540,24 +540,24 @@ func (c *index) Exist(ec errctx.Context, loc *time.Location, txn kv.Transaction,

// FetchDuplicatedHandle is used to find the duplicated row's handle for a given unique index key.
func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool,
txn kv.Transaction, tableID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) {
txn kv.Transaction, tableID int64) (foundKey bool, dupHandle kv.Handle, err error) {
if tablecodec.IsTempIndexKey(key) {
return fetchDuplicatedHandleForTempIndexKey(ctx, key, distinct, txn, tableID, isCommon)
return fetchDuplicatedHandleForTempIndexKey(ctx, key, distinct, txn, tableID)
}
// The index key is not from temp index.
val, err := getKeyInTxn(ctx, txn, key)
if err != nil || len(val) == 0 {
return false, nil, err
}
if distinct {
h, err := tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon)
h, err := tablecodec.DecodeHandleInIndexValue(val)
return true, h, err
}
return true, nil, nil
}

func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, distinct bool,
txn kv.Transaction, tableID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) {
txn kv.Transaction, tableID int64) (foundKey bool, dupHandle kv.Handle, err error) {
tempRawVal, err := getKeyInTxn(ctx, txn, tempKey)
if err != nil {
return false, nil, err
Expand All @@ -570,7 +570,7 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d
return false, nil, err
}
if distinct {
originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon)
originHandle, err := tablecodec.DecodeHandleInIndexValue(originVal)
if err != nil {
return false, nil, err
}
Expand All @@ -591,7 +591,7 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d
return false, nil, err
}
if distinct {
originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon)
originHandle, err := tablecodec.DecodeHandleInIndexValue(originVal)
if err != nil {
return false, nil, err
}
Expand All @@ -614,7 +614,7 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d
}
// The value in temp index is not the delete marker.
if distinct {
h, err := tablecodec.DecodeHandleInUniqueIndexValue(curElem.Value, isCommon)
h, err := tablecodec.DecodeHandleInIndexValue(curElem.Value)
return true, h, err
}
return true, nil, nil
Expand Down
30 changes: 30 additions & 0 deletions pkg/tablecodec/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package tablecodec

import (
"testing"
"time"

"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/benchdaily"
"github.com/pingcap/tidb/pkg/util/codec"
)

func BenchmarkEncodeRowKeyWithHandle(b *testing.B) {
Expand Down Expand Up @@ -55,6 +58,33 @@ func BenchmarkDecodeRowKey(b *testing.B) {
}
}

func BenchmarkDecodeIndexKeyIntHandle(b *testing.B) {
var idxVal []byte
// When handle values greater than 255, it will have a memory alloc.
idxVal = append(idxVal, EncodeHandleInUniqueIndexValue(kv.IntHandle(256), false)...)

for i := 0; i < b.N; i++ {
DecodeHandleInIndexValue(idxVal)
}
}

func BenchmarkDecodeIndexKeyCommonHandle(b *testing.B) {
var idxVal []byte
idxVal = append(idxVal, 0)
// index version
idxVal = append(idxVal, IndexVersionFlag)
idxVal = append(idxVal, byte(1))

// common handle
encoded, _ := codec.EncodeKey(time.UTC, nil, types.MakeDatums(1, 2)...)
h, _ := kv.NewCommonHandle(encoded)
idxVal = encodeCommonHandle(idxVal, h)

for i := 0; i < b.N; i++ {
DecodeHandleInIndexValue(idxVal)
}
}

func TestBenchDaily(t *testing.T) {
benchdaily.Run(
BenchmarkEncodeRowKeyWithHandle,
Expand Down
40 changes: 9 additions & 31 deletions pkg/tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func CutRowKeyPrefix(key kv.Key) []byte {
// EncodeRecordKey encodes the recordPrefix, row handle into a kv.Key.
func EncodeRecordKey(recordPrefix kv.Key, h kv.Handle) kv.Key {
buf := make([]byte, 0, len(recordPrefix)+h.Len())
if ph, ok := h.(kv.PartitionHandle); ok {
recordPrefix = GenTableRecordPrefix(ph.PartitionID)
}
buf = append(buf, recordPrefix...)
buf = append(buf, h.Encoded()...)
return buf
Expand Down Expand Up @@ -960,7 +963,7 @@ func DecodeIndexHandle(key, value []byte, colsLen int) (kv.Handle, error) {
if len(b) > 0 {
return decodeHandleInIndexKey(b)
} else if len(value) >= 8 {
return decodeHandleInIndexValue(value)
return DecodeHandleInIndexValue(value)
}
// Should never execute to here.
return nil, errors.Errorf("no handle in index key: %v, value: %v", key, value)
Expand All @@ -977,7 +980,11 @@ func decodeHandleInIndexKey(keySuffix []byte) (kv.Handle, error) {
return kv.NewCommonHandle(keySuffix)
}

func decodeHandleInIndexValue(value []byte) (handle kv.Handle, err error) {
// DecodeHandleInIndexValue decodes handle in unqiue index value.
func DecodeHandleInIndexValue(value []byte) (handle kv.Handle, err error) {
if len(value) <= MaxOldEncodeValueLen {
return decodeIntHandleInIndexValue(value), nil
}
seg := SplitIndexValue(value)
if len(seg.IntHandle) != 0 {
handle = decodeIntHandleInIndexValue(seg.IntHandle)
Expand Down Expand Up @@ -1676,35 +1683,6 @@ func encodeCommonHandle(idxVal []byte, h kv.Handle) []byte {
return idxVal
}

// DecodeHandleInUniqueIndexValue decodes handle in data.
func DecodeHandleInUniqueIndexValue(data []byte, isCommonHandle bool) (kv.Handle, error) {
if !isCommonHandle {
dLen := len(data)
if dLen <= MaxOldEncodeValueLen {
return kv.IntHandle(int64(binary.BigEndian.Uint64(data))), nil
}
return kv.IntHandle(int64(binary.BigEndian.Uint64(data[dLen-int(data[0]):]))), nil
}
if getIndexVersion(data) == 1 {
seg := splitIndexValueForClusteredIndexVersion1(data)
h, err := kv.NewCommonHandle(seg.CommonHandle)
if err != nil {
return nil, err
}
return h, nil
}

tailLen := int(data[0])
data = data[:len(data)-tailLen]
handleLen := uint16(data[2])<<8 + uint16(data[3])
handleEndOff := 4 + handleLen
h, err := kv.NewCommonHandle(data[4:handleEndOff])
if err != nil {
return nil, err
}
return h, nil
}

func encodePartitionID(idxVal []byte, partitionID int64) []byte {
idxVal = append(idxVal, PartitionIDFlag)
idxVal = codec.EncodeInt(idxVal, partitionID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/tablecodec/tablecodec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func TestTempIndexValueCodec(t *testing.T) {
remain, err = newTempIdxVal.DecodeOne(val)
require.NoError(t, err)
require.Equal(t, 0, len(remain))
handle, err := DecodeHandleInUniqueIndexValue(newTempIdxVal.Value, false)
handle, err := DecodeHandleInIndexValue(newTempIdxVal.Value)
require.NoError(t, err)
require.Equal(t, handle.IntValue(), int64(100))
require.EqualValues(t, tempIdxVal, newTempIdxVal)
Expand Down
Loading

0 comments on commit f1ec74b

Please sign in to comment.