Skip to content

Commit

Permalink
feat(logqlbench): add LogQL benchmark tool
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Jun 20, 2024
1 parent e9a947c commit a8580d9
Show file tree
Hide file tree
Showing 9 changed files with 828 additions and 4 deletions.
14 changes: 14 additions & 0 deletions cmd/otelbench/logql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package main

import "github.com/spf13/cobra"

func newLogQLCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "logql",
Short: "Suite for LogQL benchmarks",
}
cmd.AddCommand(
newLogQLBenchmarkCommand(),
)
return cmd
}
43 changes: 43 additions & 0 deletions cmd/otelbench/logql_bench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"time"

"github.com/go-faster/errors"
"github.com/spf13/cobra"

"github.com/go-faster/oteldb/cmd/otelbench/logqlbench"
)

func newLogQLBenchmarkCommand() *cobra.Command {
p := &logqlbench.LogQLBenchmark{}
cmd := &cobra.Command{
Use: "bench",
Aliases: []string{"benchmark"},
Short: "Run LogQL queries",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
ctx := cmd.Context()
if err := p.Setup(cmd); err != nil {
return errors.Wrap(err, "setup")
}
return p.Run(ctx)
},
}
f := cmd.Flags()
f.StringVar(&p.Addr, "addr", "http://127.0.0.1:3100", "Loki address")
f.StringVarP(&p.Input, "input", "i", "logql.yml", "Input file")
f.StringVarP(&p.Output, "output", "o", "report.yml", "Output report file")
f.DurationVar(&p.RequestTimeout, "request-timeout", time.Second*10, "Request timeout")
f.StringVar(&p.TracesExporterAddr, "traces-exporter-addr", "http://127.0.0.1:4317", "Traces exporter OTLP endpoint")
f.StringVar(&p.TempoAddr, "tempo-addr", "http://127.0.0.1:3200", "Tempo endpoint")
f.StringVar(&p.StartTime, "start", "", "Start time override (RFC3339 or unix timestamp)")
f.StringVar(&p.EndTime, "end", "", "End time override (RFC3339 or unix timestamp)")
f.BoolVar(&p.AllowEmpty, "allow-empty", true, "Allow empty results")

f.BoolVar(&p.Trace, "trace", false, "Trace queries")
f.IntVar(&p.Count, "count", 1, "Number of times to run each query (only for sequential)")
f.IntVar(&p.Warmup, "warmup", 0, "Number of warmup runs (only for sequential)")

return cmd
}
193 changes: 193 additions & 0 deletions cmd/otelbench/logqlbench/logqlbench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Package logqlbench defines utilities to benchmark LogQL queries.
package logqlbench

import (
"bytes"
"context"
"fmt"
"net/http"
"os"
"slices"
"strings"
"sync"
"time"

"github.com/go-faster/errors"
yamlx "github.com/go-faster/yaml"
"github.com/schollz/progressbar/v3"
"github.com/spf13/cobra"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"

"github.com/go-faster/oteldb/internal/lokiapi"
"github.com/go-faster/oteldb/internal/lokihandler"
"github.com/go-faster/oteldb/internal/tempoapi"
)

type LogQLBenchmark struct {
Addr string
Count int
Warmup int
Trace bool

StartTime string
EndTime string
AllowEmpty bool

TracesExporterAddr string
TempoAddr string

Input string
Output string
RequestTimeout time.Duration

batchSpanProcessor sdktrace.SpanProcessor
tracerProvider trace.TracerProvider
tempo *tempoapi.Client

client *lokiapi.Client
start time.Time
end time.Time

queries []tracedQuery
queriesMux sync.Mutex

reports []LogQLReportQuery
tracer trace.Tracer
}

// Setup setups benchmark using given flags.
func (p *LogQLBenchmark) Setup(cmd *cobra.Command) error {
ctx := cmd.Context()

var err error
if p.start, err = lokihandler.ParseTimestamp(p.StartTime, time.Time{}); err != nil {
return errors.Wrap(err, "parse start time")
}
if p.end, err = lokihandler.ParseTimestamp(p.EndTime, time.Time{}); err != nil {
return errors.Wrap(err, "parse end time")
}

if err := p.setupTracing(ctx); err != nil {
return errors.Wrap(err, "setup tracing")
}
propagator := propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
httpClient := &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport,
otelhttp.WithTracerProvider(p.tracerProvider),
otelhttp.WithPropagators(propagator),
),
}
p.client, err = lokiapi.NewClient(p.Addr,
lokiapi.WithTracerProvider(p.tracerProvider),
lokiapi.WithClient(httpClient),
)
if err != nil {
return errors.Wrap(err, "create client")
}
return nil
}

// Run starts the benchmark.
func (p *LogQLBenchmark) Run(ctx context.Context) error {
fmt.Println("sending LogQL queries from", p.Input, "to", p.Addr)
if !p.start.IsZero() {
fmt.Println("start time override:", p.start.Format(time.RFC3339))
}
if !p.end.IsZero() {
fmt.Println("end time override:", p.end.Format(time.RFC3339))
}

var total int
if err := p.each(ctx, func(ctx context.Context, _ int, q Query) error {
total += p.Count
total += p.Warmup
return nil
}); err != nil {
return errors.Wrap(err, "count total")
}

pb := progressbar.Default(int64(total))
start := time.Now()
if err := p.each(ctx, func(ctx context.Context, id int, q Query) (rerr error) {
// Warmup.
for i := 0; i < p.Warmup; i++ {
if err := p.send(ctx, q); err != nil {
return errors.Wrap(err, "send")
}
if err := pb.Add(1); err != nil {
return errors.Wrap(err, "update progress bar")
}
}
// Run.
for i := 0; i < p.Count; i++ {
if err := p.sendAndRecord(ctx, id, q); err != nil {
return errors.Wrap(err, "send")
}
if err := pb.Add(1); err != nil {
return errors.Wrap(err, "update progress bar")
}
}
return nil
}); err != nil {
_ = pb.Exit()
return errors.Wrap(err, "send queries")
}
if err := pb.Finish(); err != nil {
return errors.Wrap(err, "finish progress bar")
}
fmt.Println("done in", time.Since(start).Round(time.Millisecond))

if p.Trace {
fmt.Println("waiting for traces")
if err := p.flushTraces(ctx); err != nil {
return errors.Wrap(err, "flush traces")
}
} else {
fmt.Println("saving")
}

pb = progressbar.Default(int64(len(p.queries)))
for _, v := range p.queries {
if err := p.report(ctx, v); err != nil {
return errors.Wrap(err, "wait for trace")
}
if err := pb.Add(1); err != nil {
return errors.Wrap(err, "update progress bar")
}
}
if err := pb.Finish(); err != nil {
return errors.Wrap(err, "finish progress bar")
}

report := LogQLReport{
Queries: p.reports,
}
slices.SortFunc(report.Queries, func(a, b LogQLReportQuery) int {
if a.DurationNanos == b.DurationNanos {
return strings.Compare(a.Query, b.Query)
}
if a.DurationNanos > b.DurationNanos {
return -1
}
return 1
})
buf := new(bytes.Buffer)
enc := yamlx.NewEncoder(buf)
enc.SetIndent(2)
if err := enc.Encode(report); err != nil {
return errors.Wrap(err, "encode report")
}
reportData := buf.Bytes()

if err := os.WriteFile(p.Output, reportData, 0o644); err != nil {
return errors.Wrap(err, "write report")
}
fmt.Println("done")
return nil
}
105 changes: 105 additions & 0 deletions cmd/otelbench/logqlbench/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package logqlbench

import (
"context"
"os"
"time"

"github.com/go-faster/errors"
"github.com/go-faster/yaml"

"github.com/go-faster/oteldb/internal/lokihandler"
)

// ConfigQuery defines LogQL query parameters.
type ConfigQuery struct {
Title string `yaml:"title,omitempty"`
Description string `yaml:"description,omitempty"`
Start string `yaml:"start,omitempty"`
End string `yaml:"end,omitempty"`
Step time.Duration `yaml:"step,omitempty"`
Query string `yaml:"query,omitempty"`
Match []string `yaml:"match,omitempty"`
}

// Query is a benchmarked query.
type Query struct {
Type string

Title string
Description string
Start time.Time
End time.Time
Step time.Duration
Query string
Match []string
}

// Input defines queries config.
type Input struct {
Instant []ConfigQuery `yaml:"instant"`
Range []ConfigQuery `yaml:"range"`
Series []ConfigQuery `yaml:"series"`
}

func (p *LogQLBenchmark) each(ctx context.Context, fn func(ctx context.Context, id int, q Query) error) error {
data, err := os.ReadFile(p.Input)
if err != nil {
return errors.Wrap(err, "read input")
}

var input Input
if err := yaml.Unmarshal(data, &input); err != nil {
return errors.Wrap(err, "unmarshal input")
}

mapQuery := func(typ string, cq ConfigQuery) (Query, error) {
q := Query{
Type: typ,
Title: cq.Title,
Description: cq.Description,
Step: cq.Step,
Query: cq.Query,
}

var err error
q.Start, err = lokihandler.ParseTimestamp(cq.Start, p.start)
if err != nil {
return q, errors.Wrap(err, "parse start")
}
q.End, err = lokihandler.ParseTimestamp(cq.End, p.end)
if err != nil {
return q, errors.Wrap(err, "parse end")
}
return q, nil
}

for idx, cq := range input.Instant {
q, err := mapQuery("instant", cq)
if err != nil {
return err
}
if err := fn(ctx, idx+1, q); err != nil {
return errors.Wrap(err, "callback")
}
}
for idx, cq := range input.Range {
q, err := mapQuery("range", cq)
if err != nil {
return err
}
if err := fn(ctx, idx+1, q); err != nil {
return errors.Wrap(err, "callback")
}
}
for idx, cq := range input.Series {
q, err := mapQuery("series", cq)
if err != nil {
return err
}
if err := fn(ctx, idx+1, q); err != nil {
return errors.Wrap(err, "callback")
}
}
return nil
}
26 changes: 26 additions & 0 deletions cmd/otelbench/logqlbench/result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package logqlbench

type LogQLReport struct {
Queries []LogQLReportQuery `json:"queries"`
}

type LogQLReportQuery struct {
ID int `yaml:"id,omitempty"`
Query string `yaml:"query,omitempty"`
Title string `yaml:"title,omitempty"`
Description string `yaml:"description,omitempty"`
DurationNanos int64 `yaml:"duration_nanos,omitempty"`
Matchers []string `yaml:"matchers,omitempty"`
Queries []ClickhouseQueryReport `yaml:"queries,omitempty"`
}

type ClickhouseQueryReport struct {
DurationNanos int64 `yaml:"duration_nanos,omitempty"`
Query string `yaml:"query,omitempty"`
ReadBytes int64 `yaml:"read_bytes,omitempty"`
ReadRows int64 `yaml:"read_rows,omitempty"`
MemoryUsage int64 `yaml:"memory_usage,omitempty"`

ReceivedBytes int64 `yaml:"recevied_bytes,omitempty"`
ReceivedRows int64 `yaml:"recevied_rows,omitempty"`
}
Loading

0 comments on commit a8580d9

Please sign in to comment.