Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pump client: increase retry time, and refine some code #158

Merged
merged 31 commits into from
Jan 10, 2019
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"os"

_ "github.com/go-sql-driver/mysql"
"github.com/ngaut/log"
"github.com/pingcap/tidb-tools/pkg/check"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/utils"
log "github.com/sirupsen/logrus"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion pkg/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"strings"
"sync"

"github.com/ngaut/log"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/dbutil"
log "github.com/sirupsen/logrus"
)

// TableInstance record a table instance
Expand Down
2 changes: 1 addition & 1 deletion pkg/diff/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
package diff

import (
"log"
"strconv"

"github.com/ngaut/log"
"github.com/pingcap/parser/model"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/diff/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package diff
import (
"math/rand"

"github.com/ngaut/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb/types"
log "github.com/sirupsen/logrus"
)

func equalStrings(str1, str2 []string) bool {
Expand Down
19 changes: 11 additions & 8 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ func (e *Client) Create(ctx context.Context, key string, val string, opts []clie
}

// Get returns a key/value matchs the given key
func (e *Client) Get(ctx context.Context, key string) ([]byte, error) {
func (e *Client) Get(ctx context.Context, key string) (value []byte, revision int64, err error) {
key = keyWithPrefix(e.rootPath, key)
resp, err := e.client.KV.Get(ctx, key)
if err != nil {
return nil, errors.Trace(err)
return nil, -1, errors.Trace(err)
}

if len(resp.Kvs) == 0 {
return nil, errors.NotFoundf("key %s in etcd", key)
return nil, -1, errors.NotFoundf("key %s in etcd", key)
}

return resp.Kvs[0].Value, nil
return resp.Kvs[0].Value, resp.Header.Revision, nil
}

// Update updates a key/value.
Expand Down Expand Up @@ -156,15 +156,15 @@ func (e *Client) UpdateOrCreate(ctx context.Context, key string, val string, ttl
}

// List returns the trie struct that constructed by the key/value with same prefix
func (e *Client) List(ctx context.Context, key string) (*Node, error) {
func (e *Client) List(ctx context.Context, key string) (node *Node, revision int64, err error) {
key = keyWithPrefix(e.rootPath, key)
if !strings.HasSuffix(key, "/") {
key += "/"
}

resp, err := e.client.KV.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
return nil, errors.Trace(err)
return nil, -1, errors.Trace(err)
}

root := new(Node)
Expand All @@ -180,7 +180,7 @@ func (e *Client) List(ctx context.Context, key string) (*Node, error) {
tailNode.Value = kv.Value
}

return root, nil
return root, resp.Header.Revision, nil
IANTHEREAL marked this conversation as resolved.
Show resolved Hide resolved
}

// Delete deletes the key/values with matching prefix or key
Expand All @@ -200,7 +200,10 @@ func (e *Client) Delete(ctx context.Context, key string, withPrefix bool) error
}

// Watch watchs the events of key with prefix.
func (e *Client) Watch(ctx context.Context, prefix string) clientv3.WatchChan {
func (e *Client) Watch(ctx context.Context, prefix string, revision int64) clientv3.WatchChan {
if revision > 0 {
return e.client.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
}
return e.client.Watch(ctx, prefix, clientv3.WithPrefix())
}

Expand Down
26 changes: 19 additions & 7 deletions pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/pingcap/check"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
)
Expand Down Expand Up @@ -71,7 +72,7 @@ func (t *testEtcdSuite) TestCreateWithTTL(c *C) {
c.Assert(err, IsNil)

time.Sleep(2 * time.Second)
_, err = etcdCli.Get(ctx, key)
_, _, err = etcdCli.Get(ctx, key)
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(errors.IsNotFound(err), IsTrue)
}

Expand Down Expand Up @@ -99,19 +100,25 @@ func (t *testEtcdSuite) TestUpdate(c *C) {
err = etcdCli.Create(ctx, key, obj1, opts)
c.Assert(err, IsNil)

res, revision1, err := etcdCli.Get(ctx, key)
c.Assert(err, IsNil)
c.Assert(string(res), Equals, obj1)

time.Sleep(time.Second)

err = etcdCli.Update(ctx, key, obj2, 3)
c.Assert(err, IsNil)

time.Sleep(2 * time.Second)

res, err := etcdCli.Get(ctx, key)
// the new revision should greater than the old
res, revision2, err := etcdCli.Get(ctx, key)
c.Assert(err, IsNil)
c.Assert(string(res), Equals, obj2)
c.Assert(revision2, check.Greater, revision1)

time.Sleep(2 * time.Second)
res, err = etcdCli.Get(ctx, key)
res, _, err = etcdCli.Get(ctx, key)
c.Assert(errors.IsNotFound(err), IsTrue)
}

Expand Down Expand Up @@ -142,12 +149,17 @@ func (t *testEtcdSuite) TestList(c *C) {
err = etcdCli.Create(ctx, k11, k11, nil)
c.Assert(err, IsNil)

root, err := etcdCli.List(ctx, key)
root, revision1, err := etcdCli.List(ctx, key)
c.Assert(err, IsNil)
c.Assert(string(root.Childs["level1"].Value), Equals, k1)
c.Assert(string(root.Childs["level1"].Childs["level1"].Value), Equals, k11)
c.Assert(string(root.Childs["level2"].Value), Equals, k2)
c.Assert(string(root.Childs["level3"].Value), Equals, k3)

// the revision of list should equal to the latest update's revision
_, revision2, err := etcdCli.Get(ctx, k11)
c.Assert(err, IsNil)
c.Assert(revision1, Equals, revision2)
}

func (t *testEtcdSuite) TestDelete(c *C) {
Expand All @@ -158,21 +170,21 @@ func (t *testEtcdSuite) TestDelete(c *C) {
c.Assert(err, IsNil)
}

root, err := etcdCli.List(ctx, key)
root, _, err := etcdCli.List(ctx, key)
c.Assert(err, IsNil)
c.Assert(root.Childs, HasLen, 2)

err = etcdCli.Delete(ctx, keys[1], false)
c.Assert(err, IsNil)

root, err = etcdCli.List(ctx, key)
root, _, err = etcdCli.List(ctx, key)
c.Assert(err, IsNil)
c.Assert(root.Childs, HasLen, 1)

err = etcdCli.Delete(ctx, key, true)
c.Assert(err, IsNil)

root, err = etcdCli.List(ctx, key)
root, _, err = etcdCli.List(ctx, key)
c.Assert(err, IsNil)
c.Assert(root.Childs, HasLen, 0)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"time"

"github.com/juju/errors"
"github.com/pingcap/errors"
"github.com/siddontang/go/sync2"
)

Expand Down
6 changes: 3 additions & 3 deletions tidb-binlog/binlogctl/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func queryNodesByKind(urls string, kind string) error {
return errors.Trace(err)
}

nodes, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
if err != nil {
return errors.Trace(err)
}
Expand All @@ -60,7 +60,7 @@ func updateNodeState(urls, kind, nodeID, state string) error {
return errors.Trace(err)
}

nodes, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -101,7 +101,7 @@ func applyAction(urls, kind, nodeID string, action string) error {
return errors.Trace(err)
}

nodes, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
if err != nil {
return errors.Trace(err)
}
Expand Down
31 changes: 16 additions & 15 deletions tidb-binlog/node/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,36 +38,37 @@ func (r *EtcdRegistry) prefixed(p ...string) string {
}

// Node returns the nodeStatus that matchs nodeID in the etcd
func (r *EtcdRegistry) Node(pctx context.Context, prefix, nodeID string) (*Status, error) {
func (r *EtcdRegistry) Node(pctx context.Context, prefix, nodeID string) (status *Status, revision int64, err error) {
ctx, cancel := context.WithTimeout(pctx, r.reqTimeout)
defer cancel()

data, err := r.client.Get(ctx, r.prefixed(prefix, nodeID))
data, revision, err := r.client.Get(ctx, r.prefixed(prefix, nodeID))
if err != nil {
return nil, errors.Trace(err)
return nil, -1, errors.Trace(err)
}

status := &Status{}
if err = json.Unmarshal(data, &status); err != nil {
return nil, errors.Annotatef(err, "Invalid nodeID(%s)", nodeID)
return nil, -1, errors.Annotatef(err, "Invalid nodeID(%s)", nodeID)
}
return status, nil
return status, revision, nil
}

// Nodes retruns all the nodeStatuses in the etcd
func (r *EtcdRegistry) Nodes(pctx context.Context, prefix string) ([]*Status, error) {
func (r *EtcdRegistry) Nodes(pctx context.Context, prefix string) (status []*Status, revision int64, err error) {
ctx, cancel := context.WithTimeout(pctx, r.reqTimeout)
defer cancel()

resp, err := r.client.List(ctx, r.prefixed(prefix))
resp, revision, err := r.client.List(ctx, r.prefixed(prefix))
if err != nil {
return nil, errors.Trace(err)
return nil, -1, errors.Trace(err)
}
status, err := NodesStatusFromEtcdNode(resp)

status, err = NodesStatusFromEtcdNode(resp)
if err != nil {
return nil, errors.Trace(err)
return nil, -1, errors.Trace(err)
}
return status, nil

return status, revision, nil
}

// UpdateNode update the node information.
Expand All @@ -88,7 +89,7 @@ func (r *EtcdRegistry) UpdateNode(pctx context.Context, prefix string, status *S
}

func (r *EtcdRegistry) checkNodeExists(ctx context.Context, prefix, nodeID string) (bool, error) {
_, err := r.client.Get(ctx, r.prefixed(prefix, nodeID))
_, _, err := r.client.Get(ctx, r.prefixed(prefix, nodeID))
if err != nil {
if errors.IsNotFound(err) {
return false, nil
Expand Down Expand Up @@ -119,8 +120,8 @@ func (r *EtcdRegistry) createNode(ctx context.Context, prefix string, status *St
}

// WatchNode watchs node's event
func (r *EtcdRegistry) WatchNode(pctx context.Context, prefix string) clientv3.WatchChan {
return r.client.Watch(pctx, prefix)
func (r *EtcdRegistry) WatchNode(pctx context.Context, prefix string, revision int64) clientv3.WatchChan {
return r.client.Watch(pctx, prefix, revision)
}

func nodeStatusFromEtcdNode(id string, node *etcd.Node) (*Status, error) {
Expand Down
Loading