Skip to content

Commit

Permalink
distsql: use a more accurate type of the context of distsql (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKeao authored and imalasong committed Apr 1, 2024
1 parent c1a10c6 commit 88dc449
Show file tree
Hide file tree
Showing 32 changed files with 463 additions and 244 deletions.
6 changes: 3 additions & 3 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func buildTableScan(ctx context.Context, c *copr.CopContextBase, startTS uint64,
SetStartTS(startTS).
SetKeyRanges([]kv.KeyRange{{StartKey: start, EndKey: end}}).
SetKeepOrder(true).
SetFromSessionVars(c.SessionContext.GetSessionVars()).
SetFromSessionVars(c.SessionContext.GetDistSQLCtx()).
SetFromInfoSchema(c.SessionContext.GetDomainInfoSchema()).
SetConcurrency(1).
Build()
Expand All @@ -284,7 +284,7 @@ func buildTableScan(ctx context.Context, c *copr.CopContextBase, startTS uint64,
if err != nil {
return nil, err
}
return distsql.Select(ctx, c.SessionContext, kvReq, c.FieldTypes)
return distsql.Select(ctx, c.SessionContext.GetDistSQLCtx(), kvReq, c.FieldTypes)
}

func fetchTableScanResult(
Expand Down Expand Up @@ -353,7 +353,7 @@ func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*m
return nil, err
}
dagReq.Executors = append(dagReq.Executors, execPB)
distsql.SetEncodeType(sCtx, dagReq)
distsql.SetEncodeType(sCtx.GetDistSQLCtx(), dagReq)
return dagReq, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func buildDescTableScanDAG(ctx sessionctx.Context, tbl table.PhysicalTable, hand
tblScanExec := constructDescTableScanPB(tbl.GetPhysicalID(), tbl.Meta(), handleCols)
dagReq.Executors = append(dagReq.Executors, tblScanExec)
dagReq.Executors = append(dagReq.Executors, constructLimitPB(limit))
distsql.SetEncodeType(ctx, dagReq)
distsql.SetEncodeType(ctx.GetDistSQLCtx(), dagReq)
return dagReq, nil
}

Expand Down Expand Up @@ -528,7 +528,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.
} else {
ranges = ranger.FullIntRange(false)
}
builder = b.SetHandleRanges(sctx.GetSessionVars().StmtCtx, tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges)
builder = b.SetHandleRanges(sctx.GetDistSQLCtx(), tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges)
builder.SetDAGRequest(dagPB).
SetStartTS(startTS).
SetKeepOrder(true).
Expand All @@ -547,7 +547,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.
return nil, errors.Trace(err)
}

result, err := distsql.Select(ctx.ddlJobCtx, sctx, kvReq, getColumnsTypes(handleCols))
result, err := distsql.Select(ctx.ddlJobCtx, sctx.GetDistSQLCtx(), kvReq, getColumnsTypes(handleCols))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/planner/util",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/store/copr",
"//pkg/tablecodec",
Expand All @@ -36,6 +35,7 @@ go_library(
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/ranger",
"//pkg/util/topsql/stmtstats",
"//pkg/util/tracing",
"//pkg/util/trxevents",
"@com_github_pingcap_errors//:errors",
Expand All @@ -58,6 +58,7 @@ go_test(
timeout = "short",
srcs = [
"bench_test.go",
"context_test.go",
"distsql_test.go",
"main_test.go",
"request_builder_test.go",
Expand All @@ -68,7 +69,9 @@ go_test(
race = "on",
shard_count = 26,
deps = [
"//pkg/distsql/context",
"//pkg/domain/resourcegroup",
"//pkg/errctx",
"//pkg/kv",
"//pkg/parser/charset",
"//pkg/parser/model",
Expand All @@ -84,6 +87,7 @@ go_test(
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/collate",
"//pkg/util/context",
"//pkg/util/disk",
"//pkg/util/execdetails",
"//pkg/util/memory",
Expand Down
12 changes: 11 additions & 1 deletion pkg/distsql/context/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/distsql/context",
visibility = ["//visibility:public"],
deps = [
"//pkg/domain/resourcegroup",
"//pkg/errctx",
"//pkg/kv",
"//pkg/sessionctx/variable",
"//pkg/parser/mysql",
"//pkg/util/execdetails",
"//pkg/util/memory",
"//pkg/util/nocopy",
"//pkg/util/sqlkiller",
"//pkg/util/tiflash",
"//pkg/util/topsql/stmtstats",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//tikvrpc",
],
)
76 changes: 69 additions & 7 deletions pkg/distsql/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,76 @@
package context

import (
"time"

"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/nocopy"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/pingcap/tidb/pkg/util/tiflash"
"github.com/pingcap/tidb/pkg/util/topsql/stmtstats"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikvrpc"
)

// DistSQLContext gives the interface
type DistSQLContext interface {
// GetSessionVars gets the session variables.
GetSessionVars() *variable.SessionVars
// GetClient gets a kv.Client.
GetClient() kv.Client
// DistSQLContext provides all information needed by using functions in `distsql`
type DistSQLContext struct {
// TODO: provide a `Clone` to copy this struct.
// The life cycle of some fields in this struct cannot be extended. For example, some fields will be recycled before
// the next execution. They'll need to be handled specially.
_ nocopy.NoCopy

AppendWarning func(error)
InRestrictedSQL bool
Client kv.Client

EnabledRateLimitAction bool
EnableChunkRPC bool
OriginalSQL string
KVVars *tikvstore.Variables
KvExecCounter *stmtstats.KvExecCounter
SessionMemTracker *memory.Tracker

Location *time.Location
RuntimeStatsColl *execdetails.RuntimeStatsColl
SQLKiller *sqlkiller.SQLKiller
ErrCtx errctx.Context

// TiFlash related configurations
TiFlashReplicaRead tiflash.ReplicaRead
TiFlashMaxThreads int64
TiFlashMaxBytesBeforeExternalJoin int64
TiFlashMaxBytesBeforeExternalGroupBy int64
TiFlashMaxBytesBeforeExternalSort int64
TiFlashMaxQueryMemoryPerNode int64
TiFlashQuerySpillRatio float64

DistSQLConcurrency int
ReplicaReadType kv.ReplicaReadType
WeakConsistency bool
RCCheckTS bool
NotFillCache bool
TaskID uint64
Priority mysql.PriorityEnum
ResourceGroupTagger tikvrpc.ResourceGroupTagger
EnablePaging bool
MinPagingSize int
MaxPagingSize int
RequestSourceType string
ExplicitRequestSourceType string
StoreBatchSize int
ResourceGroupName string
LoadBasedReplicaReadThreshold time.Duration
RunawayChecker *resourcegroup.RunawayChecker
TiKVClientReadTimeout uint64

ReplicaClosestReadThreshold int64
ConnectionID uint64
SessionAlias string

ExecDetails *execdetails.SyncExecDetails
}
46 changes: 46 additions & 0 deletions pkg/distsql/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package distsql

import (
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
contextutil "github.com/pingcap/tidb/pkg/util/context"
)

// NewDistSQLContextForTest creates a new dist sql context for test
func NewDistSQLContextForTest() *distsqlctx.DistSQLContext {
return &distsqlctx.DistSQLContext{
AppendWarning: func(error) {},
TiFlashMaxThreads: variable.DefTiFlashMaxThreads,
TiFlashMaxBytesBeforeExternalJoin: variable.DefTiFlashMaxBytesBeforeExternalJoin,
TiFlashMaxBytesBeforeExternalGroupBy: variable.DefTiFlashMaxBytesBeforeExternalGroupBy,
TiFlashMaxBytesBeforeExternalSort: variable.DefTiFlashMaxBytesBeforeExternalSort,
TiFlashMaxQueryMemoryPerNode: variable.DefTiFlashMemQuotaQueryPerNode,
TiFlashQuerySpillRatio: variable.DefTiFlashQuerySpillRatio,

DistSQLConcurrency: variable.DefDistSQLScanConcurrency,
MinPagingSize: variable.DefMinPagingSize,
MaxPagingSize: variable.DefMaxPagingSize,
ResourceGroupName: "default",

ErrCtx: errctx.NewContext(contextutil.IgnoreWarn),
}
}

// DefaultDistSQLContext is an empty distsql context used for testing, which doesn't have a client and cannot be used to
// send requests.
var DefaultDistSQLContext = NewDistSQLContextForTest()
Loading

0 comments on commit 88dc449

Please sign in to comment.