diff --git a/cmd/otelbench/promql_bench.go b/cmd/otelbench/promql_bench.go index 65462e09..6ce6c8e6 100644 --- a/cmd/otelbench/promql_bench.go +++ b/cmd/otelbench/promql_bench.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io" + "math/rand" "net/http" "os" "path/filepath" @@ -29,6 +30,7 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" + "golang.org/x/sync/errgroup" "sigs.k8s.io/yaml" "github.com/go-faster/oteldb/internal/logparser" @@ -50,6 +52,11 @@ type PromQL struct { Count int Warmup int + // For concurrent benchmarks. + Jobs int + Duration time.Duration + Concurrent bool + StartTime string EndTime string AllowEmpty bool @@ -128,7 +135,9 @@ func (p *PromQL) setupTracing(ctx context.Context) error { return nil } -func (p *PromQL) Setup(ctx context.Context) error { +func (p *PromQL) Setup(cmd *cobra.Command) error { + ctx := cmd.Context() + var err error if p.start, err = parseTime(p.StartTime); err != nil { return errors.Wrap(err, "parse start time") @@ -136,6 +145,17 @@ func (p *PromQL) Setup(ctx context.Context) error { if p.end, err = parseTime(p.EndTime); err != nil { return errors.Wrap(err, "parse end time") } + + if p.Concurrent { + if p.Trace { + return errors.New("concurrent mode is not supported with tracing") + } + if p.client, err = promapi.NewClient(p.Addr); err != nil { + return errors.Wrap(err, "create client") + } + return nil + } + if err := p.setupTracing(ctx); err != nil { return errors.Wrap(err, "setup tracing") } @@ -532,6 +552,57 @@ type PromQLReport struct { Queries []PromQLReportQuery `json:"queries"` } +func (p *PromQL) runConcurrentBenchmark(ctx context.Context) error { + // Load queries. + var queries []promproxy.Query + if err := p.each(ctx, func(ctx context.Context, _ int, q promproxy.Query) error { + queries = append(queries, q) + return nil + }); err != nil { + return errors.Wrap(err, "load queries") + } + + // Spawn workers and execute random queries until time is up. + g, ctx := errgroup.WithContext(ctx) + tasks := make(chan promproxy.Query, p.Jobs) + for i := 0; i < p.Jobs; i++ { + g.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case q, ok := <-tasks: + if !ok { + return nil + } + if err := p.send(ctx, q); err != nil { + return errors.Wrap(err, "send") + } + } + } + }) + } + g.Go(func() error { + // Generate load. + // #nosec G404 + rnd := rand.New(rand.NewSource(1)) + defer close(tasks) + for { + q := queries[rnd.Intn(len(queries))] + select { + case <-ctx.Done(): + return ctx.Err() + case tasks <- q: + continue + } + } + }) + if err := g.Wait(); err != nil { + return errors.Wrap(err, "wait") + } + return nil +} + func (p *PromQL) Run(ctx context.Context) error { fmt.Println("sending promql queries from", p.Input, "to", p.Addr) if !p.start.IsZero() { @@ -540,6 +611,10 @@ func (p *PromQL) Run(ctx context.Context) error { if !p.end.IsZero() { fmt.Println("end time override:", p.end.Format(time.RFC3339)) } + if p.Concurrent { + return p.runConcurrentBenchmark(ctx) + } + var total int if err := p.each(ctx, func(ctx context.Context, _ int, q promproxy.Query) error { total += p.Count @@ -638,7 +713,7 @@ func newPromQLBenchmarkCommand() *cobra.Command { Short: "Run promql queries", RunE: func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() - if err := p.Setup(ctx); err != nil { + if err := p.Setup(cmd); err != nil { return errors.Wrap(err, "setup") } return p.Run(ctx) @@ -654,9 +729,14 @@ func newPromQLBenchmarkCommand() *cobra.Command { f.StringVar(&p.StartTime, "start", "", "Start time override (RFC3339 or unix timestamp)") f.StringVar(&p.EndTime, "end", "", "End time override (RFC3339 or unix timestamp)") f.BoolVar(&p.AllowEmpty, "allow-empty", true, "Allow empty results") + f.BoolVar(&p.Trace, "trace", false, "Trace queries") - f.IntVar(&p.Count, "count", 1, "Number of times to run each query") - f.IntVar(&p.Warmup, "warmup", 0, "Number of warmup runs") + f.IntVar(&p.Count, "count", 1, "Number of times to run each query (only for sequential)") + f.IntVar(&p.Warmup, "warmup", 0, "Number of warmup runs (only for sequential)") + + f.IntVar(&p.Jobs, "jobs", 1, "Number of concurrent jobs (only for concurrent)") + f.DurationVarP(&p.Duration, "duration", "d", time.Minute*5, "Duration of benchmark (only for concurrent)") + f.BoolVarP(&p.Concurrent, "concurrent", "c", false, "Run queries concurrently") return cmd }