-
Notifications
You must be signed in to change notification settings - Fork 491
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOAL2-731] Better slow peers disconnection logic #15
Changes from 3 commits
3079306
825c3a8
331d09c
3f6de0e
def7231
3a2c801
2dea925
d6a0c24
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,12 +83,21 @@ const MaxInt = int((^uint(0)) >> 1) | |
// connectionActivityMonitorInterval is the interval at which we check | ||
// if any of the connected peers have been idle for a long while and | ||
// need to be disconnected. | ||
const connectionActivityMonitorInterval = time.Minute * 3 | ||
const connectionActivityMonitorInterval = 3 * time.Minute | ||
|
||
// maxPeerInactivityDuration is the maximum allowed duration for a | ||
// peer to remain completly idle (i.e. no inbound or outbound communication), before | ||
// we discard the connection. | ||
const maxPeerInactivityDuration = time.Minute * 5 | ||
const maxPeerInactivityDuration = 5 * time.Minute | ||
|
||
// maxMessageQueueDuration is the maximum amount of time a message is allowed to be waiting | ||
// in the various queues before being sent. Once that deadline has reached, sending the message | ||
// is pointless, as it's too stale to be of any value | ||
const maxMessageQueueDuration = 25 * time.Second | ||
|
||
// slowWritingPeerMonitorInterval is the interval at which we peek on the connected peers to | ||
// verify that their current outgoing message is not being blocked for too long. | ||
const slowWritingPeerMonitorInterval = 5 * time.Second | ||
|
||
var networkIncomingConnections = metrics.MakeGauge(metrics.NetworkIncomingConnections) | ||
var networkOutgoingConnections = metrics.MakeGauge(metrics.NetworkOutgoingConnections) | ||
|
@@ -99,10 +108,12 @@ var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_ne | |
var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"}) | ||
var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"}) | ||
var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"}) | ||
var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to some peer"}) | ||
var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"}) | ||
var networkPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_broadcast_dropped_total", Description: "number of broadcast messages not sent to some peer"}) | ||
|
||
var networkSlowPeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_slow_drops_total", Description: "number of peers dropped for being slow to send to"}) | ||
var networkIdlePeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_idle_drops_total", Description: "number of peers dropped due to idle connection"}) | ||
var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"}) | ||
|
||
var minPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_min_ping_seconds", Description: "Network round trip time to fastest peer in seconds."}) | ||
var meanPing = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peer_mean_ping_seconds", Description: "Network round trip time to average peer in seconds."}) | ||
|
@@ -294,14 +305,20 @@ type WebsocketNetwork struct { | |
|
||
// once we detect that we have a misconfigured UseForwardedForAddress, we set this and write an warning message. | ||
misconfiguredUseForwardedForAddress bool | ||
|
||
// outgoingMessagesBufferSize is the size used for outgoing messages. | ||
outgoingMessagesBufferSize int | ||
|
||
// slowWritingPeerMonitorInterval defines the interval between two consecutive tests for slow peer writing | ||
slowWritingPeerMonitorInterval time.Duration | ||
} | ||
|
||
type broadcastRequest struct { | ||
tag Tag | ||
data []byte | ||
except *wsPeer | ||
done chan struct{} | ||
start time.Time | ||
tag Tag | ||
data []byte | ||
except *wsPeer | ||
done chan struct{} | ||
enqueueTime time.Time | ||
} | ||
|
||
// Address returns a string and whether that is a 'final' address or guessed. | ||
|
@@ -335,7 +352,7 @@ func (wn *WebsocketNetwork) PublicAddress() string { | |
// if wait is true then the call blocks until the packet has actually been sent to all neighbors. | ||
// TODO: add `priority` argument so that we don't have to guess it based on tag | ||
func (wn *WebsocketNetwork) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error { | ||
request := broadcastRequest{tag: tag, data: data, start: time.Now()} | ||
request := broadcastRequest{tag: tag, data: data, enqueueTime: time.Now()} | ||
if except != nil { | ||
request.except = except.(*wsPeer) | ||
} | ||
|
@@ -373,6 +390,7 @@ func (wn *WebsocketNetwork) Broadcast(ctx context.Context, tag protocol.Tag, dat | |
default: | ||
wn.log.Debugf("broadcast queue full") | ||
// broadcastQueue full, and we're not going to wait for it. | ||
networkBroadcastQueueFull.Inc(nil) | ||
return errBcastQFull | ||
} | ||
} | ||
|
@@ -499,13 +517,23 @@ func (wn *WebsocketNetwork) setup() { | |
wn.server.IdleTimeout = httpServerIdleTimeout | ||
wn.server.MaxHeaderBytes = httpServerMaxHeaderBytes | ||
wn.ctx, wn.ctxCancel = context.WithCancel(context.Background()) | ||
wn.broadcastQueueHighPrio = make(chan broadcastRequest, 1000) | ||
// roughly estimate the number of messages that could be sent over the lifespan of a single round. | ||
wn.outgoingMessagesBufferSize = int(config.Consensus[protocol.ConsensusCurrentVersion].NumProposers*2 + | ||
config.Consensus[protocol.ConsensusCurrentVersion].SoftCommitteeSize + | ||
config.Consensus[protocol.ConsensusCurrentVersion].CertCommitteeSize + | ||
config.Consensus[protocol.ConsensusCurrentVersion].NextCommitteeSize + | ||
config.Consensus[protocol.ConsensusCurrentVersion].LateCommitteeSize) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a small nit, I would change "single round" to "single period" (and say that this is the total number of messages sent at once). I don't think it makes a big difference here as it's a heuristic, but I would also add We also pipeline (relaying) all of these votes from the next round and the next period, so it's possible that this number should be 3x as big (as in, we might pipeline 3 periods' worth of votes). On the other hand, this is a pretty unlikely situation and means that the network is experiencing extreme congestion. I think with the current committee sizes, the sum of all our committee sizes is about 20000 messages, which would make 3x about 60000 messages (so with 0.5KB votes this is 30MB). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll increase the size of the buffer by RedoCommitteeSize+ DownCommitteeSize. |
||
|
||
wn.broadcastQueueHighPrio = make(chan broadcastRequest, wn.outgoingMessagesBufferSize) | ||
wn.broadcastQueueBulk = make(chan broadcastRequest, 100) | ||
wn.meshUpdateRequests = make(chan meshRequest, 5) | ||
wn.readyChan = make(chan struct{}) | ||
wn.tryConnectAddrs = make(map[string]int64) | ||
wn.eventualReadyDelay = time.Minute | ||
wn.prioTracker = newPrioTracker(wn) | ||
if wn.slowWritingPeerMonitorInterval == 0 { | ||
wn.slowWritingPeerMonitorInterval = slowWritingPeerMonitorInterval | ||
} | ||
|
||
readBufferLen := wn.config.IncomingConnectionsLimit + wn.config.GossipFanout | ||
if readBufferLen < 100 { | ||
|
@@ -838,7 +866,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt | |
prioChallenge: challenge, | ||
} | ||
peer.TelemetryGUID = otherTelemetryGUID | ||
peer.init(wn.config) | ||
peer.init(wn.config, wn.outgoingMessagesBufferSize) | ||
wn.addPeer(peer) | ||
localAddr, _ := wn.Address() | ||
wn.log.With("event", "ConnectedIn").With("remote", otherPublicAddr).With("local", localAddr).Infof("Accepted incoming connection from peer %s", otherPublicAddr) | ||
|
@@ -913,6 +941,23 @@ func (wn *WebsocketNetwork) checkPeersConnectivity() { | |
} | ||
} | ||
|
||
// checkSlowWritingPeers tests each of the peer's current message timestamp. | ||
// if that timestamp is too old, it means that the transmission of that message | ||
// takes longer than desired. In that case, it will disconnect the peer, allowing it to reconnect | ||
// to a faster network endpoint. | ||
func (wn *WebsocketNetwork) checkSlowWritingPeers() { | ||
wn.peersLock.Lock() | ||
defer wn.peersLock.Unlock() | ||
currentTime := time.Now() | ||
for _, peer := range wn.peers { | ||
if peer.CheckSlowWritingPeer(currentTime) { | ||
wn.wg.Add(1) | ||
go wn.disconnectThread(peer, disconnectSlowConn) | ||
networkSlowPeerDrops.Inc(nil) | ||
} | ||
} | ||
} | ||
|
||
func (wn *WebsocketNetwork) sendFilterMessage(msg IncomingMessage) { | ||
digest := generateMessageDigest(msg.Tag, msg.Data) | ||
//wn.log.Debugf("send filter %s(%d) %v", msg.Tag, len(msg.Data), digest) | ||
|
@@ -922,8 +967,12 @@ func (wn *WebsocketNetwork) sendFilterMessage(msg IncomingMessage) { | |
func (wn *WebsocketNetwork) broadcastThread() { | ||
defer wn.wg.Done() | ||
var peers []*wsPeer | ||
slowWritingPeerCheckTicker := time.NewTicker(wn.slowWritingPeerMonitorInterval) | ||
defer slowWritingPeerCheckTicker.Stop() | ||
for { | ||
// broadcast from high prio channel as long as we can | ||
// we want to try and keep this as a single case select with a default, since go compiles a single-case | ||
// select with a default into a more efficient non-blocking receive, instead of compiling it to the general-purpose selectgo | ||
select { | ||
case request := <-wn.broadcastQueueHighPrio: | ||
wn.innerBroadcast(request, true, &peers) | ||
|
@@ -935,6 +984,9 @@ func (wn *WebsocketNetwork) broadcastThread() { | |
select { | ||
case request := <-wn.broadcastQueueHighPrio: | ||
wn.innerBroadcast(request, true, &peers) | ||
case <-slowWritingPeerCheckTicker.C: | ||
wn.checkSlowWritingPeers() | ||
continue | ||
case request := <-wn.broadcastQueueBulk: | ||
wn.innerBroadcast(request, false, &peers) | ||
case <-wn.ctx.Done(): | ||
|
@@ -957,8 +1009,16 @@ func (wn *WebsocketNetwork) peerSnapshot(dest []*wsPeer) []*wsPeer { | |
|
||
// prio is set if the broadcast is a high-priority broadcast. | ||
func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, ppeers *[]*wsPeer) { | ||
broadcastQueueTime := time.Now().Sub(request.start) | ||
if request.done != nil { | ||
defer close(request.done) | ||
} | ||
|
||
broadcastQueueTime := time.Now().Sub(request.enqueueTime) | ||
tsachiherman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
networkBroadcastQueueMicros.AddUint64(uint64(broadcastQueueTime.Nanoseconds()/1000), nil) | ||
if broadcastQueueTime > maxMessageQueueDuration { | ||
networkBroadcastsDropped.Inc(nil) | ||
return | ||
} | ||
|
||
start := time.Now() | ||
tbytes := []byte(request.tag) | ||
|
@@ -975,37 +1035,27 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, | |
peers := *ppeers | ||
|
||
// first send to all the easy outbound peers who don't block, get them started. | ||
sentMessageCount := 0 | ||
for pi, peer := range peers { | ||
if wn.config.BroadcastConnectionsLimit >= 0 && pi >= wn.config.BroadcastConnectionsLimit { | ||
if wn.config.BroadcastConnectionsLimit >= 0 && sentMessageCount >= wn.config.BroadcastConnectionsLimit { | ||
break | ||
} | ||
if peer == request.except { | ||
peers[pi] = nil | ||
continue | ||
} | ||
ok := peer.writeNonBlock(mbytes, prio, digest) | ||
ok := peer.writeNonBlock(mbytes, prio, digest, request.enqueueTime) | ||
if ok { | ||
peers[pi] = nil | ||
sentMessageCount++ | ||
continue | ||
} | ||
if prio { | ||
// couldn't send a high prio message; give up | ||
wn.log.Infof("dropping peer for being too slow to send to: %s, %d enqueued", peer.rootURL, len(peer.sendBufferHighPrio)) | ||
wn.removePeer(peer, disconnectTooSlow) | ||
peer.Close() | ||
networkSlowPeerDrops.Inc(nil) | ||
} else { | ||
networkBroadcastsDropped.Inc(nil) | ||
} | ||
networkPeerBroadcastDropped.Inc(nil) | ||
} | ||
|
||
dt := time.Now().Sub(start) | ||
networkBroadcasts.Inc(nil) | ||
networkBroadcastSendMicros.AddUint64(uint64(dt.Nanoseconds()/1000), nil) | ||
|
||
if request.done != nil { | ||
close(request.done) | ||
} | ||
} | ||
|
||
// NumPeers returns number of peers we connect to (all peers incoming and outbound). | ||
|
@@ -1434,7 +1484,7 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) { | |
} | ||
peer := &wsPeer{wsPeerCore: wsPeerCore{net: wn, rootURL: addr}, conn: conn, outgoing: true, incomingMsgFilter: wn.incomingMsgFilter} | ||
peer.TelemetryGUID = otherTelemetryGUID | ||
peer.init(wn.config) | ||
peer.init(wn.config, wn.outgoingMessagesBufferSize) | ||
wn.addPeer(peer) | ||
localAddr, _ := wn.Address() | ||
wn.log.With("event", "ConnectedOut").With("remote", addr).With("local", localAddr).Infof("Made outgoing connection to peer %v", addr) | ||
|
@@ -1452,7 +1502,7 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) { | |
resp := wn.prioScheme.MakePrioResponse(challenge) | ||
if resp != nil { | ||
mbytes := append([]byte(protocol.NetPrioResponseTag), resp...) | ||
sent := peer.writeNonBlock(mbytes, true, crypto.Digest{}) | ||
sent := peer.writeNonBlock(mbytes, true, crypto.Digest{}, time.Now()) | ||
if !sent { | ||
wn.log.With("remote", addr).With("local", localAddr).Warnf("could not send priority response to %v", addr) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we have separate metrics for drops of high-priority messages and low-priority messages? It seems that high-priority drops would be much more alarming than low-priority drops (a lot of low-priority drops means that we might have a ping-pong script bug; a lot of high-priority drops means that the network could be about to stall).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. I'll defer this to a separate PR. Opened a JIRA issue to track this:
https://algorand.atlassian.net/browse/GOAL2-790