Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: add retry mechanism for join #1643

Merged
merged 5 commits into from
Jul 22, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 48 additions & 16 deletions server/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"os"
"path"
"strings"
"time"

"github.com/pingcap/failpoint"
log "github.com/pingcap/log"
"github.com/pingcap/pd/pkg/etcdutil"
"github.com/pkg/errors"
Expand All @@ -35,6 +37,9 @@ const (
privateDirMode = 0700
)

// listMemberRetryTimes is the retry times of list member.
var listMemberRetryTimes = 20

// PrepareJoinCluster sends MemberAdd command to PD cluster,
// and returns the initial configuration of the PD cluster.
//
Expand Down Expand Up @@ -137,31 +142,58 @@ func PrepareJoinCluster(cfg *Config) error {
return errors.New("missing data or join a duplicated pd")
}

var addResp *clientv3.MemberAddResponse

failpoint.Inject("add-member-failed", func() {
listMemberRetryTimes = 2
failpoint.Goto("LabelSkipAddMember")
})
// - A new PD joins an existing cluster.
// - A deleted PD joins to previous cluster.
addResp, err := etcdutil.AddEtcdMember(client, []string{cfg.AdvertisePeerUrls})
if err != nil {
return err
{
// First adds member through the API
addResp, err = etcdutil.AddEtcdMember(client, []string{cfg.AdvertisePeerUrls})
if err != nil {
return err
}
}
failpoint.Label("LabelSkipAddMember")

listResp, err = etcdutil.ListEtcdMembers(client)
if err != nil {
return err
}
var (
pds []string
listSucc bool
)

pds := []string{}
for _, memb := range listResp.Members {
n := memb.Name
if memb.ID == addResp.Member.ID {
n = cfg.Name
for i := 0; i < listMemberRetryTimes; i++ {
listResp, err = etcdutil.ListEtcdMembers(client)
if err != nil {
return err
}
if len(n) == 0 {
return errors.New("there is a member that has not joined successfully")

pds = []string{}
for _, memb := range listResp.Members {
n := memb.Name
if addResp != nil && memb.ID == addResp.Member.ID {
n = cfg.Name
listSucc = true
}
if len(n) == 0 {
return errors.New("there is a member that has not joined successfully")
}
for _, m := range memb.PeerURLs {
pds = append(pds, fmt.Sprintf("%s=%s", n, m))
}
}
for _, m := range memb.PeerURLs {
pds = append(pds, fmt.Sprintf("%s=%s", n, m))

if listSucc {
break
}
time.Sleep(500 * time.Millisecond)
}
if !listSucc {
return errors.Errorf("join failed, adds the new member %s may failed", cfg.Name)
}

initialCluster = strings.Join(pds, ",")
cfg.InitialCluster = initialCluster
cfg.InitialClusterState = embed.ClusterStateFlagExisting
Expand Down
53 changes: 53 additions & 0 deletions tests/server/join/join_fail/join_fail_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package join_fail_test

import (
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/tests"
)

func Test(t *testing.T) {
TestingT(t)
}

var _ = Suite(&serverTestSuite{})

type serverTestSuite struct{}

func (s *serverTestSuite) SetUpSuite(c *C) {
server.EnableZap = true
}

func (s *serverTestSuite) TestFailedPDJoinInStep1(c *C) {
cluster, err := tests.NewTestCluster(1)
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()

// Join the second PD.
c.Assert(failpoint.Enable("github.com/pingcap/pd/server/add-member-failed", `return`), IsNil)
_, err = cluster.Join()
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), "join failed"), IsTrue)
c.Assert(failpoint.Disable("github.com/pingcap/pd/server/add-member-failed"), IsNil)
}