Skip to content

Commit

Permalink
[Hammer] Moved more common functionality into loadtest package (#449)
Browse files Browse the repository at this point in the history
This is a rough approximation of what a library version of the hammer
could look like. The remaining code in the outer `main` package is now
the tlog-tiles specific stuff. The code in the `loadtest` package should
be configurable to use with Static CT.
  • Loading branch information
mhutchinson authored Jan 22, 2025
1 parent 20a5dcf commit 5c626d7
Show file tree
Hide file tree
Showing 7 changed files with 436 additions and 328 deletions.
285 changes: 11 additions & 274 deletions internal/hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package main
import (
"context"
"crypto/tls"
"errors"
"flag"
"fmt"
"math/rand/v2"
Expand All @@ -28,7 +27,6 @@ import (
"sync"
"time"

movingaverage "github.com/RobinUS2/golang-moving-average"
"github.com/transparency-dev/trillian-tessera/client"
"github.com/transparency-dev/trillian-tessera/internal/hammer/loadtest"
"golang.org/x/mod/sumdb/note"
Expand Down Expand Up @@ -121,12 +119,18 @@ func main() {
klog.Exitf("Failed to get initial state of the log: %v", err)
}

ha := newHammerAnalyser(func() uint64 { return tracker.LatestConsistent.Size })
go ha.updateStatsLoop(ctx)
go ha.errorLoop(ctx)
ha := loadtest.NewHammerAnalyser(func() uint64 { return tracker.LatestConsistent.Size })
ha.Run(ctx)

gen := newLeafGenerator(tracker.LatestConsistent.Size, *leafMinSize, *dupChance)
hammer := NewHammer(&tracker, f.ReadEntryBundle, w, gen, ha.seqLeafChan, ha.errChan)
opts := loadtest.HammerOpts{
MaxReadOpsPerSecond: *maxReadOpsPerSecond,
MaxWriteOpsPerSecond: *maxWriteOpsPerSecond,
NumReadersRandom: *numReadersRandom,
NumReadersFull: *numReadersFull,
NumWriters: *numWriters,
}
hammer := loadtest.NewHammer(&tracker, f.ReadEntryBundle, w, gen, ha.SeqLeafChan, ha.ErrChan, opts)

exitCode := 0
if *leafWriteGoal > 0 {
Expand Down Expand Up @@ -169,206 +173,14 @@ func main() {
hammer.Run(ctx)

if *showUI {
c := newController(hammer, ha)
c := loadtest.NewController(hammer, ha)
c.Run(ctx)
} else {
<-ctx.Done()
}
os.Exit(exitCode)
}

func NewHammer(tracker *client.LogStateTracker, f client.EntryBundleFetcherFunc, w loadtest.LeafWriter, gen func() []byte, seqLeafChan chan<- loadtest.LeafTime, errChan chan<- error) *Hammer {
readThrottle := NewThrottle(*maxReadOpsPerSecond)
writeThrottle := NewThrottle(*maxWriteOpsPerSecond)

randomReaders := loadtest.NewWorkerPool(func() loadtest.Worker {
return loadtest.NewLeafReader(tracker, f, loadtest.RandomNextLeaf(), readThrottle.tokenChan, errChan)
})
fullReaders := loadtest.NewWorkerPool(func() loadtest.Worker {
return loadtest.NewLeafReader(tracker, f, loadtest.MonotonicallyIncreasingNextLeaf(), readThrottle.tokenChan, errChan)
})
writers := loadtest.NewWorkerPool(func() loadtest.Worker {
return loadtest.NewLogWriter(w, gen, writeThrottle.tokenChan, errChan, seqLeafChan)
})

return &Hammer{
randomReaders: randomReaders,
fullReaders: fullReaders,
writers: writers,
readThrottle: readThrottle,
writeThrottle: writeThrottle,
tracker: tracker,
}
}

// Hammer is responsible for coordinating the operations against the log in the form
// of write and read operations. The work of analysing the results of hammering should
// live outside of this class.
type Hammer struct {
randomReaders loadtest.WorkerPool
fullReaders loadtest.WorkerPool
writers loadtest.WorkerPool
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
}

func (h *Hammer) Run(ctx context.Context) {
// Kick off readers & writers
for i := 0; i < *numReadersRandom; i++ {
h.randomReaders.Grow(ctx)
}
for i := 0; i < *numReadersFull; i++ {
h.fullReaders.Grow(ctx)
}
for i := 0; i < *numWriters; i++ {
h.writers.Grow(ctx)
}

go h.readThrottle.Run(ctx)
go h.writeThrottle.Run(ctx)

go h.updateCheckpointLoop(ctx)
}

func (h *Hammer) updateCheckpointLoop(ctx context.Context) {
tick := time.NewTicker(500 * time.Millisecond)
for {
select {
case <-ctx.Done():
return
case <-tick.C:
size := h.tracker.LatestConsistent.Size
_, _, _, err := h.tracker.Update(ctx)
if err != nil {
klog.Warning(err)
inconsistentErr := client.ErrInconsistency{}
if errors.As(err, &inconsistentErr) {
klog.Fatalf("Last Good Checkpoint:\n%s\n\nFirst Bad Checkpoint:\n%s\n\n%v", string(inconsistentErr.SmallerRaw), string(inconsistentErr.LargerRaw), inconsistentErr)
}
}
newSize := h.tracker.LatestConsistent.Size
if newSize > size {
klog.V(1).Infof("Updated checkpoint from %d to %d", size, newSize)
}
}
}
}

func newHammerAnalyser(treeSizeFn func() uint64) *HammerAnalyser {
leafSampleChan := make(chan loadtest.LeafTime, 100)
errChan := make(chan error, 20)
return &HammerAnalyser{
treeSizeFn: treeSizeFn,
seqLeafChan: leafSampleChan,
errChan: errChan,
integrationTime: movingaverage.Concurrent(movingaverage.New(30)),
queueTime: movingaverage.Concurrent(movingaverage.New(30)),
}
}

// HammerAnalyser is responsible for measuring and interpreting the result of hammering.
type HammerAnalyser struct {
treeSizeFn func() uint64
seqLeafChan chan loadtest.LeafTime
errChan chan error

queueTime *movingaverage.ConcurrentMovingAverage
integrationTime *movingaverage.ConcurrentMovingAverage
}

func (a *HammerAnalyser) updateStatsLoop(ctx context.Context) {
tick := time.NewTicker(100 * time.Millisecond)
size := a.treeSizeFn()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
}
newSize := a.treeSizeFn()
if newSize <= size {
continue
}
now := time.Now()
totalLatency := time.Duration(0)
queueLatency := time.Duration(0)
numLeaves := 0
var sample *loadtest.LeafTime
ReadLoop:
for {
if sample == nil {
select {
case l, ok := <-a.seqLeafChan:
if !ok {
break ReadLoop
}
sample = &l
default:
break ReadLoop
}
}
// Stop considering leaf times once we've caught up with that cross
// either the current checkpoint or "now":
// - leaves with indices beyond the tree size we're considering are not integrated yet, so we can't calculate their TTI
// - leaves which were queued before "now", but not assigned by "now" should also be ignored as they don't fall into this epoch (and would contribute a -ve latency if they were included).
if sample.Index >= newSize || sample.AssignedAt.After(now) {
break
}
queueLatency += sample.AssignedAt.Sub(sample.QueuedAt)
// totalLatency is skewed towards being higher than perhaps it may technically be by:
// - the tick interval of this goroutine,
// - the tick interval of the goroutine which updates the LogStateTracker,
// - any latency in writes to the log becoming visible for reads.
// But it's probably good enough for now.
totalLatency += now.Sub(sample.QueuedAt)

numLeaves++
sample = nil
}
if numLeaves > 0 {
a.integrationTime.Add(float64(totalLatency/time.Millisecond) / float64(numLeaves))
a.queueTime.Add(float64(queueLatency/time.Millisecond) / float64(numLeaves))
}
}
}

func (a *HammerAnalyser) errorLoop(ctx context.Context) {
tick := time.NewTicker(time.Second)
pbCount := 0
lastErr := ""
lastErrCount := 0
for {
select {
case <-ctx.Done(): //context cancelled
return
case <-tick.C:
if pbCount > 0 {
klog.Warningf("%d requests received pushback from log", pbCount)
pbCount = 0
}
if lastErrCount > 0 {
klog.Warningf("(%d x) %s", lastErrCount, lastErr)
lastErrCount = 0

}
case err := <-a.errChan:
if errors.Is(err, loadtest.ErrRetry) {
pbCount++
continue
}
es := err.Error()
if es != lastErr && lastErrCount > 0 {
klog.Warningf("(%d x) %s", lastErrCount, lastErr)
lastErr = es
lastErrCount = 0
continue
}
lastErrCount++
}
}
}

// newLeafGenerator returns a function that generates values to append to a log.
// The leaves are constructed to be at least minLeafSize bytes long.
// The generator can be used by concurrent threads.
Expand Down Expand Up @@ -410,81 +222,6 @@ func newLeafGenerator(startSize uint64, minLeafSize int, dupChance float64) func
}
}

func NewThrottle(opsPerSecond int) *Throttle {
return &Throttle{
opsPerSecond: opsPerSecond,
tokenChan: make(chan bool, opsPerSecond),
}
}

type Throttle struct {
tokenChan chan bool
mu sync.Mutex
opsPerSecond int

oversupply int
}

func (t *Throttle) Increase() {
t.mu.Lock()
defer t.mu.Unlock()
tokenCount := t.opsPerSecond
delta := float64(tokenCount) * 0.1
if delta < 1 {
delta = 1
}
t.opsPerSecond = tokenCount + int(delta)
}

func (t *Throttle) Decrease() {
t.mu.Lock()
defer t.mu.Unlock()
tokenCount := t.opsPerSecond
if tokenCount <= 1 {
return
}
delta := float64(tokenCount) * 0.1
if delta < 1 {
delta = 1
}
t.opsPerSecond = tokenCount - int(delta)
}

func (t *Throttle) Run(ctx context.Context) {
interval := time.Second
ticker := time.NewTicker(interval)
for {
select {
case <-ctx.Done(): //context cancelled
return
case <-ticker.C:
ctx, cancel := context.WithTimeout(ctx, interval)
t.supplyTokens(ctx)
cancel()
}
}
}

func (t *Throttle) supplyTokens(ctx context.Context) {
t.mu.Lock()
defer t.mu.Unlock()
tokenCount := t.opsPerSecond
for i := 0; i < t.opsPerSecond; i++ {
select {
case t.tokenChan <- true:
tokenCount--
case <-ctx.Done():
t.oversupply = tokenCount
return
}
}
t.oversupply = 0
}

func (t *Throttle) String() string {
return fmt.Sprintf("Current max: %d/s. Oversupply in last second: %d", t.opsPerSecond, t.oversupply)
}

// multiStringFlag allows a flag to be specified multiple times on the command
// line, and stores all of these values.
type multiStringFlag []string
Expand Down
50 changes: 0 additions & 50 deletions internal/hammer/hammer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@
package main

import (
"context"
"sync"
"testing"
"time"

"github.com/transparency-dev/trillian-tessera/internal/hammer/loadtest"
)

func TestLeafGenerator(t *testing.T) {
Expand All @@ -40,48 +35,3 @@ func TestLeafGenerator(t *testing.T) {
}
}
}

func TestHammerAnalyser_Stats(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var treeSize treeSizeState
ha := newHammerAnalyser(treeSize.getSize)

go ha.updateStatsLoop(ctx)

time.Sleep(100 * time.Millisecond)

baseTime := time.Now().Add(-1 * time.Minute)
for i := 0; i < 10; i++ {
ha.seqLeafChan <- loadtest.LeafTime{
Index: uint64(i),
QueuedAt: baseTime,
AssignedAt: baseTime.Add(time.Duration(i) * time.Second),
}
}
treeSize.setSize(10)
time.Sleep(500 * time.Millisecond)

avg := ha.queueTime.Avg()
if want := float64(4500); avg != want {
t.Errorf("integration time avg: got != want (%f != %f)", avg, want)
}
}

type treeSizeState struct {
size uint64
mux sync.RWMutex
}

func (s *treeSizeState) getSize() uint64 {
s.mux.RLock()
defer s.mux.RUnlock()
return s.size
}

func (s *treeSizeState) setSize(size uint64) {
s.mux.Lock()
defer s.mux.Unlock()
s.size = size
}
Loading

0 comments on commit 5c626d7

Please sign in to comment.