Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

telemetry: Count and report the number of duplicate proposals and MsgDigestSkipTag messages received #4605

Merged
merged 2 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions agreement/proposalStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@ package agreement

import (
"fmt"

"github.com/algorand/go-algorand/util/metrics"
)

var proposalAlreadyFilledCounter = metrics.MakeCounter(
metrics.MetricName{Name: "algod_agreement_proposal_already_filled", Description: "Number of times a duplicate proposal payload was received before validation"})

var proposalAlreadyAssembledCounter = metrics.MakeCounter(
metrics.MetricName{Name: "algod_agreement_proposal_already_assembled", Description: "Number of times a duplicate proposal payload was received after validation"})

// An blockAssembler contains the proposal data associated with some
// proposal-value.
//
Expand Down Expand Up @@ -52,10 +60,12 @@ type blockAssembler struct {
// an error if the pipelining operation is redundant.
func (a blockAssembler) pipeline(p unauthenticatedProposal) (blockAssembler, error) {
if a.Assembled {
proposalAlreadyAssembledCounter.Inc(nil)
return a, fmt.Errorf("blockAssembler.pipeline: already assembled")
}

if a.Filled {
proposalAlreadyFilledCounter.Inc(nil)
return a, fmt.Errorf("blockAssembler.pipeline: already filled")
}

Expand Down
2 changes: 2 additions & 0 deletions logging/telemetryspec/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ type PeerConnectionDetails struct {
Endpoint string `json:",omitempty"`
// MessageDelay is the avarage relative message delay. Not being used for incoming connection.
MessageDelay int64 `json:",omitempty"`
// DuplicateFilterCount is the number of times this peer has sent us a message hash to filter that it had already sent before.
DuplicateFilterCount int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when are we resetting it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be reset when the peer connection is closed — the counters that get sent to telemetry today are monotonically increasing, so it is the job of the analyzer to graph the rate at whatever granularity they can choose.. But for this particular event you would also need to spot the DisconnectPeer event between PeerConnections events to know when the reset occurred.

Copy link
Contributor

@winder winder Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PeerConnectionDetails seems like a weird spot to put this. We connect once so how would there ever be duplicate messages?

Nevermind, I seem to have mistaken this for a different peer connection event which is sent once when someone connects.

Copy link
Contributor Author

@cce cce Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is for this counter to be maintained for each wsPeer (as well as globally as a metrics.Counter), and reported here along with other per-peer stats like MessageDelay

}

// CatchpointGenerationEvent event
Expand Down
7 changes: 4 additions & 3 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -1749,9 +1749,10 @@ func (wn *WebsocketNetwork) sendPeerConnectionsTelemetryStatus() {
var connectionDetails telemetryspec.PeersConnectionDetails
for _, peer := range peers {
connDetail := telemetryspec.PeerConnectionDetails{
ConnectionDuration: uint(now.Sub(peer.createTime).Seconds()),
TelemetryGUID: peer.TelemetryGUID,
InstanceName: peer.InstanceName,
ConnectionDuration: uint(now.Sub(peer.createTime).Seconds()),
TelemetryGUID: peer.TelemetryGUID,
InstanceName: peer.InstanceName,
DuplicateFilterCount: peer.duplicateFilterCount,
}
if peer.outgoing {
connDetail.Address = justHost(peer.conn.RemoteAddr().String())
Expand Down
13 changes: 12 additions & 1 deletion network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name

var duplicateNetworkMessageReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedTotal)
var duplicateNetworkMessageReceivedBytesTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedBytesTotal)
var duplicateNetworkFilterReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkFilterReceivedTotal)
var outgoingNetworkMessageFilteredOutTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutTotal)
var outgoingNetworkMessageFilteredOutBytesTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutBytesTotal)

Expand Down Expand Up @@ -184,6 +185,9 @@ type wsPeer struct {

incomingMsgFilter *messageFilter
outgoingMsgFilter *messageFilter
// duplicateFilterCount counts how many times the remote peer has sent us a message hash
// to filter that it had already sent before.
duplicateFilterCount int64

processed chan struct{}

Expand Down Expand Up @@ -576,7 +580,14 @@ func (wp *wsPeer) handleFilterMessage(msg IncomingMessage) {
var digest crypto.Digest
copy(digest[:], msg.Data)
//wp.net.log.Debugf("add filter %v", digest)
wp.outgoingMsgFilter.CheckDigest(digest, true, true)
has := wp.outgoingMsgFilter.CheckDigest(digest, true, true)
if has {
// Count that this peer has sent us duplicate filter messages: this means it received the same
// large message concurrently from several peers, and then sent the filter message to us after
// each large message finished transferring.
duplicateNetworkFilterReceivedTotal.Inc(nil)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this existing duplicateNetworkFilterReceivedTotal? I wonder if that existing metric would have been useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a top level metric across all peers, so that you don't have to look at the PeerConnections events and parse it out of there to get a quick measure of how often this is happening in the network, and also makes it available to Prometheus like our other counters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added duplicateNetworkFilterReceivedTotal ... do you mean one of the other existing metrics?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was referring to outgoingNetworkMessageFilteredOutTotal sorry

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also DuplicateNetworkMessageReceivedTotal

Copy link
Contributor Author

@cce cce Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I found duplicateNetworkMessageReceivedTotal while writing this — it is counting the number of times you receive a "de-dupe-safe" tag message more than once, where dedupSafe() is defined as vote (AV) and transaction (TX) messages. These message types are both smaller than the 5000-byte limit used for the filtering I'm counting with proposals and already have their own counter, so my new counter is very similar but covers >5000-byte "skipped" messages like proposals.

outgoingNetworkMessageFilteredOutTotal is counting the number of times the filter successfully worked in preventing a duplicate proposal from being sent to a peer, so this is complementary to my new counter which is basically counting the number of times it didn't work. (Because the peer did not send the skip/filter message in time before the payload started sending)

atomic.AddInt64(&wp.duplicateFilterCount, 1)
}
}

func (wp *wsPeer) writeLoopSend(msgs sendMessages) disconnectReason {
Expand Down
2 changes: 2 additions & 0 deletions util/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (
DuplicateNetworkMessageReceivedTotal = MetricName{Name: "algod_network_duplicate_message_received_total", Description: "Total number of duplicate messages that were received from the network"}
// DuplicateNetworkMessageReceivedBytesTotal The total number ,in bytes, of the duplicate messages that were received from the network
DuplicateNetworkMessageReceivedBytesTotal = MetricName{Name: "algod_network_duplicate_message_received_bytes_total", Description: "The total number ,in bytes, of the duplicate messages that were received from the network"}
// DuplicateNetworkFilterReceivedTotal Total number of duplicate filter messages (tag MsgDigestSkipTag) that were received from the network
DuplicateNetworkFilterReceivedTotal = MetricName{Name: "algod_network_duplicate_filter_received_total", Description: "Total number of duplicate filter messages that were received from the network"}
// OutgoingNetworkMessageFilteredOutTotal Total number of messages that were not sent per peer request
OutgoingNetworkMessageFilteredOutTotal = MetricName{Name: "algod_outgoing_network_message_filtered_out_total", Description: "Total number of messages that were not sent per peer request"}
// OutgoingNetworkMessageFilteredOutBytesTotal Total number of bytes saved by not sending messages that were asked not to be sent by peer
Expand Down