From cd268fd31a2aece52a2528cb10884fb1d165e885 Mon Sep 17 00:00:00 2001 From: banst Date: Sat, 31 Aug 2024 01:54:15 +0200 Subject: [PATCH] feat: init implementation --- .gitignore | 55 +++++++++++++++ cmd/icmperf/main.go | 41 +++++++++++ go.mod | 32 +++++++++ go.sum | 55 +++++++++++++++ pkg/cli/cli.go | 25 +++++++ pkg/model/model.go | 156 +++++++++++++++++++++++++++++++++++++++++ pkg/pinger/pinger.go | 101 ++++++++++++++++++++++++++ pkg/recorder/packet.go | 9 +++ pkg/recorder/record.go | 110 +++++++++++++++++++++++++++++ pkg/recorder/stats.go | 61 ++++++++++++++++ 10 files changed, 645 insertions(+) create mode 100644 cmd/icmperf/main.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 pkg/cli/cli.go create mode 100644 pkg/model/model.go create mode 100644 pkg/pinger/pinger.go create mode 100644 pkg/recorder/packet.go create mode 100644 pkg/recorder/record.go create mode 100644 pkg/recorder/stats.go diff --git a/.gitignore b/.gitignore index 6f72f89..48e16f9 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,58 @@ go.work.sum # env file .env + +### Go ### +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + +### macOS ### +# General +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +### macOS Patch ### +# iCloud generated files +*.icloud diff --git a/cmd/icmperf/main.go b/cmd/icmperf/main.go new file mode 100644 index 0000000..003f63b --- /dev/null +++ b/cmd/icmperf/main.go @@ -0,0 +1,41 @@ +package main + +import ( + "context" + "net" + "time" + + "github.com/alecthomas/kong" + tea "github.com/charmbracelet/bubbletea" + "golang.org/x/net/icmp" + + "github.com/b4nst/icmperf/pkg/cli" + "github.com/b4nst/icmperf/pkg/model" + "github.com/b4nst/icmperf/pkg/pinger" + "github.com/b4nst/icmperf/pkg/recorder" +) + +func main() { + cli := cli.CLI{} + ktx := kong.Parse(&cli) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pinger := pinger.NewPinger(cli.BindAddr, cli.MTU, cli.Timeout) + record := recorder.NewRecord() + pinger.OnRecv(func(m *icmp.Message, t time.Time) error { + body := m.Body.(*icmp.Echo) + id := uint64(body.ID)<<32 | uint64(body.Seq) + record.PacketReceived(id, t) + return nil + }) + + m := model.NewModel(pinger, record, (*net.UDPAddr)(&cli.Target), cli.MTU, cli.Duration) + if err := pinger.Start(ctx); err != nil { + ktx.FatalIfErrorf(err) + } + + if _, err := tea.NewProgram(m).Run(); err != nil { + ktx.FatalIfErrorf(err) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..01054ee --- /dev/null +++ b/go.mod @@ -0,0 +1,32 @@ +module github.com/b4nst/icmperf + +go 1.21.6 + +require ( + github.com/alecthomas/kong v0.9.0 + github.com/charmbracelet/bubbles v0.19.0 + github.com/charmbracelet/bubbletea v1.1.0 + github.com/dustin/go-humanize v1.0.1 + github.com/emirpasic/gods v1.18.1 + golang.org/x/net v0.28.0 +) + +require ( + github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect + github.com/charmbracelet/harmonica v0.2.0 // indirect + github.com/charmbracelet/lipgloss v0.13.0 // indirect + github.com/charmbracelet/x/ansi v0.2.3 // indirect + github.com/charmbracelet/x/term v0.2.0 // indirect + github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect + github.com/lucasb-eyer/go-colorful v1.2.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-localereader v0.0.1 // indirect + github.com/mattn/go-runewidth v0.0.16 // indirect + github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect + github.com/muesli/cancelreader v0.2.2 // indirect + github.com/muesli/termenv v0.15.2 // indirect + github.com/rivo/uniseg v0.4.7 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..62364a4 --- /dev/null +++ b/go.sum @@ -0,0 +1,55 @@ +github.com/alecthomas/assert/v2 v2.6.0 h1:o3WJwILtexrEUk3cUVal3oiQY2tfgr/FHWiz/v2n4FU= +github.com/alecthomas/assert/v2 v2.6.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= +github.com/alecthomas/kong v0.9.0 h1:G5diXxc85KvoV2f0ZRVuMsi45IrBgx9zDNGNj165aPA= +github.com/alecthomas/kong v0.9.0/go.mod h1:Y47y5gKfHp1hDc7CH7OeXgLIpp+Q2m1Ni0L5s3bI8Os= +github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= +github.com/alecthomas/repr v0.4.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= +github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= +github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= +github.com/charmbracelet/bubbles v0.19.0 h1:gKZkKXPP6GlDk6EcfujDK19PCQqRjaJZQ7QRERx1UF0= +github.com/charmbracelet/bubbles v0.19.0/go.mod h1:WILteEqZ+krG5c3ntGEMeG99nCupcuIk7V0/zOP0tOA= +github.com/charmbracelet/bubbletea v1.1.0 h1:FjAl9eAL3HBCHenhz/ZPjkKdScmaS5SK69JAK2YJK9c= +github.com/charmbracelet/bubbletea v1.1.0/go.mod h1:9Ogk0HrdbHolIKHdjfFpyXJmiCzGwy+FesYkZr7hYU4= +github.com/charmbracelet/harmonica v0.2.0 h1:8NxJWRWg/bzKqqEaaeFNipOu77YR5t8aSwG4pgaUBiQ= +github.com/charmbracelet/harmonica v0.2.0/go.mod h1:KSri/1RMQOZLbw7AHqgcBycp8pgJnQMYYT8QZRqZ1Ao= +github.com/charmbracelet/lipgloss v0.13.0 h1:4X3PPeoWEDCMvzDvGmTajSyYPcZM4+y8sCA/SsA3cjw= +github.com/charmbracelet/lipgloss v0.13.0/go.mod h1:nw4zy0SBX/F/eAO1cWdcvy6qnkDUxr8Lw7dvFrAIbbY= +github.com/charmbracelet/x/ansi v0.2.3 h1:VfFN0NUpcjBRd4DnKfRaIRo53KRgey/nhOoEqosGDEY= +github.com/charmbracelet/x/ansi v0.2.3/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw= +github.com/charmbracelet/x/term v0.2.0 h1:cNB9Ot9q8I711MyZ7myUR5HFWL/lc3OpU8jZ4hwm0x0= +github.com/charmbracelet/x/term v0.2.0/go.mod h1:GVxgxAbjUrmpvIINHIQnJJKpMlHiZ4cktEQCN6GWyF0= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= +github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f h1:Y/CXytFA4m6baUTXGLOoWe4PQhGxaX0KpnayAqC48p4= +github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97MGsxSvLiUE2X8qFplwetxpGLQrlU1Q9AUEIzCaM= +github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= +github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= +github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= +github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-localereader v0.0.1 h1:ygSAOl7ZXTx4RdPYinUpg6W99U8jWvWi9Ye2JC/oIi4= +github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 h1:ZK8zHtRHOkbHy6Mmr5D264iyp3TiX5OmNcI5cIARiQI= +github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6/go.mod h1:CJlz5H+gyd6CUWT45Oy4q24RdLyn7Md9Vj2/ldJBSIo= +github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELUXHmA= +github.com/muesli/cancelreader v0.2.2/go.mod h1:3XuTXfFS2VjM+HTLZY9Ak0l6eUKfijIfMUZ4EgX0QYo= +github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo= +github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go new file mode 100644 index 0000000..ff168b5 --- /dev/null +++ b/pkg/cli/cli.go @@ -0,0 +1,25 @@ +package cli + +import ( + "net" + "time" +) + +type Target net.UDPAddr + +func (t *Target) UnmarshalText(text []byte) error { + ip, err := net.LookupIP(string(text)) + if err != nil { + return err + } + t.IP = ip[0] + return nil +} + +type CLI struct { + Target Target `arg:"" help:"The target host to ping."` + MTU int `help:"The maximum transmission unit of your interface." short:"m" default:"1500"` + Timeout time.Duration `help:"The timeout for each ping." short:"t" default:"5s"` + Duration time.Duration `help:"The duration of the test." short:"d" default:"30s"` + BindAddr string `help:"The address to bind the ICMP listener to." default:"0.0.0.0" short:"l"` +} diff --git a/pkg/model/model.go b/pkg/model/model.go new file mode 100644 index 0000000..760afb9 --- /dev/null +++ b/pkg/model/model.go @@ -0,0 +1,156 @@ +package model + +import ( + "net" + "strconv" + "strings" + "time" + + "github.com/charmbracelet/bubbles/progress" + "github.com/charmbracelet/bubbles/timer" + tea "github.com/charmbracelet/bubbletea" + "github.com/dustin/go-humanize" + + "github.com/b4nst/icmperf/pkg/pinger" + "github.com/b4nst/icmperf/pkg/recorder" +) + +var ( + maxWidth = 100 +) + +type Model struct { + pinger *pinger.Pinger + record *recorder.Record + peer *net.UDPAddr + payload []byte + stats *recorder.Stats + + duration time.Duration + timer timer.Model + progress progress.Model +} + +func NewModel(pinger *pinger.Pinger, record *recorder.Record, peer *net.UDPAddr, mtu int, duration time.Duration) *Model { + return &Model{ + pinger: pinger, + record: record, + peer: peer, + payload: make([]byte, mtu-28), + stats: nil, + + duration: duration, + timer: timer.NewWithInterval(duration, 200*time.Millisecond), + progress: progress.New(progress.WithFillCharacters('▱', ' '), progress.WithDefaultGradient()), + } +} + +type latencyTick time.Time +type pingTick time.Time +type statsMsg *recorder.Stats + +func (m *Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { + cmds := []tea.Cmd{} + + switch msg := msg.(type) { + case tea.KeyMsg: + if msg.String() == "ctrl+c" { + cmds = append(cmds, tea.Quit) + } + case timer.TickMsg: + var tc, pc tea.Cmd + m.timer, tc = m.timer.Update(msg) + pc = m.progress.SetPercent(1.0 - m.timer.Timeout.Seconds()/m.duration.Seconds()) + cmds = append(cmds, tc, pc) + case timer.TimeoutMsg: + cmds = append(cmds, m.progress.SetPercent(1.0)) + cmds = append(cmds, m.statsCmd()) + case tea.WindowSizeMsg: + m.progress.Width = min(msg.Width-4, maxWidth) + case latencyTick: + if !m.timer.Timedout() { + cmds = append(cmds, m.pingLatency()) + } + case pingTick: + if !m.timer.Timedout() { + cmds = append(cmds, m.pingBandwidth()) + } + case statsMsg: + m.stats = msg + cmds = append(cmds, tea.Quit) + + case progress.FrameMsg: + progressModel, cmd := m.progress.Update(msg) + m.progress = progressModel.(progress.Model) + cmds = append(cmds, cmd) + } + + return m, tea.Batch(cmds...) +} + +func (m *Model) View() string { + view := strings.Builder{} + + view.WriteString("Probing peer at ") + view.WriteString(m.peer.IP.String()) + view.WriteString("\n\n") + + view.WriteString("Probe size: ") + view.WriteString(humanize.Bytes(uint64(len(m.payload)))) + view.WriteString(" ") + view.WriteString("ETC: ") + view.WriteString(m.timer.View()) + view.WriteString("\n") + + view.WriteString(m.progress.View()) + view.WriteString("\n\n") + + if m.stats != nil { + view.WriteString("Bandwidth: ") + view.WriteString(humanize.Bytes(uint64(m.stats.Bandwidth()))) + view.WriteString("/s ") + + view.WriteString("Latency: ") + view.WriteString(m.stats.Latency().Round(time.Millisecond).String()) + view.WriteString(" ") + + view.WriteString("Loss rate: ") + view.WriteString(strconv.FormatFloat(m.stats.PacketLoss()*100, 'f', 2, 64)) + view.WriteString("%\n") + } + + return view.String() +} + +func (m *Model) Init() tea.Cmd { + return tea.Batch(m.pingLatency(), m.pingBandwidth(), m.timer.Init()) +} + +func (m *Model) pingLatency() tea.Cmd { + return tea.Tick(1*time.Second, func(t time.Time) tea.Msg { + sendAndRecord(m.pinger, m.peer, []byte{}, m.record) + return latencyTick(t) + }) +} + +func (m *Model) pingBandwidth() tea.Cmd { + return tea.Tick(500*time.Millisecond, func(t time.Time) tea.Msg { + sendAndRecord(m.pinger, m.peer, m.payload, m.record) + return pingTick(t) + }) +} + +func (m *Model) statsCmd() tea.Cmd { + return func() tea.Msg { + return statsMsg(m.record.Stats()) + } +} + +func sendAndRecord(p *pinger.Pinger, peer *net.UDPAddr, payload []byte, r *recorder.Record) error { + id, t, err := p.Send(peer, payload) + if err != nil { + return err + } + r.PacketSent(id, len(payload), t) + return nil +} diff --git a/pkg/pinger/pinger.go b/pkg/pinger/pinger.go new file mode 100644 index 0000000..c99857d --- /dev/null +++ b/pkg/pinger/pinger.go @@ -0,0 +1,101 @@ +package pinger + +import ( + "context" + "net" + "os" + "sync/atomic" + "time" + + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" +) + +type Pinger struct { + conn *icmp.PacketConn + listenaddr string + timeout time.Duration + MTU int + seq atomic.Uint64 + id int + + recvCb func(*icmp.Message, time.Time) error +} + +func NewPinger(addr string, mtu int, timeout time.Duration) *Pinger { + return &Pinger{ + listenaddr: addr, + timeout: timeout, + MTU: mtu, + seq: atomic.Uint64{}, + id: os.Getpid() & 0xffff, + } +} + +func (p *Pinger) Start(ctx context.Context) error { + conn, err := icmp.ListenPacket("udp4", p.listenaddr) + if err != nil { + return err + } + p.conn = conn + go p.listen(ctx) + return nil +} + +func (p *Pinger) OnRecv(cb func(*icmp.Message, time.Time) error) { + p.recvCb = cb +} + +func (p *Pinger) listen(ctx context.Context) { + buf := make([]byte, p.MTU) + for { + if ctx.Err() != nil { + return + } + err := p.conn.SetReadDeadline(time.Now().Add(p.timeout)) + if err != nil { + // TODO: log error + continue + } + n, _, err := p.conn.ReadFrom(buf) + if err != nil { + // TODO: log error + continue + } + received := time.Now() + parsed, err := icmp.ParseMessage(1, buf[:n]) + if err != nil { + // TODO: log error + continue + } + if err := p.recvCb(parsed, received); err != nil { + // TODO: log error + continue + } + } +} + +func (p *Pinger) Send(peer *net.UDPAddr, payload []byte) (uint64, time.Time, error) { + body := &icmp.Echo{ + ID: p.id, + Seq: int(p.seq.Add(1)), + Data: payload, + } + id := uint64(body.ID)<<32 | uint64(body.Seq) + + message := icmp.Message{ + Type: ipv4.ICMPTypeEcho, + Code: 0, + Body: body, + } + messageBytes, err := message.Marshal(nil) + if err != nil { + return id, time.Now(), err + } + _, err = p.conn.WriteTo(messageBytes, peer) + return id, time.Now(), err +} + +func (p *Pinger) Close() error { + return p.conn.Close() +} diff --git a/pkg/recorder/packet.go b/pkg/recorder/packet.go new file mode 100644 index 0000000..8b9b288 --- /dev/null +++ b/pkg/recorder/packet.go @@ -0,0 +1,9 @@ +package recorder + +import "time" + +type Packet struct { + Size int + Sent time.Time + Received time.Time +} diff --git a/pkg/recorder/record.go b/pkg/recorder/record.go new file mode 100644 index 0000000..4d08db9 --- /dev/null +++ b/pkg/recorder/record.go @@ -0,0 +1,110 @@ +package recorder + +import ( + "sync" + "time" + + "github.com/emirpasic/gods/maps/treemap" + "github.com/emirpasic/gods/utils" +) + +type TransferMetric struct { + Received time.Time + Sent time.Time + Bytes int +} + +type Record struct { + inflight map[uint64]*TransferMetric + + latencies *treemap.Map + tms []*TransferMetric + + inlock sync.Mutex + recvlock sync.Mutex +} + +func NewRecord() *Record { + return &Record{ + inflight: make(map[uint64]*TransferMetric), + latencies: treemap.NewWith(utils.Int64Comparator), + tms: []*TransferMetric{}, + + inlock: sync.Mutex{}, + recvlock: sync.Mutex{}, + } +} + +func (s *Record) PacketSent(id uint64, size int, sent time.Time) { + s.inlock.Lock() + defer s.inlock.Unlock() + + s.inflight[id] = &TransferMetric{ + Bytes: size, + Sent: sent, + } +} + +func (s *Record) PacketReceived(id uint64, received time.Time) { + // Remove the packet from the inflight map + s.inlock.Lock() + tm, ok := s.inflight[id] + delete(s.inflight, id) + s.inlock.Unlock() + if !ok { + return + } + + s.recvlock.Lock() + defer s.recvlock.Unlock() + if tm.Bytes == 0 { + // Record Latency ping + s.latencies.Put(tm.Sent.UnixNano(), received.Sub(tm.Sent)) + } else { + // Record bandwidth ping + tm.Received = received + s.tms = append(s.tms, tm) + } +} + +func (s *Record) Stats() *Stats { + s.inlock.Lock() + defer s.inlock.Unlock() + s.recvlock.Lock() + defer s.recvlock.Unlock() + + totalLost := len(s.inflight) + totalPackets := totalLost + len(s.tms) + s.latencies.Size() + + // Calculate bandwidths + bws := map[int64]float64{} + for _, tm := range s.tms { + // Find the closest latency to the current bandwidth packet + k, v := s.latencies.Floor(tm.Sent.UnixNano()) + if v == nil { + k, v = s.latencies.Ceiling(tm.Sent.UnixNano()) + if k == nil { + // No latencies recorded, skip this bandwidth packet + continue + } + } + latency := v.(time.Duration) + bw := float64(tm.Bytes) / (tm.Received.Sub(tm.Sent) - latency).Seconds() + // Record the bandwidth if it is not NaN + if bw == bw { + bws[tm.Sent.UnixNano()] = bw + } + } + + latencies := map[int64]time.Duration{} + s.latencies.Each(func(key interface{}, value interface{}) { + latencies[key.(int64)] = value.(time.Duration) + }) + + return &Stats{ + Latencies: latencies, + Bandwidths: bws, + TotalSent: totalPackets, + TotalReceived: totalPackets - totalLost, + } +} diff --git a/pkg/recorder/stats.go b/pkg/recorder/stats.go new file mode 100644 index 0000000..ea4a688 --- /dev/null +++ b/pkg/recorder/stats.go @@ -0,0 +1,61 @@ +package recorder + +import ( + "sort" + "time" +) + +// Stats contains the statistics of a transfer. +type Stats struct { + // Latencies is a map latency per nanosecond. + Latencies map[int64]time.Duration + // Bandwidths is a map of bandwidth per nanosecond. + Bandwidths map[int64]float64 + // TotalSent is the total number of packets sent. + TotalSent int + // TotalReceived is the total number of packets received. + TotalReceived int +} + +// Bandwidth returns the bandwidth in bytes per second. +// 10% of the best and worst bandwidths are removed before calculating the mean. +func (s *Stats) Bandwidth() float64 { + bws := make([]float64, 0, len(s.Bandwidths)) + for _, v := range s.Bandwidths { + bws = append(bws, v) + } + sort.Float64s(bws) + offset := len(bws) / 10 + bws = bws[offset : len(bws)-offset] + + var total float64 + for _, bandwidth := range bws { + total += bandwidth + } + return total / float64(len(bws)) +} + +// Latency returns the average latency. +// 10% of the best and worst latencies are removed before calculating the mean. +func (s *Stats) Latency() time.Duration { + latencies := make([]time.Duration, 0, len(s.Latencies)) + for _, v := range s.Latencies { + latencies = append(latencies, v) + } + sort.Slice(latencies, func(i, j int) bool { + return latencies[i] < latencies[j] + }) + offset := len(latencies) / 10 + latencies = latencies[offset : len(latencies)-offset] + + var total time.Duration + for _, latency := range latencies { + total += latency + } + return total / time.Duration(len(latencies)) +} + +// PacketLoss returns the percentage of packets lost. +func (s *Stats) PacketLoss() float64 { + return float64(s.TotalSent-s.TotalReceived) / float64(s.TotalSent) +}