Skip to content

Commit

Permalink
add retry on split client. (pingcap#247)
Browse files Browse the repository at this point in the history
* restore: add retry on split client.

* restore: don't fail when RegionError has no leader

* restore: use findLeaderErr instead of splitErrors

* *: use go.uber.org/multierr instead of coreos/multierror.

* restore: add failpoint tests on retry.

* *: make linter happy.

* *: remove probabilistic, and disable failpoint anyway.

* restore: add test on generic retryable error.

* *: fix CI.

* restore: use failpoint to inject leader.

* *: enable failpoint in testcover.

* *: accept some advise from @kenny.

* *: add a test point.

* Update pkg/restore/split_client.go

Co-authored-by: kennytm <[email protected]>

* restore: apply suggestions.

* Merge branch 'master' of https://github.com/pingcap/br into retry-split-master

* go.mod: fix version of failpoint.

* restore: remove some retry on batcher.

Co-authored-by: kennytm <[email protected]>
  • Loading branch information
YuJuncen and kennytm authored May 14, 2020
1 parent 943e502 commit c4fbcbd
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 47 deletions.
36 changes: 21 additions & 15 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ PACKAGES := go list ./...
PACKAGE_DIRECTORIES := $(PACKAGES) | sed 's/github.com\/pingcap\/br\/*//'
GOCHECKER := awk '{ print } END { if (NR > 0) { exit 1 } }'


BR_PKG := github.com/pingcap/br

LDFLAGS += -X "$(BR_PKG)/pkg/utils.BRReleaseVersion=$(shell git describe --tags --dirty)"
Expand All @@ -21,27 +22,26 @@ all: check test build
build:
GO111MODULE=on go build -ldflags '$(LDFLAGS)' ${RACEFLAG} -o bin/br

build_for_integration_test:
GO111MODULE=on go test -c -cover -covermode=count \
build_for_integration_test: failpoint-enable
(GO111MODULE=on go test -c -cover -covermode=count \
-coverpkg=$(BR_PKG)/... \
-o bin/br.test
# build key locker
GO111MODULE=on go build ${RACEFLAG} -o bin/locker tests/br_key_locked/*.go
# build gc
GO111MODULE=on go build ${RACEFLAG} -o bin/gc tests/br_z_gc_safepoint/*.go
# build rawkv client
GO111MODULE=on go build ${RACEFLAG} -o bin/rawkv tests/br_rawkv/*.go

test:
GO111MODULE=on go test ${RACEFLAG} -tags leak ./...

testcover: tools
-o bin/br.test && \
GO111MODULE=on go build ${RACEFLAG} -o bin/locker tests/br_key_locked/*.go && \
GO111MODULE=on go build ${RACEFLAG} -o bin/gc tests/br_z_gc_safepoint/*.go && \
GO111MODULE=on go build ${RACEFLAG} -o bin/rawkv tests/br_rawkv/*.go) || (make failpoint-disable && exit 1)
@make failpoint-disable

test: failpoint-enable
GO111MODULE=on go test ${RACEFLAG} -tags leak ./... || ( make failpoint-disable && exit 1 )
@make failpoint-disable

testcover: tools failpoint-enable
GO111MODULE=on tools/bin/overalls \
-project=$(BR_PKG) \
-covermode=count \
-ignore='.git,vendor,tests,_tools,docker' \
-debug \
-- -coverpkg=./...
-- -coverpkg=./... || ( make failpoint-disable && exit 1 )

integration_test: build build_for_integration_test
@which bin/tidb-server
Expand Down Expand Up @@ -103,4 +103,10 @@ tidy:
GO111MODULE=on go mod tidy
git diff --quiet go.mod go.sum

failpoint-enable: tools
tools/bin/failpoint-ctl enable

failpoint-disable: tools
tools/bin/failpoint-ctl disable

.PHONY: tools
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/google/uuid v1.1.1
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/failpoint v0.0.0-20200506114213-c17f16071c53
github.com/pingcap/kvproto v0.0.0-20200509065137-6a4d5c264a8b
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/parser v0.0.0-20200507022230-f3bf29096657
Expand All @@ -26,6 +27,7 @@ require (
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.15.0
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6
google.golang.org/api v0.15.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798 h1:6DMbRqPI1qzQ8N1xc3+nKY8IxSACd9VqQKkRVvbyoIg=
github.com/pingcap/failpoint v0.0.0-20200210140405-f8f9fb234798/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/failpoint v0.0.0-20200506114213-c17f16071c53 h1:8sC8OLinmaw24xLeeJlYBFvUBsOiOYBtNqTuVOTnynQ=
github.com/pingcap/failpoint v0.0.0-20200506114213-c17f16071c53/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d h1:rCmRK0lCRrHMUbS99BKFYhK9YxJDNw0xB033cQbYo0s=
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89AxUoaAar4JjrhUkVDt0o0Np6V8XbDQ=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
Expand Down
154 changes: 123 additions & 31 deletions pkg/restore/split_client.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@ import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/pd/v4/server/schedule/placement"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const (
splitRegionMaxRetryTime = 4
)

// SplitClient is an external client used by RegionSplitter.
type SplitClient interface {
// GetStore gets a store by a store id.
Expand Down Expand Up @@ -186,48 +195,131 @@ func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key
}, nil
}

func (c *pdClient) BatchSplitRegions(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) ([]*RegionInfo, error) {
var peer *metapb.Peer
if regionInfo.Leader != nil {
peer = regionInfo.Leader
} else {
if len(regionInfo.Region.Peers) == 0 {
return nil, errors.New("region does not have peer")
func splitRegionWithFailpoint(
ctx context.Context,
regionInfo *RegionInfo,
peer *metapb.Peer,
client tikvpb.TikvClient,
keys [][]byte,
) (*kvrpcpb.SplitRegionResponse, error) {
failpoint.Inject("not-leader-error", func(injectNewLeader failpoint.Value) {
log.Debug("failpoint not-leader-error injected.")
resp := &kvrpcpb.SplitRegionResponse{
RegionError: &errorpb.Error{
NotLeader: &errorpb.NotLeader{
RegionId: regionInfo.Region.Id,
},
},
}
peer = regionInfo.Region.Peers[0]
}

storeID := peer.GetStoreId()
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, err
}
opt := grpc.WithInsecure()
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
if err != nil {
return nil, err
}
defer conn.Close()
client := tikvpb.NewTikvClient(conn)
resp, err := client.SplitRegion(ctx, &kvrpcpb.SplitRegionRequest{
if injectNewLeader.(bool) {
resp.RegionError.NotLeader.Leader = regionInfo.Leader
}
failpoint.Return(resp, nil)
})
failpoint.Inject("somewhat-retryable-error", func() {
log.Debug("failpoint somewhat-retryable-error injected.")
failpoint.Return(&kvrpcpb.SplitRegionResponse{
RegionError: &errorpb.Error{
ServerIsBusy: &errorpb.ServerIsBusy{},
},
}, nil)
})
return client.SplitRegion(ctx, &kvrpcpb.SplitRegionRequest{
Context: &kvrpcpb.Context{
RegionId: regionInfo.Region.Id,
RegionEpoch: regionInfo.Region.RegionEpoch,
Peer: peer,
},
SplitKeys: keys,
})
}

func (c *pdClient) sendSplitRegionRequest(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) (*kvrpcpb.SplitRegionResponse, error) {
var splitErrors error
for i := 0; i < splitRegionMaxRetryTime; i++ {
var peer *metapb.Peer
if regionInfo.Leader != nil {
peer = regionInfo.Leader
} else {
if len(regionInfo.Region.Peers) == 0 {
return nil, multierr.Append(splitErrors,
errors.Errorf("region[%d] doesn't have any peer", regionInfo.Region.GetId()))
}
peer = regionInfo.Region.Peers[0]
}
storeID := peer.GetStoreId()
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
opt := grpc.WithInsecure()
if c.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(c.tlsConf))
}
conn, err := grpc.Dial(store.GetAddress(), opt)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
defer conn.Close()
client := tikvpb.NewTikvClient(conn)
resp, err := splitRegionWithFailpoint(ctx, regionInfo, peer, client, keys)
if err != nil {
return nil, multierr.Append(splitErrors, err)
}
if resp.RegionError != nil {
splitErrors = multierr.Append(splitErrors,
errors.Errorf("split region failed: region=%v, err=%v",
regionInfo.Region, resp.RegionError))
if nl := resp.RegionError.NotLeader; nl != nil {
if leader := nl.GetLeader(); leader != nil {
regionInfo.Leader = leader
} else {
newRegionInfo, findLeaderErr := c.GetRegionByID(ctx, nl.RegionId)
if findLeaderErr != nil {
return nil, multierr.Append(splitErrors, findLeaderErr)
}
if !checkRegionEpoch(newRegionInfo, regionInfo) {
return nil, multierr.Append(splitErrors, ErrEpochNotMatch)
}
log.Info("find new leader", zap.Uint64("new leader", newRegionInfo.Leader.Id))
regionInfo = newRegionInfo
}
log.Info("split region meet not leader error, retrying",
zap.Int("retry times", i),
zap.Uint64("regionID", regionInfo.Region.Id),
zap.Any("new leader", regionInfo.Leader),
)
continue
}
// TODO: we don't handle RegionNotMatch and RegionNotFound here,
// because I think we don't have enough information to retry.
// But maybe we can handle them here by some information the error itself provides.
if resp.RegionError.ServerIsBusy != nil ||
resp.RegionError.StaleCommand != nil {
log.Warn("a error occurs on split region",
zap.Int("retry times", i),
zap.Uint64("regionID", regionInfo.Region.Id),
zap.String("error", resp.RegionError.Message),
zap.Any("error verbose", resp.RegionError),
)
continue
}
return nil, splitErrors
}
return resp, nil
}
return nil, splitErrors
}

func (c *pdClient) BatchSplitRegions(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) ([]*RegionInfo, error) {
resp, err := c.sendSplitRegionRequest(ctx, regionInfo, keys)
if err != nil {
return nil, err
}
if resp.RegionError != nil {
return nil, errors.Errorf("split region failed: region=%v, err=%v", regionInfo.Region, resp.RegionError)
}

regions := resp.GetRegions()
newRegionInfos := make([]*RegionInfo, 0, len(regions))
Expand Down
84 changes: 84 additions & 0 deletions tests/br_split_region_fail/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/bin/sh
#
# Copyright 2020 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
DB="$TEST_NAME"
TABLE="usertable"
LOG="not-leader.log"
DB_COUNT=3

for i in $(seq $DB_COUNT); do
run_sql "CREATE DATABASE $DB${i};"
go-ycsb load mysql -P tests/$TEST_NAME/workload -p mysql.host=$TIDB_IP -p mysql.port=$TIDB_PORT -p mysql.user=root -p mysql.db=$DB${i}
done

for i in $(seq $DB_COUNT); do
row_count_ori[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}')
done


# backup full
echo "backup start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4

rm -f $LOG

for i in $(seq $DB_COUNT); do
run_sql "DROP DATABASE $DB${i};"
done


# restore full
echo "restore start..."

unset BR_LOG_TO_TERM
GO_FAILPOINTS="github.com/pingcap/br/pkg/restore/not-leader-error=1*return(true)->1*return(false);\
github.com/pingcap/br/pkg/restore/somewhat-retryable-error=3*return(true)" \
run_br restore full -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --ratelimit 1024 --log-file $LOG || true
BR_LOG_TO_TERM=1

grep "a error occurs on split region" $LOG && \
grep "split region meet not leader error" $LOG && \
grep "Full restore Success" $LOG && \
grep "find new leader" $LOG

if [ $? -ne 0 ]; then
echo "failed to retry on failpoint."
echo "full log:"
cat $LOG
exit 1
fi

for i in $(seq $DB_COUNT); do
row_count_new[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}')
done

fail=false
for i in $(seq $DB_COUNT); do
if [ "${row_count_ori[i]}" != "${row_count_new[i]}" ];then
fail=true
echo "TEST: [$TEST_NAME] fail on database $DB${i}"
fi
echo "database $DB${i} [original] row count: ${row_count_ori[i]}, [after br] row count: ${row_count_new[i]}"
done

if $fail; then
echo "TEST: [$TEST_NAME] failed!"
exit 1
fi

echo "TEST $TEST_NAME passed."


12 changes: 12 additions & 0 deletions tests/br_split_region_fail/workload
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
recordcount=1000
operationcount=0
workload=core

readallfields=true

readproportion=0
updateproportion=0
scanproportion=0
insertproportion=0

requestdistribution=uniform
5 changes: 4 additions & 1 deletion tools/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
all: bin/goimports bin/govet bin/revive bin/overalls bin/golangci-lint
all: bin/goimports bin/govet bin/revive bin/overalls bin/golangci-lint bin/failpoint-ctl

bin/goimports:
go build -o $@ golang.org/x/tools/cmd/goimports
Expand All @@ -14,3 +14,6 @@ bin/overalls:

bin/golangci-lint:
go build -o $@ github.com/golangci/golangci-lint/cmd/golangci-lint

bin/failpoint-ctl:
go build -o $@ github.com/pingcap/failpoint/failpoint-ctl
1 change: 1 addition & 0 deletions tools/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/go-playground/overalls v0.0.0-20191218162659-7df9f728c018
github.com/golangci/golangci-lint v1.26.0
github.com/mgechev/revive v1.0.2
github.com/pingcap/failpoint v0.0.0-20200506114213-c17f16071c53
github.com/yookoala/realpath v1.0.0 // indirect
golang.org/x/tools v0.0.0-20200422205258-72e4a01eba43
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
Expand Down
Loading

0 comments on commit c4fbcbd

Please sign in to comment.