Skip to content

Commit

Permalink
feat(otelbench.promql): add query id
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Jan 16, 2024
1 parent c9d358c commit ced08b6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
7 changes: 5 additions & 2 deletions cmd/otelbench/promql_analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ func (a PromQLAnalyze) Run() error {
}

for _, q := range report.Queries {
if q.ID != 0 {
fmt.Printf("[Q%d] ", q.ID)
}
if q.Query != "" {
fmt.Println("query:", q.Query)
fmt.Println(q.Query)
} else {
fmt.Println("matchers:", q.Matchers)
fmt.Println(q.Matchers)
}

formatNanos := func(nanos int64) string {
Expand Down
30 changes: 20 additions & 10 deletions cmd/otelbench/promql_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
)

type tracedQuery struct {
ID int
TraceID string
Query promproxy.Query
Duration time.Duration
Expand Down Expand Up @@ -214,9 +215,10 @@ func (p *PromQL) sendSeriesQuery(ctx context.Context, query promproxy.SeriesQuer
return nil
}

func (p *PromQL) sendAndRecord(ctx context.Context, q promproxy.Query) (rerr error) {
func (p *PromQL) sendAndRecord(ctx context.Context, id int, q promproxy.Query) (rerr error) {
start := time.Now()
tq := tracedQuery{
ID: id,
Query: q,
}
if p.Trace {
Expand Down Expand Up @@ -258,7 +260,7 @@ func (p *PromQL) send(ctx context.Context, q promproxy.Query) error {
}
}

func (p *PromQL) eachFromReport(ctx context.Context, f *os.File, fn func(ctx context.Context, q promproxy.Query) error) error {
func (p *PromQL) eachFromReport(ctx context.Context, f *os.File, fn func(ctx context.Context, id int, q promproxy.Query) error) error {
data, err := io.ReadAll(f)
if err != nil {
return errors.Wrap(err, "read")
Expand All @@ -271,6 +273,7 @@ func (p *PromQL) eachFromReport(ctx context.Context, f *os.File, fn func(ctx con
if err := report.UnmarshalJSON(jsonData); err != nil {
return errors.Wrap(err, "decode report")
}
var id int
for _, q := range report.Range {
if !q.Start.Set {
q.Start = report.Start
Expand All @@ -287,7 +290,8 @@ func (p *PromQL) eachFromReport(ctx context.Context, f *os.File, fn func(ctx con
if !p.end.IsZero() {
q.End.Value = p.end
}
if err := fn(ctx, promproxy.NewRangeQueryQuery(q)); err != nil {
id++
if err := fn(ctx, id, promproxy.NewRangeQueryQuery(q)); err != nil {
return errors.Wrap(err, "callback")
}
}
Expand All @@ -304,19 +308,21 @@ func (p *PromQL) eachFromReport(ctx context.Context, f *os.File, fn func(ctx con
if !p.end.IsZero() {
q.End.Value = p.end
}
if err := fn(ctx, promproxy.NewSeriesQueryQuery(q)); err != nil {
id++
if err := fn(ctx, id, promproxy.NewSeriesQueryQuery(q)); err != nil {
return errors.Wrap(err, "callback")
}
}
for _, q := range report.Instant {
if err := fn(ctx, promproxy.NewInstantQueryQuery(q)); err != nil {
id++
if err := fn(ctx, id, promproxy.NewInstantQueryQuery(q)); err != nil {
return errors.Wrap(err, "callback")
}
}
return nil
}

func (p *PromQL) each(ctx context.Context, fn func(ctx context.Context, q promproxy.Query) error) error {
func (p *PromQL) each(ctx context.Context, fn func(ctx context.Context, id int, q promproxy.Query) error) error {
f, err := os.Open(p.Input)
if err != nil {
return errors.Wrap(err, "read")
Expand All @@ -328,15 +334,17 @@ func (p *PromQL) each(ctx context.Context, fn func(ctx context.Context, q prompr
return p.eachFromReport(ctx, f, fn)
}
d := json.NewDecoder(f)
id := 0
for {
id++ // start from 1
var q promproxy.Query
if err := d.Decode(&q); err != nil {
if errors.Is(err, io.EOF) {
break
}
return errors.Wrap(err, "decode query")
}
if err := fn(ctx, q); err != nil {
if err := fn(ctx, id, q); err != nil {
return errors.Wrap(err, "callback")
}
}
Expand Down Expand Up @@ -385,6 +393,7 @@ func (p *PromQL) flushTraces(ctx context.Context) error {
func (p *PromQL) report(ctx context.Context, q tracedQuery) error {
// Produce query report.
reportEntry := PromQLReportQuery{
ID: q.ID,
DurationNanos: q.Duration.Nanoseconds(),
}
switch q.Query.Type {
Expand Down Expand Up @@ -510,6 +519,7 @@ type ClickhouseQueryReport struct {
}

type PromQLReportQuery struct {
ID int `yaml:"id,omitempty"`
Query string `yaml:"query,omitempty"`
Title string `yaml:"title,omitempty"`
Description string `yaml:"description,omitempty"`
Expand All @@ -531,7 +541,7 @@ func (p *PromQL) Run(ctx context.Context) error {
fmt.Println("end time override:", p.end.Format(time.RFC3339))
}
var total int
if err := p.each(ctx, func(ctx context.Context, q promproxy.Query) error {
if err := p.each(ctx, func(ctx context.Context, _ int, q promproxy.Query) error {
total += p.Count
total += p.Warmup
return nil
Expand All @@ -541,7 +551,7 @@ func (p *PromQL) Run(ctx context.Context) error {

pb := progressbar.Default(int64(total))
start := time.Now()
if err := p.each(ctx, func(ctx context.Context, q promproxy.Query) (rerr error) {
if err := p.each(ctx, func(ctx context.Context, id int, q promproxy.Query) (rerr error) {
// Warmup.
for i := 0; i < p.Warmup; i++ {
if err := p.send(ctx, q); err != nil {
Expand All @@ -553,7 +563,7 @@ func (p *PromQL) Run(ctx context.Context) error {
}
// Run.
for i := 0; i < p.Count; i++ {
if err := p.sendAndRecord(ctx, q); err != nil {
if err := p.sendAndRecord(ctx, id, q); err != nil {
return errors.Wrap(err, "send")
}
if err := pb.Add(1); err != nil {
Expand Down

0 comments on commit ced08b6

Please sign in to comment.