Skip to content

Commit

Permalink
Add TiFlash test cases (pingcap#327)
Browse files Browse the repository at this point in the history
* *: add test case of tiflash
  • Loading branch information
YuJuncen authored Jun 2, 2020
1 parent 3e05ea6 commit a6643a8
Show file tree
Hide file tree
Showing 14 changed files with 316 additions and 38 deletions.
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,20 @@ testcover: tools failpoint-enable
-debug \
-- -coverpkg=./... || ( make failpoint-disable && exit 1 )

integration_test: build build_for_integration_test
integration_test: bins build build_for_integration_test
tests/run.sh

bins:
@which bin/tidb-server
@which bin/tikv-server
@which bin/pd-server
@which bin/pd-ctl
@which bin/go-ycsb
@which bin/minio
@which bin/br
tests/run.sh
@which bin/tiflash
@which bin/libtiflash_proxy.so
if [ ! -d bin/flash_cluster_manager ]; then echo "flash_cluster_manager not exist"; exit 1; fi

tools:
@echo "install tools..."
Expand Down
6 changes: 5 additions & 1 deletion pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,13 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
keepAliveTimeout := 3
bfConf := backoff.DefaultConfig
bfConf.MaxDelay = time.Second * 3
addr := store.GetPeerAddress()
if addr == "" {
addr = store.GetAddress()
}
conn, err := grpc.DialContext(
ctx,
store.GetAddress(),
addr,
opt,
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Expand Down
6 changes: 5 additions & 1 deletion pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ func (ic *importClient) getImportClient(
if ic.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(ic.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
addr := store.GetPeerAddress()
if addr == "" {
addr = store.GetAddress()
}
conn, err := grpc.Dial(addr, opt)
if err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
flagRateLimitUnit = "ratelimit-unit"
flagConcurrency = "concurrency"
flagChecksum = "checksum"
flagRemoveTiFlash = "remove-tiflash"
flagCheckRequirement = "check-requirements"
)

Expand Down Expand Up @@ -92,6 +93,7 @@ type Config struct {
// LogProgress is true means the progress bar is printed to the log instead of stdout.
LogProgress bool `json:"log-progress" toml:"log-progress"`

RemoveTiFlash bool `json:"remove-tiflash" toml:"remove-tiflash"`
CaseSensitive bool `json:"case-sensitive" toml:"case-sensitive"`
CheckRequirements bool `json:"check-requirements" toml:"check-requirements"`
Filter filter.Rules `json:"black-white-list" toml:"black-white-list"`
Expand All @@ -108,6 +110,8 @@ func DefineCommonFlags(flags *pflag.FlagSet) {

flags.Uint64(flagRateLimit, 0, "The rate limit of the task, MB/s per node")
flags.Bool(flagChecksum, true, "Run checksum at end of task")
flags.Bool(flagRemoveTiFlash, true,
"Remove TiFlash replicas before backup or restore, for unsupported versions of TiFlash")

// Default concurrency is different for backup and restore.
// Leave it 0 and let them adjust the value.
Expand Down Expand Up @@ -193,6 +197,11 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
}
cfg.RateLimit = rateLimit * rateLimitUnit

cfg.RemoveTiFlash, err = flags.GetBool(flagRemoveTiFlash)
if err != nil {
return errors.Trace(err)
}

if dbFlag := flags.Lookup(flagDatabase); dbFlag != nil {
db := escapeFilterName(dbFlag.Value.String())
if len(db) == 0 {
Expand Down
52 changes: 27 additions & 25 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
cfg.RemoveTiFlash, err = flags.GetBool(flagRemoveTiFlash)
if err != nil {
return errors.Trace(err)
}
err = cfg.Config.ParseFromFlags(flags)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -195,20 +199,6 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return nil
}

placementRules, err := client.GetPlacementRules(cfg.PD)
if err != nil {
return err
}

err = client.RemoveTiFlashReplica(tables, newTables, placementRules)
if err != nil {
return err
}

defer func() {
_ = client.RecoverTiFlashReplica(tables)
}()

ranges, err := restore.ValidateFileRanges(files, rewriteRules)
if err != nil {
return err
Expand Down Expand Up @@ -255,19 +245,31 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
}

// Restore sst files in batch.
batchSize := int(cfg.Concurrency)
if batchSize < defaultRestoreConcurrency {
batchSize = defaultRestoreConcurrency
}
batchSize = utils.MinInt(batchSize, maxRestoreBatchSizeLimit)
batchSize := utils.ClampInt(int(cfg.Concurrency), defaultRestoreConcurrency, maxRestoreBatchSizeLimit)

tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly)
if err != nil {
return errors.Trace(err)
}
rejectStoreMap := make(map[uint64]bool)
for _, store := range tiflashStores {
rejectStoreMap[store.GetId()] = true
if cfg.RemoveTiFlash {
placementRules, err := client.GetPlacementRules(cfg.PD)
if err != nil {
return err
}

err = client.RemoveTiFlashReplica(tables, newTables, placementRules)
if err != nil {
return err
}

defer func() {
_ = client.RecoverTiFlashReplica(tables)
}()

tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly)
if err != nil {
return errors.Trace(err)
}
for _, store := range tiflashStores {
rejectStoreMap[store.GetId()] = true
}
}

for {
Expand Down
37 changes: 32 additions & 5 deletions pkg/utils/math.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,38 @@

package utils

// MinInt choice smaller integer from its two arguments.
func MinInt(x, y int) int {
if x < y {
return x
import (
"github.com/pingcap/log"
"go.uber.org/zap"
)

// MinInt choice smallest integer from its arguments.
func MinInt(x int, xs ...int) int {
min := x
for _, n := range xs {
if n < min {
min = n
}
}
return min
}

// MaxInt choice biggest integer from its arguments.
func MaxInt(x int, xs ...int) int {
max := x
for _, n := range xs {
if n > max {
max = n
}
}
return max
}

// ClampInt restrict a value to a certain interval.
func ClampInt(n, min, max int) int {
if min > max {
log.Error("clamping integer with min > max", zap.Int("min", min), zap.Int("max", max))
}

return y
return MinInt(max, MaxInt(min, n))
}
16 changes: 16 additions & 0 deletions pkg/utils/math_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,21 @@ var _ = Suite(&testMathSuite{})
func (*testMathSuite) TestMinInt(c *C) {
c.Assert(MinInt(1, 2), Equals, 1)
c.Assert(MinInt(2, 1), Equals, 1)
c.Assert(MinInt(4, 2, 1, 3), Equals, 1)
c.Assert(MinInt(1, 1), Equals, 1)
}

func (*testMathSuite) TestMaxInt(c *C) {
c.Assert(MaxInt(1, 2), Equals, 2)
c.Assert(MaxInt(2, 1), Equals, 2)
c.Assert(MaxInt(4, 2, 1, 3), Equals, 4)
c.Assert(MaxInt(1, 1), Equals, 1)
}

func (*testMathSuite) TestClampInt(c *C) {
c.Assert(ClampInt(100, 1, 3), Equals, 3)
c.Assert(ClampInt(2, 1, 3), Equals, 2)
c.Assert(ClampInt(0, 1, 3), Equals, 1)
c.Assert(ClampInt(0, 1, 1), Equals, 1)
c.Assert(ClampInt(100, 1, 1), Equals, 1)
}
7 changes: 5 additions & 2 deletions tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ programs.

## Preparations

1. The following 6 executables must be copied or linked into these locations:
1. The following 7 executables must be copied or linked into these locations:
* `bin/tidb-server`
* `bin/tikv-server`
* `bin/pd-server`
* `bin/pd-ctl`
* `bin/go-ycsb`
* `bin/minio`
* `bin/tiflash`

The versions must be ≥2.1.0 as usual.

What's more, there must be dynamic link library for TiFlash, see make target `bin` to learn more. You can install most of dependencies by running `download_tools.sh`.

2. The following programs must be installed:

* `mysql` (the CLI client)
Expand All @@ -31,7 +34,7 @@ Make sure the path is `br/`
Run `make integration_test` to execute the integration tests. This command will

1. Build `br`
2. Check that all 6 required executables and `br` executable exist
2. Check that all 7 required executables and `br` executable exist
3. Execute `tests/run.sh`

If the first two steps are done before, you could also run `tests/run.sh` directly.
Expand Down
89 changes: 89 additions & 0 deletions tests/_utils/make_tiflash_config
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/bin/sh

cat > tests/config/tiflash-learner.toml <<eof
[rocksdb]
wal-dir = ""
[security]
ca-path = ""
cert-path = ""
key-path = ""
[server]
addr = "0.0.0.0:20170"
advertise-addr = "127.0.0.1:20170"
engine-addr = "127.0.0.1:3930"
status-addr = "$TIFLASH_STATUS"
[storage]
data-dir = "$TEST_DIR/tiflash/data"
eof

cat > tests/config/tiflash.toml <<eof
default_profile = "default"
display_name = "TiFlash"
http_port = 8125
listen_host = "0.0.0.0"
mark_cache_size = 5368709120
path = "$TEST_DIR/tiflash/data"
tcp_port = 9002
tmp_path = "/tmp/tiflash"
[application]
runAsDaemon = true
[flash]
service_addr = "127.0.0.1:3930"
tidb_status_addr = "127.0.0.1:10080"
[flash.proxy]
config = "$PWD/tests/config/tiflash-learner.toml"
log-file = "$TEST_DIR/tiflash-proxy.log"
[flash.flash_cluster]
cluster_manager_path = "$PWD/bin/flash_cluster_manager"
log = "$TEST_DIR/tiflash-manager.log"
master_ttl = 60
refresh_interval = 20
update_rule_interval = 5
[logger]
count = 20
level = "trace"
log = "$TEST_DIR/tiflash-stdout.log"
errorlog = "$TEST_DIR/tiflash-stderr.log"
size = "1000M"
[raft]
pd_addr = "127.0.0.1:2379"
[profiles]
[profiles.default]
load_balancing = "random"
max_memory_usage = 10000000000
use_uncompressed_cache = 0
[users]
[users.default]
password = ""
profile = "default"
quota = "default"
[users.default.networks]
ip = "::/0"
[users.readonly]
password = ""
profile = "readonly"
quota = "default"
[users.readonly.networks]
ip = "::/0"
[quotas]
[quotas.default]
[quotas.default.interval]
duration = 3600
errors = 0
execution_time = 0
queries = 0
read_rows = 0
result_rows = 0
eof
Loading

0 comments on commit a6643a8

Please sign in to comment.