Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, distsql: Support forbiding cross txnScope query all *Reader Executor #21650

Merged
merged 31 commits into from
Dec 21, 2020
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions ddl/placement_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ add placement policy
func (s *testDBSuite1) TestGlobalTxnState(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
defer tk.MustExec("drop table if exists t1")

tk.MustExec(`create table t1 (c int)
PARTITION BY RANGE (c) (
Expand Down Expand Up @@ -635,3 +637,89 @@ alter placement policy
testFunc(testcase.name, testcase.hook, testcase.expectErr)
}
}

func (s *testDBSuite1) TestCrossDCQuery(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

tk.MustExec(`create table t1 (c int)
PARTITION BY RANGE (c) (
PARTITION p0 VALUES LESS THAN (6),
PARTITION p1 VALUES LESS THAN (11)
);`)

bundles := make(map[string]*placement.Bundle)
is := s.dom.InfoSchema()
is.MockBundles(bundles)

tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
setBundle := func(parName, dc string) {
pid, err := tables.FindPartitionByName(tb.Meta(), parName)
c.Assert(err, IsNil)
groupID := placement.GroupID(pid)
oldBundle := &placement.Bundle{
ID: groupID,
Rules: []*placement.Rule{
{
GroupID: groupID,
Role: placement.Leader,
Count: 1,
LabelConstraints: []placement.LabelConstraint{
{
Key: placement.DCLabelKey,
Op: placement.In,
Values: []string{dc},
},
},
},
},
}
bundles[groupID] = placement.BuildPlacementCopyBundle(oldBundle, pid)
}
setBundle("p0", "sh")
setBundle("p1", "bj")

testcases := []struct {
name string
txnScope string
sql string
expectErr error
}{
{
name: "cross dc read to sh by holding bj",
txnScope: "bj",
sql: "select * from t1 where c < 6",
expectErr: fmt.Errorf(".*can not be read by.*"),
},
{
name: "cross dc read to global by holding bj",
txnScope: "bj",
sql: "select * from t1",
expectErr: fmt.Errorf(".*can not be read by.*"),
},
{
name: "read sh dc by holding sh",
txnScope: "sh",
sql: "select * from t1 where c < 6",
expectErr: nil,
},
{
name: "read sh dc by holding global",
txnScope: "global",
sql: "select * from t1 where c < 6",
expectErr: nil,
},
}
for _, testcase := range testcases {
_, err = tk.Exec(fmt.Sprintf("set @@txn_scope='%v'", testcase.txnScope))
c.Assert(err, IsNil)
err = tk.ExecToErr(testcase.sql)
if testcase.expectErr != nil {
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, ".*can not be read by.*")
} else {
c.Assert(err, IsNil)
}
}
}
63 changes: 62 additions & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
package distsql

import (
"fmt"
"math"

"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
Expand All @@ -34,11 +37,19 @@ import (
// It is called before we issue a kv request by "Select".
type RequestBuilder struct {
kv.Request
err error
sv *variable.SessionVars
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
// txnScope indicates the value of txn_scope
txnScope string
bundles map[string]*placement.Bundle
err error
}

// Build builds a "kv.Request".
func (builder *RequestBuilder) Build() (*kv.Request, error) {
err := builder.verifyTxnScope()
if err != nil {
builder.err = err
}
return &builder.Request, builder.err
}

Expand Down Expand Up @@ -212,6 +223,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
// Concurrency may be set to 1 by SetDAGRequest
builder.Request.Concurrency = sv.DistSQLScanConcurrency()
}
builder.sv = sv
builder.Request.IsolationLevel = builder.getIsolationLevel()
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.TaskID = sv.StmtCtx.TaskID
Expand All @@ -225,6 +237,12 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
return builder
}

// SetTxnScope sets "TxnScope" flag for "kv.Request".
func (builder *RequestBuilder) SetTxnScope(scope string) *RequestBuilder {
builder.txnScope = scope
return builder
}

// SetStreaming sets "Streaming" flag for "kv.Request".
func (builder *RequestBuilder) SetStreaming(streaming bool) *RequestBuilder {
builder.Request.Streaming = streaming
Expand All @@ -245,6 +263,49 @@ func (builder *RequestBuilder) SetTiDBServerID(serverID uint64) *RequestBuilder
return builder
}

// SetFromInfoSchema sets the following fields from infoSchema:
// "bundles"
func (builder *RequestBuilder) SetFromInfoSchema(is infoschema.InfoSchema) *RequestBuilder {
if is == nil {
return builder
}
builder.bundles = is.RuleBundles()
return builder
}

func (builder *RequestBuilder) verifyTxnScope() error {
if builder.txnScope == "" {
builder.txnScope = oracle.GlobalTxnScope
}
if builder.txnScope == oracle.GlobalTxnScope || len(builder.bundles) < 1 {
return nil
}
visitTableID := make(map[int64]struct{})
for _, keyRange := range builder.Request.KeyRanges {
tableID := tablecodec.DecodeTableID(keyRange.StartKey)
if tableID > 0 {
visitTableID[tableID] = struct{}{}
} else {
return fmt.Errorf("requestBuilder can't decode tableID from keyRange")
winoros marked this conversation as resolved.
Show resolved Hide resolved
}
}

for tableID := range visitTableID {
bundle, ok := builder.bundles[placement.GroupID(tableID)]
if !ok {
continue
}
dc, ok := placement.GetLeaderDCByBundle(bundle, placement.DCLabelKey)
if !ok {
continue
}
if dc != builder.txnScope {
return fmt.Errorf("table %v can not be read by %v txn_scope", tableID, builder.txnScope)
}
}
return nil
}

// TableHandleRangesToKVRanges convert table handle ranges to "KeyRanges" for multiple tables.
func TableHandleRangesToKVRanges(sc *stmtctx.StatementContext, tid []int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) {
if !isCommonHandle {
Expand Down
15 changes: 15 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -3351,13 +3352,20 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T
if err != nil {
return nil, err
}
txn, err := e.ctx.Txn(false)
if err != nil {
return nil, err
}
kvReq, err := reqBuilderWithRange.
SetDAGRequest(e.dagPB).
SetStartTS(startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
// FIXME: add unit test to cover this case
SetTxnScope(extractTxnScope(txn)).
SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)).
Build()
if err != nil {
return nil, err
Expand Down Expand Up @@ -3962,3 +3970,10 @@ func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *
}
return e
}

func extractTxnScope(txn kv.Transaction) string {
if txn == nil || txn.GetUnionStore() == nil {
return oracle.GlobalTxnScope
}
return txn.GetUnionStore().GetOption(kv.TxnScope).(string)
}
15 changes: 15 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -324,6 +325,10 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)

e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
txn, err := e.ctx.Txn(false)
if err != nil {
return err
}
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
Expand All @@ -333,6 +338,9 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.memTracker).
// FIXME: add unit test to cover this case
SetTxnScope(extractTxnScope(txn)).
SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)).
Build()
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -512,6 +520,10 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
}
tracker := memory.NewTracker(memory.LabelForIndexWorker, -1)
tracker.AttachTo(e.memTracker)
txn, err := e.ctx.Txn(false)
if err != nil {
return err
}
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
Expand All @@ -521,6 +533,9 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
SetStreaming(e.indexStreaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(tracker).
// FIXME: add unit test to cover this case
SetTxnScope(extractTxnScope(txn)).
SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)).
Build()
if err != nil {
return err
Expand Down
9 changes: 8 additions & 1 deletion executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -197,7 +198,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
collExec := true
e.dagPBs[workID].CollectExecutionSummaries = &collExec
}

txn, err := e.ctx.Txn(false)
if err != nil {
return err
}
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(keyRange).
SetDAGRequest(e.dagPBs[workID]).
Expand All @@ -207,6 +211,9 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
SetStreaming(e.partialStreamings[workID]).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.memTracker).
// FIXME: add unit test to cover this case
SetTxnScope(extractTxnScope(txn)).
SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)).
Build()
if err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -222,6 +223,10 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
} else {
reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback)
}
txn, err := e.ctx.Txn(false)
if err != nil {
return nil, err
}
kvReq, err := reqBuilder.
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
Expand All @@ -232,6 +237,8 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
SetAllowBatchCop(e.batchCop).
SetTxnScope(extractTxnScope(txn)).
SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)).
Build()
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,9 @@ func GetInfoSchemaBySessionVars(sessVar *variable.SessionVars) InfoSchema {
is = snap.(InfoSchema)
logutil.BgLogger().Info("use snapshot schema", zap.Uint64("conn", sessVar.ConnectionID), zap.Int64("schemaVersion", is.SchemaMetaVersion()))
} else {
if sessVar.TxnCtx == nil || sessVar.TxnCtx.InfoSchema == nil {
return nil
}
is = sessVar.TxnCtx.InfoSchema.(InfoSchema)
}
return is
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2420,7 +2420,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error {
}

// no need to get txn from txnFutureCh since txn should init with startTs
txn, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, startTS)
txn, err := s.store.BeginWithStartTS(s.GetSessionVars().TxnScope, startTS)
if err != nil {
return err
}
Expand Down
Loading