From 1a91492f858dcb251bdc8c9bd04bd0ec9103110e Mon Sep 17 00:00:00 2001 From: zyguan Date: Wed, 3 Jan 2024 13:30:32 +0800 Subject: [PATCH] This is an automated cherry-pick of #49384 Signed-off-by: ti-chi-bot --- session/session.go | 27 ++++- .../realtikvtest/pessimistictest/BUILD.bazel | 47 ++++++++ .../pessimistictest/pessimistic_test.go | 106 ++++++++++++++++++ 3 files changed, 175 insertions(+), 5 deletions(-) create mode 100644 tests/realtikvtest/pessimistictest/BUILD.bazel diff --git a/session/session.go b/session/session.go index 6a04cfc0eb780..5c81cdaec5a9a 100644 --- a/session/session.go +++ b/session/session.go @@ -1024,22 +1024,39 @@ func (s *session) isTxnRetryableError(err error) bool { return kv.IsTxnRetryableError(err) || domain.ErrInfoSchemaChanged.Equal(err) } +func isEndTxnStmt(stmt ast.StmtNode, vars *variable.SessionVars) (bool, error) { + switch n := stmt.(type) { + case *ast.RollbackStmt, *ast.CommitStmt: + return true, nil + case *ast.ExecuteStmt: + ps, err := plannercore.GetPreparedStmt(n, vars) + if err != nil { + return false, err + } + return isEndTxnStmt(ps.PreparedAst.Stmt, vars) + } + return false, nil +} + func (s *session) checkTxnAborted(stmt sqlexec.Statement) error { +<<<<<<< HEAD:session/session.go var err error if atomic.LoadUint32(&s.GetSessionVars().TxnCtx.LockExpire) > 0 { err = kv.ErrLockExpire } else { +======= + if atomic.LoadUint32(&s.GetSessionVars().TxnCtx.LockExpire) == 0 { +>>>>>>> 37c7326c73e (session: allow end aborted txn via binary protocal (#49384)):pkg/session/session.go return nil } // If the transaction is aborted, the following statements do not need to execute, except `commit` and `rollback`, // because they are used to finish the aborted transaction. - if _, ok := stmt.(*executor.ExecStmt).StmtNode.(*ast.CommitStmt); ok { - return nil - } - if _, ok := stmt.(*executor.ExecStmt).StmtNode.(*ast.RollbackStmt); ok { + if ok, err := isEndTxnStmt(stmt.(*executor.ExecStmt).StmtNode, s.sessionVars); err == nil && ok { return nil + } else if err != nil { + return err } - return err + return kv.ErrLockExpire } func (s *session) retry(ctx context.Context, maxCnt uint) (err error) { diff --git a/tests/realtikvtest/pessimistictest/BUILD.bazel b/tests/realtikvtest/pessimistictest/BUILD.bazel new file mode 100644 index 0000000000000..e8fe76ff36556 --- /dev/null +++ b/tests/realtikvtest/pessimistictest/BUILD.bazel @@ -0,0 +1,47 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "pessimistictest_test", + timeout = "long", + srcs = [ + "main_test.go", + "pessimistic_test.go", + ], + flaky = True, + deps = [ + "//pkg/config", + "//pkg/domain", + "//pkg/errno", + "//pkg/expression", + "//pkg/kv", + "//pkg/parser", + "//pkg/parser/auth", + "//pkg/parser/model", + "//pkg/parser/mysql", + "//pkg/parser/terror", + "//pkg/planner/core", + "//pkg/session", + "//pkg/sessionctx/variable", + "//pkg/sessiontxn", + "//pkg/store/driver/error", + "//pkg/store/gcworker", + "//pkg/store/mockstore", + "//pkg/tablecodec", + "//pkg/testkit", + "//pkg/testkit/external", + "//pkg/types", + "//pkg/util/codec", + "//pkg/util/dbterror/exeerrors", + "//pkg/util/deadlockhistory", + "//pkg/util/sqlkiller", + "//tests/realtikvtest", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//config", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//testutils", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//txnkv/transaction", + ], +) diff --git a/tests/realtikvtest/pessimistictest/pessimistic_test.go b/tests/realtikvtest/pessimistictest/pessimistic_test.go index 9b91712f3e657..a8c4bba4d6a36 100644 --- a/tests/realtikvtest/pessimistictest/pessimistic_test.go +++ b/tests/realtikvtest/pessimistictest/pessimistic_test.go @@ -47,6 +47,7 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/deadlockhistory" "github.com/stretchr/testify/require" + tikvcfg "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" @@ -3426,3 +3427,108 @@ func TestBatchResolveLocks(t *testing.T) { // Check data consistency tk.MustQuery("select * from t2 order by id").Check(testkit.Rows("1 1", "2 3", "3 13", "4 14", "5 15")) } +<<<<<<< HEAD +======= + +func TestIssue42937(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + tk3 := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_enable_async_commit = 0") + tk.MustExec("set @@tidb_enable_1pc = 0") + tk2.MustExec("use test") + tk2.MustExec("set @@tidb_enable_async_commit = 0") + tk2.MustExec("set @@tidb_enable_1pc = 0") + tk3.MustExec("use test") + + tk.MustExec("create table t(id int primary key, v int unique)") + tk.MustExec("insert into t values (1, 10), (2, 20), (3, 30), (4, 40)") + tk.MustExec("create table t2 (id int primary key, v int)") + tk.MustExec("insert into t2 values (1, 1), (2, 2)") + + require.NoError(t, failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`)) + require.NoError(t, failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", "return")) + defer func() { + require.NoError(t, failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback")) + require.NoError(t, failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit")) + }() + + tk.MustExec("begin pessimistic") + tk2.MustExec("begin pessimistic") + tk2.MustExec("update t set v = v + 1 where id = 2") + + require.NoError(t, failpoint.Enable("tikvclient/twoPCShortLockTTL", "return")) + require.NoError(t, failpoint.Enable("tikvclient/shortPessimisticLockTTL", "return")) + ch := mustExecAsync(tk, ` + with + c as (select /*+ MERGE() */ v from t2 where id = 1 or id = 2) + update c join t on c.v = t.id set t.v = t.v + 1`) + mustTimeout(t, ch, time.Millisecond*100) + + tk3.MustExec("update t2 set v = v + 2") + tk2.MustExec("commit") + <-ch + + tk.MustQuery("select id, v from t order by id").Check(testkit.Rows("1 10", "2 20", "3 31", "4 41")) + tk.MustExec("update t set v = 0 where id = 1") + + require.NoError(t, failpoint.Enable("tikvclient/beforeCommit", `1*return("delay(500)")`)) + defer func() { + require.NoError(t, failpoint.Disable("tikvclient/beforeCommit")) + }() + + ch = mustExecAsync(tk, "commit") + mustTimeout(t, ch, time.Millisecond*100) + + require.NoError(t, failpoint.Disable("tikvclient/twoPCShortLockTTL")) + require.NoError(t, failpoint.Disable("tikvclient/shortPessimisticLockTTL")) + + tk2.MustExec("insert into t values (5, 11)") + + mustRecv(t, ch) + tk.MustExec("admin check table t") + tk.MustQuery("select * from t order by id").Check(testkit.Rows( + "1 0", + "2 21", + "3 31", + "4 41", + "5 11", + )) +} + +func TestEndTxnOnLockExpire(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("prepare ps_commit from 'commit'") + tk.MustExec("prepare ps_rollback from 'rollback'") + + defer setLockTTL(300).restore() + defer tikvcfg.UpdateGlobal(func(conf *tikvcfg.Config) { + conf.MaxTxnTTL = 500 + })() + + for _, tt := range []struct { + name string + endTxnSQL string + }{ + {"CommitTxt", "commit"}, + {"CommitBin", "execute ps_commit"}, + {"RollbackTxt", "rollback"}, + {"RollbackBin", "execute ps_rollback"}, + } { + t.Run(tt.name, func(t *testing.T) { + tk.Exec("delete from t") + tk.Exec("insert into t values (1, 1)") + tk.Exec("begin pessimistic") + tk.Exec("update t set b = 10 where a = 1") + time.Sleep(time.Second) + tk.MustContainErrMsg("select * from t", "TTL manager has timed out") + tk.MustExec(tt.endTxnSQL) + }) + } +} +>>>>>>> 37c7326c73e (session: allow end aborted txn via binary protocal (#49384))