Skip to content

Commit

Permalink
fix(otelbench.promql): correctly compute duration
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Jan 10, 2024
1 parent d4042cf commit 661af3e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 45 deletions.
28 changes: 18 additions & 10 deletions cmd/otelbench/promql_analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,27 @@ func (a PromQLAnalyze) Run() error {
} else {
fmt.Println("matchers:", q.Matchers)
}
fmt.Println(" sql:", len(q.Queries))
fmt.Println(" duration:", time.Duration(q.DurationNanos)*time.Nanosecond)

var memUsage, readBytes, readRows int64
for _, v := range q.Queries {
memUsage += v.MemoryUsage
readBytes += v.ReadBytes
readRows += v.ReadRows
formatNanos := func(nanos int64) string {
d := time.Duration(nanos) * time.Nanosecond
return d.Round(time.Millisecond / 20).String()
}
fmt.Println(" duration:", formatNanos(q.DurationNanos))

fmt.Println(" memory usage:", humanize.Bytes(uint64(memUsage)))
fmt.Println(" read bytes:", humanize.Bytes(uint64(readBytes)))
fmt.Println(" read rows:", fmtInt(int(readRows)))
if len(q.Queries) > 0 {
fmt.Println(" sql queries:", len(q.Queries))

var memUsage, readBytes, readRows int64
for _, v := range q.Queries {
memUsage += v.MemoryUsage
readBytes += v.ReadBytes
readRows += v.ReadRows
}

fmt.Println(" memory usage:", humanize.Bytes(uint64(memUsage)))
fmt.Println(" read bytes:", humanize.Bytes(uint64(readBytes)))
fmt.Println(" read rows:", fmtInt(int(readRows)))
}
}

return nil
Expand Down
86 changes: 51 additions & 35 deletions cmd/otelbench/promql_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.opentelemetry.io/otel/trace/noop"
"sigs.k8s.io/yaml"

"github.com/go-faster/oteldb/internal/logparser"
Expand All @@ -44,7 +44,8 @@ type tracedQuery struct {
}

type PromQL struct {
Addr string
Addr string
Trace bool

StartTime string
EndTime string
Expand All @@ -59,12 +60,12 @@ type PromQL struct {

client *promapi.Client
batchSpanProcessor sdktrace.SpanProcessor
tracerProvider *sdktrace.TracerProvider
tracerProvider trace.TracerProvider
tempo *tempoapi.Client
start time.Time
end time.Time

traces []tracedQuery
queries []tracedQuery
reports []PromQLReportQuery
}

Expand Down Expand Up @@ -93,6 +94,10 @@ func parseTime(s string) (time.Time, error) {
}

func (p *PromQL) setupTracing(ctx context.Context) error {
if !p.Trace {
p.tracerProvider = noop.NewTracerProvider()
return nil
}
exporter, err := otlptracegrpc.New(ctx)
if err != nil {
return errors.Wrap(err, "create exporter")
Expand Down Expand Up @@ -333,10 +338,38 @@ func (t *tempoTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return resp, nil
}

func (p *PromQL) waitForTrace(ctx context.Context, q tracedQuery) error {
func (p *PromQL) report(ctx context.Context, q tracedQuery) error {
// Produce query report.
reportEntry := PromQLReportQuery{
DurationNanos: q.Duration.Nanoseconds(),
}
switch q.Query.Type {
case promproxy.InstantQueryQuery:
reportEntry.Query = q.Query.InstantQuery.Query
reportEntry.Title = q.Query.InstantQuery.Title.Value
reportEntry.Description = q.Query.InstantQuery.Description.Value
case promproxy.RangeQueryQuery:
reportEntry.Query = q.Query.RangeQuery.Query
reportEntry.Title = q.Query.RangeQuery.Title.Value
reportEntry.Description = q.Query.RangeQuery.Description.Value
case promproxy.SeriesQueryQuery:
reportEntry.Matchers = q.Query.SeriesQuery.Matchers
reportEntry.Title = q.Query.SeriesQuery.Title.Value
reportEntry.Description = q.Query.SeriesQuery.Description.Value
default:
return errors.Errorf("unknown query type %q", q.Query.Type)
}

if !p.Trace {
p.reports = append(p.reports, reportEntry)
return nil
}

ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

if err := p.batchSpanProcessor.ForceFlush(ctx); err != nil {
return errors.Wrap(err, "flush")
}
bo := backoff.NewConstantBackOff(time.Millisecond * 100)
res, err := backoff.RetryWithData(func() (v ptrace.Traces, err error) {
res, err := p.tempo.TraceByID(ctx, tempoapi.TraceByIDParams{TraceID: q.TraceID})
Expand Down Expand Up @@ -387,25 +420,6 @@ func (p *PromQL) waitForTrace(ctx context.Context, q tracedQuery) error {
return errors.Errorf("trace %q spans length is zero", q.TraceID)
}

// Produce query report.
var reportEntry PromQLReportQuery
switch q.Query.Type {
case promproxy.InstantQueryQuery:
reportEntry.Query = q.Query.InstantQuery.Query
reportEntry.Title = q.Query.InstantQuery.Title.Value
reportEntry.Description = q.Query.InstantQuery.Description.Value
case promproxy.RangeQueryQuery:
reportEntry.Query = q.Query.RangeQuery.Query
reportEntry.Title = q.Query.RangeQuery.Title.Value
reportEntry.Description = q.Query.RangeQuery.Description.Value
case promproxy.SeriesQueryQuery:
reportEntry.Matchers = q.Query.SeriesQuery.Matchers
reportEntry.Title = q.Query.SeriesQuery.Title.Value
reportEntry.Description = q.Query.SeriesQuery.Description.Value
default:
return errors.Errorf("unknown query type %q", q.Query.Type)
}

// For each clickhouse query ID, save query.
rsl := res.ResourceSpans()
for i := 0; i < rsl.Len(); i++ {
Expand Down Expand Up @@ -434,7 +448,7 @@ func (p *PromQL) waitForTrace(ctx context.Context, q tracedQuery) error {
if memoryUsage, ok := attrs.Get("clickhouse.memory_usage"); ok {
reportQuery.MemoryUsage = memoryUsage.Int()
}
reportEntry.DurationNanos = span.EndTimestamp().AsTime().Sub(span.StartTimestamp().AsTime()).Nanoseconds()
reportQuery.DurationNanos = span.EndTimestamp().AsTime().Sub(span.StartTimestamp().AsTime()).Nanoseconds()
reportEntry.Queries = append(reportEntry.Queries, reportQuery)
}
}
Expand Down Expand Up @@ -503,11 +517,7 @@ func (p *PromQL) Run(ctx context.Context) error {
span.SetStatus(codes.Ok, "")
}
span.End()
if err := p.batchSpanProcessor.ForceFlush(ctx); err != nil {
rerr = multierr.Append(rerr, errors.Wrap(err, "force flush"))
}

p.traces = append(p.traces, tracedQuery{
p.queries = append(p.queries, tracedQuery{
Query: q,
TraceID: span.SpanContext().TraceID().String(),
Duration: time.Since(queryStart),
Expand All @@ -528,11 +538,16 @@ func (p *PromQL) Run(ctx context.Context) error {
return errors.Wrap(err, "finish progress bar")
}
fmt.Println("done in", time.Since(start).Round(time.Millisecond))
fmt.Println("waiting for traces")

pb = progressbar.Default(int64(len(p.traces)))
for _, v := range p.traces {
if err := p.waitForTrace(ctx, v); err != nil {
if p.Trace {
fmt.Println("waiting for traces")
} else {
fmt.Println("saving")
}

pb = progressbar.Default(int64(len(p.queries)))
for _, v := range p.queries {
if err := p.report(ctx, v); err != nil {
return errors.Wrap(err, "wait for trace")
}
if err := pb.Add(1); err != nil {
Expand Down Expand Up @@ -595,5 +610,6 @@ func newPromQLBenchmarkCommand() *cobra.Command {
f.StringVar(&p.StartTime, "start", "", "Start time override (RFC3339 or unix timestamp)")
f.StringVar(&p.EndTime, "end", "", "End time override (RFC3339 or unix timestamp)")
f.BoolVar(&p.AllowEmpty, "allow-empty", true, "Allow empty results")
f.BoolVar(&p.Trace, "trace", false, "Trace queries")
return cmd
}

0 comments on commit 661af3e

Please sign in to comment.