Skip to content

Commit

Permalink
perf(otelbench): add traces capture
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Jan 7, 2024
1 parent 5c9e193 commit ac6e108
Show file tree
Hide file tree
Showing 9 changed files with 345 additions and 17 deletions.
11 changes: 10 additions & 1 deletion cmd/chotel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type App struct {
otlpAddr string

latest time.Time
rate time.Duration

spansSaved metric.Int64Counter
traceExporter *otlptrace.Exporter
Expand All @@ -83,6 +84,14 @@ func NewApp(lg *zap.Logger, metrics *app.Metrics) (*App, error) {
clickHousePassword: "",
clickHouseDB: "default",
otlpAddr: "otelcol:4317",
rate: time.Millisecond * 500,
}
if v := os.Getenv("CHOTEL_SEND_RATE"); v != "" {
d, err := time.ParseDuration(v)
if err != nil {
return nil, errors.Wrap(err, "parse CHOTEL_SEND_RATE")
}
a.rate = d
}
if v := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); v != "" {
a.otlpAddr = strings.TrimPrefix(v, "http://")
Expand Down Expand Up @@ -306,7 +315,7 @@ func (a *App) send(ctx context.Context, now time.Time) error {
}

func (a *App) runSender(ctx context.Context) error {
ticker := time.NewTicker(time.Millisecond * 500)
ticker := time.NewTicker(a.rate)
defer ticker.Stop()

// First immediate tick.
Expand Down
199 changes: 192 additions & 7 deletions cmd/otelbench/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,94 @@ import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/go-faster/errors"
"github.com/schollz/progressbar/v3"
"github.com/spf13/cobra"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"

"github.com/go-faster/oteldb/internal/promapi"
"github.com/go-faster/oteldb/internal/promproxy"
"github.com/go-faster/oteldb/internal/tempoapi"
)

type PromQL struct {
Addr string
Addr string

TracesExporterAddr string
TempoAddr string

Input string
RequestTimeout time.Duration

client *promapi.Client
client *promapi.Client
batchSpanProcessor sdktrace.SpanProcessor
tracerProvider *sdktrace.TracerProvider
tempo *tempoapi.Client

traces []string
}

func (p *PromQL) Setup() error {
func (p *PromQL) setupTracing(ctx context.Context) error {
exporter, err := otlptracegrpc.New(ctx)
if err != nil {
return errors.Wrap(err, "create exporter")
}
p.batchSpanProcessor = sdktrace.NewBatchSpanProcessor(exporter)
p.tracerProvider = sdktrace.NewTracerProvider(
sdktrace.WithResource(resource.NewSchemaless(
attribute.String("service.name", "otelbench.promql"),
)),
sdktrace.WithSpanProcessor(p.batchSpanProcessor),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
httpClient := &http.Client{
Transport: newTempoTransport(http.DefaultTransport),
}
tempoClient, err := tempoapi.NewClient(p.TempoAddr,
tempoapi.WithClient(httpClient),
)
if err != nil {
return errors.Wrap(err, "create tempo client")
}
p.tempo = tempoClient
return nil
}

func (p *PromQL) Setup(ctx context.Context) error {
if err := p.setupTracing(ctx); err != nil {
return errors.Wrap(err, "setup tracing")
}
propagator := propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
httpClient := &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport,
otelhttp.WithTracerProvider(p.tracerProvider),
otelhttp.WithPropagators(propagator),
),
}
var err error
p.client, err = promapi.NewClient(p.Addr)
p.client, err = promapi.NewClient(p.Addr,
promapi.WithTracerProvider(p.tracerProvider),
promapi.WithClient(httpClient),
)
if err != nil {
return errors.Wrap(err, "create client")
}
Expand Down Expand Up @@ -117,6 +182,90 @@ func (p *PromQL) each(ctx context.Context, fn func(ctx context.Context, q prompr
return nil
}

// tempoTransport sets Accept for some endpoints.
//
// FIXME(tdakkota): probably, we need to add an Accept header.
type tempoTransport struct {
next http.RoundTripper
}

func newTempoTransport(next http.RoundTripper) http.RoundTripper {
return &tempoTransport{next: next}
}

func (t *tempoTransport) RoundTrip(req *http.Request) (*http.Response, error) {
next := t.next
if next == nil {
next = http.DefaultTransport
}
if strings.Contains(req.URL.Path, "api/traces/") {
if req.Header.Get("Accept") == "" {
req.Header.Set("Accept", "application/protobuf")
}
}
resp, err := next.RoundTrip(req)
if err != nil {
return resp, err
}
return resp, nil
}

func (p *PromQL) waitForTrace(ctx context.Context, traceID string) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

bo := backoff.NewConstantBackOff(time.Millisecond * 100)
res, err := backoff.RetryWithData(func() (v ptrace.Traces, err error) {
res, err := p.tempo.TraceByID(ctx, tempoapi.TraceByIDParams{TraceID: traceID})
if err != nil {
return v, backoff.Permanent(err)
}
switch r := res.(type) {
case *tempoapi.TraceByIDNotFound:
return v, errors.Errorf("trace %q not found", traceID)
case *tempoapi.TraceByID:
var um ptrace.ProtoUnmarshaler
buf, err := io.ReadAll(r.Data)
if err != nil {
return v, backoff.Permanent(errors.Wrap(err, "read data"))
}
traces, err := um.UnmarshalTraces(buf)
if err != nil {
return v, backoff.Permanent(errors.Wrap(err, "unmarshal traces"))
}
services := make(map[string]int)
list := traces.ResourceSpans()
for i := 0; i < list.Len(); i++ {
rs := list.At(i)
attrValue, ok := rs.Resource().Attributes().Get("service.name")
if !ok {
return v, backoff.Permanent(errors.New("service name not found"))
}
services[attrValue.AsString()]++
}
for _, svc := range []string{
"otelbench.promql",
"go-faster.oteldb",
"clickhouse",
} {
if _, ok := services[svc]; !ok {
return v, errors.Errorf("service %q not found", svc)
}
}
return traces, nil
default:
return v, backoff.Permanent(errors.Errorf("unknown response type %T", res))
}
}, backoff.WithContext(bo, ctx))
if err != nil {
return errors.Wrap(err, "retry")
}
if res.SpanCount() < 1 {
return errors.Errorf("trace %q spans length is zero", traceID)
}
return nil
}

func (p *PromQL) Run(ctx context.Context) error {
fmt.Println("sending", p.Input, "to", p.Addr)
var total int64
Expand All @@ -128,7 +277,25 @@ func (p *PromQL) Run(ctx context.Context) error {
}
pb := progressbar.Default(total)
start := time.Now()
if err := p.each(ctx, func(ctx context.Context, q promproxy.Query) error {
tracer := p.tracerProvider.Tracer("promql")
if err := p.each(ctx, func(ctx context.Context, q promproxy.Query) (rerr error) {
ctx, span := tracer.Start(ctx, "Send",
trace.WithSpanKind(trace.SpanKindClient),
)
defer func() {
if rerr != nil {
span.RecordError(rerr)
span.SetStatus(codes.Error, rerr.Error())
} else {
span.SetStatus(codes.Ok, "")
}
span.End()
if err := p.batchSpanProcessor.ForceFlush(ctx); err != nil {
rerr = multierr.Append(rerr, errors.Wrap(err, "force flush"))
}

p.traces = append(p.traces, span.SpanContext().TraceID().String())
}()
if err := p.send(ctx, q); err != nil {
return errors.Wrap(err, "send")
}
Expand All @@ -144,6 +311,21 @@ func (p *PromQL) Run(ctx context.Context) error {
return errors.Wrap(err, "finish progress bar")
}
fmt.Println("done in", time.Since(start).Round(time.Millisecond))
fmt.Println("waiting for traces")

pb = progressbar.Default(int64(len(p.traces)))
for _, traceID := range p.traces {
if err := p.waitForTrace(ctx, traceID); err != nil {
return errors.Wrap(err, "wait for trace")
}
if err := pb.Add(1); err != nil {
return errors.Wrap(err, "update progress bar")
}
}
if err := pb.Finish(); err != nil {
return errors.Wrap(err, "finish progress bar")
}
fmt.Println("done")
return nil
}

Expand All @@ -153,15 +335,18 @@ func newPromQLCommand() *cobra.Command {
Use: "promql",
Short: "Run promql queries",
RunE: func(cmd *cobra.Command, args []string) error {
if err := p.Setup(); err != nil {
ctx := cmd.Context()
if err := p.Setup(ctx); err != nil {
return errors.Wrap(err, "setup")
}
return p.Run(cmd.Context())
return p.Run(ctx)
},
}
f := cmd.Flags()
f.StringVar(&p.Addr, "addr", "http://localhost:9090", "Prometheus address")
f.StringVarP(&p.Input, "input", "i", "queries.jsonl", "Input file")
f.DurationVar(&p.RequestTimeout, "request-timeout", time.Second*10, "Request timeout")
f.StringVar(&p.TracesExporterAddr, "traces-exporter-addr", "http://127.0.0.1:4317", "Traces exporter OTLP endpoint")
f.StringVar(&p.TempoAddr, "tempo-addr", "http://127.0.0.1:3200", "Tempo endpoint")
return cmd
}
26 changes: 26 additions & 0 deletions dev/local/ch-bench-read/clickhouse.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<clickhouse>
<listen_host>0.0.0.0</listen_host>
<logger>
<level>information</level>
<console>true</console>
<log remove="remove"/>
<errorlog remove="remove"/>
</logger>
<prometheus>
<endpoint>/metrics</endpoint>
<port>8080</port>
<metrics>true</metrics>
<events>true</events>
<asynchronous_metrics>true</asynchronous_metrics>
</prometheus>
<opentelemetry_span_log>
<engine>
engine MergeTree
order by (start_time_us, trace_id)
ttl toDateTime(finish_time_us/1000000) + toIntervalMinute(15)
</engine>
<database>system</database>
<table>opentelemetry_span_log</table>
<flush_interval_milliseconds>10</flush_interval_milliseconds>
</opentelemetry_span_log>
</clickhouse>
42 changes: 39 additions & 3 deletions dev/local/ch-bench-read/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
- "127.0.0.1:9000:9000"
- "127.0.0.1:8123:8123"
volumes:
- ../clickhouse.xml:/etc/clickhouse-server/config.d/monitoring.xml
- ./clickhouse.xml:/etc/clickhouse-server/config.d/monitoring.xml
healthcheck:
test: ['CMD', 'wget', '--spider', '-q', '127.0.0.1:8123/ping']
interval: 1s
Expand All @@ -30,7 +30,10 @@ services:
- OTEL_LOG_LEVEL=info
- OTEL_METRICS_EXPORTER=none
- OTEL_LOGS_EXPORTER=none
- OTEL_TRACES_EXPORTER=none
- OTEL_TRACES_EXPORTER=otlp
- OTEL_TRACES_SAMPLER=parentbased_always_off
- OTEL_EXPORTER_OTLP_PROTOCOL=grpc
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
- OTEL_RESOURCE_ATTRIBUTES=service.name=go-faster.oteldb
healthcheck:
test: ['CMD', 'wget', '--spider', '-q', '127.0.0.1:13133/liveness']
Expand All @@ -43,6 +46,21 @@ services:
depends_on:
- clickhouse

# Exports traces from clickhouse internal table to otel.
chotel:
restart: always
build:
context: ../../../
dockerfile: chotel.Dockerfile
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
- OTEL_EXPORTER_OTLP_PROTOCOL=grpc
- OTEL_EXPORTER_OTLP_INSECURE=true
- OTEL_RESOURCE_ATTRIBUTES=service.name=go-faster.oteldb.chotel
- CHOTEL_SEND_RATE=50ms
depends_on:
- clickhouse

otelproxy:
build:
context: ../../../
Expand All @@ -57,6 +75,24 @@ services:
- OTEL_LOGS_EXPORTER=none
- OTEL_TRACES_EXPORTER=none
- OTEL_RESOURCE_ATTRIBUTES=service.name=go-faster.otelproxy
tempo:
image: grafana/tempo:latest
command: [ "-config.file=/etc/tempo.yml" ]
ports:
- "127.0.0.1:3200:3200"
volumes:
- ./tempo.yml:/etc/tempo.yml

# https://opentelemetry.io/docs/collector/installation/#docker-compose
otelcol:
image: ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.89.0
volumes:
- ./otelcol.yml:/etc/otelcol-contrib/config.yaml
ports:
- "127.0.0.1:4317:4317"
command:
- '--config'
- '/etc/otelcol-contrib/config.yaml'

grafana:
image: "grafana/grafana:10.0.0"
Expand All @@ -70,7 +106,7 @@ services:
- GF_LOG_LEVEL=debug
- GF_INSTALL_PLUGINS=grafana-clickhouse-datasource
ports:
- "3000:3000"
- "127.0.0.1:3000:3000"
volumes:
- ./grafana/datasources.yml:/etc/grafana/provisioning/datasources/datasources.yml
- ./grafana/dashboards.yml:/etc/grafana/provisioning/dashboards/default.yml
Expand Down
Loading

0 comments on commit ac6e108

Please sign in to comment.