Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support forbiding cross dc read for pointGet #21840

Merged
merged 13 commits into from
Dec 22, 2020
30 changes: 21 additions & 9 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,15 +290,8 @@ func (builder *RequestBuilder) verifyTxnScope() error {
}

for tableID := range visitTableID {
bundle, ok := builder.bundles[placement.GroupID(tableID)]
if !ok {
continue
}
dc, ok := placement.GetLeaderDCByBundle(bundle, placement.DCLabelKey)
if !ok {
continue
}
if dc != builder.txnScope {
valid := VerifyTxnScope(builder.txnScope, tableID, builder.bundles)
if !valid {
return fmt.Errorf("table %v can not be read by %v txn_scope", tableID, builder.txnScope)
}
}
Expand Down Expand Up @@ -522,6 +515,25 @@ func CommonHandleRangesToKVRanges(sc *stmtctx.StatementContext, tids []int64, ra
return krs, nil
}

// VerifyTxnScope verify whether the txnScope and visited physical table break the leader rule's dcLocation.
func VerifyTxnScope(txnScope string, physicalTableID int64, bundles map[string]*placement.Bundle) bool {
if txnScope == "" || txnScope == oracle.GlobalTxnScope {
return true
}
bundle, ok := bundles[placement.GroupID(physicalTableID)]
if !ok {
return true
}
leaderDC, ok := placement.GetLeaderDCByBundle(bundle, placement.DCLabelKey)
if !ok {
return true
}
if leaderDC != txnScope {
return false
}
return true
}

func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range) ([]kv.KeyRange, error) {
krs := make([]kv.KeyRange, 0, len(ranges))
for _, ran := range ranges {
Expand Down
39 changes: 39 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@ package executor

import (
"context"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -128,6 +133,9 @@ func (e *PointGetExecutor) Open(context.Context) error {
} else {
e.snapshot = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: snapshotTS})
}
if err := e.verifyTxnScope(); err != nil {
return err
}
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
Expand Down Expand Up @@ -372,6 +380,37 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)
return e.snapshot.Get(ctx, key)
}

func (e *PointGetExecutor) verifyTxnScope() error {
txnScope := e.txn.GetUnionStore().GetOption(kv.TxnScope).(string)
if txnScope == "" || txnScope == oracle.GlobalTxnScope {
return nil
}
var tblID int64
var tblName string
var partName string
is := infoschema.GetInfoSchema(e.ctx)
if e.partInfo != nil {
tblID = e.partInfo.ID
tblInfo, _, partInfo := is.FindTableByPartitionID(tblID)
tblName = tblInfo.Meta().Name.String()
partName = partInfo.Name.String()
} else {
tblID = e.tblInfo.ID
tblInfo, _ := is.TableByID(tblID)
tblName = tblInfo.Meta().Name.String()
}
valid := distsql.VerifyTxnScope(txnScope, tblID, is.RuleBundles())
if valid {
return nil
}
if len(partName) > 0 {
return ddl.ErrInvalidPlacementPolicyCheck.GenWithStackByArgs(
fmt.Sprintf("table %v's partition %v can not be read by %v txn_scope", tblName, partName, txnScope))
}
return ddl.ErrInvalidPlacementPolicyCheck.GenWithStackByArgs(
fmt.Sprintf("table %v can not be read by %v txn_scope", tblName, txnScope))
}

// EncodeUniqueIndexKey encodes a unique index key.
func EncodeUniqueIndexKey(ctx sessionctx.Context, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, idxVals []types.Datum, tID int64) (_ []byte, err error) {
encodedIdxVals, err := EncodeUniqueIndexValuesForKey(ctx, tblInfo, idxInfo, idxVals)
Expand Down
13 changes: 13 additions & 0 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8481,6 +8481,19 @@ PARTITION BY RANGE (c) (
// sql: "select /*+ USE_INDEX(t1, idx_d) */ d from t1 where c < 5 and d < 1;",
// expectErr: fmt.Errorf(".*can not be read by.*"),
//},
// FIXME: block by https://github.com/pingcap/tidb/issues/21847
//{
// name: "cross dc read to sh by holding bj, BatchPointGet",
// txnScope: "bj",
// sql: "select * from t1 where c in (1,2,3,4);",
// expectErr: fmt.Errorf(".*can not be read by.*"),
//},
{
name: "cross dc read to sh by holding bj, PointGet",
txnScope: "bj",
sql: "select * from t1 where c = 1",
expectErr: fmt.Errorf(".*can not be read by.*"),
},
{
name: "cross dc read to sh by holding bj, IndexLookUp",
txnScope: "bj",
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2422,7 +2422,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error {
}

// no need to get txn from txnFutureCh since txn should init with startTs
txn, err := s.store.BeginWithStartTS(oracle.GlobalTxnScope, startTS)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would let PointGet always use global as txnScope.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe because PointGet always use max uint64 as start ts, so it doesn't matter.

txn, err := s.store.BeginWithStartTS(s.GetSessionVars().CheckAndGetTxnScope(), startTS)
if err != nil {
return err
}
Expand Down