diff --git a/checker/main.go b/checker/main.go index 4802a4c2b..4f89d93a8 100644 --- a/checker/main.go +++ b/checker/main.go @@ -20,10 +20,10 @@ import ( "os" _ "github.com/go-sql-driver/mysql" - "github.com/ngaut/log" "github.com/pingcap/tidb-tools/pkg/check" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/utils" + log "github.com/sirupsen/logrus" ) var ( diff --git a/pkg/diff/diff.go b/pkg/diff/diff.go index 764dd22cd..c17bf9d07 100644 --- a/pkg/diff/diff.go +++ b/pkg/diff/diff.go @@ -22,10 +22,10 @@ import ( "strings" "sync" - "github.com/ngaut/log" "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/dbutil" + log "github.com/sirupsen/logrus" ) // TableInstance record a table instance diff --git a/pkg/diff/merge.go b/pkg/diff/merge.go index 7c2a540f1..610ec8656 100644 --- a/pkg/diff/merge.go +++ b/pkg/diff/merge.go @@ -14,9 +14,9 @@ package diff import ( + "log" "strconv" - "github.com/ngaut/log" "github.com/pingcap/parser/model" ) diff --git a/pkg/diff/util.go b/pkg/diff/util.go index fb92b7f26..aa1bab544 100644 --- a/pkg/diff/util.go +++ b/pkg/diff/util.go @@ -16,10 +16,10 @@ package diff import ( "math/rand" - "github.com/ngaut/log" "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb/types" + log "github.com/sirupsen/logrus" ) func equalStrings(str1, str2 []string) bool { diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index beb866c2d..2df814ce2 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -89,18 +89,18 @@ func (e *Client) Create(ctx context.Context, key string, val string, opts []clie } // Get returns a key/value matchs the given key -func (e *Client) Get(ctx context.Context, key string) ([]byte, error) { +func (e *Client) Get(ctx context.Context, key string) (value []byte, revision int64, err error) { key = keyWithPrefix(e.rootPath, key) resp, err := e.client.KV.Get(ctx, key) if err != nil { - return nil, errors.Trace(err) + return nil, -1, errors.Trace(err) } if len(resp.Kvs) == 0 { - return nil, errors.NotFoundf("key %s in etcd", key) + return nil, -1, errors.NotFoundf("key %s in etcd", key) } - return resp.Kvs[0].Value, nil + return resp.Kvs[0].Value, resp.Header.Revision, nil } // Update updates a key/value. @@ -156,7 +156,7 @@ func (e *Client) UpdateOrCreate(ctx context.Context, key string, val string, ttl } // List returns the trie struct that constructed by the key/value with same prefix -func (e *Client) List(ctx context.Context, key string) (*Node, error) { +func (e *Client) List(ctx context.Context, key string) (node *Node, revision int64, err error) { key = keyWithPrefix(e.rootPath, key) if !strings.HasSuffix(key, "/") { key += "/" @@ -164,7 +164,7 @@ func (e *Client) List(ctx context.Context, key string) (*Node, error) { resp, err := e.client.KV.Get(ctx, key, clientv3.WithPrefix()) if err != nil { - return nil, errors.Trace(err) + return nil, -1, errors.Trace(err) } root := new(Node) @@ -180,7 +180,7 @@ func (e *Client) List(ctx context.Context, key string) (*Node, error) { tailNode.Value = kv.Value } - return root, nil + return root, resp.Header.Revision, nil } // Delete deletes the key/values with matching prefix or key @@ -200,7 +200,10 @@ func (e *Client) Delete(ctx context.Context, key string, withPrefix bool) error } // Watch watchs the events of key with prefix. -func (e *Client) Watch(ctx context.Context, prefix string) clientv3.WatchChan { +func (e *Client) Watch(ctx context.Context, prefix string, revision int64) clientv3.WatchChan { + if revision > 0 { + return e.client.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision)) + } return e.client.Watch(ctx, prefix, clientv3.WithPrefix()) } diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index bae15a365..fcaf0418f 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -20,6 +20,7 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/integration" + "github.com/pingcap/check" . "github.com/pingcap/check" "github.com/pingcap/errors" ) @@ -71,7 +72,7 @@ func (t *testEtcdSuite) TestCreateWithTTL(c *C) { c.Assert(err, IsNil) time.Sleep(2 * time.Second) - _, err = etcdCli.Get(ctx, key) + _, _, err = etcdCli.Get(ctx, key) c.Assert(errors.IsNotFound(err), IsTrue) } @@ -99,6 +100,10 @@ func (t *testEtcdSuite) TestUpdate(c *C) { err = etcdCli.Create(ctx, key, obj1, opts) c.Assert(err, IsNil) + res, revision1, err := etcdCli.Get(ctx, key) + c.Assert(err, IsNil) + c.Assert(string(res), Equals, obj1) + time.Sleep(time.Second) err = etcdCli.Update(ctx, key, obj2, 3) @@ -106,12 +111,14 @@ func (t *testEtcdSuite) TestUpdate(c *C) { time.Sleep(2 * time.Second) - res, err := etcdCli.Get(ctx, key) + // the new revision should greater than the old + res, revision2, err := etcdCli.Get(ctx, key) c.Assert(err, IsNil) c.Assert(string(res), Equals, obj2) + c.Assert(revision2, check.Greater, revision1) time.Sleep(2 * time.Second) - res, err = etcdCli.Get(ctx, key) + res, _, err = etcdCli.Get(ctx, key) c.Assert(errors.IsNotFound(err), IsTrue) } @@ -142,12 +149,17 @@ func (t *testEtcdSuite) TestList(c *C) { err = etcdCli.Create(ctx, k11, k11, nil) c.Assert(err, IsNil) - root, err := etcdCli.List(ctx, key) + root, revision1, err := etcdCli.List(ctx, key) c.Assert(err, IsNil) c.Assert(string(root.Childs["level1"].Value), Equals, k1) c.Assert(string(root.Childs["level1"].Childs["level1"].Value), Equals, k11) c.Assert(string(root.Childs["level2"].Value), Equals, k2) c.Assert(string(root.Childs["level3"].Value), Equals, k3) + + // the revision of list should equal to the latest update's revision + _, revision2, err := etcdCli.Get(ctx, k11) + c.Assert(err, IsNil) + c.Assert(revision1, Equals, revision2) } func (t *testEtcdSuite) TestDelete(c *C) { @@ -158,21 +170,21 @@ func (t *testEtcdSuite) TestDelete(c *C) { c.Assert(err, IsNil) } - root, err := etcdCli.List(ctx, key) + root, _, err := etcdCli.List(ctx, key) c.Assert(err, IsNil) c.Assert(root.Childs, HasLen, 2) err = etcdCli.Delete(ctx, keys[1], false) c.Assert(err, IsNil) - root, err = etcdCli.List(ctx, key) + root, _, err = etcdCli.List(ctx, key) c.Assert(err, IsNil) c.Assert(root.Childs, HasLen, 1) err = etcdCli.Delete(ctx, key, true) c.Assert(err, IsNil) - root, err = etcdCli.List(ctx, key) + root, _, err = etcdCli.List(ctx, key) c.Assert(err, IsNil) c.Assert(root.Childs, HasLen, 0) } diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index d581a1142..30b7f6f58 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -7,7 +7,7 @@ import ( "sync" "time" - "github.com/juju/errors" + "github.com/pingcap/errors" "github.com/siddontang/go/sync2" ) diff --git a/tidb-binlog/binlogctl/nodes.go b/tidb-binlog/binlogctl/nodes.go index db0cbc033..034d67929 100644 --- a/tidb-binlog/binlogctl/nodes.go +++ b/tidb-binlog/binlogctl/nodes.go @@ -37,7 +37,7 @@ func queryNodesByKind(urls string, kind string) error { return errors.Trace(err) } - nodes, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) + nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) if err != nil { return errors.Trace(err) } @@ -60,7 +60,7 @@ func updateNodeState(urls, kind, nodeID, state string) error { return errors.Trace(err) } - nodes, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) + nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) if err != nil { return errors.Trace(err) } @@ -101,7 +101,7 @@ func applyAction(urls, kind, nodeID string, action string) error { return errors.Trace(err) } - nodes, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) + nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind]) if err != nil { return errors.Trace(err) } diff --git a/tidb-binlog/node/registry.go b/tidb-binlog/node/registry.go index 9637e9c41..aeb0828ab 100644 --- a/tidb-binlog/node/registry.go +++ b/tidb-binlog/node/registry.go @@ -38,36 +38,37 @@ func (r *EtcdRegistry) prefixed(p ...string) string { } // Node returns the nodeStatus that matchs nodeID in the etcd -func (r *EtcdRegistry) Node(pctx context.Context, prefix, nodeID string) (*Status, error) { +func (r *EtcdRegistry) Node(pctx context.Context, prefix, nodeID string) (status *Status, revision int64, err error) { ctx, cancel := context.WithTimeout(pctx, r.reqTimeout) defer cancel() - data, err := r.client.Get(ctx, r.prefixed(prefix, nodeID)) + data, revision, err := r.client.Get(ctx, r.prefixed(prefix, nodeID)) if err != nil { - return nil, errors.Trace(err) + return nil, -1, errors.Trace(err) } - status := &Status{} if err = json.Unmarshal(data, &status); err != nil { - return nil, errors.Annotatef(err, "Invalid nodeID(%s)", nodeID) + return nil, -1, errors.Annotatef(err, "Invalid nodeID(%s)", nodeID) } - return status, nil + return status, revision, nil } // Nodes retruns all the nodeStatuses in the etcd -func (r *EtcdRegistry) Nodes(pctx context.Context, prefix string) ([]*Status, error) { +func (r *EtcdRegistry) Nodes(pctx context.Context, prefix string) (status []*Status, revision int64, err error) { ctx, cancel := context.WithTimeout(pctx, r.reqTimeout) defer cancel() - resp, err := r.client.List(ctx, r.prefixed(prefix)) + resp, revision, err := r.client.List(ctx, r.prefixed(prefix)) if err != nil { - return nil, errors.Trace(err) + return nil, -1, errors.Trace(err) } - status, err := NodesStatusFromEtcdNode(resp) + + status, err = NodesStatusFromEtcdNode(resp) if err != nil { - return nil, errors.Trace(err) + return nil, -1, errors.Trace(err) } - return status, nil + + return status, revision, nil } // UpdateNode update the node information. @@ -88,7 +89,7 @@ func (r *EtcdRegistry) UpdateNode(pctx context.Context, prefix string, status *S } func (r *EtcdRegistry) checkNodeExists(ctx context.Context, prefix, nodeID string) (bool, error) { - _, err := r.client.Get(ctx, r.prefixed(prefix, nodeID)) + _, _, err := r.client.Get(ctx, r.prefixed(prefix, nodeID)) if err != nil { if errors.IsNotFound(err) { return false, nil @@ -119,8 +120,8 @@ func (r *EtcdRegistry) createNode(ctx context.Context, prefix string, status *St } // WatchNode watchs node's event -func (r *EtcdRegistry) WatchNode(pctx context.Context, prefix string) clientv3.WatchChan { - return r.client.Watch(pctx, prefix) +func (r *EtcdRegistry) WatchNode(pctx context.Context, prefix string, revision int64) clientv3.WatchChan { + return r.client.Watch(pctx, prefix, revision) } func nodeStatusFromEtcdNode(id string, node *etcd.Node) (*Status, error) { diff --git a/tidb-binlog/pump_client/client.go b/tidb-binlog/pump_client/client.go index ed573a557..e98b83ecf 100644 --- a/tidb-binlog/pump_client/client.go +++ b/tidb-binlog/pump_client/client.go @@ -30,26 +30,22 @@ import ( "github.com/pingcap/tidb-tools/tidb-binlog/node" pb "github.com/pingcap/tipb/go-binlog" log "github.com/sirupsen/logrus" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( // DefaultEtcdTimeout is the default timeout config for etcd. DefaultEtcdTimeout = 5 * time.Second - // DefaultAllRetryTime is the default retry time for all pumps, should greter than RetryTime. - DefaultAllRetryTime = 20 - - // RetryTime is the retry time for each pump. - RetryTime = 5 + // DefaultRetryTime is the default retry time for each pump. + DefaultRetryTime = 10 // DefaultBinlogWriteTimeout is the default max time binlog can use to write to pump. DefaultBinlogWriteTimeout = 15 * time.Second // CheckInterval is the default interval for check unavaliable pumps. CheckInterval = 30 * time.Second - - // RetryInterval is the default interval of retrying to write binlog. - RetryInterval = 100 * time.Millisecond ) var ( @@ -59,8 +55,11 @@ var ( // ErrNoAvaliablePump means no avaliable pump to write binlog. ErrNoAvaliablePump = errors.New("no avaliable pump to write binlog") - // CommitBinlogMaxRetryTime is the max retry duration time for write commit binlog. - CommitBinlogMaxRetryTime = 10 * time.Minute + // CommitBinlogTimeout is the max retry duration time for write commit/rollback binlog. + CommitBinlogTimeout = 10 * time.Minute + + // RetryInterval is the interval of retrying to write binlog. + RetryInterval = 100 * time.Millisecond ) // PumpInfos saves pumps' infomations in pumps client. @@ -106,7 +105,7 @@ type PumpsClient struct { // Selector will select a suitable pump. Selector PumpSelector - // the max retry time if write binlog failed. + // the max retry time if write binlog failed, obsolete now. RetryTime int // BinlogWriteTimeout is the max time binlog can use to write to pump. @@ -117,6 +116,8 @@ type PumpsClient struct { // binlog socket file path, for compatible with kafka version pump. binlogSocket string + + nodePath string } // NewPumpsClient returns a PumpsClient. @@ -153,19 +154,23 @@ func NewPumpsClient(etcdURLs string, timeout time.Duration, securityOpt pd.Secur EtcdRegistry: node.NewEtcdRegistry(cli, DefaultEtcdTimeout), Pumps: NewPumpInfos(), Selector: NewSelector(Range), - RetryTime: DefaultAllRetryTime, BinlogWriteTimeout: timeout, Security: security, + nodePath: path.Join(node.DefaultRootPath, node.NodePrefix[node.PumpNode]), } - err = newPumpsClient.getPumpStatus(ctx) + revision, err := newPumpsClient.getPumpStatus(ctx) if err != nil { return nil, errors.Trace(err) } + + if len(newPumpsClient.Pumps.Pumps) == 0 { + return nil, errors.New("no pump found in pd") + } newPumpsClient.Selector.SetPumps(copyPumps(newPumpsClient.Pumps.AvaliablePumps)) newPumpsClient.wg.Add(2) - go newPumpsClient.watchStatus() + go newPumpsClient.watchStatus(revision) go newPumpsClient.detect() return newPumpsClient, nil @@ -198,7 +203,6 @@ func NewLocalPumpsClient(etcdURLs, binlogSocket string, timeout time.Duration, s ClusterID: clusterID, Pumps: NewPumpInfos(), Selector: NewSelector(LocalUnix), - RetryTime: DefaultAllRetryTime, BinlogWriteTimeout: timeout, Security: security, binlogSocket: binlogSocket, @@ -220,10 +224,10 @@ func (c *PumpsClient) getLocalPumpStatus(pctx context.Context) { } // getPumpStatus gets all the pumps status in the etcd. -func (c *PumpsClient) getPumpStatus(pctx context.Context) error { - nodesStatus, err := c.EtcdRegistry.Nodes(pctx, node.NodePrefix[node.PumpNode]) +func (c *PumpsClient) getPumpStatus(pctx context.Context) (revision int64, err error) { + nodesStatus, revision, err := c.EtcdRegistry.Nodes(pctx, node.NodePrefix[node.PumpNode]) if err != nil { - return errors.Trace(err) + return -1, errors.Trace(err) } for _, status := range nodesStatus { @@ -231,22 +235,20 @@ func (c *PumpsClient) getPumpStatus(pctx context.Context) error { c.addPump(NewPumpStatus(status, c.Security), false) } - return nil + return revision, nil } -// WriteBinlog writes binlog to a situable pump. +// WriteBinlog writes binlog to a situable pump. Tips: will never return error for commit/rollback binlog. func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error { - pump := c.Selector.Select(binlog) - if pump == nil { - if binlog.Tp == pb.BinlogType_Prewrite { - return ErrNoAvaliablePump + var choosePump *PumpStatus + meetError := false + defer func() { + if meetError { + c.checkPumpAvaliable() } - // never return error for commit/rollback binlog - return nil - } - - Logger.Debugf("[pumps client] write binlog choose pump %s", pump.NodeID) + c.Selector.Feedback(binlog.StartTs, binlog.Tp, choosePump) + }() commitData, err := binlog.Marshal() if err != nil { @@ -255,33 +257,39 @@ func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error { req := &pb.WriteBinlogReq{ClusterID: c.ClusterID, Payload: commitData} retryTime := 0 - startTime := time.Now() + var pump *PumpStatus var resp *pb.WriteBinlogResp + startTime := time.Now() + + c.Pumps.RLock() + pumpNum := len(c.Pumps.AvaliablePumps) + c.Pumps.RUnlock() for { + if pump == nil || binlog.Tp == pb.BinlogType_Prewrite { + pump = c.Selector.Select(binlog, retryTime) + } if pump == nil { - if binlog.Tp == pb.BinlogType_Prewrite { - return ErrNoAvaliablePump - } - - // never return error for commit/rollback binlog - return nil + err = ErrNoAvaliablePump + break } - resp, err = pump.writeBinlog(req, c.BinlogWriteTimeout) + resp, err = pump.WriteBinlog(req, c.BinlogWriteTimeout) if err == nil && resp.Errmsg != "" { err = errors.New(resp.Errmsg) } if err == nil { + choosePump = pump return nil } - Logger.Errorf("[pumps client] write binlog error %v", err) + + meetError = true + Logger.Warnf("[pumps client] write binlog to pump %s (type: %s, start ts: %d, commit ts: %d, length: %d) error %v", pump.NodeID, binlog.Tp, binlog.StartTs, binlog.CommitTs, len(commitData), err) if binlog.Tp != pb.BinlogType_Prewrite { // only use one pump to write commit/rollback binlog, util write success or blocked for ten minutes. And will not return error to tidb. - if time.Since(startTime) > CommitBinlogMaxRetryTime { - Logger.Warnf("[pumps client] write commit binlog %d failed, error %v", binlog.CommitTs, err) - return nil + if time.Since(startTime) > CommitBinlogTimeout { + break } } else { if !isRetryableError(err) { @@ -289,62 +297,114 @@ func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error { return err } - // every pump can retry 5 times, if retry 5 times and still failed, set this pump unavaliable, and choose a new pump. - if (retryTime+1)%RetryTime == 0 { - c.setPumpAvaliable(pump, false) - pump = c.Selector.Next(binlog, retryTime/5+1) - Logger.Debugf("[pumps client] avaliable pumps: %v, write binlog choose pump %v", c.Pumps.AvaliablePumps, pump) + // make sure already retry every avaliable pump. + if time.Since(startTime) > c.BinlogWriteTimeout && retryTime > pumpNum { + break } - retryTime++ - if retryTime > c.RetryTime { - break + if isConnUnAvliable(err) { + // this kind of error indicate that the grpc connection is not avaliable, may be create the connection again can write success. + pump.ResetGrpcClient() } + + retryTime++ + } + + if binlog.Tp != pb.BinlogType_Prewrite { + time.Sleep(RetryInterval * 10) + } else { + time.Sleep(RetryInterval) + } + } + + Logger.Info("[pumps client] write binlog to avaliable pumps all failed, will try unavaliable pumps") + pump, err1 := c.backoffWriteBinlog(req, binlog.Tp, binlog.StartTs) + if err1 == nil { + return nil + } + choosePump = pump + + return errors.Errorf("write binlog failed, the last error %v", err) +} + +func (c *PumpsClient) backoffWriteBinlog(req *pb.WriteBinlogReq, binlogType pb.BinlogType, startTS int64) (pump *PumpStatus, err error) { + if binlogType != pb.BinlogType_Prewrite { + // never return error for commit/rollback binlog. + return nil, nil + } + + unAvaliablePumps := make([]*PumpStatus, 0, 3) + c.Pumps.RLock() + for _, pump := range c.Pumps.UnAvaliablePumps { + unAvaliablePumps = append(unAvaliablePumps, pump) + } + c.Pumps.RUnlock() + + var resp *pb.WriteBinlogResp + // send binlog to unavaliable pumps to retry again. + for _, pump := range unAvaliablePumps { + if !pump.IsUsable() { + continue } - time.Sleep(RetryInterval) + pump.ResetGrpcClient() + + resp, err = pump.WriteBinlog(req, c.BinlogWriteTimeout) + if err == nil { + if resp.Errmsg != "" { + err = errors.New(resp.Errmsg) + } else { + // if this pump can write binlog success, set this pump to avaliable. + Logger.Debugf("[pumps client] write binlog to unavaliable pump %s success, set this pump to avaliable", pump.NodeID) + c.setPumpAvaliable(pump, true) + return pump, nil + } + } } - return err + return nil, errors.New("write binlog to unavaliable pump failed") +} + +func (c *PumpsClient) checkPumpAvaliable() { + c.Pumps.RLock() + allPumps := copyPumps(c.Pumps.Pumps) + c.Pumps.RUnlock() + + for _, pump := range allPumps { + if !pump.IsUsable() { + c.setPumpAvaliable(pump, false) + } + } } // setPumpAvaliable set pump's isAvaliable, and modify UnAvaliablePumps or AvaliablePumps. func (c *PumpsClient) setPumpAvaliable(pump *PumpStatus, avaliable bool) { - pump.IsAvaliable = avaliable - if pump.IsAvaliable { - err := pump.createGrpcClient(c.Security) - if err != nil { - Logger.Errorf("[pumps client] create grpc client for pump %s failed, error: %v", pump.NodeID, err) - pump.IsAvaliable = false - return - } + c.Pumps.Lock() + defer c.Pumps.Unlock() - c.Pumps.Lock() + pump.Reset() + + if avaliable { delete(c.Pumps.UnAvaliablePumps, pump.NodeID) if _, ok := c.Pumps.Pumps[pump.NodeID]; ok { c.Pumps.AvaliablePumps[pump.NodeID] = pump } - c.Pumps.Unlock() } else { - c.Pumps.Lock() delete(c.Pumps.AvaliablePumps, pump.NodeID) if _, ok := c.Pumps.Pumps[pump.NodeID]; ok { c.Pumps.UnAvaliablePumps[pump.NodeID] = pump } - c.Pumps.Unlock() } - c.Pumps.RLock() c.Selector.SetPumps(copyPumps(c.Pumps.AvaliablePumps)) - c.Pumps.RUnlock() } // addPump add a new pump. func (c *PumpsClient) addPump(pump *PumpStatus, updateSelector bool) { c.Pumps.Lock() - if pump.State == node.Online { + if pump.IsUsable() { c.Pumps.AvaliablePumps[pump.NodeID] = pump } else { c.Pumps.UnAvaliablePumps[pump.NodeID] = pump @@ -367,7 +427,7 @@ func (c *PumpsClient) updatePump(status *node.Status) (pump *PumpStatus, avaliab if status.State == node.Online { avaliableChanged = true avaliable = true - } else if pump.Status.State == node.Online { + } else if pump.IsUsable() { avaliableChanged = true avaliable = false } @@ -379,11 +439,11 @@ func (c *PumpsClient) updatePump(status *node.Status) (pump *PumpStatus, avaliab return } -// removePump removes a pump. +// removePump removes a pump, used when pump is offline. func (c *PumpsClient) removePump(nodeID string) { c.Pumps.Lock() if pump, ok := c.Pumps.Pumps[nodeID]; ok { - pump.closeGrpcClient() + pump.Reset() } delete(c.Pumps.Pumps, nodeID) delete(c.Pumps.UnAvaliablePumps, nodeID) @@ -401,16 +461,25 @@ func (c *PumpsClient) exist(nodeID string) bool { } // watchStatus watchs pump's status in etcd. -func (c *PumpsClient) watchStatus() { +func (c *PumpsClient) watchStatus(revision int64) { defer c.wg.Done() - rootPath := path.Join(node.DefaultRootPath, node.NodePrefix[node.PumpNode]) - rch := c.EtcdRegistry.WatchNode(c.ctx, rootPath) + rch := c.EtcdRegistry.WatchNode(c.ctx, c.nodePath, revision) + for { select { case <-c.ctx.Done(): Logger.Info("[pumps client] watch status finished") return case wresp := <-rch: + if wresp.Err() != nil { + // meet error, watch from the latest revision. + Logger.Warnf("[pumps client] watch status meet error %v", wresp.Err()) + rch = c.EtcdRegistry.WatchNode(c.ctx, c.nodePath, revision) + continue + } + + revision = wresp.Header.Revision + for _, ev := range wresp.Events { status := &node.Status{} err := json.Unmarshal(ev.Kv.Value, &status) @@ -446,13 +515,18 @@ func (c *PumpsClient) watchStatus() { // detect send detect binlog to pumps with online state in UnAvaliablePumps, func (c *PumpsClient) detect() { - defer c.wg.Done() + checkTick := time.NewTicker(CheckInterval) + defer func() { + checkTick.Stop() + c.wg.Done() + }() + for { select { case <-c.ctx.Done(): Logger.Infof("[pumps client] heartbeat finished") return - default: + case <-checkTick.C: // send detect binlog to pump, if this pump can return response without error // means this pump is avaliable. needCheckPumps := make([]*PumpStatus, 0, len(c.Pumps.UnAvaliablePumps)) @@ -460,35 +534,25 @@ func (c *PumpsClient) detect() { req := &pb.WriteBinlogReq{ClusterID: c.ClusterID, Payload: nil} c.Pumps.RLock() for _, pump := range c.Pumps.UnAvaliablePumps { - if pump.Status.State == node.Online { + if pump.IsUsable() { needCheckPumps = append(needCheckPumps, pump) } } c.Pumps.RUnlock() for _, pump := range needCheckPumps { - err := pump.createGrpcClient(c.Security) - if err != nil { - Logger.Errorf("[pumps client] create grpc client for pump %s failed, error %v", pump.NodeID, errors.Trace(err)) - continue - } - if pump.Client == nil { - continue - } - - _, err = pump.writeBinlog(req, c.BinlogWriteTimeout) + _, err := pump.WriteBinlog(req, c.BinlogWriteTimeout) if err == nil { + Logger.Debugf("[pumps client] write detect binlog to unavaliable pump %s success", pump.NodeID) checkPassPumps = append(checkPassPumps, pump) } else { - Logger.Errorf("[pumps client] write detect binlog to pump %s error %v", pump.NodeID, err) + Logger.Debugf("[pumps client] write detect binlog to pump %s error %v", pump.NodeID, err) } } for _, pump := range checkPassPumps { c.setPumpAvaliable(pump, true) } - - time.Sleep(CheckInterval) } } } @@ -513,6 +577,18 @@ func isRetryableError(err error) bool { return true } +func isConnUnAvliable(err error) bool { + // Unavailable indicates the service is currently unavailable. + // This is a most likely a transient condition and may be corrected + // by retrying with a backoff. + // https://github.com/grpc/grpc-go/blob/76cc50721c5fde18bae10a36f4c202f5f2f95bb7/codes/codes.go#L139 + if status.Code(err) == codes.Unavailable { + return true + } + + return false +} + func copyPumps(pumps map[string]*PumpStatus) []*PumpStatus { ps := make([]*PumpStatus, 0, len(pumps)) for _, pump := range pumps { diff --git a/tidb-binlog/pump_client/client_test.go b/tidb-binlog/pump_client/client_test.go index 93228f22f..18ac79e5c 100644 --- a/tidb-binlog/pump_client/client_test.go +++ b/tidb-binlog/pump_client/client_test.go @@ -60,7 +60,6 @@ func (*testClientSuite) testSelector(c *C, algorithm string) { pumpsClient := &PumpsClient{ Pumps: NewPumpInfos(), Selector: NewSelector(algorithm), - RetryTime: DefaultAllRetryTime, BinlogWriteTimeout: DefaultBinlogWriteTimeout, } @@ -107,13 +106,14 @@ func (*testClientSuite) testSelector(c *C, algorithm string) { tCase.setNodeID = []string{"pump0", "", "pump0", "pump1", "", "pump2"} tCase.setAvliable = []bool{true, false, false, true, false, true} tCase.choosePumps = []*PumpStatus{pumpsClient.Pumps.Pumps["pump0"], pumpsClient.Pumps.Pumps["pump0"], nil, - pumpsClient.Pumps.Pumps["pump1"], pumpsClient.Pumps.Pumps["pump1"], pumpsClient.Pumps.Pumps["pump1"]} + nil, pumpsClient.Pumps.Pumps["pump1"], pumpsClient.Pumps.Pumps["pump1"]} for i, nodeID := range tCase.setNodeID { if nodeID != "" { pumpsClient.setPumpAvaliable(pumpsClient.Pumps.Pumps[nodeID], tCase.setAvliable[i]) } - pump := pumpsClient.Selector.Select(tCase.binlogs[i]) + pump := pumpsClient.Selector.Select(tCase.binlogs[i], 0) + pumpsClient.Selector.Feedback(tCase.binlogs[i].StartTs, tCase.binlogs[i].Tp, pump) c.Assert(pump, Equals, tCase.choosePumps[i]) } @@ -127,19 +127,18 @@ func (*testClientSuite) testSelector(c *C, algorithm string) { StartTs: int64(j), } - pump1 := pumpsClient.Selector.Select(prewriteBinlog) + pump1 := pumpsClient.Selector.Select(prewriteBinlog, 0) if j%2 == 0 { - pump1 = pumpsClient.Selector.Next(prewriteBinlog, 0) + pump1 = pumpsClient.Selector.Select(prewriteBinlog, 1) } + pumpsClient.Selector.Feedback(prewriteBinlog.StartTs, prewriteBinlog.Tp, pump1) pumpsClient.setPumpAvaliable(pump1, false) - pump2 := pumpsClient.Selector.Select(commitBinlog) - c.Assert(pump2.IsAvaliable, Equals, false) + pump2 := pumpsClient.Selector.Select(commitBinlog, 0) + pumpsClient.Selector.Feedback(commitBinlog.StartTs, commitBinlog.Tp, pump2) // prewrite binlog and commit binlog with same start ts should choose same pump c.Assert(pump1.NodeID, Equals, pump2.NodeID) - pumpsClient.setPumpAvaliable(pump1, true) - c.Assert(pump2.IsAvaliable, Equals, true) } } @@ -157,6 +156,10 @@ func (t *testClientSuite) TestWriteBinlog(c *C) { }, } + // make test faster + RetryInterval = 100 * time.Millisecond + CommitBinlogTimeout = time.Second + for _, cfg := range pumpServerConfig { pumpServer, err := createMockPumpServer(cfg.addr, cfg.serverMode) c.Assert(err, IsNil) @@ -209,7 +212,6 @@ func (t *testClientSuite) TestWriteBinlog(c *C) { // test when pump is down pumpServer.Close() - CommitBinlogMaxRetryTime = time.Second // write commit binlog failed will not return error err = pumpClient.WriteBinlog(commitBinlog) c.Assert(err, IsNil) @@ -281,8 +283,7 @@ func mockPumpsClient(client pb.PumpClient) *PumpsClient { NodeID: nodeID1, State: node.Online, }, - IsAvaliable: true, - Client: client, + Client: client, } // add a pump without grpc client @@ -292,7 +293,6 @@ func mockPumpsClient(client pb.PumpClient) *PumpsClient { NodeID: nodeID2, State: node.Online, }, - IsAvaliable: true, } pumpInfos := NewPumpInfos() @@ -302,12 +302,10 @@ func mockPumpsClient(client pb.PumpClient) *PumpsClient { pumpInfos.AvaliablePumps[nodeID2] = pump2 pCli := &PumpsClient{ - ClusterID: 1, - Pumps: pumpInfos, - Selector: NewSelector(Range), - // have two pump, so use 2 * testRetryTime - RetryTime: 2 * testRetryTime, - BinlogWriteTimeout: 15 * time.Second, + ClusterID: 1, + Pumps: pumpInfos, + Selector: NewSelector(Range), + BinlogWriteTimeout: time.Second, } pCli.Selector.SetPumps([]*PumpStatus{pump1, pump2}) diff --git a/tidb-binlog/pump_client/pump.go b/tidb-binlog/pump_client/pump.go index bd45b9c66..2377f7870 100644 --- a/tidb-binlog/pump_client/pump.go +++ b/tidb-binlog/pump_client/pump.go @@ -17,6 +17,8 @@ import ( "context" "crypto/tls" "net" + "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -29,6 +31,9 @@ import ( var ( // localPump is used to write local pump through unix socket connection. localPump = "localPump" + + // if pump failed more than defaultMaxErrNums times, this pump can treated as unavaliable. + defaultMaxErrNums int64 = 10 ) // PumpStatus saves pump's status. @@ -46,38 +51,34 @@ type PumpStatus struct { Offline: this pump is offline, and can't provide write binlog service forever. */ + sync.RWMutex + node.Status - // the pump is avaliable or not + // the pump is avaliable or not, obsolete now IsAvaliable bool + security *tls.Config + grpcConn *grpc.ClientConn // the client of this pump Client pb.PumpClient + + ErrNum int64 } // NewPumpStatus returns a new PumpStatus according to node's status. func NewPumpStatus(status *node.Status, security *tls.Config) *PumpStatus { pumpStatus := &PumpStatus{} pumpStatus.Status = *status - pumpStatus.IsAvaliable = (status.State == node.Online) - - if status.State != node.Online { - return pumpStatus - } - - err := pumpStatus.createGrpcClient(security) - if err != nil { - Logger.Errorf("[pumps client] create grpc client for %s failed, error %v", status.NodeID, err) - pumpStatus.IsAvaliable = false - } + pumpStatus.security = security return pumpStatus } // createGrpcClient create grpc client for online pump. -func (p *PumpStatus) createGrpcClient(security *tls.Config) error { +func (p *PumpStatus) createGrpcClient() error { // release the old connection, and create a new one if p.grpcConn != nil { p.grpcConn.Close() @@ -93,15 +94,16 @@ func (p *PumpStatus) createGrpcClient(security *tls.Config) error { return net.DialTimeout("tcp", addr, timeout) }) } - Logger.Debugf("[pumps client] create gcpc client at %s", p.Addr) + Logger.Debugf("[pumps client] create grpc client at %s", p.Addr) var clientConn *grpc.ClientConn var err error - if security != nil { - clientConn, err = grpc.Dial(p.Addr, dialerOpt, grpc.WithTransportCredentials(credentials.NewTLS(security))) + if p.security != nil { + clientConn, err = grpc.Dial(p.Addr, dialerOpt, grpc.WithTransportCredentials(credentials.NewTLS(p.security))) } else { clientConn, err = grpc.Dial(p.Addr, dialerOpt, grpc.WithInsecure()) } if err != nil { + atomic.AddInt64(&p.ErrNum, 1) return err } @@ -111,22 +113,68 @@ func (p *PumpStatus) createGrpcClient(security *tls.Config) error { return nil } -// closeGrpcClient closes the pump's grpc connection. -func (p *PumpStatus) closeGrpcClient() { +// ResetGrpcClient closes the pump's grpc connection. +func (p *PumpStatus) ResetGrpcClient() { + p.Lock() + defer p.Unlock() + if p.grpcConn != nil { p.grpcConn.Close() p.Client = nil } } -func (p *PumpStatus) writeBinlog(req *pb.WriteBinlogReq, timeout time.Duration) (*pb.WriteBinlogResp, error) { - if p.Client == nil { - return nil, errors.Errorf("pump %s don't have avaliable pump client", p.NodeID) +// Reset resets the pump's grpc conn and err num. +func (p *PumpStatus) Reset() { + p.ResetGrpcClient() + atomic.StoreInt64(&p.ErrNum, 0) +} + +// WriteBinlog write binlog by grpc client. +func (p *PumpStatus) WriteBinlog(req *pb.WriteBinlogReq, timeout time.Duration) (*pb.WriteBinlogResp, error) { + p.RLock() + client := p.Client + p.RUnlock() + + if client == nil { + p.Lock() + + if p.Client != nil { + client = p.Client + } else { + err := p.createGrpcClient() + if err != nil { + p.Unlock() + return nil, errors.Errorf("create grpc connection for pump %s failed, error %v", p.NodeID, err) + } + client = p.Client + } + + p.Unlock() } ctx, cancel := context.WithTimeout(context.Background(), timeout) - resp, err := p.Client.WriteBinlog(ctx, req) + resp, err := client.WriteBinlog(ctx, req) cancel() + if err != nil { + atomic.AddInt64(&p.ErrNum, 1) + } else { + atomic.StoreInt64(&p.ErrNum, 0) + } + return resp, err } + +// IsUsable returns true if pump is usable. +func (p *PumpStatus) IsUsable() bool { + if p.Status.State != node.Online { + return false + } + + if atomic.LoadInt64(&p.ErrNum) > defaultMaxErrNums { + return false + } + + return true +} diff --git a/tidb-binlog/pump_client/selector.go b/tidb-binlog/pump_client/selector.go index 5a7f7257c..bdebb5145 100644 --- a/tidb-binlog/pump_client/selector.go +++ b/tidb-binlog/pump_client/selector.go @@ -40,11 +40,11 @@ type PumpSelector interface { // SetPumps set pumps to be selected. SetPumps([]*PumpStatus) - // Select returns a situable pump. - Select(*pb.Binlog) *PumpStatus + // Select returns a situable pump. Tips: should call this function only one time for commit/rollback binlog. + Select(binlog *pb.Binlog, retryTime int) *PumpStatus - // returns the next pump. - Next(*pb.Binlog, int) *PumpStatus + // Feedback set the corresponding relations between startTS and pump. + Feedback(startTS int64, binlogType pb.BinlogType, pump *PumpStatus) } // HashSelector select a pump by hash. @@ -83,46 +83,39 @@ func (h *HashSelector) SetPumps(pumps []*PumpStatus) { } // Select implement PumpSelector.Select. -func (h *HashSelector) Select(binlog *pb.Binlog) *PumpStatus { +func (h *HashSelector) Select(binlog *pb.Binlog, retryTime int) *PumpStatus { // TODO: use status' label to match situale pump. h.Lock() defer h.Unlock() - if pump, ok := h.TsMap[binlog.StartTs]; ok { + if binlog.Tp != pb.BinlogType_Prewrite { // binlog is commit binlog or rollback binlog, choose the same pump by start ts map. - delete(h.TsMap, binlog.StartTs) - return pump - } + if pump, ok := h.TsMap[binlog.StartTs]; ok { + return pump + } - if len(h.Pumps) == 0 { + // this should never happened + Logger.Warnf("[pumps client] %s binlog with start ts %d don't have matched prewrite binlog", binlog.Tp, binlog.StartTs) return nil } - if binlog.Tp == pb.BinlogType_Prewrite { - pump := h.Pumps[hashTs(binlog.StartTs)%len(h.Pumps)] - h.TsMap[binlog.StartTs] = pump - return pump + if len(h.Pumps) == 0 { + return nil } - // can't find pump in ts map, or unkow binlog type, choose a new one. - return h.Pumps[hashTs(binlog.StartTs)%len(h.Pumps)] + pump := h.Pumps[(hashTs(binlog.StartTs)+int(retryTime))%len(h.Pumps)] + return pump } -// Next implement PumpSelector.Next. Only for Prewrite binlog. -func (h *HashSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus { +// Feedback implement PumpSelector.Feedback +func (h *HashSelector) Feedback(startTS int64, binlogType pb.BinlogType, pump *PumpStatus) { h.Lock() - defer h.Unlock() - - if len(h.Pumps) == 0 { - return nil - } - - nextPump := h.Pumps[(hashTs(binlog.StartTs)+int(retryTime))%len(h.Pumps)] - if binlog.Tp == pb.BinlogType_Prewrite { - h.TsMap[binlog.StartTs] = nextPump + if binlogType != pb.BinlogType_Prewrite { + delete(h.TsMap, startTS) + } else { + h.TsMap[startTS] = pump } - - return nextPump + h.Unlock() } // RangeSelector select a pump by range. @@ -166,20 +159,20 @@ func (r *RangeSelector) SetPumps(pumps []*PumpStatus) { } // Select implement PumpSelector.Select. -func (r *RangeSelector) Select(binlog *pb.Binlog) *PumpStatus { - // TODO: use status' label to match situale pump. +func (r *RangeSelector) Select(binlog *pb.Binlog, retryTime int) *PumpStatus { + // TODO: use status' label to match situable pump. r.Lock() - defer func() { - if r.Offset == len(r.Pumps) { - r.Offset = 0 - } - r.Unlock() - }() + defer r.Unlock() - if pump, ok := r.TsMap[binlog.StartTs]; ok { + if binlog.Tp != pb.BinlogType_Prewrite { // binlog is commit binlog or rollback binlog, choose the same pump by start ts map. - delete(r.TsMap, binlog.StartTs) - return pump + if pump, ok := r.TsMap[binlog.StartTs]; ok { + return pump + } + + // this should never happened + Logger.Warnf("[pumps client] %s binlog with start ts %d don't have matched prewrite binlog", binlog.Tp, binlog.StartTs) + return nil } if len(r.Pumps) == 0 { @@ -190,41 +183,21 @@ func (r *RangeSelector) Select(binlog *pb.Binlog) *PumpStatus { r.Offset = 0 } - if binlog.Tp == pb.BinlogType_Prewrite { - pump := r.Pumps[r.Offset] - r.TsMap[binlog.StartTs] = pump - r.Offset++ - return pump - } + pump := r.Pumps[r.Offset] - // can't find pump in ts map, or the pump is not avaliable, choose a new one. - return r.Pumps[r.Offset] + r.Offset++ + return pump } -// Next implement PumpSelector.Next. Only for Prewrite binlog. -func (r *RangeSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus { +// Feedback implement PumpSelector.Select +func (r *RangeSelector) Feedback(startTS int64, binlogType pb.BinlogType, pump *PumpStatus) { r.Lock() - defer func() { - if len(r.Pumps) != 0 { - r.Offset = (r.Offset + 1) % len(r.Pumps) - } - r.Unlock() - }() - - if len(r.Pumps) == 0 { - return nil - } - - if r.Offset >= len(r.Pumps) { - r.Offset = 0 - } - - nextPump := r.Pumps[r.Offset] - if binlog.Tp == pb.BinlogType_Prewrite { - r.TsMap[binlog.StartTs] = nextPump + if binlogType != pb.BinlogType_Prewrite { + delete(r.TsMap, startTS) + } else { + r.TsMap[startTS] = pump } - - return nextPump + r.Unlock() } // LocalUnixSelector will always select the local pump, used for compatible with kafka version tidb-binlog. @@ -252,19 +225,16 @@ func (u *LocalUnixSelector) SetPumps(pumps []*PumpStatus) { } // Select implement PumpSelector.Select. -func (u *LocalUnixSelector) Select(binlog *pb.Binlog) *PumpStatus { +func (u *LocalUnixSelector) Select(binlog *pb.Binlog, retryTime int) *PumpStatus { u.RLock() defer u.RUnlock() return u.Pump } -// Next implement PumpSelector.Next. Only for Prewrite binlog. -func (u *LocalUnixSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus { - u.RLock() - defer u.RUnlock() - - return u.Pump +// Feedback implement PumpSelector.Feedback +func (u *LocalUnixSelector) Feedback(startTS int64, binlogType pb.BinlogType, pump *PumpStatus) { + return } // ScoreSelector select a pump by pump's score. @@ -281,15 +251,14 @@ func (s *ScoreSelector) SetPumps(pumps []*PumpStatus) { } // Select implement PumpSelector.Select. -func (s *ScoreSelector) Select(binlog *pb.Binlog) *PumpStatus { +func (s *ScoreSelector) Select(binlog *pb.Binlog, retryTime int) *PumpStatus { // TODO return nil } -// Next implement PumpSelector.Next. Only for Prewrite binlog. -func (s *ScoreSelector) Next(binlog *pb.Binlog, retryTime int) *PumpStatus { +// Feedback implement PumpSelector.Feedback +func (s *ScoreSelector) Feedback(startTS int64, binlogType pb.BinlogType, pump *PumpStatus) { // TODO - return nil } // NewSelector returns a PumpSelector according to the algorithm. @@ -305,7 +274,7 @@ func NewSelector(algorithm string) PumpSelector { case LocalUnix: selector = NewLocalUnixSelector() default: - Logger.Warnf("unknow algorithm %s, use range as default", algorithm) + Logger.Warnf("[pumps client] unknown algorithm %s, use range as default", algorithm) selector = NewRangeSelector() }