Skip to content

Commit

Permalink
This is an automated cherry-pick of #49384
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
zyguan authored and ti-chi-bot committed Jan 3, 2024
1 parent 9c3d4e3 commit 1a91492
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 5 deletions.
27 changes: 22 additions & 5 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,22 +1024,39 @@ func (s *session) isTxnRetryableError(err error) bool {
return kv.IsTxnRetryableError(err) || domain.ErrInfoSchemaChanged.Equal(err)
}

func isEndTxnStmt(stmt ast.StmtNode, vars *variable.SessionVars) (bool, error) {
switch n := stmt.(type) {
case *ast.RollbackStmt, *ast.CommitStmt:
return true, nil
case *ast.ExecuteStmt:
ps, err := plannercore.GetPreparedStmt(n, vars)
if err != nil {
return false, err
}
return isEndTxnStmt(ps.PreparedAst.Stmt, vars)
}
return false, nil
}

func (s *session) checkTxnAborted(stmt sqlexec.Statement) error {
<<<<<<< HEAD:session/session.go
var err error
if atomic.LoadUint32(&s.GetSessionVars().TxnCtx.LockExpire) > 0 {
err = kv.ErrLockExpire
} else {
=======
if atomic.LoadUint32(&s.GetSessionVars().TxnCtx.LockExpire) == 0 {
>>>>>>> 37c7326c73e (session: allow end aborted txn via binary protocal (#49384)):pkg/session/session.go
return nil
}
// If the transaction is aborted, the following statements do not need to execute, except `commit` and `rollback`,
// because they are used to finish the aborted transaction.
if _, ok := stmt.(*executor.ExecStmt).StmtNode.(*ast.CommitStmt); ok {
return nil
}
if _, ok := stmt.(*executor.ExecStmt).StmtNode.(*ast.RollbackStmt); ok {
if ok, err := isEndTxnStmt(stmt.(*executor.ExecStmt).StmtNode, s.sessionVars); err == nil && ok {
return nil
} else if err != nil {
return err
}
return err
return kv.ErrLockExpire
}

func (s *session) retry(ctx context.Context, maxCnt uint) (err error) {
Expand Down
47 changes: 47 additions & 0 deletions tests/realtikvtest/pessimistictest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "pessimistictest_test",
timeout = "long",
srcs = [
"main_test.go",
"pessimistic_test.go",
],
flaky = True,
deps = [
"//pkg/config",
"//pkg/domain",
"//pkg/errno",
"//pkg/expression",
"//pkg/kv",
"//pkg/parser",
"//pkg/parser/auth",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/planner/core",
"//pkg/session",
"//pkg/sessionctx/variable",
"//pkg/sessiontxn",
"//pkg/store/driver/error",
"//pkg/store/gcworker",
"//pkg/store/mockstore",
"//pkg/tablecodec",
"//pkg/testkit",
"//pkg/testkit/external",
"//pkg/types",
"//pkg/util/codec",
"//pkg/util/dbterror/exeerrors",
"//pkg/util/deadlockhistory",
"//pkg/util/sqlkiller",
"//tests/realtikvtest",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//txnkv/transaction",
],
)
106 changes: 106 additions & 0 deletions tests/realtikvtest/pessimistictest/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/deadlockhistory"
"github.com/stretchr/testify/require"
tikvcfg "github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -3426,3 +3427,108 @@ func TestBatchResolveLocks(t *testing.T) {
// Check data consistency
tk.MustQuery("select * from t2 order by id").Check(testkit.Rows("1 1", "2 3", "3 13", "4 14", "5 15"))
}
<<<<<<< HEAD
=======

func TestIssue42937(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk3 := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_enable_async_commit = 0")
tk.MustExec("set @@tidb_enable_1pc = 0")
tk2.MustExec("use test")
tk2.MustExec("set @@tidb_enable_async_commit = 0")
tk2.MustExec("set @@tidb_enable_1pc = 0")
tk3.MustExec("use test")

tk.MustExec("create table t(id int primary key, v int unique)")
tk.MustExec("insert into t values (1, 10), (2, 20), (3, 30), (4, 40)")
tk.MustExec("create table t2 (id int primary key, v int)")
tk.MustExec("insert into t2 values (1, 1), (2, 2)")

require.NoError(t, failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`))
require.NoError(t, failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", "return"))
defer func() {
require.NoError(t, failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback"))
require.NoError(t, failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit"))
}()

tk.MustExec("begin pessimistic")
tk2.MustExec("begin pessimistic")
tk2.MustExec("update t set v = v + 1 where id = 2")

require.NoError(t, failpoint.Enable("tikvclient/twoPCShortLockTTL", "return"))
require.NoError(t, failpoint.Enable("tikvclient/shortPessimisticLockTTL", "return"))
ch := mustExecAsync(tk, `
with
c as (select /*+ MERGE() */ v from t2 where id = 1 or id = 2)
update c join t on c.v = t.id set t.v = t.v + 1`)
mustTimeout(t, ch, time.Millisecond*100)

tk3.MustExec("update t2 set v = v + 2")
tk2.MustExec("commit")
<-ch

tk.MustQuery("select id, v from t order by id").Check(testkit.Rows("1 10", "2 20", "3 31", "4 41"))
tk.MustExec("update t set v = 0 where id = 1")

require.NoError(t, failpoint.Enable("tikvclient/beforeCommit", `1*return("delay(500)")`))
defer func() {
require.NoError(t, failpoint.Disable("tikvclient/beforeCommit"))
}()

ch = mustExecAsync(tk, "commit")
mustTimeout(t, ch, time.Millisecond*100)

require.NoError(t, failpoint.Disable("tikvclient/twoPCShortLockTTL"))
require.NoError(t, failpoint.Disable("tikvclient/shortPessimisticLockTTL"))

tk2.MustExec("insert into t values (5, 11)")

mustRecv(t, ch)
tk.MustExec("admin check table t")
tk.MustQuery("select * from t order by id").Check(testkit.Rows(
"1 0",
"2 21",
"3 31",
"4 41",
"5 11",
))
}

func TestEndTxnOnLockExpire(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("prepare ps_commit from 'commit'")
tk.MustExec("prepare ps_rollback from 'rollback'")

defer setLockTTL(300).restore()
defer tikvcfg.UpdateGlobal(func(conf *tikvcfg.Config) {
conf.MaxTxnTTL = 500
})()

for _, tt := range []struct {
name string
endTxnSQL string
}{
{"CommitTxt", "commit"},
{"CommitBin", "execute ps_commit"},
{"RollbackTxt", "rollback"},
{"RollbackBin", "execute ps_rollback"},
} {
t.Run(tt.name, func(t *testing.T) {
tk.Exec("delete from t")
tk.Exec("insert into t values (1, 1)")
tk.Exec("begin pessimistic")
tk.Exec("update t set b = 10 where a = 1")
time.Sleep(time.Second)
tk.MustContainErrMsg("select * from t", "TTL manager has timed out")
tk.MustExec(tt.endTxnSQL)
})
}
}
>>>>>>> 37c7326c73e (session: allow end aborted txn via binary protocal (#49384))

0 comments on commit 1a91492

Please sign in to comment.