Skip to content

Commit

Permalink
use a mock clock in bandwidth tests (#276)
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann authored Aug 16, 2022
1 parent d14ac01 commit 0b79495
Showing 1 changed file with 43 additions and 92 deletions.
135 changes: 43 additions & 92 deletions core/metrics/bandwidth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,25 @@ package metrics

import (
"fmt"
"math"
"runtime"
"sync"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

"github.com/libp2p/go-flow-metrics"

"github.com/benbjohnson/clock"
"github.com/stretchr/testify/require"
)

var cl = clock.NewMock()

func init() {
flow.SetClock(cl)
}

func BenchmarkBandwidthCounter(b *testing.B) {
b.StopTimer()
b.ResetTimer()
Expand Down Expand Up @@ -49,48 +58,29 @@ func round(bwc *BandwidthCounter, b *testing.B) {
b.StopTimer()
}

// Allow 1% errors for bw calculations.
const acceptableError = 0.01

func TestBandwidthCounter(t *testing.T) {
bwc := NewBandwidthCounter()
start := make(chan struct{})
var wg sync.WaitGroup
wg.Add(200)
for i := 0; i < 100; i++ {
p := peer.ID(fmt.Sprintf("peer-%d", i))
for j := 0; j < 2; j++ {
proto := protocol.ID(fmt.Sprintf("proto-%d", j))
go func() {
defer wg.Done()
for i := 0; i < 40; i++ {
for i := 0; i < 100; i++ {
p := peer.ID(fmt.Sprintf("peer-%d", i))
for j := 0; j < 2; j++ {
proto := protocol.ID(fmt.Sprintf("proto-%d", j))

// make sure the bandwidth counters are active
bwc.LogSentMessage(100)
bwc.LogRecvMessage(50)
bwc.LogSentMessageStream(100, proto, p)
bwc.LogRecvMessageStream(50, proto, p)

<-start

t := time.NewTicker(100 * time.Millisecond)
defer t.Stop()

for i := 0; i < 39; i++ {
bwc.LogSentMessage(100)
bwc.LogRecvMessage(50)
bwc.LogSentMessageStream(100, proto, p)
bwc.LogRecvMessageStream(50, proto, p)
<-t.C
}
}()
// <-start
}
}
cl.Add(100 * time.Millisecond)
}

assertProtocols := func(check func(Stats)) {
byProtocol := bwc.GetBandwidthByProtocol()
if len(byProtocol) != 2 {
t.Errorf("expected 2 protocols, got %d", len(byProtocol))
}
require.Len(t, byProtocol, 2, "expected 2 protocols")
for i := 0; i < 2; i++ {
p := protocol.ID(fmt.Sprintf("proto-%d", i))
for _, stats := range [...]Stats{bwc.GetBandwidthForProtocol(p), byProtocol[p]} {
Expand All @@ -101,9 +91,7 @@ func TestBandwidthCounter(t *testing.T) {

assertPeers := func(check func(Stats)) {
byPeer := bwc.GetBandwidthByPeer()
if len(byPeer) != 100 {
t.Errorf("expected 100 peers, got %d", len(byPeer))
}
require.Len(t, byPeer, 100, "expected 100 peers")
for i := 0; i < 100; i++ {
p := peer.ID(fmt.Sprintf("peer-%d", i))
for _, stats := range [...]Stats{bwc.GetBandwidthForPeer(p), byPeer[p]} {
Expand All @@ -112,34 +100,22 @@ func TestBandwidthCounter(t *testing.T) {
}
}

time.Sleep(time.Second)
close(start)

wg.Wait()
time.Sleep(1 * time.Second)

assertPeers(func(stats Stats) {
assertEq(t, 8000, stats.TotalOut)
assertEq(t, 4000, stats.TotalIn)
require.Equal(t, int64(8000), stats.TotalOut)
require.Equal(t, int64(4000), stats.TotalIn)
})

assertProtocols(func(stats Stats) {
assertEq(t, 400000, stats.TotalOut)
assertEq(t, 200000, stats.TotalIn)
require.Equal(t, int64(400000), stats.TotalOut)
require.Equal(t, int64(200000), stats.TotalIn)
})

{
stats := bwc.GetBandwidthTotals()
assertEq(t, 800000, stats.TotalOut)
assertEq(t, 400000, stats.TotalIn)
}
stats := bwc.GetBandwidthTotals()
require.Equal(t, int64(800000), stats.TotalOut)
require.Equal(t, int64(400000), stats.TotalIn)
}

func TestResetBandwidthCounter(t *testing.T) {
if runtime.GOOS != "linux" {
// Specifically, it fails on MacOS because we need a high precision timer.
t.Skip("this test is highly timing dependent and only passes reliably on Linux")
}
bwc := NewBandwidthCounter()

p := peer.ID("peer-0")
Expand All @@ -151,69 +127,44 @@ func TestResetBandwidthCounter(t *testing.T) {
bwc.LogSentMessageStream(100, proto, p)
bwc.LogRecvMessageStream(50, proto, p)

time.Sleep(1*time.Second + time.Millisecond)
time.Sleep(200 * time.Millisecond) // make sure the meters are registered with the sweeper
cl.Add(time.Second)

bwc.LogSentMessage(42)
bwc.LogRecvMessage(24)
bwc.LogSentMessageStream(100, proto, p)
bwc.LogRecvMessageStream(50, proto, p)

time.Sleep(1*time.Second + time.Millisecond)
cl.Add(time.Second)

{
stats := bwc.GetBandwidthTotals()
assertEq(t, 84, stats.TotalOut)
assertEq(t, 48, stats.TotalIn)
require.Equal(t, int64(84), stats.TotalOut)
require.Equal(t, int64(48), stats.TotalIn)
}

{
stats := bwc.GetBandwidthByProtocol()
assertApproxEq(t, 1, float64(len(stats)))
require.Len(t, stats, 1)
stat := stats[proto]
assertApproxEq(t, 100, stat.RateOut)
assertApproxEq(t, 50, stat.RateIn)
require.Equal(t, float64(100), stat.RateOut)
require.Equal(t, float64(50), stat.RateIn)
}

{
stats := bwc.GetBandwidthByPeer()
assertApproxEq(t, 1, float64(len(stats)))
require.Len(t, stats, 1)
stat := stats[p]
assertApproxEq(t, 100, stat.RateOut)
assertApproxEq(t, 50, stat.RateIn)
require.Equal(t, float64(100), stat.RateOut)
require.Equal(t, float64(50), stat.RateIn)
}

bwc.Reset()
{
stats := bwc.GetBandwidthTotals()
assertEq(t, 0, stats.TotalOut)
assertEq(t, 0, stats.TotalIn)
}

{
byProtocol := bwc.GetBandwidthByProtocol()
if len(byProtocol) != 0 {
t.Errorf("expected 0 protocols, got %d", len(byProtocol))
}
}

{
byPeer := bwc.GetBandwidthByPeer()
if len(byPeer) != 0 {
t.Errorf("expected 0 peers, got %d", len(byPeer))
}
}
}

func assertEq(t *testing.T, expected, actual int64) {
if expected != actual {
t.Errorf("expected %d, got %d", expected, actual)
}
}

func assertApproxEq(t *testing.T, expected, actual float64) {
t.Helper()
margin := expected * acceptableError
if !(math.Abs(expected-actual) <= margin) {
t.Errorf("expected %f (±%f), got %f", expected, margin, actual)
require.Zero(t, stats.TotalOut)
require.Zero(t, stats.TotalIn)
require.Empty(t, bwc.GetBandwidthByProtocol(), "expected 0 protocols")
require.Empty(t, bwc.GetBandwidthByPeer(), "expected 0 peers")
}
}

0 comments on commit 0b79495

Please sign in to comment.