diff --git a/DEPS.bzl b/DEPS.bzl index f640e181e9916..0fe1dc011d812 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -7054,13 +7054,13 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sha256 = "6aac9bafaa457d038aaa5e70e0a288ac5fcf19599f4f625de3bd90d76eb7c6ee", - strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240313022320-d59fea5757db", + sha256 = "9e86def6bedefa6095b20a2d90afa099ca301d49f6792eccb54883201df3063b", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240316105842-98a7df8f413d", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240313022320-d59fea5757db.zip", - "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240313022320-d59fea5757db.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240313022320-d59fea5757db.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240313022320-d59fea5757db.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240316105842-98a7df8f413d.zip", ], ) go_repository( diff --git a/br/pkg/lightning/backend/kv/session.go b/br/pkg/lightning/backend/kv/session.go index 0f5b4a1ba9f5c..396698881de4b 100644 --- a/br/pkg/lightning/backend/kv/session.go +++ b/br/pkg/lightning/backend/kv/session.go @@ -180,9 +180,9 @@ func (*MemBuf) Staging() kv.StagingHandle { // If the changes are not published by `Release`, they will be discarded. func (*MemBuf) Cleanup(_ kv.StagingHandle) {} -// MayFlush implements the kv.MemBuffer interface. -func (*MemBuf) MayFlush() error { - return nil +// GetLocal implements the kv.MemBuffer interface. +func (mb *MemBuf) GetLocal(ctx context.Context, key []byte) ([]byte, error) { + return mb.Get(ctx, key) } // Size returns sum of keys and values length. @@ -272,6 +272,16 @@ func (*transaction) SetAssertion(_ []byte, _ ...kv.FlagsOp) error { return nil } +// IsPipelined implements the kv.Transaction interface. +func (*transaction) IsPipelined() bool { + return false +} + +// MayFlush implements the kv.Transaction interface. +func (*transaction) MayFlush() error { + return nil +} + type planCtxImpl struct { *Session *planctximpl.PlanCtxExtendedImpl diff --git a/go.mod b/go.mod index 766372ec094cd..f2be14e51e65e 100644 --- a/go.mod +++ b/go.mod @@ -107,7 +107,7 @@ require ( github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 github.com/tidwall/btree v1.7.0 - github.com/tikv/client-go/v2 v2.0.8-0.20240313022320-d59fea5757db + github.com/tikv/client-go/v2 v2.0.8-0.20240316105842-98a7df8f413d github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a github.com/twmb/murmur3 v1.1.6 diff --git a/go.sum b/go.sum index 5fb7da206caae..eceeb59a8e3f9 100644 --- a/go.sum +++ b/go.sum @@ -868,8 +868,8 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= -github.com/tikv/client-go/v2 v2.0.8-0.20240313022320-d59fea5757db h1:9OWldDvBvRMXThTjPKSskgu+WzLWvhP/Op3gS1YhiEg= -github.com/tikv/client-go/v2 v2.0.8-0.20240313022320-d59fea5757db/go.mod h1:9s6+YbGt0kW+9qTFDXuc5TkIpwpEf038S1UCa3utsSQ= +github.com/tikv/client-go/v2 v2.0.8-0.20240316105842-98a7df8f413d h1:QDsglttywjxGkxf7pjInpKTudLwy3qr/sR9BCxBiLow= +github.com/tikv/client-go/v2 v2.0.8-0.20240316105842-98a7df8f413d/go.mod h1:9s6+YbGt0kW+9qTFDXuc5TkIpwpEf038S1UCa3utsSQ= github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e h1:kHXMmskVCNyH53u43I73Y5cmZ6yqqder/jGOiI7ylxs= github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg= github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a h1:A6uKudFIfAEpoPdaal3aSqGxBzLyU8TqyXImLwo6dIo= diff --git a/pkg/executor/delete.go b/pkg/executor/delete.go index 330f39192ec57..cbd328e14dafe 100644 --- a/pkg/executor/delete.go +++ b/pkg/executor/delete.go @@ -144,6 +144,11 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { rowCount++ } chk = chunk.Renew(chk, e.MaxChunkSize()) + if txn, _ := e.Ctx().Txn(false); txn != nil { + if err := txn.MayFlush(); err != nil { + return err + } + } } return nil @@ -222,6 +227,11 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error { } } chk = exec.TryNewCacheChunk(e.Children(0)) + if txn, _ := e.Ctx().Txn(false); txn != nil { + if err := txn.MayFlush(); err != nil { + return err + } + } } return e.removeRowsInTblRowMap(tblRowMap) diff --git a/pkg/executor/insert.go b/pkg/executor/insert.go index 9b24962c8c1b4..28625264e65ee 100644 --- a/pkg/executor/insert.go +++ b/pkg/executor/insert.go @@ -88,6 +88,14 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { // If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword, // the to-be-insert rows will be check on duplicate keys and update to the new rows. if len(e.OnDuplicate) > 0 { + if ignoreErr && txn.IsPipelined() { + // P-DML doesn't support staging now. In a INSERT IGNORE ... ON DUPLICATE ... stmt, it's + // possible that 1 row succeeded and 1 row returns error but is ignored, thus the whole + // statement can successfully commit. See TestIssue50043 + // The second row is supposed to be cleanup by the staging mechanism, so forbid this + // case for P-DML temporarily. + return errors.New("cannot use pipelined mode with insert ignore and on duplicate update") + } err := e.batchUpdateDupRows(ctx, rows) if err != nil { return err @@ -120,7 +128,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { e.stats.CheckInsertTime += time.Since(start) } } - return nil + return txn.MayFlush() } func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) { diff --git a/pkg/executor/replace.go b/pkg/executor/replace.go index d2f30e4333c8c..687ddaf00a333 100644 --- a/pkg/executor/replace.go +++ b/pkg/executor/replace.go @@ -188,7 +188,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { } } e.memTracker.Consume(int64(txn.Size() - txnSize)) - return nil + return txn.MayFlush() } // Next implements the Executor Next interface. diff --git a/pkg/executor/update.go b/pkg/executor/update.go index 3b33644cfd8b9..6f3c2d43a8356 100644 --- a/pkg/executor/update.go +++ b/pkg/executor/update.go @@ -218,6 +218,9 @@ func (e *UpdateExec) exec(ctx context.Context, _ *expression.Schema, row, newDat } return err1 } + if txn, _ := e.Ctx().Txn(false); txn != nil { + return txn.MayFlush() + } return nil } diff --git a/pkg/kv/interface_mock_test.go b/pkg/kv/interface_mock_test.go index 83b6be1a662a4..9ea952cfb750f 100644 --- a/pkg/kv/interface_mock_test.go +++ b/pkg/kv/interface_mock_test.go @@ -182,6 +182,7 @@ func (t *mockTxn) CancelFairLocking(_ context.Context) error { return nil } func (t *mockTxn) DoneFairLocking(_ context.Context) error { return nil } func (t *mockTxn) IsInFairLockingMode() bool { return false } func (t *mockTxn) IsPipelined() bool { return false } +func (t *mockTxn) MayFlush() error { return nil } // newMockTxn new a mockTxn. func newMockTxn() Transaction { diff --git a/pkg/kv/kv.go b/pkg/kv/kv.go index bec0221e57f05..39c59f2dd5144 100644 --- a/pkg/kv/kv.go +++ b/pkg/kv/kv.go @@ -189,8 +189,11 @@ type MemBuffer interface { // RemoveFromBuffer removes the entry from the buffer. It's used for testing. RemoveFromBuffer(Key) - // MayFlush will be called in pipelined txn - MayFlush() error + // GetLocal checks if the key exists in the buffer in local memory. + GetLocal(context.Context, []byte) ([]byte, error) + + // BatchGet gets values from the memory buffer. + BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) } // FindKeysInStage returns all keys in the given stage that satisfies the given condition. @@ -281,6 +284,8 @@ type Transaction interface { UpdateMemBufferFlags(key []byte, flags ...FlagsOp) // IsPipelined returns whether the transaction is used for pipelined DML. IsPipelined() bool + // MayFlush flush the pipelined memdb if the keys or size exceeds threshold, no effect for standard DML. + MayFlush() error } // AssertionProto is an interface defined for the assertion protocol. diff --git a/pkg/session/session.go b/pkg/session/session.go index a2d86b0353c23..aa0e5909fa9ae 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -4326,7 +4326,9 @@ func (s *session) usePipelinedDmlOrWarn() bool { return false } if !(stmtCtx.InInsertStmt || stmtCtx.InDeleteStmt || stmtCtx.InUpdateStmt) { - stmtCtx.AppendWarning(errors.New("Pipelined DML can only be used for auto-commit INSERT, REPLACE, UPDATE or DELETE. Fallback to standard mode")) + if !stmtCtx.IsReadOnly { + stmtCtx.AppendWarning(errors.New("Pipelined DML can only be used for auto-commit INSERT, REPLACE, UPDATE or DELETE. Fallback to standard mode")) + } return false } if s.isInternal() { @@ -4341,6 +4343,10 @@ func (s *session) usePipelinedDmlOrWarn() bool { stmtCtx.AppendWarning(errors.New("Pipelined DML can only be used in autocommit mode. Fallback to standard mode")) return false } + if s.GetSessionVars().ConstraintCheckInPlace { + // we enforce that pipelined DML must lazily check key. + return false + } // tidb_dml_type=bulk will invalidate the config pessimistic-auto-commit. // The behavior is as if the config is set to false. But we generate a warning for it. diff --git a/pkg/store/driver/txn/batch_getter.go b/pkg/store/driver/txn/batch_getter.go index bd988c64b6563..465ee450db866 100644 --- a/pkg/store/driver/txn/batch_getter.go +++ b/pkg/store/driver/txn/batch_getter.go @@ -31,7 +31,6 @@ type tikvBatchGetter struct { } func (b tikvBatchGetter) BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) { - // toTiDBKeys kvKeys := *(*[]kv.Key)(unsafe.Pointer(&keys)) vals, err := b.tidbBatchGetter.BatchGet(ctx, kvKeys) return vals, err @@ -66,6 +65,29 @@ func (b tikvBatchBufferGetter) Get(ctx context.Context, k []byte) ([]byte, error return val, err } +func (b tikvBatchBufferGetter) BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) { + bufferValues, err := b.tidbBuffer.BatchGet(ctx, keys) + if err != nil { + return nil, err + } + if b.tidbMiddleCache == nil { + return bufferValues, nil + } + for _, key := range keys { + if _, ok := bufferValues[string(key)]; !ok { + val, err := b.tidbMiddleCache.Get(ctx, key) + if err != nil { + if kv.IsErrNotFound(err) { + continue + } + return nil, err + } + bufferValues[string(key)] = val + } + } + return bufferValues, nil +} + func (b tikvBatchBufferGetter) Len() int { return b.tidbBuffer.Len() } @@ -74,6 +96,8 @@ func (b tikvBatchBufferGetter) Len() int { type BatchBufferGetter interface { Len() int Getter + // BatchGet gets a batch of values, keys are in bytes slice format. + BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) } // BatchGetter is the interface for BatchGet. diff --git a/pkg/store/driver/txn/batch_getter_test.go b/pkg/store/driver/txn/batch_getter_test.go index 1b07532be3062..a0341260040b7 100644 --- a/pkg/store/driver/txn/batch_getter_test.go +++ b/pkg/store/driver/txn/batch_getter_test.go @@ -16,6 +16,7 @@ package txn import ( "context" "testing" + "unsafe" "github.com/pingcap/tidb/pkg/kv" "github.com/stretchr/testify/require" @@ -41,7 +42,7 @@ func TestBufferBatchGetter(t *testing.T) { require.NoError(t, buffer.Set(ka, []byte("a2"))) require.NoError(t, buffer.Delete(kb)) - batchGetter := NewBufferBatchGetter(buffer, middle, snap) + batchGetter := NewBufferBatchGetter(&mockBufferBatchGetterStore{buffer}, middle, snap) result, err := batchGetter.BatchGet(context.Background(), []kv.Key{ka, kb, kc, kd}) require.NoError(t, err) require.Len(t, result, 3) @@ -105,3 +106,12 @@ func (s *mockBatchGetterStore) Set(k kv.Key, v []byte) error { func (s *mockBatchGetterStore) Delete(k kv.Key) error { return s.Set(k, []byte{}) } + +type mockBufferBatchGetterStore struct { + *mockBatchGetterStore +} + +func (s *mockBufferBatchGetterStore) BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) { + kvKeys := *(*[]kv.Key)(unsafe.Pointer(&keys)) + return s.mockBatchGetterStore.BatchGet(ctx, kvKeys) +} diff --git a/pkg/store/driver/txn/txn_driver.go b/pkg/store/driver/txn/txn_driver.go index f8a2bbc3fadb3..9214225cf02fc 100644 --- a/pkg/store/driver/txn/txn_driver.go +++ b/pkg/store/driver/txn/txn_driver.go @@ -427,6 +427,15 @@ func (txn *tikvTxn) IsInFairLockingMode() bool { return txn.KVTxn.IsInAggressiveLockingMode() } +// MayFlush wraps the flush function and extract the error. +func (txn *tikvTxn) MayFlush() error { + if !txn.IsPipelined() { + return nil + } + _, err := txn.KVTxn.GetMemBuffer().Flush(false) + return txn.extractKeyErr(err) +} + // TiDBKVFilter is the filter specific to TiDB to filter out KV pairs that needn't be committed. type TiDBKVFilter struct{} diff --git a/pkg/store/driver/txn/unionstore_driver.go b/pkg/store/driver/txn/unionstore_driver.go index e12e673ba8555..400bca8d921af 100644 --- a/pkg/store/driver/txn/unionstore_driver.go +++ b/pkg/store/driver/txn/unionstore_driver.go @@ -148,13 +148,15 @@ func (m *memBuffer) SnapshotGetter() kv.Getter { return newKVGetter(m.MemBuffer.SnapshotGetter()) } -// MayFlush implements kv.MemBuffer.MayFlush interface. -func (m *memBuffer) MayFlush() error { - if !m.isPipelinedDML { - return nil - } - _, err := m.MemBuffer.Flush(false) - return err +// GetLocal implements kv.MemBuffer interface +func (m *memBuffer) GetLocal(ctx context.Context, key []byte) ([]byte, error) { + data, err := m.MemBuffer.GetLocal(ctx, key) + return data, derr.ToTiDBErr(err) +} + +func (m *memBuffer) BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) { + data, err := m.MemBuffer.BatchGet(ctx, keys) + return data, derr.ToTiDBErr(err) } type tikvGetter struct { diff --git a/pkg/store/mockstore/unistore/tikv/mvcc.go b/pkg/store/mockstore/unistore/tikv/mvcc.go index a7690bb557274..f4bab77bb1215 100644 --- a/pkg/store/mockstore/unistore/tikv/mvcc.go +++ b/pkg/store/mockstore/unistore/tikv/mvcc.go @@ -1493,12 +1493,27 @@ func (store *MVCCStore) ReadBufferFromLock(startTS uint64, keys ...[]byte) []*kv if lock.StartTS != startTS { continue } - val := getValueFromLock(&lock) - if len(val) > 0 { - pairs = append(pairs, &kvrpcpb.KvPair{ - Key: key, - Value: safeCopy(val), - }) + switch lock.Op { + case uint8(kvrpcpb.Op_Put): + val := getValueFromLock(&lock) + if val == nil { + panic("Op_Put has a nil value") + } + pairs = append( + pairs, &kvrpcpb.KvPair{ + Key: key, + Value: safeCopy(val), + }, + ) + case uint8(kvrpcpb.Op_Del): + pairs = append( + pairs, &kvrpcpb.KvPair{ + Key: key, + Value: []byte{}, + }, + ) + default: + panic("unexpected op. Optimistic txn should only contain put and delete locks") } } return pairs diff --git a/pkg/table/tables/index.go b/pkg/table/tables/index.go index 56a0c511bda1e..bde5c0e9624d4 100644 --- a/pkg/table/tables/index.go +++ b/pkg/table/tables/index.go @@ -196,6 +196,12 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu } } + if txn.IsPipelined() { + // For pipelined DML, disable the untouched optimization to avoid extra RPCs for MemBuffer.Get(). + // TODO: optimize this. + opt.Untouched = false + } + if opt.Untouched { txn, err1 := sctx.Txn(true) if err1 != nil { @@ -278,10 +284,10 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu if c.tblInfo.TempTableType != model.TempTableNone { // Always check key for temporary table because it does not write to TiKV value, err = txn.Get(ctx, key) - } else if sctx.GetSessionVars().LazyCheckKeyNotExists() && !keyIsTempIdxKey { + } else if (txn.IsPipelined() || sctx.GetSessionVars().LazyCheckKeyNotExists()) && !keyIsTempIdxKey { // For temp index keys, we can't get the temp value from memory buffer, even if the lazy check is enabled. // Otherwise, it may cause the temp index value to be overwritten, leading to data inconsistency. - value, err = txn.GetMemBuffer().Get(ctx, key) + value, err = txn.GetMemBuffer().GetLocal(ctx, key) } else { value, err = txn.Get(ctx, key) } @@ -298,7 +304,7 @@ func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValu // The index key value is not found or deleted. if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().Delete) { val := idxVal - lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil + lazyCheck := (txn.IsPipelined() || sctx.GetSessionVars().LazyCheckKeyNotExists()) && err != nil if keyIsTempIdxKey { tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} val = tempVal.Encode(value) diff --git a/pkg/table/tables/tables.go b/pkg/table/tables/tables.go index 7fb206b3c19cd..c388641c0b3e0 100644 --- a/pkg/table/tables/tables.go +++ b/pkg/table/tables/tables.go @@ -629,7 +629,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx table.MutateContext colSize[col.ID] = int64(newLen - oldLen) } sessVars.TxnCtx.UpdateDeltaForTable(t.physicalTableID, 0, 1, colSize) - return memBuffer.MayFlush() + return nil } func (t *TableCommon) rebuildIndices(ctx table.MutateContext, txn kv.Transaction, h kv.Handle, touched []bool, oldData []types.Datum, newData []types.Datum, opts ...table.CreateIdxOptFunc) error { @@ -1016,9 +1016,9 @@ func (t *TableCommon) AddRecord(sctx table.MutateContext, r []types.Datum, opts if t.meta.TempTableType != model.TempTableNone { // Always check key for temporary table because it does not write to TiKV _, err = txn.Get(ctx, key) - } else if sctx.GetSessionVars().LazyCheckKeyNotExists() { + } else if sctx.GetSessionVars().LazyCheckKeyNotExists() || txn.IsPipelined() { var v []byte - v, err = txn.GetMemBuffer().Get(ctx, key) + v, err = txn.GetMemBuffer().GetLocal(ctx, key) if err != nil { setPresume = true } @@ -1123,9 +1123,6 @@ func (t *TableCommon) AddRecord(sctx table.MutateContext, r []types.Datum, opts colSize[col.ID] = int64(size) - 1 } sessVars.TxnCtx.UpdateDeltaForTable(t.physicalTableID, 1, 1, colSize) - if err = memBuffer.MayFlush(); err != nil { - return nil, err - } return recordID, nil } @@ -1403,10 +1400,7 @@ func (t *TableCommon) RemoveRecord(ctx table.MutateContext, h kv.Handle, r []typ colSize[col.ID] = -int64(size - 1) } ctx.GetSessionVars().TxnCtx.UpdateDeltaForTable(t.physicalTableID, -1, 1, colSize) - if err != nil { - return err - } - return memBuffer.MayFlush() + return err } func (t *TableCommon) addInsertBinlog(ctx table.MutateContext, h kv.Handle, row []types.Datum, colIDs []int64) error { diff --git a/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go b/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go index fa61e23f70fa4..5920e41ac4f58 100644 --- a/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go +++ b/tests/realtikvtest/pipelineddmltest/pipelineddml_test.go @@ -304,27 +304,104 @@ func TestPipelinedDMLInsertRPC(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t1 (a int, b int, unique index idx(b))") - res := tk.MustQuery("explain analyze insert ignore into t1 values (1,1), (2,2), (3,3), (4,4), (5,5)") - explain := getExplainResult(res) - require.Regexp(t, "Insert.* check_insert: {total_time: .* rpc:{BatchGet:{num_rpc:1, total_time:.*}}}.*", explain) - // Test with bulk dml. - tk.MustExec("set session tidb_dml_type = bulk") - // Test normal insert. - tk.MustExec("truncate table t1") - res = tk.MustQuery("explain analyze insert into t1 values (1,1), (2,2), (3,3), (4,4), (5,5)") - explain = getExplainResult(res) - // TODO: try to optimize the rpc count, when use bulk dml, insert will send many BufferBatchGet rpc. - require.Regexp(t, "Insert.* insert:.*, rpc:{BufferBatchGet:{num_rpc:10, total_time:.*}}.*", explain) - // Test insert ignore. - tk.MustExec("truncate table t1") - res = tk.MustQuery("explain analyze insert ignore into t1 values (1,1), (2,2), (3,3), (4,4), (5,5)") - explain = getExplainResult(res) - // TODO: try to optimize the rpc count, when use bulk dml, insert ignore will send 5 BufferBatchGet and 1 BatchGet rpc. - // but without bulk dml, it will only use 1 BatchGet rpcs. - require.Regexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BufferBatchGet:{num_rpc:5, total_time:.*}}}.*", explain) - require.Regexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BatchGet:{num_rpc:1, total_time:.*}}}.*", explain) + tables := []string{ + "create table _t1 (a int, b int)", // no index, auto generated handle + "create table _t1 (a int primary key, b int)", // clustered handle + "create table _t1 (a int, b int, unique index idx(b))", // unique index + "create table _t1 (a int primary key, b int, unique index idx(b))", // clustered handle + unique index + } + for _, table := range tables { + for _, tableSource := range []bool{true, false} { + hasPK := strings.Contains(table, "primary key") + hasUK := strings.Contains(table, "unique index") + tk.MustExec("drop table if exists t1, _t1") + var values string + if tableSource { + tk.MustExec("create table t1 (a int, b int)") + tk.MustExec("insert into t1 values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)") + values = " select * from t1" + } else { + values = " values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)" + } + tk.MustExec(table) + + // Test with standard dml. + tk.MustExec("set session tidb_dml_type = standard") + res := tk.MustQuery("explain analyze insert ignore into _t1" + values) + explain := getExplainResult(res) + if hasPK || hasUK { + require.Regexp(t, "Insert.* check_insert: {total_time: .* rpc:{BatchGet:{num_rpc:1, total_time:.*}}}.*", explain) + require.Regexp(t, "Insert.* check_insert: {total_time: .* rpc:{BatchGet:{num_rpc:1, total_time:.*}}}.*", explain) + } else { + require.NotRegexp(t, "Insert.* check_insert: {total_time: .* rpc:{BatchGet:{num_rpc:.*, total_time:.*}}}.*", explain) + require.NotRegexp(t, "Insert.* check_insert: {total_time: .* rpc:{BatchGet:{num_rpc:.*, total_time:.*}}}.*", explain) + } + + // Test with bulk dml. + tk.MustExec("set session tidb_dml_type = bulk") + + // Test normal insert. + tk.MustExec("truncate table _t1") + res = tk.MustQuery("explain analyze insert into _t1" + values) + explain = getExplainResult(res) + // no BufferBatchGet with lazy check + require.NotRegexp(t, "Insert.* insert:.*, rpc:{BufferBatchGet:{num_rpc:.*, total_time:.*}}.*", explain) + + // Test insert ignore. + tk.MustExec("truncate table _t1") + res = tk.MustQuery("explain analyze insert ignore into _t1" + values) + explain = getExplainResult(res) + // with bulk dml, it will 1 BatchGet and 1 BufferBatchGet RPCs in prefetch phase. + // but no need to prefetch when there are no unique indexes and no primary key. + if hasPK || hasUK { + require.Regexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BufferBatchGet:{num_rpc:1, total_time:.*}}}.*", explain) + require.Regexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BatchGet:{num_rpc:1, total_time:.*}}}.*", explain) + } else { + require.NotRegexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BufferBatchGet:{num_rpc:.*, total_time:.*}}}.*", explain) + require.NotRegexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BatchGet:{num_rpc:.*, total_time:.*}}}.*", explain) + } + // The ignore takes effect now. + res = tk.MustQuery("explain analyze insert ignore into _t1" + values) + explain = getExplainResult(res) + if hasPK || hasUK { + require.Regexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BufferBatchGet:{num_rpc:1, total_time:.*}}}.*", explain) + require.Regexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BatchGet:{num_rpc:1, total_time:.*}}}.*", explain) + } else { + require.NotRegexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BufferBatchGet:{num_rpc:.*, total_time:.*}}}.*", explain) + require.NotRegexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BatchGet:{num_rpc:.*, total_time:.*}}}.*", explain) + } + + // Test insert on duplicate key update. + res = tk.MustQuery("explain analyze insert into _t1 " + values + " on duplicate key update a = values(a) + 5") + explain = getExplainResult(res) + if hasUK { + // 2 rounds checks are required: read handles by unique keys and read rows by handles + require.Regexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BufferBatchGet:{num_rpc:2, total_time:.*}}}.*", explain) + require.Regexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BatchGet:{num_rpc:2, total_time:.*}}}.*", explain) + } else if hasPK { + require.Regexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BufferBatchGet:{num_rpc:1, total_time:.*}}}.*", explain) + require.Regexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BatchGet:{num_rpc:1, total_time:.*}}}.*", explain) + } else { + require.NotRegexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BufferBatchGet:{num_rpc:.*, total_time:.*}}}.*", explain) + require.NotRegexp(t, "Insert.* check_insert: {total_time: .* rpc:{.*BatchGet:{num_rpc:.*, total_time:.*}}}.*", explain) + } + + // Test replace into. replace checks in the same way with insert on duplicate key update. + // However, the format of explain result is little different. + res = tk.MustQuery("explain analyze replace into _t1" + values) + explain = getExplainResult(res) + if hasUK { + require.Regexp(t, "Insert.* rpc: {.*BufferBatchGet:{num_rpc:2, total_time:.*}}.*", explain) + require.Regexp(t, "Insert.* rpc: {.*BatchGet:{num_rpc:2, total_time:.*}}.*", explain) + } else if hasPK { + require.Regexp(t, "Insert.* rpc: {.*BufferBatchGet:{num_rpc:1, total_time:.*}}.*", explain) + require.Regexp(t, "Insert.* rpc: {.*BatchGet:{num_rpc:1, total_time:.*}}.*", explain) + } else { + require.NotRegexp(t, "Insert.* rpc: {.*BufferBatchGet:{num_rpc:.*, total_time:.*}}.*", explain) + require.NotRegexp(t, "Insert.* rpc: {.*BatchGet:{num_rpc:.*, total_time:.*}}.*", explain) + } + } + } } func getExplainResult(res *testkit.Result) string { @@ -503,7 +580,6 @@ func TestPipelinedDMLInsertMemoryTest(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - tk.MustExec("drop table if exists t1, _t1") tk.MustExec("create table t1 (a int, b int, c varchar(128), unique index idx(b))") tk.MustExec("create table _t1 like t1") @@ -562,7 +638,6 @@ func TestPipelinedDMLDisableRetry(t *testing.T) { tk2 := testkit.NewTestKit(t, store) tk1.MustExec("use test") tk2.MustExec("use test") - tk1.MustExec("drop table if exists t1") tk1.MustExec("create table t1(a int primary key, b int)") tk1.MustExec("insert into t1 values(1, 1)") @@ -580,6 +655,30 @@ func TestPipelinedDMLDisableRetry(t *testing.T) { require.True(t, kv.ErrWriteConflict.Equal(err), fmt.Sprintf("error: %s", err)) } +func TestReplaceRowCheck(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk1.MustExec("drop table if exists t1, _t1") + tk1.MustExec("create table t1(a int, b int)") + tk1.MustExec("create table _t1(a int primary key, b int)") + tk1.MustExec("insert into t1 values(1, 1), (2, 2), (1, 2), (2, 1)") + tk1.MustExec("set session tidb_dml_type = bulk") + tk1.MustExec("replace into _t1 select * from t1") + tk1.MustExec("admin check table _t1") + tk1.MustQuery("select a from _t1").Sort().Check(testkit.Rows("1", "2")) + + tk1.MustExec("truncate table _t1") + tk1.MustExec("insert ignore into _t1 select * from t1") + tk1.MustExec("admin check table _t1") + tk1.MustQuery("select a from _t1").Sort().Check(testkit.Rows("1", "2")) + + tk1.MustExec("truncate table _t1") + tk1.MustExec("insert into _t1 select * from t1 on duplicate key update b = values(b)") + tk1.MustExec("admin check table _t1") + tk1.MustQuery("select a from _t1").Sort().Check(testkit.Rows("1", "2")) +} + func TestDuplicateKeyErrorMessage(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) @@ -594,3 +693,48 @@ func TestDuplicateKeyErrorMessage(t *testing.T) { require.Error(t, err2) require.Equal(t, err1.Error(), err2.Error()) } + +func TestInsertIgnoreOnDuplicateKeyUpdate(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_dml_type = bulk") + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(a int, b int, unique index u1(a, b), unique index u2(a))") + tk.MustExec("insert into t1 values(0, 0), (1, 1)") + tk.MustExecToErr("insert ignore into t1 values (0, 2) ,(1, 3) on duplicate key update b = 5, a = 0") + // if the statement execute successful, the following check should pass. + // tk.MustQuery("select * from t1").Sort().Check(testkit.Rows("0 5", "1 1")) +} + +func TestConflictError(t *testing.T) { + require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushKeys", `return(10)`)) + require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBMinFlushSize", `return(128)`)) + require.Nil(t, failpoint.Enable("tikvclient/pipelinedMemDBForceFlushSizeThreshold", `return(128)`)) + defer func() { + require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushKeys")) + require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBMinFlushSize")) + require.Nil(t, failpoint.Disable("tikvclient/pipelinedMemDBForceFlushSizeThreshold")) + }() + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, _t1") + tk.MustExec("create table t1(a int primary key, b int)") + tk.MustExec("create table _t1(a int primary key, b int)") + var insert strings.Builder + insert.WriteString("insert into t1 values") + for i := 0; i < 100; i++ { + if i > 0 { + insert.WriteString(",") + } + insert.WriteString(fmt.Sprintf("(%d, %d)", i, i)) + } + tk.MustExec(insert.String()) + tk.MustExec("set session tidb_dml_type = bulk") + tk.MustExec("insert into _t1 select * from t1") + tk.MustExec("set session tidb_max_chunk_size = 32") + err := tk.ExecToErr("insert into _t1 select * from t1 order by rand()") + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), "Duplicate entry"), err.Error()) +}