Skip to content

Commit

Permalink
rafthttp: probe all raft transports
Browse files Browse the repository at this point in the history
This PR adds another probing routine to monitor the connection
for Raft message transports. Previously, we only monitored
snapshot transports.

In our production cluster, we found one TCP connection had >8-sec
latencies to a remote peer, but "etcd_network_peer_round_trip_time_seconds"
metrics shows <1-sec latency distribution, which means etcd server
was not sampling enough while such latency spikes happen
outside of snapshot pipeline connection.

Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Oct 10, 2018
1 parent c096dc2 commit 3381ef1
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 35 deletions.
21 changes: 15 additions & 6 deletions rafthttp/probing_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package rafthttp
import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/xiang90/probing"
)

Expand All @@ -28,7 +29,15 @@ var (
statusErrorInterval = 5 * time.Second
)

func addPeerToProber(p probing.Prober, id string, us []string) {
const (
// RoundTripperNameRaftMessage is the name of round-tripper that sends
// all other Raft messages, other than "snap.Message".
RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
)

func addPeerToProber(p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
hus := make([]string, len(us))
for i := range us {
hus[i] = us[i] + ProbingPrefix
Expand All @@ -40,26 +49,26 @@ func addPeerToProber(p probing.Prober, id string, us []string) {
if err != nil {
plog.Errorf("failed to add peer %s into prober", id)
} else {
go monitorProbingStatus(s, id)
go monitorProbingStatus(s, id, roundTripperName, rttSecProm)
}
}

func monitorProbingStatus(s probing.Status, id string) {
func monitorProbingStatus(s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
// set the first interval short to log error early.
interval := statusErrorInterval
for {
select {
case <-time.After(interval):
if !s.Health() {
plog.Warningf("health check for peer %s could not connect: %v", id, s.Err())
plog.Warningf("health check for peer %s could not connect: %v (prober %q)", id, s.Err(), roundTripperName)
interval = statusErrorInterval
} else {
interval = statusMonitoringInterval
}
if s.ClockDiff() > time.Second {
plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
plog.Warningf("the clock difference against peer %s is too high [%v > %v] (prober %q)", id, s.ClockDiff(), time.Second, roundTripperName)
}
rtts.WithLabelValues(id).Observe(s.SRTT().Seconds())
rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds())
case <-s.StopNotify():
return
}
Expand Down
23 changes: 15 additions & 8 deletions rafthttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"

"github.com/coreos/pkg/capnslog"
"github.com/xiang90/probing"
"golang.org/x/net/context"
Expand Down Expand Up @@ -121,7 +122,8 @@ type Transport struct {
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
peers map[types.ID]Peer // peers map

prober probing.Prober
pipelineProber probing.Prober
streamProber probing.Prober
}

func (t *Transport) Start() error {
Expand All @@ -136,7 +138,8 @@ func (t *Transport) Start() error {
}
t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer)
t.prober = probing.NewProber(t.pipelineRt)
t.pipelineProber = probing.NewProber(t.pipelineRt)
t.streamProber = probing.NewProber(t.streamRt)
return nil
}

Expand Down Expand Up @@ -197,7 +200,8 @@ func (t *Transport) Stop() {
for _, p := range t.peers {
p.stop()
}
t.prober.RemoveAll()
t.pipelineProber.RemoveAll()
t.streamProber.RemoveAll()
if tr, ok := t.streamRt.(*http.Transport); ok {
tr.CloseIdleConnections()
}
Expand Down Expand Up @@ -276,8 +280,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
}
fs := t.LeaderStats.Follower(id.String())
t.peers[id] = startPeer(t, urls, id, fs)
addPeerToProber(t.prober, id.String(), us)

addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
plog.Infof("added peer %s", id)
}

Expand All @@ -304,7 +308,8 @@ func (t *Transport) removePeer(id types.ID) {
}
delete(t.peers, id)
delete(t.LeaderStats.Followers, id.String())
t.prober.Remove(id.String())
t.pipelineProber.Remove(id.String())
t.streamProber.Remove(id.String())
plog.Infof("removed peer %s", id)
}

Expand All @@ -321,8 +326,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) {
}
t.peers[id].update(urls)

t.prober.Remove(id.String())
addPeerToProber(t.prober, id.String(), us)
t.pipelineProber.Remove(id.String())
addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts)
t.streamProber.Remove(id.String())
addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts)
plog.Infof("updated peer %s", id)
}

Expand Down
50 changes: 29 additions & 21 deletions rafthttp/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ func TestTransportSend(t *testing.T) {
peer1 := newFakePeer()
peer2 := newFakePeer()
tr := &Transport{
ServerStats: ss,
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
ServerStats: ss,
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
wmsgsIgnored := []raftpb.Message{
// bad local message
Expand Down Expand Up @@ -72,8 +74,10 @@ func TestTransportCutMend(t *testing.T) {
peer1 := newFakePeer()
peer2 := newFakePeer()
tr := &Transport{
ServerStats: ss,
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
ServerStats: ss,
peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}

tr.CutPeer(types.ID(1))
Expand All @@ -100,10 +104,11 @@ func TestTransportCutMend(t *testing.T) {
func TestTransportAdd(t *testing.T) {
ls := stats.NewLeaderStats("")
tr := &Transport{
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
LeaderStats: ls,
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})

Expand All @@ -128,10 +133,11 @@ func TestTransportAdd(t *testing.T) {

func TestTransportRemove(t *testing.T) {
tr := &Transport{
LeaderStats: stats.NewLeaderStats(""),
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
LeaderStats: stats.NewLeaderStats(""),
streamRt: &roundTripperRecorder{},
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
tr.RemovePeer(types.ID(1))
Expand All @@ -145,8 +151,9 @@ func TestTransportRemove(t *testing.T) {
func TestTransportUpdate(t *testing.T) {
peer := newFakePeer()
tr := &Transport{
peers: map[types.ID]Peer{types.ID(1): peer},
prober: probing.NewProber(nil),
peers: map[types.ID]Peer{types.ID(1): peer},
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
u := "http://localhost:2380"
tr.UpdatePeer(types.ID(1), []string{u})
Expand All @@ -159,13 +166,14 @@ func TestTransportUpdate(t *testing.T) {
func TestTransportErrorc(t *testing.T) {
errorc := make(chan error, 1)
tr := &Transport{
Raft: &fakeRaft{},
LeaderStats: stats.NewLeaderStats(""),
ErrorC: errorc,
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
peers: make(map[types.ID]Peer),
prober: probing.NewProber(nil),
Raft: &fakeRaft{},
LeaderStats: stats.NewLeaderStats(""),
ErrorC: errorc,
streamRt: newRespRoundTripper(http.StatusForbidden, nil),
pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
peers: make(map[types.ID]Peer),
pipelineProber: probing.NewProber(nil),
streamProber: probing.NewProber(nil),
}
tr.AddPeer(1, []string{"http://localhost:2380"})
defer tr.Stop()
Expand Down

0 comments on commit 3381ef1

Please sign in to comment.