Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix insert on duplicate and replace into with global index #53717

Merged
merged 6 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func checkTempIndexKey(txn kv.Transaction, tmpRec *temporaryIndexRecord, originI
return nil
}
// For distinct index key values, prevent deleting an unexpected index KV in original index.
hdInVal, err := tablecodec.DecodeHandleInUniqueIndexValue(originIdxVal, tblInfo.Meta().IsCommonHandle)
hdInVal, err := tablecodec.DecodeHandleInIndexValue(originIdxVal)
if err != nil {
return errors.Trace(err)
}
Expand Down
12 changes: 5 additions & 7 deletions pkg/executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ func (e *RecoverIndexExec) fetchRecoverRows(ctx context.Context, srcResult dists
if err != nil {
return nil, err
}
if e.index.Meta().Global {
handle = kv.NewPartitionHandle(e.physicalID, handle)
}
idxVals, err := e.buildIndexedValues(row, e.idxValsBufs[result.scanRowCount], e.colFieldTypes, idxValLen)
if err != nil {
return nil, err
Expand Down Expand Up @@ -472,12 +475,11 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows)
// 1. unique-key is duplicate and the handle is equal, skip it.
// 2. unique-key is duplicate and the handle is not equal, data is not consistent, log it and skip it.
// 3. non-unique-key is duplicate, skip it.
isCommonHandle := e.table.Meta().IsCommonHandle
for i, key := range e.batchKeys {
val, found := values[string(key)]
if found {
if distinctFlags[i] {
handle, err1 := tablecodec.DecodeHandleInUniqueIndexValue(val, isCommonHandle)
handle, err1 := tablecodec.DecodeHandleInIndexValue(val)
if err1 != nil {
return err1
}
Expand Down Expand Up @@ -614,11 +616,7 @@ func (e *CleanupIndexExec) getIdxColTypes() []*types.FieldType {

func (e *CleanupIndexExec) batchGetRecord(txn kv.Transaction) (map[string][]byte, error) {
e.idxValues.Range(func(h kv.Handle, _ any) bool {
if ph, ok := h.(kv.PartitionHandle); ok {
e.batchKeys = append(e.batchKeys, tablecodec.EncodeRecordKey(tablecodec.GenTableRecordPrefix(ph.PartitionID), ph.Handle))
} else {
e.batchKeys = append(e.batchKeys, tablecodec.EncodeRecordKey(e.table.RecordPrefix(), h))
}
e.batchKeys = append(e.batchKeys, tablecodec.EncodeRecordKey(e.table.RecordPrefix(), h))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Note for future reference)
Will this work with partitions? Is e.table the partition if non-global index, and the logical table if global index?
=> EncodeRecordKey now adds the proper PhysicalTableID (partition id) if partitioned handle.

return true
})
values, err := txn.BatchGet(context.Background(), e.batchKeys)
Expand Down
10 changes: 4 additions & 6 deletions pkg/executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ import (
)

type keyValueWithDupInfo struct {
newKey kv.Key
dupErr error
commonHandle bool
newKey kv.Key
dupErr error
}

type toBeCheckedRow struct {
Expand Down Expand Up @@ -213,9 +212,8 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D
return nil, err1
}
uniqueKeys = append(uniqueKeys, &keyValueWithDupInfo{
newKey: key,
dupErr: kv.ErrKeyExists.FastGenByArgs(colValStr, fmt.Sprintf("%s.%s", v.TableMeta().Name.String(), v.Meta().Name.String())),
commonHandle: t.Meta().IsCommonHandle,
newKey: key,
dupErr: kv.ErrKeyExists.FastGenByArgs(colValStr, fmt.Sprintf("%s.%s", v.TableMeta().Name.String(), v.Meta().Name.String())),
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if len(handleVal) == 0 {
continue
}
handle, err1 := tablecodec.DecodeHandleInUniqueIndexValue(handleVal, e.tblInfo.IsCommonHandle)
handle, err1 := tablecodec.DecodeHandleInIndexValue(handleVal)
if err1 != nil {
return err1
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t
for _, uk := range r.uniqueKeys {
if val, found := values[string(uk.newKey)]; found {
if tablecodec.IsTempIndexKey(uk.newKey) {
// If it is a temp index, the value cannot be decoded by DecodeHandleInUniqueIndexValue.
// If it is a temp index, the value cannot be decoded by DecodeHandleInIndexValue.
// Since this function is an optimization, we can skip prefetching the rows referenced by
// temp indexes.
continue
}
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle)
handle, err := tablecodec.DecodeHandleInIndexValue(val)
if err != nil {
return err
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}

for _, uk := range r.uniqueKeys {
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle)
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ func (e *InsertValues) handleDuplicateKey(ctx context.Context, txn kv.Transactio
}
return true, nil
}
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle)
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -1373,7 +1373,11 @@ func (e *InsertValues) removeRow(
return true, nil
}

err = r.t.RemoveRecord(e.Ctx().GetTableCtx(), handle, oldRow)
if ph, ok := handle.(kv.PartitionHandle); ok {
err = e.Table.(table.PartitionedTable).GetPartition(ph.PartitionID).RemoveRecord(e.Ctx().GetTableCtx(), ph.Handle, oldRow)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also Note for future reference.
We could fix it by handle partition handle in func (c *index) GenIndexKey, but since the phyTblID is already included in the index struct, so it is better to add it at the entry of removeRecord.

} else {
err = r.t.RemoveRecord(e.Ctx().GetTableCtx(), handle, oldRow)
}
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
}

var iv kv.Handle
iv, err = tablecodec.DecodeHandleInUniqueIndexValue(e.handleVal, e.tblInfo.IsCommonHandle)
iv, err = tablecodec.DecodeHandleInIndexValue(e.handleVal)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestReturnValues(t *testing.T) {
txnCtx := tk.Session().GetSessionVars().TxnCtx
val, ok := txnCtx.GetKeyInPessimisticLockCache(pk)
require.True(t, ok)
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, false)
handle, err := tablecodec.DecodeHandleInIndexValue(val)
require.NoError(t, err)
rowKey := tablecodec.EncodeRowKeyWithHandle(tid, handle)
_, ok = txnCtx.GetKeyInPessimisticLockCache(rowKey)
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
// 3. error: the error.
func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (rowUnchanged, foundDupKey bool, err error) {
for _, uk := range r.uniqueKeys {
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle)
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID)
if err != nil {
return false, false, err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1951,8 +1951,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return nil, err1
}
for idxKey, idxVal := range idxVals {
isCommonHd := isCommonHandle[idxKey]
h, err2 := tablecodec.DecodeHandleInUniqueIndexValue(idxVal, isCommonHd)
h, err2 := tablecodec.DecodeHandleInIndexValue(idxVal)
if err2 != nil {
return nil, err2
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu
val = tempVal.Encode(value)
}
needPresumeNotExists, err := needPresumeKeyNotExistsFlag(ctx, txn, key, tempKey, h,
keyIsTempIdxKey, c.tblInfo.IsCommonHandle, c.tblInfo.ID)
keyIsTempIdxKey, c.tblInfo.ID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -356,7 +356,7 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu
if keyIsTempIdxKey && !tempIdxVal.IsEmpty() {
value = tempIdxVal.Current().Value
}
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle)
handle, err := tablecodec.DecodeHandleInIndexValue(value)
if err != nil {
return nil, err
}
Expand All @@ -366,7 +366,7 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu
}

func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, tempKey kv.Key,
h kv.Handle, keyIsTempIdxKey bool, isCommon bool, tblID int64) (needFlag bool, err error) {
h kv.Handle, keyIsTempIdxKey bool, tblID int64) (needFlag bool, err error) {
var uniqueTempKey kv.Key
if keyIsTempIdxKey {
uniqueTempKey = key
Expand All @@ -375,7 +375,7 @@ func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, t
} else {
return true, nil
}
foundKey, dupHandle, err := FetchDuplicatedHandle(ctx, uniqueTempKey, true, txn, tblID, isCommon)
foundKey, dupHandle, err := FetchDuplicatedHandle(ctx, uniqueTempKey, true, txn, tblID)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -417,7 +417,7 @@ func (c *index) Delete(ctx table.MutateContext, txn kv.Transaction, indexedValue
return err
}
if len(originVal) > 0 {
oh, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, c.tblInfo.IsCommonHandle)
oh, err := tablecodec.DecodeHandleInIndexValue(originVal)
if err != nil {
return err
}
Expand Down Expand Up @@ -526,7 +526,7 @@ func (c *index) Exist(ec errctx.Context, loc *time.Location, txn kv.Transaction,
if len(tempKey) > 0 {
key = tempKey
}
foundKey, dupHandle, err := FetchDuplicatedHandle(context.TODO(), key, distinct, txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle)
foundKey, dupHandle, err := FetchDuplicatedHandle(context.TODO(), key, distinct, txn, c.tblInfo.ID)
if err != nil || !foundKey {
return false, nil, err
}
Expand All @@ -540,24 +540,24 @@ func (c *index) Exist(ec errctx.Context, loc *time.Location, txn kv.Transaction,

// FetchDuplicatedHandle is used to find the duplicated row's handle for a given unique index key.
func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool,
txn kv.Transaction, tableID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) {
txn kv.Transaction, tableID int64) (foundKey bool, dupHandle kv.Handle, err error) {
if tablecodec.IsTempIndexKey(key) {
return fetchDuplicatedHandleForTempIndexKey(ctx, key, distinct, txn, tableID, isCommon)
return fetchDuplicatedHandleForTempIndexKey(ctx, key, distinct, txn, tableID)
}
// The index key is not from temp index.
val, err := getKeyInTxn(ctx, txn, key)
if err != nil || len(val) == 0 {
return false, nil, err
}
if distinct {
h, err := tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon)
h, err := tablecodec.DecodeHandleInIndexValue(val)
return true, h, err
}
return true, nil, nil
}

func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, distinct bool,
txn kv.Transaction, tableID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) {
txn kv.Transaction, tableID int64) (foundKey bool, dupHandle kv.Handle, err error) {
tempRawVal, err := getKeyInTxn(ctx, txn, tempKey)
if err != nil {
return false, nil, err
Expand All @@ -570,7 +570,7 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d
return false, nil, err
}
if distinct {
originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon)
originHandle, err := tablecodec.DecodeHandleInIndexValue(originVal)
if err != nil {
return false, nil, err
}
Expand All @@ -591,7 +591,7 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d
return false, nil, err
}
if distinct {
originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon)
originHandle, err := tablecodec.DecodeHandleInIndexValue(originVal)
if err != nil {
return false, nil, err
}
Expand All @@ -614,7 +614,7 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d
}
// The value in temp index is not the delete marker.
if distinct {
h, err := tablecodec.DecodeHandleInUniqueIndexValue(curElem.Value, isCommon)
h, err := tablecodec.DecodeHandleInIndexValue(curElem.Value)
return true, h, err
}
return true, nil, nil
Expand Down
30 changes: 30 additions & 0 deletions pkg/tablecodec/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package tablecodec

import (
"testing"
"time"

"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/benchdaily"
"github.com/pingcap/tidb/pkg/util/codec"
)

func BenchmarkEncodeRowKeyWithHandle(b *testing.B) {
Expand Down Expand Up @@ -55,6 +58,33 @@ func BenchmarkDecodeRowKey(b *testing.B) {
}
}

func BenchmarkDecodeIndexKeyIntHandle(b *testing.B) {
var idxVal []byte
// When handle values greater than 255, it will have a memory alloc.
idxVal = append(idxVal, EncodeHandleInUniqueIndexValue(kv.IntHandle(256), false)...)

for i := 0; i < b.N; i++ {
DecodeHandleInIndexValue(idxVal)
}
}

func BenchmarkDecodeIndexKeyCommonHandle(b *testing.B) {
var idxVal []byte
idxVal = append(idxVal, 0)
// index version
idxVal = append(idxVal, IndexVersionFlag)
idxVal = append(idxVal, byte(1))

// common handle
encoded, _ := codec.EncodeKey(time.UTC, nil, types.MakeDatums(1, 2)...)
h, _ := kv.NewCommonHandle(encoded)
idxVal = encodeCommonHandle(idxVal, h)

for i := 0; i < b.N; i++ {
DecodeHandleInIndexValue(idxVal)
}
}

func TestBenchDaily(t *testing.T) {
benchdaily.Run(
BenchmarkEncodeRowKeyWithHandle,
Expand Down
40 changes: 9 additions & 31 deletions pkg/tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func CutRowKeyPrefix(key kv.Key) []byte {
// EncodeRecordKey encodes the recordPrefix, row handle into a kv.Key.
func EncodeRecordKey(recordPrefix kv.Key, h kv.Handle) kv.Key {
buf := make([]byte, 0, len(recordPrefix)+h.Len())
if ph, ok := h.(kv.PartitionHandle); ok {
recordPrefix = GenTableRecordPrefix(ph.PartitionID)
}
buf = append(buf, recordPrefix...)
buf = append(buf, h.Encoded()...)
return buf
Expand Down Expand Up @@ -960,7 +963,7 @@ func DecodeIndexHandle(key, value []byte, colsLen int) (kv.Handle, error) {
if len(b) > 0 {
return decodeHandleInIndexKey(b)
} else if len(value) >= 8 {
return decodeHandleInIndexValue(value)
return DecodeHandleInIndexValue(value)
}
// Should never execute to here.
return nil, errors.Errorf("no handle in index key: %v, value: %v", key, value)
Expand All @@ -977,7 +980,11 @@ func decodeHandleInIndexKey(keySuffix []byte) (kv.Handle, error) {
return kv.NewCommonHandle(keySuffix)
}

func decodeHandleInIndexValue(value []byte) (handle kv.Handle, err error) {
// DecodeHandleInIndexValue decodes handle in unqiue index value.
func DecodeHandleInIndexValue(value []byte) (handle kv.Handle, err error) {
if len(value) <= MaxOldEncodeValueLen {
return decodeIntHandleInIndexValue(value), nil
}
Comment on lines +985 to +987
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove these codes that look like they will be processed on line988?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This speeds up the function for simplest int handles(like _tidb_rowid).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance difference is 9ns/op compare 15ns/op

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

seg := SplitIndexValue(value)
if len(seg.IntHandle) != 0 {
handle = decodeIntHandleInIndexValue(seg.IntHandle)
Expand Down Expand Up @@ -1676,35 +1683,6 @@ func encodeCommonHandle(idxVal []byte, h kv.Handle) []byte {
return idxVal
}

// DecodeHandleInUniqueIndexValue decodes handle in data.
func DecodeHandleInUniqueIndexValue(data []byte, isCommonHandle bool) (kv.Handle, error) {
if !isCommonHandle {
dLen := len(data)
if dLen <= MaxOldEncodeValueLen {
return kv.IntHandle(int64(binary.BigEndian.Uint64(data))), nil
}
return kv.IntHandle(int64(binary.BigEndian.Uint64(data[dLen-int(data[0]):]))), nil
}
if getIndexVersion(data) == 1 {
seg := splitIndexValueForClusteredIndexVersion1(data)
h, err := kv.NewCommonHandle(seg.CommonHandle)
if err != nil {
return nil, err
}
return h, nil
}

tailLen := int(data[0])
data = data[:len(data)-tailLen]
handleLen := uint16(data[2])<<8 + uint16(data[3])
handleEndOff := 4 + handleLen
h, err := kv.NewCommonHandle(data[4:handleEndOff])
if err != nil {
return nil, err
}
return h, nil
}

func encodePartitionID(idxVal []byte, partitionID int64) []byte {
idxVal = append(idxVal, PartitionIDFlag)
idxVal = codec.EncodeInt(idxVal, partitionID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/tablecodec/tablecodec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func TestTempIndexValueCodec(t *testing.T) {
remain, err = newTempIdxVal.DecodeOne(val)
require.NoError(t, err)
require.Equal(t, 0, len(remain))
handle, err := DecodeHandleInUniqueIndexValue(newTempIdxVal.Value, false)
handle, err := DecodeHandleInIndexValue(newTempIdxVal.Value)
require.NoError(t, err)
require.Equal(t, handle.IntValue(), int64(100))
require.EqualValues(t, tempIdxVal, newTempIdxVal)
Expand Down
Loading