Skip to content

Commit

Permalink
gossip: add digest/delta metrics (#154)
Browse files Browse the repository at this point in the history
Adds gossip metrics for incoming/outgoing digest and delta entries.
  • Loading branch information
andydunstall authored Aug 15, 2024
1 parent fa28e5d commit 689648d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 4 deletions.
14 changes: 11 additions & 3 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ func (g *Gossip) gossip(node NodeMetadata) error {
break
}
bufLen = buf.Len()

g.metrics.DigestEntriesOutbound.Inc()
}

udpAddr, err := net.ResolveUDPAddr("udp", node.Addr)
Expand Down Expand Up @@ -438,13 +440,17 @@ func (g *Gossip) join(addr string) (string, error) {
return "", fmt.Errorf("encode: %w", err)
}

delta := g.state.LocalDelta()
if err := encoder.Encode(g.state.LocalDelta()); err != nil {
return "", fmt.Errorf("encode: %w", err)
}
g.metrics.DeltaEntriesOutbound.Add(float64(delta.EntriesTotal()))

if err := encoder.Encode(g.state.Digest()); err != nil {
digest := g.state.Digest()
if err := encoder.Encode(digest); err != nil {
return "", fmt.Errorf("encode: %w", err)
}
g.metrics.DigestEntriesOutbound.Add(float64(len(digest)))

if err := w.Flush(); err != nil {
return "", fmt.Errorf("flush: %w", err)
Expand All @@ -457,10 +463,10 @@ func (g *Gossip) join(addr string) (string, error) {
return "", fmt.Errorf("decode: %w", err)
}

var delta delta
if err := decoder.Decode(&delta); err != nil {
return "", fmt.Errorf("decode: %w", err)
}
g.metrics.DeltaEntriesInbound.Add(float64(delta.EntriesTotal()))

g.state.ApplyDelta(delta)

Expand Down Expand Up @@ -509,9 +515,11 @@ func (g *Gossip) leave(addr string) error {
return fmt.Errorf("encode: %w", err)
}

if err := encoder.Encode(g.state.LocalDelta()); err != nil {
delta := g.state.LocalDelta()
if err := encoder.Encode(delta); err != nil {
return fmt.Errorf("encode: %w", err)
}
g.metrics.DeltaEntriesOutbound.Add(float64(delta.EntriesTotal()))

if err := w.Flush(); err != nil {
return fmt.Errorf("flush: %w", err)
Expand Down
11 changes: 10 additions & 1 deletion pkg/gossip/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,12 @@ func (l *streamListener) join(r io.Reader, w *bufio.Writer) error {
if err := decoder.Decode(&delta); err != nil {
return fmt.Errorf("decode: %w", err)
}
l.metrics.DeltaEntriesInbound.Add(float64(delta.EntriesTotal()))
var digest digest
if err := decoder.Decode(&digest); err != nil {
return fmt.Errorf("decode: %w", err)
}
l.metrics.DigestEntriesInbound.Add(float64(len(digest)))

// Apply unknown state from the delta.
l.state.ApplyDelta(delta)
Expand All @@ -156,6 +158,7 @@ func (l *streamListener) join(r io.Reader, w *bufio.Writer) error {
if err := encoder.Encode(delta); err != nil {
return fmt.Errorf("encode: %w", err)
}
l.metrics.DeltaEntriesOutbound.Add(float64(delta.EntriesTotal()))

if err := w.Flush(); err != nil {
return fmt.Errorf("flush: %w", err)
Expand All @@ -174,6 +177,7 @@ func (l *streamListener) leave(r io.Reader, w *bufio.Writer) error {
if err := decoder.Decode(&delta); err != nil {
return fmt.Errorf("decode: %w", err)
}
l.metrics.DeltaEntriesInbound.Add(float64(delta.EntriesTotal()))

// Apply unknown state from the delta.
l.state.ApplyDelta(delta)
Expand Down Expand Up @@ -285,6 +289,7 @@ func (l *packetListener) digest(b []byte) error {
if err != nil {
return fmt.Errorf("decode: %w", err)
}
l.metrics.DigestEntriesInbound.Add(float64(len(digest)))

// Discover any unknown nodes from the digest.
l.state.ApplyDigest(digest)
Expand All @@ -293,16 +298,19 @@ func (l *packetListener) digest(b []byte) error {
if err := l.sendDelta(delta, header.Addr); err != nil {
return fmt.Errorf("send delta: %w", err)
}
l.metrics.DeltaEntriesOutbound.Add(float64(delta.EntriesTotal()))

// If the digest was a request, send our own digest response.
if header.Request {
digest := l.state.Digest()
if err := l.sendDigest(
l.state.Digest(),
digest,
header.Addr,
false,
); err != nil {
return fmt.Errorf("send digest: %w", err)
}
l.metrics.DigestEntriesOutbound.Add(float64(len(digest)))
}

return nil
Expand All @@ -313,6 +321,7 @@ func (l *packetListener) delta(b []byte) error {
if err != nil {
return fmt.Errorf("decode: %w", err)
}
l.metrics.DeltaEntriesInbound.Add(float64(delta.EntriesTotal()))

l.failureDetector.Report(header.NodeID)

Expand Down
48 changes: 48 additions & 0 deletions pkg/gossip/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ type Metrics struct {
// connection.
PacketBytesOutbound prometheus.Counter

// DigestEntriesInbound is the total number of incoming digest entries.
DigestEntriesInbound prometheus.Counter

// DeltaEntriesInbound is the total number of incoming delta entries.
DeltaEntriesInbound prometheus.Counter

// DigestEntriesOutbound is the total number of outgoing digest entries.
DigestEntriesOutbound prometheus.Counter

// DeltaEntriesOutbound is the total number of outgoing delta entries.
DeltaEntriesOutbound prometheus.Counter

// Entries is the number of entries labelled by node_id, deleted and
// internal.
Entries *prometheus.GaugeVec
Expand Down Expand Up @@ -82,6 +94,38 @@ func newMetrics() *Metrics {
Help: "Total number of written bytes via a packet connection",
},
),
DigestEntriesInbound: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "piko",
Subsystem: "gossip",
Name: "digest_entries_inbound_total",
Help: "Total number of inbound digest entries",
},
),
DeltaEntriesInbound: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "piko",
Subsystem: "gossip",
Name: "delta_entries_inbound_total",
Help: "Total number of inbound digest entries",
},
),
DigestEntriesOutbound: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "piko",
Subsystem: "gossip",
Name: "digest_entries_outbound_total",
Help: "Total number of outbound digest entries",
},
),
DeltaEntriesOutbound: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "piko",
Subsystem: "gossip",
Name: "delta_entries_outbound_total",
Help: "Total number of outbound delta entries",
},
),
Entries: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "piko",
Expand All @@ -102,6 +146,10 @@ func (m *Metrics) Register(reg *prometheus.Registry) {
m.ConnectionsOutbound,
m.StreamBytesOutbound,
m.PacketBytesOutbound,
m.DigestEntriesInbound,
m.DeltaEntriesInbound,
m.DigestEntriesOutbound,
m.DeltaEntriesOutbound,
m.Entries,
)
}
8 changes: 8 additions & 0 deletions pkg/gossip/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ type deltaEntry struct {

type delta []deltaEntry

func (d delta) EntriesTotal() int {
total := 0
for _, e := range d {
total += len(e.Entries)
}
return total
}

type nodeState struct {
NodeMetadata

Expand Down

0 comments on commit 689648d

Please sign in to comment.