From f4466bd7f5ab699e9ab5ee7f67dd52985242eda9 Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Sat, 13 Jul 2024 15:38:22 +0800 Subject: [PATCH 1/8] fix: data race in value ping error --- speedtest/server.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/speedtest/server.go b/speedtest/server.go index 19a3c50..fd56160 100644 --- a/speedtest/server.go +++ b/speedtest/server.go @@ -291,14 +291,15 @@ func (s *Speedtest) FetchServerListContext(ctx context.Context) (Servers, error) wg.Add(1) go func(gs *Server) { var latency []int64 + var errPing error if s.config.PingMode == TCP { - latency, err = gs.TCPPing(pCtx, 1, time.Millisecond, nil) + _, errPing = gs.TCPPing(pCtx, 1, time.Millisecond, nil) } else if s.config.PingMode == ICMP { - latency, err = gs.ICMPPing(pCtx, 4*time.Second, 1, time.Millisecond, nil) + _, errPing = gs.ICMPPing(pCtx, 4*time.Second, 1, time.Millisecond, nil) } else { - latency, err = gs.HTTPPing(pCtx, 1, time.Millisecond, nil) + _, errPing = gs.HTTPPing(context.Background(), 1, time.Millisecond, nil) } - if err != nil || len(latency) < 1 { + if errPing != nil || len(latency) < 1 { gs.Latency = PingTimeout } else { gs.Latency = time.Duration(latency[0]) * time.Nanosecond From 447349642c39f0ee3bdcd5104d71c8b007773d8b Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Sat, 13 Jul 2024 15:53:49 +0800 Subject: [PATCH 2/8] fix(cli): data race in value packetloss --- speedtest.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/speedtest.go b/speedtest.go index 10a1b79..d3775c3 100644 --- a/speedtest.go +++ b/speedtest.go @@ -11,6 +11,7 @@ import ( "os" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -149,9 +150,12 @@ func main() { SourceInterface: *source, }) + blocker := sync.WaitGroup{} packetLossAnalyzerCtx, packetLossAnalyzerCancel := context.WithTimeout(context.Background(), time.Second*40) taskManager.Run("Packet Loss Analyzer", func(task *Task) { go func() { + blocker.Add(1) + defer blocker.Done() err = analyzer.RunWithContext(packetLossAnalyzerCtx, server.Host, func(packetLoss *transport.PLoss) { server.PacketLoss = *packetLoss }) @@ -211,6 +215,7 @@ func main() { time.Sleep(time.Second * 30) } packetLossAnalyzerCancel() + blocker.Wait() if !*jsonOutput { taskManager.Println(server.PacketLoss.String()) } From 78b05bffe495f038f5f90320bcc28ac63a1774f8 Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Sat, 13 Jul 2024 16:16:07 +0800 Subject: [PATCH 3/8] fix(api): data race in value repeat byte --- speedtest/data_manager.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/speedtest/data_manager.go b/speedtest/data_manager.go index 04546ac..57ab2ab 100644 --- a/speedtest/data_manager.go +++ b/speedtest/data_manager.go @@ -114,11 +114,13 @@ func (dm *DataManager) NewDataDirection(testType int) *TestDirection { } func NewDataManager() *DataManager { + r := bytes.Repeat([]byte{0xAA}, readChunkSize) // uniformly distributed sequence of bits ret := &DataManager{ nThread: runtime.NumCPU(), captureTime: time.Second * 15, rateCaptureFrequency: time.Millisecond * 50, Snapshot: &Snapshot{}, + repeatByte: &r, } ret.download = ret.NewDataDirection(typeDownload) ret.upload = ret.NewDataDirection(typeUpload) @@ -434,12 +436,6 @@ func (dc *DataChunk) UploadHandler(size int64) Chunk { dc.ContentLength = size dc.remainOrDiscardSize = size dc.dateType = typeUpload - - if dc.manager.repeatByte == nil { - r := bytes.Repeat([]byte{0xAA}, readChunkSize) // uniformly distributed sequence of bits - dc.manager.repeatByte = &r - } - dc.startTime = time.Now() return dc } From dd38f3048a64e84825feee924c25e93b306a2eef Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Sat, 13 Jul 2024 16:33:24 +0800 Subject: [PATCH 4/8] fix(api): data race in value totalDataVolume --- speedtest/data_manager.go | 26 +++++++++++++++++--------- speedtest/data_manager_test.go | 2 +- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/speedtest/data_manager.go b/speedtest/data_manager.go index 57ab2ab..28114b8 100644 --- a/speedtest/data_manager.go +++ b/speedtest/data_manager.go @@ -171,6 +171,14 @@ func (dm *DataManager) RegisterDownloadHandler(fn func()) *TestDirection { return dm.download } +func (td *TestDirection) GetTotalDataVolume() int64 { + return atomic.LoadInt64(&td.totalDataVolume) +} + +func (td *TestDirection) AddTotalDataVolume(delta int64) int64 { + return atomic.AddInt64(&td.totalDataVolume, delta) +} + func (td *TestDirection) Start(cancel context.CancelFunc, mainRequestHandlerIndex int) { if len(td.fns) == 0 { panic("empty task stack") @@ -257,14 +265,14 @@ func (td *TestDirection) rateCapture() chan bool { for { select { case <-t.C: - newTotalDataVolume := td.totalDataVolume + newTotalDataVolume := td.GetTotalDataVolume() deltaDataVolume := newTotalDataVolume - prevTotalDataVolume prevTotalDataVolume = newTotalDataVolume if deltaDataVolume != 0 { td.RateSequence = append(td.RateSequence, deltaDataVolume) } // anyway we update the measuring instrument - globalAvg := (float64(td.totalDataVolume)) / float64(time.Since(sTime).Milliseconds()) * 1000 + globalAvg := (float64(td.GetTotalDataVolume())) / float64(time.Since(sTime).Milliseconds()) * 1000 if td.welford.Update(globalAvg, float64(deltaDataVolume)) { go td.closeFunc() } @@ -292,19 +300,19 @@ func (dm *DataManager) NewChunk() Chunk { } func (dm *DataManager) AddTotalDownload(value int64) { - atomic.AddInt64(&dm.download.totalDataVolume, value) + dm.download.AddTotalDataVolume(value) } func (dm *DataManager) AddTotalUpload(value int64) { - atomic.AddInt64(&dm.upload.totalDataVolume, value) + dm.upload.AddTotalDataVolume(value) } func (dm *DataManager) GetTotalDownload() int64 { - return dm.download.totalDataVolume + return dm.download.GetTotalDataVolume() } func (dm *DataManager) GetTotalUpload() int64 { - return dm.upload.totalDataVolume + return dm.upload.GetTotalDataVolume() } func (dm *DataManager) SetRateCaptureFrequency(duration time.Duration) Manager { @@ -339,7 +347,7 @@ func (dm *DataManager) Reset() { func (dm *DataManager) GetAvgDownloadRate() float64 { unit := float64(dm.captureTime / time.Millisecond) - return float64(dm.download.totalDataVolume*8/1000) / unit + return float64(dm.download.GetTotalDataVolume()*8/1000) / unit } func (dm *DataManager) GetEWMADownloadRate() float64 { @@ -351,7 +359,7 @@ func (dm *DataManager) GetEWMADownloadRate() float64 { func (dm *DataManager) GetAvgUploadRate() float64 { unit := float64(dm.captureTime / time.Millisecond) - return float64(dm.upload.totalDataVolume*8/1000) / unit + return float64(dm.upload.GetTotalDataVolume()*8/1000) / unit } func (dm *DataManager) GetEWMAUploadRate() float64 { @@ -414,7 +422,7 @@ func (dc *DataChunk) DownloadHandler(r io.Reader) error { rs := int64(readSize) dc.remainOrDiscardSize += rs - atomic.AddInt64(&dc.manager.download.totalDataVolume, rs) + dc.manager.download.AddTotalDataVolume(rs) if dc.err != nil { if dc.err == io.EOF { return nil diff --git a/speedtest/data_manager_test.go b/speedtest/data_manager_test.go index 6f59db6..dbd34d3 100644 --- a/speedtest/data_manager_test.go +++ b/speedtest/data_manager_test.go @@ -35,7 +35,7 @@ func TestDataManager_AddTotalDownload(t *testing.T) { }() } wg.Wait() - if dmp.download.totalDataVolume != 43521000000 { + if dmp.download.GetTotalDataVolume() != 43521000000 { t.Fatal() } } From 63debf92a05216e54456c494d997e272436853d4 Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Sat, 13 Jul 2024 17:16:53 +0800 Subject: [PATCH 5/8] fix(api): data race in value running --- speedtest/data_manager.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/speedtest/data_manager.go b/speedtest/data_manager.go index 28114b8..bbf8357 100644 --- a/speedtest/data_manager.go +++ b/speedtest/data_manager.go @@ -88,7 +88,8 @@ type DataManager struct { rateCaptureFrequency time.Duration nThread int - running bool + running bool + runningRW sync.RWMutex download *TestDirection upload *TestDirection @@ -210,7 +211,9 @@ func (td *TestDirection) Start(cancel context.CancelFunc, mainRequestHandlerInde once.Do(func() { stopCapture <- true close(stopCapture) + td.manager.runningRW.Lock() td.manager.running = false + td.manager.runningRW.Unlock() cancel() dbg.Println("FuncGroup: Stop") }) @@ -222,7 +225,10 @@ func (td *TestDirection) Start(cancel context.CancelFunc, mainRequestHandlerInde go func() { defer wg.Done() for { - if !td.manager.running { + td.manager.runningRW.RLock() + running := td.manager.running + td.manager.runningRW.RUnlock() + if !running { return } td.fns[mainRequestHandlerIndex]() @@ -415,7 +421,10 @@ func (dc *DataChunk) DownloadHandler(r io.Reader) error { defer blackHolePool.Put(bufP) readSize := 0 for { - if !dc.manager.running { + dc.manager.runningRW.RLock() + running := dc.manager.running + dc.manager.runningRW.RUnlock() + if !running { return nil } readSize, dc.err = r.Read(*bufP) @@ -457,7 +466,10 @@ func (dc *DataChunk) WriteTo(w io.Writer) (written int64, err error) { nw := 0 nr := readChunkSize for { - if !dc.manager.running || dc.remainOrDiscardSize <= 0 { + dc.manager.runningRW.RLock() + running := dc.manager.running + dc.manager.runningRW.RUnlock() + if !running || dc.remainOrDiscardSize <= 0 { dc.endTime = time.Now() return written, io.EOF } From 8beddc955e4205f965e259fad6f5d4cf08160227 Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Sat, 13 Jul 2024 17:32:34 +0800 Subject: [PATCH 6/8] fix lint --- speedtest.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/speedtest.go b/speedtest.go index d3775c3..cad3ef9 100644 --- a/speedtest.go +++ b/speedtest.go @@ -153,8 +153,8 @@ func main() { blocker := sync.WaitGroup{} packetLossAnalyzerCtx, packetLossAnalyzerCancel := context.WithTimeout(context.Background(), time.Second*40) taskManager.Run("Packet Loss Analyzer", func(task *Task) { + blocker.Add(1) go func() { - blocker.Add(1) defer blocker.Done() err = analyzer.RunWithContext(packetLossAnalyzerCtx, server.Host, func(packetLoss *transport.PLoss) { server.PacketLoss = *packetLoss From dedf56c57a2d3193edb01359ae51c424e51a853b Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Sat, 13 Jul 2024 17:37:22 +0800 Subject: [PATCH 7/8] fix(api): data race in value running --- speedtest/data_manager.go | 5 ++++- speedtest/server.go | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/speedtest/data_manager.go b/speedtest/data_manager.go index bbf8357..7ae2a48 100644 --- a/speedtest/data_manager.go +++ b/speedtest/data_manager.go @@ -248,7 +248,10 @@ func (td *TestDirection) Start(cancel context.CancelFunc, mainRequestHandlerInde go func() { defer wg.Done() for { - if !td.manager.running { + td.manager.runningRW.RLock() + running := td.manager.running + td.manager.runningRW.RUnlock() + if !running { return } td.fns[t]() diff --git a/speedtest/server.go b/speedtest/server.go index fd56160..c48524f 100644 --- a/speedtest/server.go +++ b/speedtest/server.go @@ -293,11 +293,11 @@ func (s *Speedtest) FetchServerListContext(ctx context.Context) (Servers, error) var latency []int64 var errPing error if s.config.PingMode == TCP { - _, errPing = gs.TCPPing(pCtx, 1, time.Millisecond, nil) + latency, errPing = gs.TCPPing(pCtx, 1, time.Millisecond, nil) } else if s.config.PingMode == ICMP { - _, errPing = gs.ICMPPing(pCtx, 4*time.Second, 1, time.Millisecond, nil) + latency, errPing = gs.ICMPPing(pCtx, 4*time.Second, 1, time.Millisecond, nil) } else { - _, errPing = gs.HTTPPing(context.Background(), 1, time.Millisecond, nil) + latency, errPing = gs.HTTPPing(context.Background(), 1, time.Millisecond, nil) } if errPing != nil || len(latency) < 1 { gs.Latency = PingTimeout From 9e78a17c3bf32b88c2d13929aee769f09f0bddb2 Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Sat, 13 Jul 2024 17:41:35 +0800 Subject: [PATCH 8/8] revert context --- speedtest/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/speedtest/server.go b/speedtest/server.go index c48524f..7ab64bb 100644 --- a/speedtest/server.go +++ b/speedtest/server.go @@ -297,7 +297,7 @@ func (s *Speedtest) FetchServerListContext(ctx context.Context) (Servers, error) } else if s.config.PingMode == ICMP { latency, errPing = gs.ICMPPing(pCtx, 4*time.Second, 1, time.Millisecond, nil) } else { - latency, errPing = gs.HTTPPing(context.Background(), 1, time.Millisecond, nil) + latency, errPing = gs.HTTPPing(pCtx, 1, time.Millisecond, nil) } if errPing != nil || len(latency) < 1 { gs.Latency = PingTimeout