From afbef28e56009cbad856006bb5ea952459c64a44 Mon Sep 17 00:00:00 2001 From: hehechen Date: Tue, 27 Dec 2022 19:06:16 +0800 Subject: [PATCH] executor: TiFlash supports stale read (#40048) close pingcap/tidb#40047 --- DEPS.bzl | 16 +++++-- distsql/distsql.go | 4 +- executor/adapter.go | 4 ++ executor/builder.go | 5 +-- executor/executor_required_rows_test.go | 2 + executor/mpp_gather.go | 38 ++++++++++++----- executor/tiflashtest/BUILD.bazel | 1 + executor/tiflashtest/tiflash_test.go | 16 +++---- go.mod | 5 ++- go.sum | 10 +++-- kv/mpp.go | 34 ++++++++++----- planner/core/fragment.go | 57 +++++++++++++++++-------- planner/core/planbuilder.go | 4 -- sessionctx/stmtctx/stmtctx.go | 7 +++ sessionctx/variable/session.go | 21 --------- sessionctx/variable/session_test.go | 13 ++---- store/copr/mpp.go | 18 +++++--- 17 files changed, 149 insertions(+), 106 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index f82cf027ab500..686aef1f3c028 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2923,8 +2923,8 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sum = "h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ=", - version = "v0.0.0-20221130022225-6c56ac56fe5f", + sum = "h1:v0Z0nC0knwWHn3e9br8EMNfLBB14QDULn142UGjiTMQ=", + version = "v0.0.0-20221213093948-9ccc6beaf0aa", ) go_repository( name = "com_github_pingcap_log", @@ -3523,12 +3523,20 @@ def go_deps(): sum = "h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=", version = "v0.0.0-20181126055449-889f96f722a2", ) + go_repository( + name = "com_github_tiancaiamao_gp", + build_file_proto_mode = "disable", + importpath = "github.com/tiancaiamao/gp", + sum = "h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo=", + version = "v0.0.0-20221221095600-1a473d1f9b4b", + ) + go_repository( name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=", - version = "v2.0.3", + sum = "h1:m6glgBGCIds9QURbk8Mn+8mjLKDcv6nWrNwYh92fydQ=", + version = "v2.0.4-0.20221226080148-018c59dbd837", ) go_repository( name = "com_github_tikv_pd_client", diff --git a/distsql/distsql.go b/distsql/distsql.go index 1f18d084eb0b2..3c65205f3d331 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -38,11 +38,11 @@ import ( ) // DispatchMPPTasks dispatches all tasks and returns an iterator. -func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64) (SelectResult, error) { +func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64, mppQueryID kv.MPPQueryID) (SelectResult, error) { ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx) _, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash] ctx = SetTiFlashMaxThreadsInContext(ctx, sctx) - resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs) + resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs, mppQueryID) if resp == nil { return nil, errors.New("client returns nil response") } diff --git a/executor/adapter.go b/executor/adapter.go index 5e12cce1ccc69..5195ee4d1dae5 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -1407,6 +1407,10 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo sessVars.DurationParse = 0 // Clean the stale read flag when statement execution finish sessVars.StmtCtx.IsStaleness = false + // Clean the MPP query info + sessVars.StmtCtx.MPPQueryInfo.QueryID.Store(0) + sessVars.StmtCtx.MPPQueryInfo.QueryTS.Store(0) + sessVars.StmtCtx.MPPQueryInfo.AllocatedMPPTaskID.Store(0) if sessVars.StmtCtx.ReadFromTableCache { metrics.ReadFromTableCacheCounter.Inc() diff --git a/executor/builder.go b/executor/builder.go index d4270397eecd0..c6baaf0932849 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3402,6 +3402,7 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe is: b.is, originalPlan: v.GetTablePlan(), startTS: startTs, + mppQueryID: kv.MPPQueryID{QueryTs: getMPPQueryTS(b.ctx), LocalQueryID: getMPPQueryID(b.ctx), ServerID: domain.GetDomain(b.ctx).ServerID()}, } return gather } @@ -3409,10 +3410,6 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe // buildTableReader builds a table reader executor. It first build a no range table reader, // and then update it ranges from table scan plan. func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) Executor { - if v.StoreType != kv.TiKV && b.isStaleness { - b.err = errors.New("stale requests require tikv backend") - return nil - } failpoint.Inject("checkUseMPP", func(val failpoint.Value) { if !b.ctx.GetSessionVars().InRestrictedSQL && val.(bool) != useMPPExecution(b.ctx, v) { if val.(bool) { diff --git a/executor/executor_required_rows_test.go b/executor/executor_required_rows_test.go index cbca9914b5bc2..c3ac762050d24 100644 --- a/executor/executor_required_rows_test.go +++ b/executor/executor_required_rows_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/parser/ast" @@ -211,6 +212,7 @@ func defaultCtx() sessionctx.Context { ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, ctx.GetSessionVars().MemQuotaQuery) ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(-1, -1) ctx.GetSessionVars().SnapshotTS = uint64(1) + domain.BindDomain(ctx, domain.NewMockDomain()) return ctx } diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index 42526774dbdd5..eee019bd0de47 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -16,6 +16,7 @@ package executor import ( "context" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -38,6 +39,18 @@ func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader return ok } +func getMPPQueryID(ctx sessionctx.Context) uint64 { + mppQueryInfo := &ctx.GetSessionVars().StmtCtx.MPPQueryInfo + mppQueryInfo.QueryID.CompareAndSwap(0, plannercore.AllocMPPQueryID()) + return mppQueryInfo.QueryID.Load() +} + +func getMPPQueryTS(ctx sessionctx.Context) uint64 { + mppQueryInfo := &ctx.GetSessionVars().StmtCtx.MPPQueryInfo + mppQueryInfo.QueryTS.CompareAndSwap(0, uint64(time.Now().UnixNano())) + return mppQueryInfo.QueryTS.Load() +} + // MPPGather dispatch MPP tasks and read data from root tasks. type MPPGather struct { // following fields are construct needed @@ -45,6 +58,7 @@ type MPPGather struct { is infoschema.InfoSchema originalPlan plannercore.PhysicalPlan startTS uint64 + mppQueryID kv.MPPQueryID mppReqs []*kv.MPPDispatchRequest @@ -78,17 +92,19 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error { return errors.Trace(err) } logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs), - zap.Int64("ID", mppTask.ID), zap.String("address", mppTask.Meta.GetAddress()), + zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", mppTask.MppQueryID.LocalQueryID), + zap.Uint64("ServerID", mppTask.MppQueryID.ServerID), zap.String("address", mppTask.Meta.GetAddress()), zap.String("plan", plannercore.ToString(pf.ExchangeSender))) req := &kv.MPPDispatchRequest{ - Data: pbData, - Meta: mppTask.Meta, - ID: mppTask.ID, - IsRoot: pf.IsRoot, - Timeout: 10, - SchemaVar: e.is.SchemaMetaVersion(), - StartTs: e.startTS, - State: kv.MppTaskReady, + Data: pbData, + Meta: mppTask.Meta, + ID: mppTask.ID, + IsRoot: pf.IsRoot, + Timeout: 10, + SchemaVar: e.is.SchemaMetaVersion(), + StartTs: e.startTS, + MppQueryID: mppTask.MppQueryID, + State: kv.MppTaskReady, } e.mppReqs = append(e.mppReqs, req) } @@ -109,7 +125,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) { // TODO: Move the construct tasks logic to planner, so we can see the explain results. sender := e.originalPlan.(*plannercore.PhysicalExchangeSender) planIDs := collectPlanIDS(e.originalPlan, nil) - frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.is) + frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, e.mppQueryID, sender, e.is) if err != nil { return errors.Trace(err) } @@ -124,7 +140,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) { failpoint.Return(errors.Errorf("The number of tasks is not right, expect %d tasks but actually there are %d tasks", val.(int), len(e.mppReqs))) } }) - e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS) + e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS, e.mppQueryID) if err != nil { return errors.Trace(err) } diff --git a/executor/tiflashtest/BUILD.bazel b/executor/tiflashtest/BUILD.bazel index 5223fa79cc2d9..c7678e569522d 100644 --- a/executor/tiflashtest/BUILD.bazel +++ b/executor/tiflashtest/BUILD.bazel @@ -16,6 +16,7 @@ go_test( "//executor", "//meta/autoid", "//parser/terror", + "//planner/core", "//store/mockstore", "//store/mockstore/unistore", "//testkit", diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index 222e1bfdaff0b..9898ecd89a8eb 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/parser/terror" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/pingcap/tidb/testkit" @@ -267,14 +268,9 @@ func TestMppExecution(t *testing.T) { tk.MustExec("begin") tk.MustQuery("select count(*) from ( select * from t2 group by a, b) A group by A.b").Check(testkit.Rows("3")) tk.MustQuery("select count(*) from t1 where t1.a+100 > ( select count(*) from t2 where t1.a=t2.a and t1.b=t2.b) group by t1.b").Check(testkit.Rows("4")) - txn, err := tk.Session().Txn(true) - require.NoError(t, err) - ts := txn.StartTS() - taskID := tk.Session().GetSessionVars().AllocMPPTaskID(ts) - require.Equal(t, int64(6), taskID) - tk.MustExec("commit") - taskID = tk.Session().GetSessionVars().AllocMPPTaskID(ts + 1) + taskID := plannercore.AllocMPPTaskID(tk.Session()) require.Equal(t, int64(1), taskID) + tk.MustExec("commit") failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(3)`) // all the data is related to one store, so there are three tasks. @@ -1043,7 +1039,7 @@ func TestTiFlashPartitionTableBroadcastJoin(t *testing.T) { } } -func TestForbidTiflashDuringStaleRead(t *testing.T) { +func TestTiflashSupportStaleRead(t *testing.T) { store := testkit.CreateMockStore(t, withMockTiFlash(2)) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -1075,8 +1071,8 @@ func TestForbidTiflashDuringStaleRead(t *testing.T) { fmt.Fprintf(resBuff, "%s\n", row) } res = resBuff.String() - require.NotContains(t, res, "tiflash") - require.Contains(t, res, "tikv") + require.Contains(t, res, "tiflash") + require.NotContains(t, res, "tikv") } func TestForbidTiFlashIfExtraPhysTableIDIsNeeded(t *testing.T) { diff --git a/go.mod b/go.mod index 93ec1bbee740b..b7375dddbe3dc 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278 github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f + github.com/pingcap/kvproto v0.0.0-20221213093948-9ccc6beaf0aa github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e @@ -89,7 +89,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.3 + github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837 github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 @@ -219,6 +219,7 @@ require ( github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect github.com/tklauser/numcpus v0.4.0 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect diff --git a/go.sum b/go.sum index 95ecd5822a589..2a6c8d4fc308e 100644 --- a/go.sum +++ b/go.sum @@ -781,8 +781,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ= -github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20221213093948-9ccc6beaf0aa h1:v0Z0nC0knwWHn3e9br8EMNfLBB14QDULn142UGjiTMQ= +github.com/pingcap/kvproto v0.0.0-20221213093948-9ccc6beaf0aa/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= @@ -932,8 +932,10 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= -github.com/tikv/client-go/v2 v2.0.3 h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s= -github.com/tikv/client-go/v2 v2.0.3/go.mod h1:MDT4J9LzgS7Bj1DnEq6Gk/puy6mp8TgUC92zGEVVLLg= +github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo= +github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= +github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837 h1:m6glgBGCIds9QURbk8Mn+8mjLKDcv6nWrNwYh92fydQ= +github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837/go.mod h1:ptS8K+VBrEH2gIS3JxaiFSSLfDDyuS2xcdLozOtBWBw= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= diff --git a/kv/mpp.go b/kv/mpp.go index 2e398af595650..32e9186506067 100644 --- a/kv/mpp.go +++ b/kv/mpp.go @@ -27,12 +27,20 @@ type MPPTaskMeta interface { GetAddress() string } +// MPPQueryID means the global unique id of a mpp query. +type MPPQueryID struct { + QueryTs uint64 // timestamp of query execution, used for TiFlash minTSO schedule + LocalQueryID uint64 // unique mpp query id in local tidb memory. + ServerID uint64 +} + // MPPTask means the minimum execution unit of a mpp computation job. type MPPTask struct { - Meta MPPTaskMeta // on which store this task will execute - ID int64 // mppTaskID - StartTs uint64 - TableID int64 // physical table id + Meta MPPTaskMeta // on which store this task will execute + ID int64 // mppTaskID + StartTs uint64 + MppQueryID MPPQueryID + TableID int64 // physical table id PartitionTableIDs []int64 } @@ -40,8 +48,11 @@ type MPPTask struct { // ToPB generates the pb structure. func (t *MPPTask) ToPB() *mpp.TaskMeta { meta := &mpp.TaskMeta{ - StartTs: t.StartTs, - TaskId: t.ID, + StartTs: t.StartTs, + QueryTs: t.MppQueryID.QueryTs, + LocalQueryId: t.MppQueryID.LocalQueryID, + ServerId: t.MppQueryID.ServerID, + TaskId: t.ID, } if t.ID != -1 { meta.Address = t.Meta.GetAddress() @@ -70,10 +81,11 @@ type MPPDispatchRequest struct { IsRoot bool // root task returns data to tidb directly. Timeout uint64 // If task is assigned but doesn't receive a connect request during timeout, the task should be destroyed. // SchemaVer is for any schema-ful storage (like tiflash) to validate schema correctness if necessary. - SchemaVar int64 - StartTs uint64 - ID int64 // identify a single task - State MppTaskStates + SchemaVar int64 + StartTs uint64 + MppQueryID MPPQueryID + ID int64 // identify a single task + State MppTaskStates } // MPPClient accepts and processes mpp requests. @@ -83,7 +95,7 @@ type MPPClient interface { ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error) // DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data. - DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64) Response + DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID) Response } // MPPBuildTasksRequest request the stores allocation for a mpp plan fragment. diff --git a/planner/core/fragment.go b/planner/core/fragment.go index 917f4392d9f9e..7e86696ccc4d6 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -16,6 +16,7 @@ package core import ( "context" + "sync/atomic" "time" "unsafe" @@ -79,29 +80,45 @@ type tasksAndFrags struct { } type mppTaskGenerator struct { - ctx sessionctx.Context - startTS uint64 - is infoschema.InfoSchema - frags []*Fragment - cache map[int]tasksAndFrags + ctx sessionctx.Context + startTS uint64 + mppQueryID kv.MPPQueryID + is infoschema.InfoSchema + frags []*Fragment + cache map[int]tasksAndFrags } // GenerateRootMPPTasks generate all mpp tasks and return root ones. -func GenerateRootMPPTasks(ctx sessionctx.Context, startTs uint64, sender *PhysicalExchangeSender, is infoschema.InfoSchema) ([]*Fragment, error) { +func GenerateRootMPPTasks(ctx sessionctx.Context, startTs uint64, mppQueryID kv.MPPQueryID, sender *PhysicalExchangeSender, is infoschema.InfoSchema) ([]*Fragment, error) { g := &mppTaskGenerator{ - ctx: ctx, - startTS: startTs, - is: is, - cache: make(map[int]tasksAndFrags), + ctx: ctx, + startTS: startTs, + mppQueryID: mppQueryID, + is: is, + cache: make(map[int]tasksAndFrags), } return g.generateMPPTasks(sender) } +// AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id when the query finished. +func AllocMPPTaskID(ctx sessionctx.Context) int64 { + mppQueryInfo := &ctx.GetSessionVars().StmtCtx.MPPQueryInfo + return mppQueryInfo.AllocatedMPPTaskID.Add(1) +} + +var mppQueryID uint64 = 1 + +// AllocMPPQueryID allocates local query id for mpp queries. +func AllocMPPQueryID() uint64 { + return atomic.AddUint64(&mppQueryID, 1) +} + func (e *mppTaskGenerator) generateMPPTasks(s *PhysicalExchangeSender) ([]*Fragment, error) { logutil.BgLogger().Info("Mpp will generate tasks", zap.String("plan", ToString(s))) tidbTask := &kv.MPPTask{ - StartTs: e.startTS, - ID: -1, + StartTs: e.startTS, + MppQueryID: e.mppQueryID, + ID: -1, } _, frags, err := e.generateMPPTasksForExchangeSender(s) if err != nil { @@ -132,10 +149,11 @@ func (e *mppTaskGenerator) constructMPPTasksByChildrenTasks(tasks []*kv.MPPTask) _, ok := addressMap[addr] if !ok { mppTask := &kv.MPPTask{ - Meta: &mppAddr{addr: addr}, - ID: e.ctx.GetSessionVars().AllocMPPTaskID(e.startTS), - StartTs: e.startTS, - TableID: -1, + Meta: &mppAddr{addr: addr}, + ID: AllocMPPTaskID(e.ctx), + MppQueryID: e.mppQueryID, + StartTs: e.startTS, + TableID: -1, } newTasks = append(newTasks, mppTask) addressMap[addr] = struct{}{} @@ -385,7 +403,12 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic tasks := make([]*kv.MPPTask, 0, len(metas)) for _, meta := range metas { - task := &kv.MPPTask{Meta: meta, ID: e.ctx.GetSessionVars().AllocMPPTaskID(e.startTS), StartTs: e.startTS, TableID: ts.Table.ID, PartitionTableIDs: allPartitionsIDs} + task := &kv.MPPTask{Meta: meta, + ID: AllocMPPTaskID(e.ctx), + StartTs: e.startTS, + MppQueryID: e.mppQueryID, + TableID: ts.Table.ID, + PartitionTableIDs: allPartitionsIDs} tasks = append(tasks, task) } return tasks, nil diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index d2c19c9b181a8..60a3eba28292f 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1429,10 +1429,6 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i } available = removeIgnoredPaths(available, ignored, tblInfo) - if staleread.IsStmtStaleness(ctx) { - // skip tiflash if the statement is for stale read until tiflash support stale read - available = removeTiflashDuringStaleRead(available) - } // If we have got "FORCE" or "USE" index hint but got no available index, // we have to use table scan. diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 9a49c24851c08..799f2f8bdeea8 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -380,6 +380,13 @@ type StatementContext struct { HasFKCascades bool } + // MPPQueryInfo stores some id and timestamp of current MPP query statement. + MPPQueryInfo struct { + QueryID atomic2.Uint64 + QueryTS atomic2.Uint64 + AllocatedMPPTaskID atomic2.Int64 + } + // TableStats stores the visited runtime table stats by table id during query TableStats map[int64]interface{} // useChunkAlloc indicates whether statement use chunk alloc diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index d3d30d9c7bcc5..977b34b1578a9 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -671,13 +671,6 @@ type SessionVars struct { value string } - // mppTaskIDAllocator is used to allocate mpp task id for a session. - mppTaskIDAllocator struct { - mu sync.Mutex - lastTS uint64 - taskID int64 - } - // Status stands for the session status. e.g. in transaction or not, auto commit is on or off, and so on. Status uint16 @@ -1451,20 +1444,6 @@ func (s *SessionVars) InitStatementContext() *stmtctx.StatementContext { return sc } -// AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's -// startTs is different. -func (s *SessionVars) AllocMPPTaskID(startTS uint64) int64 { - s.mppTaskIDAllocator.mu.Lock() - defer s.mppTaskIDAllocator.mu.Unlock() - if s.mppTaskIDAllocator.lastTS == startTS { - s.mppTaskIDAllocator.taskID++ - return s.mppTaskIDAllocator.taskID - } - s.mppTaskIDAllocator.lastTS = startTS - s.mppTaskIDAllocator.taskID = 1 - return 1 -} - // IsMPPAllowed returns whether mpp execution is allowed. func (s *SessionVars) IsMPPAllowed() bool { return s.allowMPPExecution diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index b624005b7b512..5df5e187088d0 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -130,16 +130,9 @@ func TestSession(t *testing.T) { func TestAllocMPPID(t *testing.T) { ctx := mock.NewContext() - - seVar := ctx.GetSessionVars() - require.NotNil(t, seVar) - - require.Equal(t, int64(1), seVar.AllocMPPTaskID(1)) - require.Equal(t, int64(2), seVar.AllocMPPTaskID(1)) - require.Equal(t, int64(3), seVar.AllocMPPTaskID(1)) - require.Equal(t, int64(1), seVar.AllocMPPTaskID(2)) - require.Equal(t, int64(2), seVar.AllocMPPTaskID(2)) - require.Equal(t, int64(3), seVar.AllocMPPTaskID(2)) + require.Equal(t, int64(1), plannercore.AllocMPPTaskID(ctx)) + require.Equal(t, int64(2), plannercore.AllocMPPTaskID(ctx)) + require.Equal(t, int64(3), plannercore.AllocMPPTaskID(ctx)) } func TestSlowLogFormat(t *testing.T) { diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 02b66478958d4..c3225c40d1455 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -143,7 +143,8 @@ type mppIterator struct { tasks []*kv.MPPDispatchRequest finishCh chan struct{} - startTs uint64 + startTs uint64 + mppQueryID kv.MPPQueryID respChan chan *mppResponse @@ -220,7 +221,8 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req } // meta for current task. - taskMeta := &mpp.TaskMeta{StartTs: req.StartTs, TaskId: req.ID, Address: req.Meta.GetAddress()} + taskMeta := &mpp.TaskMeta{StartTs: req.StartTs, QueryTs: req.MppQueryID.QueryTs, LocalQueryId: req.MppQueryID.LocalQueryID, TaskId: req.ID, ServerId: req.MppQueryID.ServerID, + Address: req.Meta.GetAddress()} mppReq := &mpp.DispatchTaskRequest{ Meta: taskMeta, @@ -334,7 +336,7 @@ func (m *mppIterator) cancelMppTasks() { m.mu.Lock() defer m.mu.Unlock() killReq := &mpp.CancelTaskRequest{ - Meta: &mpp.TaskMeta{StartTs: m.startTs}, + Meta: &mpp.TaskMeta{StartTs: m.startTs, QueryTs: m.mppQueryID.QueryTs, LocalQueryId: m.mppQueryID.LocalQueryID, ServerId: m.mppQueryID.ServerID}, } disaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash @@ -374,8 +376,11 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques connReq := &mpp.EstablishMPPConnectionRequest{ SenderMeta: taskMeta, ReceiverMeta: &mpp.TaskMeta{ - StartTs: req.StartTs, - TaskId: -1, + StartTs: req.StartTs, + QueryTs: m.mppQueryID.QueryTs, + LocalQueryId: m.mppQueryID.LocalQueryID, + ServerId: m.mppQueryID.ServerID, + TaskId: -1, }, } @@ -528,7 +533,7 @@ func (m *mppIterator) Next(ctx context.Context) (kv.ResultSubset, error) { } // DispatchMPPTasks dispatches all the mpp task and waits for the responses. -func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, dispatchReqs []*kv.MPPDispatchRequest, needTriggerFallback bool, startTs uint64) kv.Response { +func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, dispatchReqs []*kv.MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID kv.MPPQueryID) kv.Response { vars := variables.(*tikv.Variables) ctxChild, cancelFunc := context.WithCancel(ctx) iter := &mppIterator{ @@ -539,6 +544,7 @@ func (c *MPPClient) DispatchMPPTasks(ctx context.Context, variables interface{}, cancelFunc: cancelFunc, respChan: make(chan *mppResponse), startTs: startTs, + mppQueryID: mppQueryID, vars: vars, needTriggerFallback: needTriggerFallback, enableCollectExecutionInfo: config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load(),