Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inforschema, executor: add push down for tikv_region_peers #28211

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
679632a
executor: move tikv_region_peers from infoschema_reader.go to memtabl…
IcePigZDB Nov 29, 2021
93d24f0
store/helper: support get store regionsInfo
IcePigZDB Nov 29, 2021
edca445
planner: add test for TikvRegionPeersExtractor
IcePigZDB Nov 29, 2021
b2079bf
Merge branch 'master' into optikvregionpeers
IcePigZDB Nov 29, 2021
87093f2
executor: formate
IcePigZDB Nov 29, 2021
954dc0b
Merge branch 'pingcap:master' into optikvregionpeers
IcePigZDB Nov 29, 2021
daba083
executor: remove unuseless failpoint in hot region hishotry test
IcePigZDB Nov 30, 2021
5bf7c5a
Merge branch 'master' into optikvregionpeers
IcePigZDB Nov 30, 2021
3b4abc1
executor: remove redundancy file
IcePigZDB Nov 30, 2021
4c97831
executor: close http for tikv_region_status test
IcePigZDB Nov 30, 2021
1b1d917
executor: tikv_reigon_peers update storeRegions prefix bug
IcePigZDB Nov 30, 2021
43e81ce
Merge branch 'master' into optikvregionpeers
IcePigZDB Dec 8, 2021
3984644
Merge branch 'master' into optikvregionpeers
IcePigZDB Dec 10, 2021
41f93c3
Merge branch 'master' into optikvregionpeers
IcePigZDB Dec 13, 2021
a67dd8b
Merge branch 'master' into optikvregionpeers
IcePigZDB Dec 15, 2021
d5ad8a7
Update executor/memtable_reader.go
IcePigZDB Dec 20, 2021
33c8a18
executor: use map[int64]struct{} in tikvRegionPeersRetriever
IcePigZDB Dec 20, 2021
4b4fc04
extractor: add a test caseto test columns that is not extracted by Ti…
IcePigZDB Dec 20, 2021
55328ef
executor: fix typo in intersect case
IcePigZDB Dec 21, 2021
b8277f9
executor: remove debug code in retriver test
IcePigZDB Dec 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,14 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
timeRange: v.QueryTimeRange,
},
}
case strings.ToLower(infoschema.TableTiKVRegionPeers):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
retriever: &tikvRegionPeersRetriever{
extractor: v.Extractor.(*plannercore.TikvRegionPeersExtractor),
},
}
case strings.ToLower(infoschema.TableSchemata),
strings.ToLower(infoschema.TableStatistics),
strings.ToLower(infoschema.TableTiDBIndexes),
Expand All @@ -1628,7 +1636,6 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
strings.ToLower(infoschema.TableProcesslist),
strings.ToLower(infoschema.ClusterTableProcesslist),
strings.ToLower(infoschema.TableTiKVRegionStatus),
strings.ToLower(infoschema.TableTiKVRegionPeers),
strings.ToLower(infoschema.TableTiDBHotRegions),
strings.ToLower(infoschema.TableSessionVar),
strings.ToLower(infoschema.TableConstraints),
Expand Down
59 changes: 0 additions & 59 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,6 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
e.setDataFromUserPrivileges(sctx)
case infoschema.TableTiKVRegionStatus:
err = e.setDataForTiKVRegionStatus(sctx)
case infoschema.TableTiKVRegionPeers:
err = e.setDataForTikVRegionPeers(sctx)
case infoschema.TableTiDBHotRegions:
err = e.setDataForTiDBHotRegions(sctx)
case infoschema.TableConstraints:
Expand Down Expand Up @@ -1495,63 +1493,6 @@ func (e *memtableRetriever) setNewTiKVRegionStatusCol(region *helper.RegionInfo,
e.rows = append(e.rows, row)
}

func (e *memtableRetriever) setDataForTikVRegionPeers(ctx sessionctx.Context) error {
tikvStore, ok := ctx.GetStore().(helper.Storage)
if !ok {
return errors.New("Information about TiKV region status can be gotten only when the storage is TiKV")
}
tikvHelper := &helper.Helper{
Store: tikvStore,
RegionCache: tikvStore.GetRegionCache(),
}
regionsInfo, err := tikvHelper.GetRegionsInfo()
if err != nil {
return err
}
for i := range regionsInfo.Regions {
e.setNewTiKVRegionPeersCols(&regionsInfo.Regions[i])
}
return nil
}

func (e *memtableRetriever) setNewTiKVRegionPeersCols(region *helper.RegionInfo) {
records := make([][]types.Datum, 0, len(region.Peers))
pendingPeerIDSet := set.NewInt64Set()
for _, peer := range region.PendingPeers {
pendingPeerIDSet.Insert(peer.ID)
}
downPeerMap := make(map[int64]int64, len(region.DownPeers))
for _, peerStat := range region.DownPeers {
downPeerMap[peerStat.Peer.ID] = peerStat.DownSec
}
for _, peer := range region.Peers {
row := make([]types.Datum, len(infoschema.TableTiKVRegionPeersCols))
row[0].SetInt64(region.ID)
row[1].SetInt64(peer.ID)
row[2].SetInt64(peer.StoreID)
if peer.IsLearner {
row[3].SetInt64(1)
} else {
row[3].SetInt64(0)
}
if peer.ID == region.Leader.ID {
row[4].SetInt64(1)
} else {
row[4].SetInt64(0)
}
if downSec, ok := downPeerMap[peer.ID]; ok {
row[5].SetString(downPeer, mysql.DefaultCollationName)
row[6].SetInt64(downSec)
} else if pendingPeerIDSet.Exist(peer.ID) {
row[5].SetString(pendingPeer, mysql.DefaultCollationName)
} else {
row[5].SetString(normalPeer, mysql.DefaultCollationName)
}
records = append(records, row)
}
e.rows = append(e.rows, records...)
}

const (
normalPeer = "NORMAL"
pendingPeer = "PENDING"
Expand Down
113 changes: 113 additions & 0 deletions executor/memtable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,3 +997,116 @@ func (e *hotRegionsHistoryRetriver) getHotRegionRowWithSchemaInfo(
}
return row, nil
}

type tikvRegionPeersRetriever struct {
dummyCloser
extractor *plannercore.TikvRegionPeersExtractor
retrieved bool
}

func (e *tikvRegionPeersRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.extractor.SkipRequest || e.retrieved {
return nil, nil
}
e.retrieved = true
tikvStore, ok := sctx.GetStore().(helper.Storage)
if !ok {
return nil, errors.New("Information about hot region can be gotten only when the storage is TiKV")
}
tikvHelper := &helper.Helper{
Store: tikvStore,
RegionCache: tikvStore.GetRegionCache(),
}

if len(e.extractor.StoreIDs) < 1 && len(e.extractor.RegionIDs) < 1 {
regionsInfo, err := tikvHelper.GetRegionsInfo()
if err != nil {
return nil, err
}
return e.packTiKVRegionPeersRows(regionsInfo.Regions)
}

var regionsInfo, regionsInfoByStoreID, regionsInfoByRegionID []helper.RegionInfo
regionMap := make(map[int64]struct{})
if len(e.extractor.StoreIDs) > 0 {
for _, storeID := range e.extractor.StoreIDs {
regionsInfo, err := tikvHelper.GetStoreRegionsInfo(storeID)
if err != nil {
return nil, err
}
// remove dup region
for _, region := range regionsInfo.Regions {
if _, ok := regionMap[region.ID]; !ok {
regionMap[region.ID] = struct{}{}
regionsInfoByStoreID = append(regionsInfoByStoreID, region)
}
}
}
if len(e.extractor.RegionIDs) < 1 {
return e.packTiKVRegionPeersRows(regionsInfoByStoreID)
}
}

if len(e.extractor.RegionIDs) > 0 {
for _, regionID := range e.extractor.RegionIDs {
regionInfo, err := tikvHelper.GetRegionInfoByID(regionID)
if err != nil {
return nil, err
}
regionsInfoByRegionID = append(regionsInfoByRegionID, *regionInfo)
}
if len(e.extractor.StoreIDs) < 1 {
return e.packTiKVRegionPeersRows(regionsInfoByRegionID)
}
}

// intersect
for _, region := range regionsInfoByRegionID {
if _, ok := regionMap[region.ID]; ok {
regionsInfo = append(regionsInfo, region)
}
}
return e.packTiKVRegionPeersRows(regionsInfo)
}

func (e *tikvRegionPeersRetriever) packTiKVRegionPeersRows(regionsInfo []helper.RegionInfo) ([][]types.Datum, error) {
var rows [][]types.Datum
for _, region := range regionsInfo {
records := make([][]types.Datum, 0, len(region.Peers))
pendingPeerIDSet := set.NewInt64Set()
for _, peer := range region.PendingPeers {
pendingPeerIDSet.Insert(peer.ID)
}
downPeerMap := make(map[int64]int64, len(region.DownPeers))
for _, peerStat := range region.DownPeers {
downPeerMap[peerStat.Peer.ID] = peerStat.DownSec
}
for _, peer := range region.Peers {
row := make([]types.Datum, len(infoschema.TableTiKVRegionPeersCols))
row[0].SetInt64(region.ID)
row[1].SetInt64(peer.ID)
row[2].SetInt64(peer.StoreID)
if peer.IsLearner {
row[3].SetInt64(1)
} else {
row[3].SetInt64(0)
}
if peer.ID == region.Leader.ID {
row[4].SetInt64(1)
} else {
row[4].SetInt64(0)
}
if downSec, ok := downPeerMap[peer.ID]; ok {
row[5].SetString(downPeer, mysql.DefaultCollationName)
row[6].SetInt64(downSec)
} else if pendingPeerIDSet.Exist(peer.ID) {
row[5].SetString(pendingPeer, mysql.DefaultCollationName)
} else {
row[5].SetString(normalPeer, mysql.DefaultCollationName)
}
records = append(records, row)
}
rows = append(rows, records...)
}
return rows, nil
}
Loading