From 8d96f19b1a038f1a5245f04bda3b3c5944c3ba2a Mon Sep 17 00:00:00 2001 From: clambin Date: Sat, 14 Aug 2021 20:49:10 +0200 Subject: [PATCH] fix: Grafana dashboard shows negative packet loss --- .github/workflows/build.yml | 2 +- .github/workflows/test.yml | 2 +- assets/grafana/dashboards/pinger.json | 146 ++++++++++++-------------- pinger/collect.go | 18 ++-- pinger/pinger.go | 68 ++++++++---- pinger/pinger_test.go | 55 ++++++++-- pingtracker/pingtracker.go | 71 +++++++------ pingtracker/pingtracker_test.go | 75 ++++++++----- 8 files changed, 260 insertions(+), 177 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 260fac4..00e0fdb 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -5,7 +5,7 @@ on: branches: - master - refactor - - metrics + - fix jobs: test: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 808ae83..dbfba65 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,7 +5,7 @@ on: branches-ignore: - master - refactor - - metrics + - fix pull_request_target: jobs: diff --git a/assets/grafana/dashboards/pinger.json b/assets/grafana/dashboards/pinger.json index 76ffa71..438ae69 100644 --- a/assets/grafana/dashboards/pinger.json +++ b/assets/grafana/dashboards/pinger.json @@ -8,6 +8,12 @@ "hide": true, "iconColor": "rgba(0, 211, 255, 1)", "name": "Annotations & Alerts", + "target": { + "limit": 100, + "matchAny": false, + "tags": [], + "type": "dashboard" + }, "type": "dashboard" } ] @@ -19,106 +25,95 @@ "links": [], "panels": [ { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, "datasource": "Prometheus", "description": "", "fieldConfig": { "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, "links": [], + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, "unit": "s" }, "overrides": [] }, - "fill": 1, - "fillGradient": 0, "gridPos": { "h": 11, "w": 12, "x": 0, "y": 0 }, - "hiddenSeries": false, "id": 4, - "legend": { - "alignAsTable": true, - "avg": true, - "current": true, - "max": true, - "min": true, - "rightSide": false, - "show": true, - "total": false, - "values": true - }, - "lines": true, - "linewidth": 1, - "nullPointMode": "null", "options": { - "alertThreshold": true + "legend": { + "calcs": [ + "mean", + "lastNotNull", + "max", + "min" + ], + "displayMode": "table", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } }, - "percentage": false, "pluginVersion": "8.0.6", - "pointradius": 2, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, "targets": [ { - "exemplar": true, "expr": "sum by (host) (pinger_latency_seconds / pinger_packet_count)", "interval": "", "legendFormat": "{{host}}", "refId": "B" } ], - "thresholds": [], "timeFrom": null, - "timeRegions": [], "timeShift": null, "title": "Latency", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "$$hashKey": "object:83", - "format": "s", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "$$hashKey": "object:84", - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } + "type": "timeseries" }, { "datasource": "Prometheus", @@ -140,7 +135,7 @@ "tooltip": false, "viz": false }, - "lineInterpolation": "stepAfter", + "lineInterpolation": "stepBefore", "lineWidth": 1, "pointSize": 5, "scaleDistribution": { @@ -158,7 +153,6 @@ }, "links": [], "mappings": [], - "min": 0, "thresholds": { "mode": "absolute", "steps": [ @@ -202,7 +196,7 @@ "targets": [ { "exemplar": true, - "expr": "sum by (host) (increase(pinger_packet_loss_count[2m]))", + "expr": "sum by (host) (pinger_packet_loss_count)", "interval": "", "legendFormat": "{{host}}", "refId": "B" @@ -214,7 +208,7 @@ "type": "timeseries" } ], - "refresh": "10s", + "refresh": "5m", "schemaVersion": 30, "style": "dark", "tags": [], @@ -222,7 +216,7 @@ "list": [] }, "time": { - "from": "now-15m", + "from": "now-1h", "to": "now" }, "timepicker": { @@ -242,5 +236,5 @@ "timezone": "", "title": "Pinger", "uid": "WHj4zUggz", - "version": 5 + "version": 23 } \ No newline at end of file diff --git a/pinger/collect.go b/pinger/collect.go index ff54239..a48f7e7 100644 --- a/pinger/collect.go +++ b/pinger/collect.go @@ -6,21 +6,21 @@ import ( ) // Describe interface for Prometheus collector -func (pinger *Pinger) Describe(ch chan<- *prometheus.Desc) { - ch <- pinger.packetsMetric - ch <- pinger.lossMetric - ch <- pinger.latencyMetric +func (monitor *Monitor) Describe(ch chan<- *prometheus.Desc) { + ch <- monitor.packetsMetric + ch <- monitor.lossMetric + ch <- monitor.latencyMetric } // Collect interface for Prometheus collector -func (pinger *Pinger) Collect(ch chan<- prometheus.Metric) { - for host, tracker := range pinger.Trackers { +func (monitor *Monitor) Collect(ch chan<- prometheus.Metric) { + for host, tracker := range monitor.Trackers { count, loss, latency := tracker.Calculate() log.WithFields(log.Fields{"host": host, "count": count, "loss": loss, "latency": latency}).Debug() - ch <- prometheus.MustNewConstMetric(pinger.packetsMetric, prometheus.GaugeValue, float64(count), host) - ch <- prometheus.MustNewConstMetric(pinger.lossMetric, prometheus.GaugeValue, float64(loss), host) - ch <- prometheus.MustNewConstMetric(pinger.latencyMetric, prometheus.GaugeValue, latency.Seconds(), host) + ch <- prometheus.MustNewConstMetric(monitor.packetsMetric, prometheus.GaugeValue, float64(count), host) + ch <- prometheus.MustNewConstMetric(monitor.lossMetric, prometheus.GaugeValue, float64(loss), host) + ch <- prometheus.MustNewConstMetric(monitor.latencyMetric, prometheus.GaugeValue, latency.Seconds(), host) } } diff --git a/pinger/pinger.go b/pinger/pinger.go index 371fd93..21fbca0 100644 --- a/pinger/pinger.go +++ b/pinger/pinger.go @@ -14,62 +14,84 @@ import ( "time" ) -type Pinger struct { +// Monitor pings a number of hosts and measures latency & packet loss +type Monitor struct { + Pinger func(host string, ch chan PingResponse) (err error) Trackers map[string]*pingtracker.PingTracker + packets chan PingResponse packetsMetric *prometheus.Desc lossMetric *prometheus.Desc latencyMetric *prometheus.Desc } -// New creates a Pinger for the specified hosts -func New(hosts []string) (pinger *Pinger) { - pinger = &Pinger{ +type PingResponse struct { + Host string + SequenceNr int + Latency time.Duration +} + +// New creates a Monitor for the specified hosts +func New(hosts []string) (monitor *Monitor) { + monitor = &Monitor{ + Pinger: spawnedPinger, Trackers: make(map[string]*pingtracker.PingTracker), + packets: make(chan PingResponse), packetsMetric: prometheus.NewDesc( prometheus.BuildFQName("pinger", "", "packet_count"), - "Pinger total packet count", + "Monitor total packet count", []string{"host"}, nil, ), lossMetric: prometheus.NewDesc( prometheus.BuildFQName("pinger", "", "packet_loss_count"), - "Pinger total measured packet loss", + "Monitor total measured packet loss", []string{"host"}, nil, ), latencyMetric: prometheus.NewDesc( prometheus.BuildFQName("pinger", "", "latency_seconds"), - "Pinger latency in seconds", + "Monitor latency in seconds", []string{"host"}, nil, ), } for _, host := range hosts { - pinger.Trackers[host] = pingtracker.New() + monitor.Trackers[host] = pingtracker.New() } return } -// Run starts the pingers -func (pinger *Pinger) Run(ctx context.Context) { - for host, tracker := range pinger.Trackers { - log.WithField("host", host).Debug("starting tracker") - go func(host string, tracker *pingtracker.PingTracker) { - err := spawnedPinger(host, tracker) +// Run starts the pinger(s) +func (monitor *Monitor) Run(ctx context.Context) { + monitor.startPingers() + + for running := true; running; { + select { + case <-ctx.Done(): + running = false + case packet := <-monitor.packets: + monitor.Trackers[packet.Host].Track(packet.SequenceNr, packet.Latency) + } + } +} + +func (monitor *Monitor) startPingers() { + for host := range monitor.Trackers { + log.WithField("host", host).Debug("starting pinger") + go func(host string) { + err := monitor.Pinger(host, monitor.packets) if err != nil { - log.WithError(err).Error("failed to run tracker") + log.WithError(err).Fatal("failed to run pinger") } - }(host, tracker) + }(host) } - - <-ctx.Done() } // spawnedPinger spawns a ping process and reports to a specified PingTracker -func spawnedPinger(host string, tracker *pingtracker.PingTracker) (err error) { +func spawnedPinger(host string, ch chan PingResponse) (err error) { var ( cmd string pingOut io.ReadCloser @@ -104,9 +126,11 @@ func spawnedPinger(host string, tracker *pingtracker.PingTracker) (err error) { rtt, _ = strconv.ParseFloat(match[3], 64) latency = time.Duration(rtt*1000) * time.Microsecond - tracker.Track(seqNr, latency) - - // log.Debugf("%s: seqno=%d, latency=%v", host, seqNr, latency) + ch <- PingResponse{ + Host: host, + SequenceNr: seqNr, + Latency: latency, + } } } } diff --git a/pinger/pinger_test.go b/pinger/pinger_test.go index 09132db..12f3001 100644 --- a/pinger/pinger_test.go +++ b/pinger/pinger_test.go @@ -12,6 +12,42 @@ import ( "time" ) +func TestPinger_Run_Quick(t *testing.T) { + p := pinger.New([]string{"127.0.0.1"}) + p.Pinger = fakePinger + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go p.Run(ctx) + + var m prometheus.Metric + var ch chan prometheus.Metric + + // wait for 4 packets to arrive + assert.Eventually(t, func() bool { + ch = make(chan prometheus.Metric) + go p.Collect(ch) + m = <-ch + return metricName(m) == "pinger_packet_count" && getMetric(m).GetGauge().GetValue() == 4 + }, 500*time.Millisecond, 10*time.Millisecond) + + m = <-ch + assert.Equal(t, "pinger_packet_loss_count", metricName(m)) + assert.Equal(t, 1.0, getMetric(m).GetGauge().GetValue()) + m = <-ch + assert.Equal(t, "pinger_latency_seconds", metricName(m)) + assert.Equal(t, 4e-05, getMetric(m).GetGauge().GetValue()) +} + +// fakePinger sends packets rapidly, so we don't have to wait 5 seconds to get some meaningful data +func fakePinger(host string, ch chan pinger.PingResponse) (err error) { + ch <- pinger.PingResponse{Host: host, SequenceNr: 0, Latency: 10 * time.Microsecond} + ch <- pinger.PingResponse{Host: host, SequenceNr: 1, Latency: 10 * time.Microsecond} + ch <- pinger.PingResponse{Host: host, SequenceNr: 3, Latency: 10 * time.Microsecond} + ch <- pinger.PingResponse{Host: host, SequenceNr: 4, Latency: 10 * time.Microsecond} + return +} + func TestPinger_Run(t *testing.T) { log.SetLevel(log.DebugLevel) p := pinger.New([]string{"127.0.0.1"}) @@ -20,20 +56,23 @@ func TestPinger_Run(t *testing.T) { defer cancel() go p.Run(ctx) - time.Sleep(5 * time.Second) + var m prometheus.Metric + var ch chan prometheus.Metric - ch := make(chan prometheus.Metric) - go p.Collect(ch) + // wait for 1 packet to arrive + assert.Eventually(t, func() bool { + ch = make(chan prometheus.Metric) + go p.Collect(ch) + m = <-ch + return metricName(m) == "pinger_packet_count" && getMetric(m).GetGauge().GetValue() > 0 + }, 5*time.Second, 10*time.Millisecond) - m := <-ch - assert.Equal(t, "pinger_packet_count", metricName(m)) - assert.GreaterOrEqual(t, getMetric(m).GetGauge().GetValue(), 4.0) m = <-ch assert.Equal(t, "pinger_packet_loss_count", metricName(m)) - assert.LessOrEqual(t, getMetric(m).GetGauge().GetValue(), 3.0) m = <-ch assert.Equal(t, "pinger_latency_seconds", metricName(m)) - assert.Greater(t, getMetric(m).GetGauge().GetValue(), 0.00001) + assert.NotZero(t, getMetric(m).GetGauge().GetValue()) + } // metricName returns the metric name diff --git a/pingtracker/pingtracker.go b/pingtracker/pingtracker.go index 12cad66..b652166 100644 --- a/pingtracker/pingtracker.go +++ b/pingtracker/pingtracker.go @@ -1,6 +1,7 @@ package pingtracker import ( + log "github.com/sirupsen/logrus" "sort" "sync" "time" @@ -56,50 +57,48 @@ func (tracker *PingTracker) calculateLatency() (total time.Duration) { return } -func (tracker *PingTracker) calculateLoss() int { +func (tracker *PingTracker) calculateLoss() (gap int) { if len(tracker.seqNrs) == 0 { return 0 } // Sort all sequence numbers and remove duplicates tracker.seqNrs = unique(tracker.seqNrs) - // sequence numbers can wrap around! + // sequence numbers can roll over! // In this case, we'd get something like [ 0, 1, 2, 3, 65534, 65535 ] // Split into two lists [ 65534, 65535 ] and [ 0, 1, 2 ] using nextSeqNr as a boundary - // Process the higher list first (pre-wrap) and then the lower one (post-wrap) + // Process the higher list first (pre-rollover) and then the lower one (post-rollover) + index := 0 - for ; index < len(tracker.seqNrs); index++ { - if tracker.seqNrs[index] >= tracker.NextSeqNr-60 { // Allow up to 60 packets (1 min) of old packets - break - } - } - total := 0 - // pre-wrap - // skip to nextSeqNr - i := index - for ; i < len(tracker.seqNrs) && tracker.seqNrs[i] < tracker.NextSeqNr; i++ { + for index < len(tracker.seqNrs) && tracker.seqNrs[index] < tracker.NextSeqNr-60 { + index++ } - if i < len(tracker.seqNrs) { - total = tracker.seqNrs[i] - tracker.NextSeqNr - total += countGaps(tracker.seqNrs[i:]) - tracker.NextSeqNr = tracker.seqNrs[len(tracker.seqNrs)-1] + 1 - } - // post-wrap + + startingNextExpectedSequenceNumber := tracker.NextSeqNr + + // pre-rollover / no rollover + gap = tracker.processRange(tracker.seqNrs[index:]) + if index > 0 { tracker.NextSeqNr = 0 - total += tracker.seqNrs[0] // - tracker.NextSeqNr - total += countGaps(tracker.seqNrs[:index]) - tracker.NextSeqNr = tracker.seqNrs[index-1] + 1 + gap += tracker.processRange(tracker.seqNrs[:index]) + } + + if gap < 0 { + log.WithFields(log.Fields{ + "sequenceNrs": tracker.seqNrs, + "nextExpected": startingNextExpectedSequenceNumber, + }).Warning("negative gap found") } - return total + return } func unique(seqNrs []int) (result []int) { - uniqueSeqNrs := make(map[int]bool) + uniqueSeqNrs := make(map[int]struct{}) for _, seqNr := range seqNrs { if _, ok := uniqueSeqNrs[seqNr]; ok == false { - uniqueSeqNrs[seqNr] = true + uniqueSeqNrs[seqNr] = struct{}{} result = append(result, seqNr) } } @@ -107,14 +106,22 @@ func unique(seqNrs []int) (result []int) { return } -func countGaps(sequence []int) int { +func (tracker *PingTracker) processRange(sequence []int) (gap int) { count := len(sequence) - if count < 2 { - return 0 + if count == 0 { + panic("processRange: sequence range should not be empty") } - total := 0 - for i := 0; i < count-1; i++ { - total += sequence[i+1] - sequence[i] - 1 + + index := 0 + // skip older packets + for index < count && sequence[index] < tracker.NextSeqNr { + index++ } - return total + + for ; index < count; index++ { + gap += sequence[index] - tracker.NextSeqNr + tracker.NextSeqNr = sequence[index] + 1 + } + + return } diff --git a/pingtracker/pingtracker_test.go b/pingtracker/pingtracker_test.go index 677000a..df3ad13 100644 --- a/pingtracker/pingtracker_test.go +++ b/pingtracker/pingtracker_test.go @@ -23,59 +23,59 @@ var testCases = []struct { }{ // No data { - "No data received", - []Entry{}, - Outcome{0, 0, 0, 0 * time.Millisecond}, + description: "No data received", + input: []Entry{}, + output: Outcome{0, 0, 0, 0 * time.Millisecond}, }, // Packets may come in out of order { - "Packets may come in out of order", - []Entry{ + description: "Packets may come in out of order", + input: []Entry{ {0, 25 * time.Millisecond}, {2, 50 * time.Millisecond}, {1, 75 * time.Millisecond}, }, - Outcome{3, 3, 0, 150 * time.Millisecond}, + output: Outcome{3, 3, 0, 150 * time.Millisecond}, }, { - "Duplicate packets are ignored", - []Entry{ + description: "Duplicate packets are ignored", + input: []Entry{ {3, 50 * time.Millisecond}, {4, 50 * time.Millisecond}, {4, 50 * time.Millisecond}, {5, 50 * time.Millisecond}, }, - Outcome{4, 6, 0, 200 * time.Millisecond}, + output: Outcome{4, 6, 0, 200 * time.Millisecond}, }, { - "Lose one packet", - []Entry{ + description: "Lose one packet", + input: []Entry{ {6, 50 * time.Millisecond}, // lose 7 {8, 50 * time.Millisecond}, }, - Outcome{2, 9, 1, 100 * time.Millisecond}, + output: Outcome{2, 9, 1, 100 * time.Millisecond}, }, { - "Lose packets between calls to Calculate", - []Entry{ + description: "Lose packets between calls to Calculate", + input: []Entry{ // lose 9 {10, 50 * time.Millisecond}, {11, 50 * time.Millisecond}, {12, 50 * time.Millisecond}, }, - Outcome{3, 13, 1, 150 * time.Millisecond}, + output: Outcome{3, 13, 1, 150 * time.Millisecond}, }, { - "Fast forward to 30000", - []Entry{ + description: "Fast forward to 30000", + input: []Entry{ {29999, 50 * time.Millisecond}, }, - Outcome{1, 30000, 29999 - 13, 50 * time.Millisecond}, + output: Outcome{1, 30000, 29999 - 13, 50 * time.Millisecond}, }, { - "Support wraparound of sequence numbers", - []Entry{ + description: "Support rollover of sequence numbers", + input: []Entry{ // lose 30000 {30001, 50 * time.Millisecond}, {30002, 50 * time.Millisecond}, @@ -83,17 +83,36 @@ var testCases = []struct { {1, 50 * time.Millisecond}, {2, 50 * time.Millisecond}, }, - Outcome{4, 3, 2, 200 * time.Millisecond}, + output: Outcome{4, 3, 2, 200 * time.Millisecond}, }, { - "Recent (delayed) packets aren't interpreted as a wrap-around", - []Entry{ + description: "Recent (delayed) packets aren't interpreted as a rollover", + input: []Entry{ {0, 50 * time.Millisecond}, {2, 50 * time.Millisecond}, {3, 50 * time.Millisecond}, {4, 50 * time.Millisecond}, }, - Outcome{4, 5, 0, 200 * time.Millisecond}, + output: Outcome{4, 5, 0, 200 * time.Millisecond}, + }, + { + description: "fast-forward to 30000", + input: []Entry{{29999, 50 * time.Millisecond}}, + output: Outcome{1, 30000, 29994, 50 * time.Millisecond}, + }, + { + description: "delayed packets before rollover are ignored", + input: []Entry{ + {29998, 50 * time.Millisecond}, + {29999, 50 * time.Millisecond}, + {30000, 50 * time.Millisecond}, + {30002, 50 * time.Millisecond}, + {30001, 50 * time.Millisecond}, + {0, 50 * time.Millisecond}, + {1, 50 * time.Millisecond}, + {2, 50 * time.Millisecond}, + }, + output: Outcome{8, 3, 0, 400 * time.Millisecond}, }, } @@ -105,9 +124,9 @@ func TestPingTracker(t *testing.T) { tracker.Track(input.seqNr, input.latency) } count, loss, latency := tracker.Calculate() - assert.Equal(t, testCase.output.count, count, testCase.description) - assert.Equal(t, testCase.output.nextSeqNr, tracker.NextSeqNr, testCase.description) - assert.Equal(t, testCase.output.loss, loss, testCase.description) - assert.Equal(t, testCase.output.latency, latency, testCase.description) + assert.Equal(t, testCase.output.count, count, testCase.description+" (count)") + assert.Equal(t, testCase.output.nextSeqNr, tracker.NextSeqNr, testCase.description+" (next sequence nr)") + assert.Equal(t, testCase.output.loss, loss, testCase.description+" (loss)") + assert.Equal(t, testCase.output.latency, latency, testCase.description+" (latency)") } }