Skip to content

Commit

Permalink
Merge branch 'master' of http://github.com/tikv/pd into fix-move-peer
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed May 11, 2021
2 parents 7126e98 + bf0e891 commit f97e022
Show file tree
Hide file tree
Showing 41 changed files with 659 additions and 344 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ test-with-cover: install-go-tools dashboard-ui
done
@$(FAILPOINT_DISABLE)

check: install-go-tools check-all check-plugin errdoc check-missing-tests docker-build-test
check: install-go-tools check-all check-plugin errdoc check-testing-t docker-build-test

check-all: static lint tidy
@echo "checking"
Expand Down Expand Up @@ -200,8 +200,8 @@ docker-build-test:
fi
docker build --no-cache -t tikv/pd .

check-missing-tests:
./scripts/check-missing-tests.sh
check-testing-t:
./scripts/check-testing-t.sh

simulator: export GO111MODULE=on
simulator:
Expand Down
22 changes: 17 additions & 5 deletions pkg/autoscaling/calculation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package autoscaling

import (
"context"
"encoding/json"
"fmt"
"math"
Expand All @@ -32,11 +33,22 @@ func Test(t *testing.T) {

var _ = Suite(&calculationTestSuite{})

type calculationTestSuite struct{}
type calculationTestSuite struct {
ctx context.Context
cancel context.CancelFunc
}

func (s *calculationTestSuite) SetUpSuite(c *C) {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

func (s *calculationTestSuite) TearDownTest(c *C) {
s.cancel()
}

func (s *calculationTestSuite) TestGetScaledTiKVGroups(c *C) {
// case1 indicates the tikv cluster with not any group existed
case1 := mockcluster.NewCluster(config.NewTestOptions())
case1 := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
case1.AddLabelsStore(1, 1, map[string]string{})
case1.AddLabelsStore(2, 1, map[string]string{
"foo": "bar",
Expand All @@ -46,7 +58,7 @@ func (s *calculationTestSuite) TestGetScaledTiKVGroups(c *C) {
})

// case2 indicates the tikv cluster with 1 auto-scaling group existed
case2 := mockcluster.NewCluster(config.NewTestOptions())
case2 := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
case2.AddLabelsStore(1, 1, map[string]string{})
case2.AddLabelsStore(2, 1, map[string]string{
groupLabelKey: fmt.Sprintf("%s-%s-0", autoScalingGroupLabelKeyPrefix, TiKV.String()),
Expand All @@ -58,7 +70,7 @@ func (s *calculationTestSuite) TestGetScaledTiKVGroups(c *C) {
})

// case3 indicates the tikv cluster with other group existed
case3 := mockcluster.NewCluster(config.NewTestOptions())
case3 := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
case3.AddLabelsStore(1, 1, map[string]string{})
case3.AddLabelsStore(2, 1, map[string]string{
groupLabelKey: "foo",
Expand Down Expand Up @@ -323,7 +335,7 @@ func (s *calculationTestSuite) TestStrategyChangeCount(c *C) {
}

// tikv cluster with 1 auto-scaling group existed
cluster := mockcluster.NewCluster(config.NewTestOptions())
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
cluster.AddLabelsStore(1, 1, map[string]string{})
cluster.AddLabelsStore(2, 1, map[string]string{
groupLabelKey: fmt.Sprintf("%s-%s-0", autoScalingGroupLabelKeyPrefix, TiKV.String()),
Expand Down
5 changes: 0 additions & 5 deletions pkg/autoscaling/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net/http"
"net/url"
"strings"
"testing"
"time"

. "github.com/pingcap/check"
Expand All @@ -41,10 +40,6 @@ const (
instanceCount = 3
)

func TestPrometheus(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testPrometheusQuerierSuite{})

var podNameTemplate = map[ComponentType]string{
Expand Down
5 changes: 0 additions & 5 deletions pkg/encryption/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,12 @@
package encryption

import (
"testing"
"time"

. "github.com/pingcap/check"
"github.com/tikv/pd/pkg/typeutil"
)

func TestConfig(t *testing.T) {
TestingT(t)
}

type testConfigSuite struct{}

var _ = Suite(&testConfigSuite{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/encryption/crypter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/encryptionpb"
)

func TestCrypter(t *testing.T) {
func Test(t *testing.T) {
TestingT(t)
}

Expand Down
5 changes: 0 additions & 5 deletions pkg/encryption/master_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,11 @@ package encryption
import (
"encoding/hex"
"io/ioutil"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/encryptionpb"
)

func TestMasterKey(t *testing.T) {
TestingT(t)
}

type testMasterKeySuite struct{}

var _ = Suite(&testMasterKeySuite{})
Expand Down
5 changes: 0 additions & 5 deletions pkg/encryption/region_crypter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,13 @@ package encryption
import (
"crypto/aes"
"crypto/cipher"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/kvproto/pkg/metapb"
)

func TestRegionCrypter(t *testing.T) {
TestingT(t)
}

type testRegionCrypterSuite struct{}

var _ = Suite(&testRegionCrypterSuite{})
Expand Down
9 changes: 5 additions & 4 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package mockcluster

import (
"context"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -52,11 +53,11 @@ type Cluster struct {
}

// NewCluster creates a new Cluster
func NewCluster(opts *config.PersistOptions) *Cluster {
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
clus := &Cluster{
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
HotStat: statistics.NewHotStat(),
HotStat: statistics.NewHotStat(ctx),
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
disabledFeatures: make(map[versioninfo.Feature]struct{}),
Expand Down Expand Up @@ -339,7 +340,7 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo(

var items []*statistics.HotPeerStat
for i := 0; i < filledNum; i++ {
items = mc.HotCache.CheckRead(r)
items = mc.HotCache.CheckReadSync(r)
for _, item := range items {
mc.HotCache.Update(item)
}
Expand All @@ -366,7 +367,7 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo(

var items []*statistics.HotPeerStat
for i := 0; i < filledNum; i++ {
items = mc.HotCache.CheckWrite(r)
items = mc.HotCache.CheckWriteSync(r)
for _, item := range items {
mc.HotCache.Update(item)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/typeutil/comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
. "github.com/pingcap/check"
)

func TestComparison(t *testing.T) {
func TestTypeUtil(t *testing.T) {
TestingT(t)
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/typeutil/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,9 @@
package typeutil

import (
"testing"

. "github.com/pingcap/check"
)

func TestConversion(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testUint64BytesSuite{})

type testUint64BytesSuite struct{}
Expand Down
5 changes: 0 additions & 5 deletions pkg/typeutil/size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,10 @@ package typeutil

import (
"encoding/json"
"testing"

. "github.com/pingcap/check"
)

func TestSize(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testSizeSuite{})

type testSizeSuite struct {
Expand Down
10 changes: 10 additions & 0 deletions scripts/check-missing-tests.sh → scripts/check-testing-t.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,14 @@ if [ "$res" ]; then
exit 1
fi

# Check if there are duplicated `TestingT` in package.

res=$(grep -r --include=\*_test.go "TestingT(" . | cut -f1 | xargs -L 1 dirname | sort | uniq -d)

if [ "$res" ]; then
echo "following packages may have duplicated TestingT:"
echo "$res"
exit 1
fi

exit 0
25 changes: 6 additions & 19 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.PersistOptions, s
c.storage = storage
c.id = id
c.labelLevelStats = statistics.NewLabelStatistics()
c.hotStat = statistics.NewHotStat()
c.hotStat = statistics.NewHotStat(c.ctx)
c.prepareChecker = newPrepareChecker()
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
c.suspectRegions = cache.NewIDTTL(c.ctx, time.Minute, 3*time.Minute)
Expand Down Expand Up @@ -546,8 +546,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.RUnlock()
return err
}
writeItems := c.CheckWriteStatus(region)
readItems := c.CheckReadStatus(region)
c.CheckRWStatus(region)
c.RUnlock()

// Save to storage if meta is updated.
Expand Down Expand Up @@ -623,7 +622,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
}

if len(writeItems) == 0 && len(readItems) == 0 && !saveKV && !saveCache && !isNew {
if !saveKV && !saveCache && !isNew {
return nil
}

Expand Down Expand Up @@ -682,13 +681,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
if c.regionStats != nil {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}

for _, writeItem := range writeItems {
c.hotStat.Update(writeItem)
}
for _, readItem := range readItems {
c.hotStat.Update(readItem)
}
c.Unlock()

// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down Expand Up @@ -1475,14 +1467,9 @@ func (c *RaftCluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
return c.hotStat.RegionStats(statistics.WriteFlow, c.GetOpts().GetHotRegionCacheHitsThreshold())
}

// CheckWriteStatus checks the write status, returns whether need update statistics and item.
func (c *RaftCluster) CheckWriteStatus(region *core.RegionInfo) []*statistics.HotPeerStat {
return c.hotStat.CheckWrite(region)
}

// CheckReadStatus checks the read status, returns whether need update statistics and item.
func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.HotPeerStat {
return c.hotStat.CheckRead(region)
// CheckRWStatus checks the read/write status.
func (c *RaftCluster) CheckRWStatus(region *core.RegionInfo) {
c.hotStat.CheckRWAsync(region)
}

// TODO: remove me.
Expand Down
8 changes: 8 additions & 0 deletions server/core/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,11 @@ func StringToKeyType(input string) KeyType {
panic("invalid key type: " + input)
}
}

// FlowStat indicates the stats of the flow
type FlowStat interface {
GetKeysWritten() uint64
GetBytesWritten() uint64
GetBytesRead() uint64
GetKeysRead() uint64
}
50 changes: 50 additions & 0 deletions server/core/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,53 @@ func CountInJointState(peers ...*metapb.Peer) int {
}
return count
}

// PeerInfo provides peer information
type PeerInfo struct {
*metapb.Peer
writtenBytes uint64
writtenKeys uint64
readBytes uint64
readKeys uint64
}

// NewPeerInfo creates PeerInfo
func NewPeerInfo(meta *metapb.Peer, writtenBytes, writtenKeys, readBytes, readKeys uint64) *PeerInfo {
return &PeerInfo{
Peer: meta,
writtenBytes: writtenBytes,
writtenKeys: writtenKeys,
readBytes: readBytes,
readKeys: readKeys,
}
}

// GetKeysWritten provides peer written keys
func (p *PeerInfo) GetKeysWritten() uint64 {
return p.writtenKeys
}

// GetBytesWritten provides peer written bytes
func (p *PeerInfo) GetBytesWritten() uint64 {
return p.writtenBytes
}

// GetBytesRead provides peer read bytes
func (p *PeerInfo) GetBytesRead() uint64 {
return p.readBytes
}

// GetKeysRead provides read keys
func (p *PeerInfo) GetKeysRead() uint64 {
return p.readKeys
}

// GetStoreID provides located storeID
func (p *PeerInfo) GetStoreID() uint64 {
return p.GetStoreId()
}

// GetPeerID provides peer id
func (p *PeerInfo) GetPeerID() uint64 {
return p.GetId()
}
Loading

0 comments on commit f97e022

Please sign in to comment.