Skip to content

Commit

Permalink
feat(otelbench.promql.bench): add --concurrent mode
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Jan 18, 2024
1 parent 401f437 commit 63c2115
Showing 1 changed file with 84 additions and 4 deletions.
88 changes: 84 additions & 4 deletions cmd/otelbench/promql_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"os"
"path/filepath"
Expand All @@ -29,6 +30,7 @@ import (
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/sync/errgroup"
"sigs.k8s.io/yaml"

"github.com/go-faster/oteldb/internal/logparser"
Expand All @@ -50,6 +52,11 @@ type PromQL struct {
Count int
Warmup int

// For concurrent benchmarks.
Jobs int
Duration time.Duration
Concurrent bool

StartTime string
EndTime string
AllowEmpty bool
Expand Down Expand Up @@ -128,14 +135,27 @@ func (p *PromQL) setupTracing(ctx context.Context) error {
return nil
}

func (p *PromQL) Setup(ctx context.Context) error {
func (p *PromQL) Setup(cmd *cobra.Command) error {
ctx := cmd.Context()

var err error
if p.start, err = parseTime(p.StartTime); err != nil {
return errors.Wrap(err, "parse start time")
}
if p.end, err = parseTime(p.EndTime); err != nil {
return errors.Wrap(err, "parse end time")
}

if p.Concurrent {
if p.Trace {
return errors.New("concurrent mode is not supported with tracing")
}
if p.client, err = promapi.NewClient(p.Addr); err != nil {
return errors.Wrap(err, "create client")
}
return nil
}

if err := p.setupTracing(ctx); err != nil {
return errors.Wrap(err, "setup tracing")
}
Expand Down Expand Up @@ -532,6 +552,57 @@ type PromQLReport struct {
Queries []PromQLReportQuery `json:"queries"`
}

func (p *PromQL) runConcurrentBenchmark(ctx context.Context) error {
// Load queries.
var queries []promproxy.Query
if err := p.each(ctx, func(ctx context.Context, _ int, q promproxy.Query) error {
queries = append(queries, q)
return nil
}); err != nil {
return errors.Wrap(err, "load queries")
}

// Spawn workers and execute random queries until time is up.
g, ctx := errgroup.WithContext(ctx)
tasks := make(chan promproxy.Query, p.Jobs)
for i := 0; i < p.Jobs; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case q, ok := <-tasks:
if !ok {
return nil
}
if err := p.send(ctx, q); err != nil {
return errors.Wrap(err, "send")
}
}
}
})
}
g.Go(func() error {
// Generate load.
// #nosec G404
rnd := rand.New(rand.NewSource(1))
defer close(tasks)
for {
q := queries[rnd.Intn(len(queries))]
select {
case <-ctx.Done():
return ctx.Err()
case tasks <- q:
continue
}
}
})
if err := g.Wait(); err != nil {
return errors.Wrap(err, "wait")
}
return nil
}

func (p *PromQL) Run(ctx context.Context) error {
fmt.Println("sending promql queries from", p.Input, "to", p.Addr)
if !p.start.IsZero() {
Expand All @@ -540,6 +611,10 @@ func (p *PromQL) Run(ctx context.Context) error {
if !p.end.IsZero() {
fmt.Println("end time override:", p.end.Format(time.RFC3339))
}
if p.Concurrent {
return p.runConcurrentBenchmark(ctx)
}

var total int
if err := p.each(ctx, func(ctx context.Context, _ int, q promproxy.Query) error {
total += p.Count
Expand Down Expand Up @@ -638,7 +713,7 @@ func newPromQLBenchmarkCommand() *cobra.Command {
Short: "Run promql queries",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
if err := p.Setup(ctx); err != nil {
if err := p.Setup(cmd); err != nil {
return errors.Wrap(err, "setup")
}
return p.Run(ctx)
Expand All @@ -654,9 +729,14 @@ func newPromQLBenchmarkCommand() *cobra.Command {
f.StringVar(&p.StartTime, "start", "", "Start time override (RFC3339 or unix timestamp)")
f.StringVar(&p.EndTime, "end", "", "End time override (RFC3339 or unix timestamp)")
f.BoolVar(&p.AllowEmpty, "allow-empty", true, "Allow empty results")

f.BoolVar(&p.Trace, "trace", false, "Trace queries")
f.IntVar(&p.Count, "count", 1, "Number of times to run each query")
f.IntVar(&p.Warmup, "warmup", 0, "Number of warmup runs")
f.IntVar(&p.Count, "count", 1, "Number of times to run each query (only for sequential)")
f.IntVar(&p.Warmup, "warmup", 0, "Number of warmup runs (only for sequential)")

f.IntVar(&p.Jobs, "jobs", 1, "Number of concurrent jobs (only for concurrent)")
f.DurationVarP(&p.Duration, "duration", "d", time.Minute*5, "Duration of benchmark (only for concurrent)")
f.BoolVarP(&p.Concurrent, "concurrent", "c", false, "Run queries concurrently")

return cmd
}

0 comments on commit 63c2115

Please sign in to comment.