Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upload: process counterflow messages #38

Merged
merged 12 commits into from
Jul 14, 2021
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/m-lab/ndt7-client-go

go 1.13
go 1.16

require (
github.com/apex/log v1.9.0 // indirect
Expand All @@ -18,7 +18,7 @@ require (
github.com/m-lab/uuid v0.0.0-20191115203855-549727171666 // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642 // indirect
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,9 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJ
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/m-lab/access v0.0.3 h1:ixaAZNvN/ggOj1/FOZQhw4r31QA/WlApn0HH71LSHgQ=
github.com/m-lab/access v0.0.3/go.mod h1:gZ7YN3SeMTZYeRv5EFaLdG+XVI/F/X4njM1G1BfwuE4=
Expand Down Expand Up @@ -450,7 +448,6 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
Expand Down
45 changes: 33 additions & 12 deletions internal/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package upload

import (
"context"
"encoding/json"
"errors"
"math/rand"
"time"
Expand Down Expand Up @@ -36,28 +37,41 @@ var makePreparedMessage = func(size int) (*websocket.PreparedMessage, error) {
// errNonTextMessage indicates we've got a non textual message
var errNonTextMessage = errors.New("Received non textual message")

// ignoreIncoming ignores any incoming message. The error is typically ignored
// as this code runs in its own goroutine, yet it's useful for testing.
func ignoreIncoming(conn websocketx.Conn) error {
// readcounterflow reads counter flow message. Errors are reported via errCh.
func readcounterflow(ctx context.Context, conn websocketx.Conn, ch chan<- spec.Measurement,
errCh chan<- error) {
conn.SetReadLimit(params.MaxMessageSize)
for {
for ctx.Err() == nil {
// Implementation note: this guarantees that the websocket engine
// is processing messages. Here we're using as timeout the timeout
// for the whole upload, so that we know that this goroutine is
// active for most of the time we care about, even in the case in
// which the server is not sending us any messages.
err := conn.SetReadDeadline(time.Now().Add(params.UploadTimeout))
if err != nil {
return err
errCh <- err
return
}
mtype, _, err := conn.ReadMessage()
mtype, mdata, err := conn.ReadMessage()
if err != nil {
return err
errCh <- err
return
}
if mtype != websocket.TextMessage {
return errNonTextMessage
errCh <- errNonTextMessage
return
}
var measurement spec.Measurement
if err := json.Unmarshal(mdata, &measurement); err != nil {
errCh <- err
return
}
measurement.Origin = spec.OriginServer
measurement.Test = spec.TestUpload
ch <- measurement
}
// Signal we've finished reading counterflow messages.
errCh <- nil
}

// emit emits an event during the upload.
Expand All @@ -81,14 +95,12 @@ func emit(ch chan<- spec.Measurement, elapsed time.Duration, numBytes int64) {
// Note that upload closes the out channel.
func upload(ctx context.Context, conn websocketx.Conn, out chan<- int64) error {
defer close(out)
wholectx, cancel := context.WithTimeout(ctx, params.UploadTimeout)
defer cancel()
preparedMessage, err := makePreparedMessage(params.BulkMessageSize)
if err != nil {
return err
}
var total int64
for wholectx.Err() == nil {
for ctx.Err() == nil {
err := conn.SetWriteDeadline(time.Now().Add(params.IOTimeout))
if err != nil {
return err
Expand Down Expand Up @@ -122,7 +134,11 @@ func uploadAsync(ctx context.Context, conn websocketx.Conn) <-chan int64 {
func Run(ctx context.Context, conn websocketx.Conn, ch chan<- spec.Measurement) error {
defer close(ch)
defer conn.Close()
go ignoreIncoming(conn)
ctx, cancel := context.WithTimeout(ctx, params.UploadTimeout)
defer cancel()
errCh := make(chan error)
defer close(errCh)
go readcounterflow(ctx, conn, ch, errCh)
start := time.Now()
prev := start
for tot := range uploadAsync(ctx, conn) {
Expand All @@ -132,5 +148,10 @@ func Run(ctx context.Context, conn websocketx.Conn, ch chan<- spec.Measurement)
prev = now
}
}

err := <-errCh
if err != nil {
return err
}
return nil
}
86 changes: 60 additions & 26 deletions internal/upload/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package upload

import (
"context"
"encoding/json"
"errors"
"testing"
"time"
Expand All @@ -17,37 +18,20 @@ func TestNormal(t *testing.T) {
context.Background(), time.Duration(time.Second),
)
defer cancel()
conn := mocks.Conn{}
conn := mocks.Conn{
MessageByteArray: []byte("{}"),
ReadMessageType: websocket.TextMessage,
}
go func() {
err := Run(ctx, &conn, outch)
if err != nil {
t.Fatal(err)
}
}()
prev := spec.Measurement{
AppInfo: &spec.AppInfo{},
}
tot := 0
for m := range outch {
// Drain the channel and count the number of Measurements read.
for _ = range outch {
tot++
if m.Origin != spec.OriginClient {
t.Fatal("The origin is wrong")
}
if m.Test != spec.TestUpload {
t.Fatal("The test is wrong")
}
if m.AppInfo == nil {
t.Fatal("m.AppInfo is nil")
}
if m.AppInfo.ElapsedTime <= prev.AppInfo.ElapsedTime {
t.Fatal("Time is not increasing")
}
// Note: it can stay constant when we're servicing
// a TCP timeout longer than the update interval
if m.AppInfo.NumBytes < prev.AppInfo.NumBytes {
t.Fatal("Number of bytes is decreasing")
}
prev = m
}
if tot <= 0 {
t.Fatal("Expected at least one message")
Expand All @@ -59,7 +43,10 @@ func TestSetReadDeadlineError(t *testing.T) {
conn := mocks.Conn{
SetReadDeadlineResult: mockedErr,
}
err := ignoreIncoming(&conn)
ch := make(chan spec.Measurement, 128)
errCh := make(chan error)
go readcounterflow(context.Background(), &conn, ch, errCh)
err := <-errCh
if err != mockedErr {
t.Fatal("Not the error we expected")
}
Expand All @@ -70,7 +57,11 @@ func TestReadMessageError(t *testing.T) {
conn := mocks.Conn{
ReadMessageResult: mockedErr,
}
err := ignoreIncoming(&conn)
ch := make(chan spec.Measurement, 128)
errCh := make(chan error)
defer close(errCh)
go readcounterflow(context.Background(), &conn, ch, errCh)
err := <-errCh
if err != mockedErr {
t.Fatal("Not the error we expected")
}
Expand All @@ -81,12 +72,55 @@ func TestReadNonTextMessageError(t *testing.T) {
ReadMessageType: websocket.BinaryMessage,
MessageByteArray: []byte("abcdef"),
}
err := ignoreIncoming(&conn)
ch := make(chan spec.Measurement, 128)
errCh := make(chan error)
defer close(errCh)
go readcounterflow(context.Background(), &conn, ch, errCh)
err := <-errCh
if err != errNonTextMessage {
t.Fatal("Not the error we expected")
}
}

func TestReadNonJSONError(t *testing.T) {
conn := mocks.Conn{
ReadMessageType: websocket.TextMessage,
MessageByteArray: []byte("{"),
}
ch := make(chan spec.Measurement, 128)
errCh := make(chan error)
defer close(errCh)
go readcounterflow(context.Background(), &conn, ch, errCh)
err := <-errCh
var syntaxError *json.SyntaxError
if !errors.As(err, &syntaxError) {
t.Fatal("Not the error we expected")
}
}

func TestReadGoodMessage(t *testing.T) {
conn := mocks.Conn{
ReadMessageType: websocket.TextMessage,
MessageByteArray: []byte("{}"),
}
ch := make(chan spec.Measurement, 128)
var count int64
ctx, cancel := context.WithCancel(context.Background())
go func() {
for range ch {
count++
cancel()
}
}()
errCh := make(chan error)
defer close(errCh)
go readcounterflow(ctx, &conn, ch, errCh)
err := <-errCh
if err != nil {
t.Fatal(err)
}
}

func TestMakePreparedMessageError(t *testing.T) {
mockedErr := errors.New("mocked error")
outch := make(chan int64)
Expand Down