Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: handle the corner case that temp index is not exist but the normal index is exist (#51862) #56092

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -1817,3 +1818,42 @@ func TestMDLTruncateTable(t *testing.T) {
require.True(t, timetk2.After(timeMain))
require.True(t, timetk3.After(timeMain))
}

func TestInsertIgnore(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a smallint(6) DEFAULT '-13202', b varchar(221) NOT NULL DEFAULT 'duplicatevalue', " +
"c tinyint(1) NOT NULL DEFAULT '0', PRIMARY KEY (c, b));")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

d := dom.DDL()
originalCallback := d.GetHook()
defer d.SetHook(originalCallback)
callback := &ddl.TestDDLCallback{}

onJobUpdatedExportedFunc := func(job *model.Job) {
switch job.SchemaState {
case model.StateDeleteOnly:
_, err := tk1.Exec("INSERT INTO t VALUES (-18585,'aaa',1), (-18585,'0',1), (-18585,'1',1), (-18585,'duplicatevalue',1);")
assert.NoError(t, err)
case model.StateWriteReorganization:
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
assert.NoError(t, err)
idx := tbl.Meta().FindIndexByName("idx")
if idx.BackfillState == model.BackfillStateReadyToMerge {
_, err := tk1.Exec("insert ignore into `t` values ( 234,'duplicatevalue',-2028 );")
assert.NoError(t, err)
return
}
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(callback)

tk.MustExec("alter table t add unique index idx(b);")
tk.MustExec("admin check table t;")
}
87 changes: 54 additions & 33 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,29 @@
return false
}

func (e *InsertValues) handleDuplicateKey(ctx context.Context, txn kv.Transaction, uk *keyValueWithDupInfo, replace bool, r toBeCheckedRow) (bool, error) {
if !replace {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
// lock duplicated row key on insert-ignore
txnCtx.AddUnchangedRowKey(uk.newKey)
}
return true, nil
}
_, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle)
if err != nil {
return false, err
}

Check warning on line 1112 in executor/insert_common.go

View check run for this annotation

Codecov / codecov/patch

executor/insert_common.go#L1111-L1112

Added lines #L1111 - L1112 were not covered by tests
if handle == nil {
return false, nil
}

Check warning on line 1115 in executor/insert_common.go

View check run for this annotation

Codecov / codecov/patch

executor/insert_common.go#L1114-L1115

Added lines #L1114 - L1115 were not covered by tests
_, err = e.removeRow(ctx, txn, handle, r, true)
if err != nil {
return false, err
}

Check warning on line 1119 in executor/insert_common.go

View check run for this annotation

Codecov / codecov/patch

executor/insert_common.go#L1118-L1119

Added lines #L1118 - L1119 were not covered by tests
return false, nil
}

// batchCheckAndInsert checks rows with duplicate errors.
// All duplicate rows will be ignored and appended as duplicate warnings.
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum,
Expand Down Expand Up @@ -1177,53 +1200,51 @@
return err
}
}
skip := false
rowInserted := false
for _, uk := range r.uniqueKeys {
_, err := txn.Get(ctx, uk.newKey)
if err != nil && !kv.IsErrNotFound(err) {
return err
}

Check warning on line 1208 in executor/insert_common.go

View check run for this annotation

Codecov / codecov/patch

executor/insert_common.go#L1207-L1208

Added lines #L1207 - L1208 were not covered by tests
if err == nil {
if replace {
_, handle, err := tables.FetchDuplicatedHandle(
ctx,
uk.newKey,
true,
txn,
e.Table.Meta().ID,
uk.commonHandle,
)
if err != nil {
return err
}
if handle == nil {
continue
}
_, err = e.removeRow(ctx, txn, handle, r, true)
rowInserted, err = e.handleDuplicateKey(ctx, txn, uk, replace, r)
if err != nil {
return err
}

Check warning on line 1213 in executor/insert_common.go

View check run for this annotation

Codecov / codecov/patch

executor/insert_common.go#L1212-L1213

Added lines #L1212 - L1213 were not covered by tests
if rowInserted {
break
}
continue
}
if tablecodec.IsTempIndexKey(uk.newKey) {
tablecodec.TempIndexKey2IndexKey(uk.newKey)
_, err = txn.Get(ctx, uk.newKey)
if err != nil && !kv.IsErrNotFound(err) {
return err
}

Check warning on line 1224 in executor/insert_common.go

View check run for this annotation

Codecov / codecov/patch

executor/insert_common.go#L1223-L1224

Added lines #L1223 - L1224 were not covered by tests
if err == nil {
rowInserted, err = e.handleDuplicateKey(ctx, txn, uk, replace, r)
if err != nil {
return err
}
} else {
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
// lock duplicated unique key on insert-ignore
txnCtx.AddUnchangedRowKey(uk.newKey)
if rowInserted {
break
}
skip = true
break
}
} else if !kv.IsErrNotFound(err) {
return err
}
}

if rowInserted {
continue
}

// If row was checked with no duplicate keys,
// it should be add to values map for the further row check.
// There may be duplicate keys inside the insert statement.
if !skip {
e.ctx.GetSessionVars().StmtCtx.AddCopiedRows(1)
err = addRecord(ctx, rows[i])
if err != nil {
return err
}
e.ctx.GetSessionVars().StmtCtx.AddCopiedRows(1)
err = addRecord(ctx, rows[i])
if err != nil {
return err

Check warning on line 1247 in executor/insert_common.go

View check run for this annotation

Codecov / codecov/patch

executor/insert_common.go#L1247

Added line #L1247 was not covered by tests
}
}
if e.stats != nil {
Expand Down
Loading