Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, distsql: Support forbiding cross txnScope query all *Reader Executor #21650

Merged
merged 31 commits into from
Dec 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ddl/placement_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,8 @@ add placement policy
func (s *testDBSuite1) TestGlobalTxnState(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
defer tk.MustExec("drop table if exists t1")

tk.Se.GetSessionVars().EnableAlterPlacement = true
defer func() {
Expand Down
62 changes: 61 additions & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@
package distsql

import (
"fmt"
"math"

"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
Expand All @@ -34,11 +38,18 @@ import (
// It is called before we issue a kv request by "Select".
type RequestBuilder struct {
kv.Request
err error
// txnScope indicates the value of txn_scope
txnScope string
bundles map[string]*placement.Bundle
err error
}

// Build builds a "kv.Request".
func (builder *RequestBuilder) Build() (*kv.Request, error) {
err := builder.verifyTxnScope()
if err != nil {
builder.err = err
}
return &builder.Request, builder.err
}

Expand Down Expand Up @@ -225,6 +236,12 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
return builder
}

// SetTxnScope sets "TxnScope" flag for "kv.Request".
func (builder *RequestBuilder) SetTxnScope(scope string) *RequestBuilder {
builder.txnScope = scope
return builder
}

// SetStreaming sets "Streaming" flag for "kv.Request".
func (builder *RequestBuilder) SetStreaming(streaming bool) *RequestBuilder {
builder.Request.Streaming = streaming
Expand All @@ -245,6 +262,49 @@ func (builder *RequestBuilder) SetTiDBServerID(serverID uint64) *RequestBuilder
return builder
}

// SetFromInfoSchema sets the following fields from infoSchema:
// "bundles"
func (builder *RequestBuilder) SetFromInfoSchema(is infoschema.InfoSchema) *RequestBuilder {
if is == nil {
return builder
}
builder.bundles = is.RuleBundles()
return builder
}

func (builder *RequestBuilder) verifyTxnScope() error {
if builder.txnScope == "" {
builder.txnScope = oracle.GlobalTxnScope
}
if builder.txnScope == oracle.GlobalTxnScope || len(builder.bundles) < 1 {
return nil
}
visitTableID := make(map[int64]struct{})
for _, keyRange := range builder.Request.KeyRanges {
tableID := tablecodec.DecodeTableID(keyRange.StartKey)
if tableID > 0 {
visitTableID[tableID] = struct{}{}
} else {
return errors.New("requestBuilder can't decode tableID from keyRange")
}
}

for tableID := range visitTableID {
bundle, ok := builder.bundles[placement.GroupID(tableID)]
if !ok {
continue
}
dc, ok := placement.GetLeaderDCByBundle(bundle, placement.DCLabelKey)
if !ok {
continue
}
if dc != builder.txnScope {
return fmt.Errorf("table %v can not be read by %v txn_scope", tableID, builder.txnScope)
}
}
return nil
}

// TableHandleRangesToKVRanges convert table handle ranges to "KeyRanges" for multiple tables.
func TableHandleRangesToKVRanges(sc *stmtctx.StatementContext, tid []int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) {
if !isCommonHandle {
Expand Down
14 changes: 14 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -3351,13 +3352,19 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T
if err != nil {
return nil, err
}
txn, err := e.ctx.Txn(false)
if err != nil {
return nil, err
}
kvReq, err := reqBuilderWithRange.
SetDAGRequest(e.dagPB).
SetStartTS(startTS).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetTxnScope(extractTxnScope(txn)).
SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)).
Build()
if err != nil {
return nil, err
Expand Down Expand Up @@ -3962,3 +3969,10 @@ func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *
}
return e
}

func extractTxnScope(txn kv.Transaction) string {
if txn == nil || txn.GetUnionStore() == nil {
return oracle.GlobalTxnScope
}
return txn.GetUnionStore().GetOption(kv.TxnScope).(string)
}
14 changes: 14 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -324,6 +325,10 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)

e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
txn, err := e.ctx.Txn(false)
if err != nil {
return err
}
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
Expand All @@ -333,6 +338,9 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.memTracker).
// FIXME: add unit test to cover this case
SetTxnScope(extractTxnScope(txn)).
SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)).
Build()
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -512,6 +520,10 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
}
tracker := memory.NewTracker(memory.LabelForIndexWorker, -1)
tracker.AttachTo(e.memTracker)
txn, err := e.ctx.Txn(false)
if err != nil {
return err
}
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
Expand All @@ -521,6 +533,8 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
SetStreaming(e.indexStreaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(tracker).
SetTxnScope(extractTxnScope(txn)).
SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)).
Build()
if err != nil {
return err
Expand Down
8 changes: 7 additions & 1 deletion executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -197,7 +198,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
collExec := true
e.dagPBs[workID].CollectExecutionSummaries = &collExec
}

txn, err := e.ctx.Txn(false)
if err != nil {
return err
}
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(keyRange).
SetDAGRequest(e.dagPBs[workID]).
Expand All @@ -207,6 +211,8 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
SetStreaming(e.partialStreamings[workID]).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.memTracker).
SetTxnScope(extractTxnScope(txn)).
SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)).
Build()
if err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -222,6 +223,10 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
} else {
reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback)
}
txn, err := e.ctx.Txn(false)
if err != nil {
return nil, err
}
kvReq, err := reqBuilder.
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
Expand All @@ -232,6 +237,8 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
SetAllowBatchCop(e.batchCop).
SetTxnScope(extractTxnScope(txn)).
SetFromInfoSchema(infoschema.GetInfoSchema(e.ctx)).
Build()
if err != nil {
return nil, err
Expand Down
119 changes: 119 additions & 0 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -8408,6 +8410,123 @@ func (s *testIntegrationSuite) TestIssue12209(c *C) {
testkit.Rows("<nil>"))
}

func (s *testIntegrationSuite) TestCrossDCQuery(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

tk.MustExec(`create table t1 (c int primary key, d int,e int,index idx_d(d),index idx_e(e))
PARTITION BY RANGE (c) (
PARTITION p0 VALUES LESS THAN (6),
PARTITION p1 VALUES LESS THAN (11)
);`)

tk.MustExec(`insert into t1 (c,d,e) values (1,1,1);`)
tk.MustExec(`insert into t1 (c,d,e) values (2,3,5);`)
tk.MustExec(`insert into t1 (c,d,e) values (3,5,7);`)

bundles := make(map[string]*placement.Bundle)
is := s.dom.InfoSchema()
is.MockBundles(bundles)

tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
setBundle := func(parName, dc string) {
pid, err := tables.FindPartitionByName(tb.Meta(), parName)
c.Assert(err, IsNil)
groupID := placement.GroupID(pid)
oldBundle := &placement.Bundle{
ID: groupID,
Rules: []*placement.Rule{
{
GroupID: groupID,
Role: placement.Leader,
Count: 1,
LabelConstraints: []placement.LabelConstraint{
{
Key: placement.DCLabelKey,
Op: placement.In,
Values: []string{dc},
},
},
},
},
}
bundles[groupID] = placement.BuildPlacementCopyBundle(oldBundle, pid)
}
setBundle("p0", "sh")
setBundle("p1", "bj")

testcases := []struct {
name string
txnScope string
sql string
expectErr error
}{
// FIXME: block by https://github.com/pingcap/tidb/issues/21872
//{
// name: "cross dc read to sh by holding bj, IndexReader",
// txnScope: "bj",
// sql: "select /*+ USE_INDEX(t1, idx_d) */ d from t1 where c < 5 and d < 1;",
// expectErr: fmt.Errorf(".*can not be read by.*"),
//},
{
name: "cross dc read to sh by holding bj, IndexLookUp",
txnScope: "bj",
sql: "select * from t1 use index (idx_d) where c < 5 and d < 5;",
expectErr: fmt.Errorf(".*can not be read by.*"),
},
{
name: "cross dc read to sh by holding bj, IndexMerge",
txnScope: "bj",
sql: "select /*+ USE_INDEX_MERGE(t1, idx_d, idx_e) */ * from t1 where c <5 and (d =5 or e=5);",
expectErr: fmt.Errorf(".*can not be read by.*"),
},
{
name: "cross dc read to sh by holding bj, TableReader",
txnScope: "bj",
sql: "select * from t1 where c < 6",
expectErr: fmt.Errorf(".*can not be read by.*"),
},
{
name: "cross dc read to global by holding bj",
txnScope: "bj",
sql: "select * from t1",
expectErr: fmt.Errorf(".*can not be read by.*"),
},
{
name: "read sh dc by holding sh",
txnScope: "sh",
sql: "select * from t1 where c < 6",
expectErr: nil,
},
{
name: "read sh dc by holding global",
txnScope: "global",
sql: "select * from t1 where c < 6",
expectErr: nil,
},
}
for _, testcase := range testcases {
c.Log(testcase.name)
_, err = tk.Exec(fmt.Sprintf("set @@txn_scope='%v'", testcase.txnScope))
c.Assert(err, IsNil)
res, err := tk.Exec(testcase.sql)
_, resErr := session.GetRows4Test(context.Background(), tk.Se, res)
var checkErr error
if err != nil {
checkErr = err
} else {
checkErr = resErr
}
if testcase.expectErr != nil {
c.Assert(checkErr, NotNil)
c.Assert(checkErr.Error(), Matches, ".*can not be read by.*")
} else {
c.Assert(checkErr, IsNil)
}
}
}

func (s *testIntegrationSerialSuite) TestCollationUnion(c *C) {
// For issue 19694.
tk := testkit.NewTestKit(c, s.store)
Expand Down
Loading