Skip to content

Commit

Permalink
txn: save RPCs for Pipelined-DML (#51652)
Browse files Browse the repository at this point in the history
close #51625
  • Loading branch information
ekexium authored Mar 16, 2024
1 parent e158c21 commit b3cb95d
Show file tree
Hide file tree
Showing 19 changed files with 315 additions and 68 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7054,13 +7054,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "6aac9bafaa457d038aaa5e70e0a288ac5fcf19599f4f625de3bd90d76eb7c6ee",
strip_prefix = "github.com/tikv/client-go/[email protected].20240313022320-d59fea5757db",
sha256 = "9e86def6bedefa6095b20a2d90afa099ca301d49f6792eccb54883201df3063b",
strip_prefix = "github.com/tikv/client-go/[email protected].20240316105842-98a7df8f413d",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240313022320-d59fea5757db.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240313022320-d59fea5757db.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240313022320-d59fea5757db.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240313022320-d59fea5757db.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip",
],
)
go_repository(
Expand Down
16 changes: 13 additions & 3 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ func (*MemBuf) Staging() kv.StagingHandle {
// If the changes are not published by `Release`, they will be discarded.
func (*MemBuf) Cleanup(_ kv.StagingHandle) {}

// MayFlush implements the kv.MemBuffer interface.
func (*MemBuf) MayFlush() error {
return nil
// GetLocal implements the kv.MemBuffer interface.
func (mb *MemBuf) GetLocal(ctx context.Context, key []byte) ([]byte, error) {
return mb.Get(ctx, key)
}

// Size returns sum of keys and values length.
Expand Down Expand Up @@ -272,6 +272,16 @@ func (*transaction) SetAssertion(_ []byte, _ ...kv.FlagsOp) error {
return nil
}

// IsPipelined implements the kv.Transaction interface.
func (*transaction) IsPipelined() bool {
return false
}

// MayFlush implements the kv.Transaction interface.
func (*transaction) MayFlush() error {
return nil
}

type planCtxImpl struct {
*Session
*planctximpl.PlanCtxExtendedImpl
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ require (
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tidwall/btree v1.7.0
github.com/tikv/client-go/v2 v2.0.8-0.20240313022320-d59fea5757db
github.com/tikv/client-go/v2 v2.0.8-0.20240316105842-98a7df8f413d
github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e
github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -868,8 +868,8 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tikv/client-go/v2 v2.0.8-0.20240313022320-d59fea5757db h1:9OWldDvBvRMXThTjPKSskgu+WzLWvhP/Op3gS1YhiEg=
github.com/tikv/client-go/v2 v2.0.8-0.20240313022320-d59fea5757db/go.mod h1:9s6+YbGt0kW+9qTFDXuc5TkIpwpEf038S1UCa3utsSQ=
github.com/tikv/client-go/v2 v2.0.8-0.20240316105842-98a7df8f413d h1:QDsglttywjxGkxf7pjInpKTudLwy3qr/sR9BCxBiLow=
github.com/tikv/client-go/v2 v2.0.8-0.20240316105842-98a7df8f413d/go.mod h1:9s6+YbGt0kW+9qTFDXuc5TkIpwpEf038S1UCa3utsSQ=
github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e h1:kHXMmskVCNyH53u43I73Y5cmZ6yqqder/jGOiI7ylxs=
github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg=
github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a h1:A6uKudFIfAEpoPdaal3aSqGxBzLyU8TqyXImLwo6dIo=
Expand Down
10 changes: 10 additions & 0 deletions pkg/executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
rowCount++
}
chk = chunk.Renew(chk, e.MaxChunkSize())
if txn, _ := e.Ctx().Txn(false); txn != nil {
if err := txn.MayFlush(); err != nil {
return err
}
}
}

return nil
Expand Down Expand Up @@ -222,6 +227,11 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
}
}
chk = exec.TryNewCacheChunk(e.Children(0))
if txn, _ := e.Ctx().Txn(false); txn != nil {
if err := txn.MayFlush(); err != nil {
return err
}
}
}

return e.removeRowsInTblRowMap(tblRowMap)
Expand Down
10 changes: 9 additions & 1 deletion pkg/executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
// If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword,
// the to-be-insert rows will be check on duplicate keys and update to the new rows.
if len(e.OnDuplicate) > 0 {
if ignoreErr && txn.IsPipelined() {
// P-DML doesn't support staging now. In a INSERT IGNORE ... ON DUPLICATE ... stmt, it's
// possible that 1 row succeeded and 1 row returns error but is ignored, thus the whole
// statement can successfully commit. See TestIssue50043
// The second row is supposed to be cleanup by the staging mechanism, so forbid this
// case for P-DML temporarily.
return errors.New("cannot use pipelined mode with insert ignore and on duplicate update")
}
err := e.batchUpdateDupRows(ctx, rows)
if err != nil {
return err
Expand Down Expand Up @@ -120,7 +128,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
e.stats.CheckInsertTime += time.Since(start)
}
}
return nil
return txn.MayFlush()
}

func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
}
}
e.memTracker.Consume(int64(txn.Size() - txnSize))
return nil
return txn.MayFlush()
}

// Next implements the Executor Next interface.
Expand Down
3 changes: 3 additions & 0 deletions pkg/executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newDat
}
return err1
}
if txn, _ := e.Ctx().Txn(false); txn != nil {
return txn.MayFlush()
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (t *mockTxn) CancelFairLocking(_ context.Context) error { return nil }
func (t *mockTxn) DoneFairLocking(_ context.Context) error { return nil }
func (t *mockTxn) IsInFairLockingMode() bool { return false }
func (t *mockTxn) IsPipelined() bool { return false }
func (t *mockTxn) MayFlush() error { return nil }

// newMockTxn new a mockTxn.
func newMockTxn() Transaction {
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,11 @@ type MemBuffer interface {
// RemoveFromBuffer removes the entry from the buffer. It's used for testing.
RemoveFromBuffer(Key)

// MayFlush will be called in pipelined txn
MayFlush() error
// GetLocal checks if the key exists in the buffer in local memory.
GetLocal(context.Context, []byte) ([]byte, error)

// BatchGet gets values from the memory buffer.
BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error)
}

// FindKeysInStage returns all keys in the given stage that satisfies the given condition.
Expand Down Expand Up @@ -281,6 +284,8 @@ type Transaction interface {
UpdateMemBufferFlags(key []byte, flags ...FlagsOp)
// IsPipelined returns whether the transaction is used for pipelined DML.
IsPipelined() bool
// MayFlush flush the pipelined memdb if the keys or size exceeds threshold, no effect for standard DML.
MayFlush() error
}

// AssertionProto is an interface defined for the assertion protocol.
Expand Down
8 changes: 7 additions & 1 deletion pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4326,7 +4326,9 @@ func (s *session) usePipelinedDmlOrWarn() bool {
return false
}
if !(stmtCtx.InInsertStmt || stmtCtx.InDeleteStmt || stmtCtx.InUpdateStmt) {
stmtCtx.AppendWarning(errors.New("Pipelined DML can only be used for auto-commit INSERT, REPLACE, UPDATE or DELETE. Fallback to standard mode"))
if !stmtCtx.IsReadOnly {
stmtCtx.AppendWarning(errors.New("Pipelined DML can only be used for auto-commit INSERT, REPLACE, UPDATE or DELETE. Fallback to standard mode"))
}
return false
}
if s.isInternal() {
Expand All @@ -4341,6 +4343,10 @@ func (s *session) usePipelinedDmlOrWarn() bool {
stmtCtx.AppendWarning(errors.New("Pipelined DML can only be used in autocommit mode. Fallback to standard mode"))
return false
}
if s.GetSessionVars().ConstraintCheckInPlace {
// we enforce that pipelined DML must lazily check key.
return false
}

// tidb_dml_type=bulk will invalidate the config pessimistic-auto-commit.
// The behavior is as if the config is set to false. But we generate a warning for it.
Expand Down
26 changes: 25 additions & 1 deletion pkg/store/driver/txn/batch_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type tikvBatchGetter struct {
}

func (b tikvBatchGetter) BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
// toTiDBKeys
kvKeys := *(*[]kv.Key)(unsafe.Pointer(&keys))
vals, err := b.tidbBatchGetter.BatchGet(ctx, kvKeys)
return vals, err
Expand Down Expand Up @@ -66,6 +65,29 @@ func (b tikvBatchBufferGetter) Get(ctx context.Context, k []byte) ([]byte, error
return val, err
}

func (b tikvBatchBufferGetter) BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
bufferValues, err := b.tidbBuffer.BatchGet(ctx, keys)
if err != nil {
return nil, err
}
if b.tidbMiddleCache == nil {
return bufferValues, nil
}
for _, key := range keys {
if _, ok := bufferValues[string(key)]; !ok {
val, err := b.tidbMiddleCache.Get(ctx, key)
if err != nil {
if kv.IsErrNotFound(err) {
continue
}
return nil, err
}
bufferValues[string(key)] = val
}
}
return bufferValues, nil
}

func (b tikvBatchBufferGetter) Len() int {
return b.tidbBuffer.Len()
}
Expand All @@ -74,6 +96,8 @@ func (b tikvBatchBufferGetter) Len() int {
type BatchBufferGetter interface {
Len() int
Getter
// BatchGet gets a batch of values, keys are in bytes slice format.
BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error)
}

// BatchGetter is the interface for BatchGet.
Expand Down
12 changes: 11 additions & 1 deletion pkg/store/driver/txn/batch_getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package txn
import (
"context"
"testing"
"unsafe"

"github.com/pingcap/tidb/pkg/kv"
"github.com/stretchr/testify/require"
Expand All @@ -41,7 +42,7 @@ func TestBufferBatchGetter(t *testing.T) {
require.NoError(t, buffer.Set(ka, []byte("a2")))
require.NoError(t, buffer.Delete(kb))

batchGetter := NewBufferBatchGetter(buffer, middle, snap)
batchGetter := NewBufferBatchGetter(&mockBufferBatchGetterStore{buffer}, middle, snap)
result, err := batchGetter.BatchGet(context.Background(), []kv.Key{ka, kb, kc, kd})
require.NoError(t, err)
require.Len(t, result, 3)
Expand Down Expand Up @@ -105,3 +106,12 @@ func (s *mockBatchGetterStore) Set(k kv.Key, v []byte) error {
func (s *mockBatchGetterStore) Delete(k kv.Key) error {
return s.Set(k, []byte{})
}

type mockBufferBatchGetterStore struct {
*mockBatchGetterStore
}

func (s *mockBufferBatchGetterStore) BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
kvKeys := *(*[]kv.Key)(unsafe.Pointer(&keys))
return s.mockBatchGetterStore.BatchGet(ctx, kvKeys)
}
9 changes: 9 additions & 0 deletions pkg/store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,15 @@ func (txn *tikvTxn) IsInFairLockingMode() bool {
return txn.KVTxn.IsInAggressiveLockingMode()
}

// MayFlush wraps the flush function and extract the error.
func (txn *tikvTxn) MayFlush() error {
if !txn.IsPipelined() {
return nil
}
_, err := txn.KVTxn.GetMemBuffer().Flush(false)
return txn.extractKeyErr(err)
}

// TiDBKVFilter is the filter specific to TiDB to filter out KV pairs that needn't be committed.
type TiDBKVFilter struct{}

Expand Down
16 changes: 9 additions & 7 deletions pkg/store/driver/txn/unionstore_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,15 @@ func (m *memBuffer) SnapshotGetter() kv.Getter {
return newKVGetter(m.MemBuffer.SnapshotGetter())
}

// MayFlush implements kv.MemBuffer.MayFlush interface.
func (m *memBuffer) MayFlush() error {
if !m.isPipelinedDML {
return nil
}
_, err := m.MemBuffer.Flush(false)
return err
// GetLocal implements kv.MemBuffer interface
func (m *memBuffer) GetLocal(ctx context.Context, key []byte) ([]byte, error) {
data, err := m.MemBuffer.GetLocal(ctx, key)
return data, derr.ToTiDBErr(err)
}

func (m *memBuffer) BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
data, err := m.MemBuffer.BatchGet(ctx, keys)
return data, derr.ToTiDBErr(err)
}

type tikvGetter struct {
Expand Down
27 changes: 21 additions & 6 deletions pkg/store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1493,12 +1493,27 @@ func (store *MVCCStore) ReadBufferFromLock(startTS uint64, keys ...[]byte) []*kv
if lock.StartTS != startTS {
continue
}
val := getValueFromLock(&lock)
if len(val) > 0 {
pairs = append(pairs, &kvrpcpb.KvPair{
Key: key,
Value: safeCopy(val),
})
switch lock.Op {
case uint8(kvrpcpb.Op_Put):
val := getValueFromLock(&lock)
if val == nil {
panic("Op_Put has a nil value")
}
pairs = append(
pairs, &kvrpcpb.KvPair{
Key: key,
Value: safeCopy(val),
},
)
case uint8(kvrpcpb.Op_Del):
pairs = append(
pairs, &kvrpcpb.KvPair{
Key: key,
Value: []byte{},
},
)
default:
panic("unexpected op. Optimistic txn should only contain put and delete locks")
}
}
return pairs
Expand Down
12 changes: 9 additions & 3 deletions pkg/table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu
}
}

if txn.IsPipelined() {
// For pipelined DML, disable the untouched optimization to avoid extra RPCs for MemBuffer.Get().
// TODO: optimize this.
opt.Untouched = false
}

if opt.Untouched {
txn, err1 := sctx.Txn(true)
if err1 != nil {
Expand Down Expand Up @@ -278,10 +284,10 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu
if c.tblInfo.TempTableType != model.TempTableNone {
// Always check key for temporary table because it does not write to TiKV
value, err = txn.Get(ctx, key)
} else if sctx.GetSessionVars().LazyCheckKeyNotExists() && !keyIsTempIdxKey {
} else if (txn.IsPipelined() || sctx.GetSessionVars().LazyCheckKeyNotExists()) && !keyIsTempIdxKey {
// For temp index keys, we can't get the temp value from memory buffer, even if the lazy check is enabled.
// Otherwise, it may cause the temp index value to be overwritten, leading to data inconsistency.
value, err = txn.GetMemBuffer().Get(ctx, key)
value, err = txn.GetMemBuffer().GetLocal(ctx, key)
} else {
value, err = txn.Get(ctx, key)
}
Expand All @@ -298,7 +304,7 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu
// The index key value is not found or deleted.
if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().Delete) {
val := idxVal
lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil
lazyCheck := (txn.IsPipelined() || sctx.GetSessionVars().LazyCheckKeyNotExists()) && err != nil
if keyIsTempIdxKey {
tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true}
val = tempVal.Encode(value)
Expand Down
Loading

0 comments on commit b3cb95d

Please sign in to comment.