From ac6e1080e9bfaa311e9274c7fb39feb4f70b5555 Mon Sep 17 00:00:00 2001 From: Aleksandr Razumov Date: Sun, 7 Jan 2024 22:18:32 +0300 Subject: [PATCH] perf(otelbench): add traces capture --- cmd/chotel/main.go | 11 +- cmd/otelbench/promql.go | 199 +++++++++++++++++- dev/local/ch-bench-read/clickhouse.xml | 26 +++ dev/local/ch-bench-read/docker-compose.yml | 42 +++- .../ch-bench-read/grafana/datasources.yml | 10 +- dev/local/ch-bench-read/otelcol.yml | 23 ++ dev/local/ch-bench-read/oteldb.yml | 1 + dev/local/ch-bench-read/tempo.yml | 47 +++++ internal/chstorage/dial.go | 3 + 9 files changed, 345 insertions(+), 17 deletions(-) create mode 100644 dev/local/ch-bench-read/clickhouse.xml create mode 100644 dev/local/ch-bench-read/otelcol.yml create mode 100644 dev/local/ch-bench-read/tempo.yml diff --git a/cmd/chotel/main.go b/cmd/chotel/main.go index 8535bce9..536b5ea6 100644 --- a/cmd/chotel/main.go +++ b/cmd/chotel/main.go @@ -57,6 +57,7 @@ type App struct { otlpAddr string latest time.Time + rate time.Duration spansSaved metric.Int64Counter traceExporter *otlptrace.Exporter @@ -83,6 +84,14 @@ func NewApp(lg *zap.Logger, metrics *app.Metrics) (*App, error) { clickHousePassword: "", clickHouseDB: "default", otlpAddr: "otelcol:4317", + rate: time.Millisecond * 500, + } + if v := os.Getenv("CHOTEL_SEND_RATE"); v != "" { + d, err := time.ParseDuration(v) + if err != nil { + return nil, errors.Wrap(err, "parse CHOTEL_SEND_RATE") + } + a.rate = d } if v := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); v != "" { a.otlpAddr = strings.TrimPrefix(v, "http://") @@ -306,7 +315,7 @@ func (a *App) send(ctx context.Context, now time.Time) error { } func (a *App) runSender(ctx context.Context) error { - ticker := time.NewTicker(time.Millisecond * 500) + ticker := time.NewTicker(a.rate) defer ticker.Stop() // First immediate tick. diff --git a/cmd/otelbench/promql.go b/cmd/otelbench/promql.go index 918b02c7..beb2d474 100644 --- a/cmd/otelbench/promql.go +++ b/cmd/otelbench/promql.go @@ -5,29 +5,94 @@ import ( "encoding/json" "fmt" "io" + "net/http" "os" "strconv" + "strings" "time" + "github.com/cenkalti/backoff/v4" "github.com/go-faster/errors" "github.com/schollz/progressbar/v3" "github.com/spf13/cobra" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" + "go.uber.org/multierr" "github.com/go-faster/oteldb/internal/promapi" "github.com/go-faster/oteldb/internal/promproxy" + "github.com/go-faster/oteldb/internal/tempoapi" ) type PromQL struct { - Addr string + Addr string + + TracesExporterAddr string + TempoAddr string + Input string RequestTimeout time.Duration - client *promapi.Client + client *promapi.Client + batchSpanProcessor sdktrace.SpanProcessor + tracerProvider *sdktrace.TracerProvider + tempo *tempoapi.Client + + traces []string } -func (p *PromQL) Setup() error { +func (p *PromQL) setupTracing(ctx context.Context) error { + exporter, err := otlptracegrpc.New(ctx) + if err != nil { + return errors.Wrap(err, "create exporter") + } + p.batchSpanProcessor = sdktrace.NewBatchSpanProcessor(exporter) + p.tracerProvider = sdktrace.NewTracerProvider( + sdktrace.WithResource(resource.NewSchemaless( + attribute.String("service.name", "otelbench.promql"), + )), + sdktrace.WithSpanProcessor(p.batchSpanProcessor), + sdktrace.WithSampler(sdktrace.AlwaysSample()), + ) + httpClient := &http.Client{ + Transport: newTempoTransport(http.DefaultTransport), + } + tempoClient, err := tempoapi.NewClient(p.TempoAddr, + tempoapi.WithClient(httpClient), + ) + if err != nil { + return errors.Wrap(err, "create tempo client") + } + p.tempo = tempoClient + return nil +} + +func (p *PromQL) Setup(ctx context.Context) error { + if err := p.setupTracing(ctx); err != nil { + return errors.Wrap(err, "setup tracing") + } + propagator := propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ) + httpClient := &http.Client{ + Transport: otelhttp.NewTransport(http.DefaultTransport, + otelhttp.WithTracerProvider(p.tracerProvider), + otelhttp.WithPropagators(propagator), + ), + } var err error - p.client, err = promapi.NewClient(p.Addr) + p.client, err = promapi.NewClient(p.Addr, + promapi.WithTracerProvider(p.tracerProvider), + promapi.WithClient(httpClient), + ) if err != nil { return errors.Wrap(err, "create client") } @@ -117,6 +182,90 @@ func (p *PromQL) each(ctx context.Context, fn func(ctx context.Context, q prompr return nil } +// tempoTransport sets Accept for some endpoints. +// +// FIXME(tdakkota): probably, we need to add an Accept header. +type tempoTransport struct { + next http.RoundTripper +} + +func newTempoTransport(next http.RoundTripper) http.RoundTripper { + return &tempoTransport{next: next} +} + +func (t *tempoTransport) RoundTrip(req *http.Request) (*http.Response, error) { + next := t.next + if next == nil { + next = http.DefaultTransport + } + if strings.Contains(req.URL.Path, "api/traces/") { + if req.Header.Get("Accept") == "" { + req.Header.Set("Accept", "application/protobuf") + } + } + resp, err := next.RoundTrip(req) + if err != nil { + return resp, err + } + return resp, nil +} + +func (p *PromQL) waitForTrace(ctx context.Context, traceID string) error { + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + bo := backoff.NewConstantBackOff(time.Millisecond * 100) + res, err := backoff.RetryWithData(func() (v ptrace.Traces, err error) { + res, err := p.tempo.TraceByID(ctx, tempoapi.TraceByIDParams{TraceID: traceID}) + if err != nil { + return v, backoff.Permanent(err) + } + switch r := res.(type) { + case *tempoapi.TraceByIDNotFound: + return v, errors.Errorf("trace %q not found", traceID) + case *tempoapi.TraceByID: + var um ptrace.ProtoUnmarshaler + buf, err := io.ReadAll(r.Data) + if err != nil { + return v, backoff.Permanent(errors.Wrap(err, "read data")) + } + traces, err := um.UnmarshalTraces(buf) + if err != nil { + return v, backoff.Permanent(errors.Wrap(err, "unmarshal traces")) + } + services := make(map[string]int) + list := traces.ResourceSpans() + for i := 0; i < list.Len(); i++ { + rs := list.At(i) + attrValue, ok := rs.Resource().Attributes().Get("service.name") + if !ok { + return v, backoff.Permanent(errors.New("service name not found")) + } + services[attrValue.AsString()]++ + } + for _, svc := range []string{ + "otelbench.promql", + "go-faster.oteldb", + "clickhouse", + } { + if _, ok := services[svc]; !ok { + return v, errors.Errorf("service %q not found", svc) + } + } + return traces, nil + default: + return v, backoff.Permanent(errors.Errorf("unknown response type %T", res)) + } + }, backoff.WithContext(bo, ctx)) + if err != nil { + return errors.Wrap(err, "retry") + } + if res.SpanCount() < 1 { + return errors.Errorf("trace %q spans length is zero", traceID) + } + return nil +} + func (p *PromQL) Run(ctx context.Context) error { fmt.Println("sending", p.Input, "to", p.Addr) var total int64 @@ -128,7 +277,25 @@ func (p *PromQL) Run(ctx context.Context) error { } pb := progressbar.Default(total) start := time.Now() - if err := p.each(ctx, func(ctx context.Context, q promproxy.Query) error { + tracer := p.tracerProvider.Tracer("promql") + if err := p.each(ctx, func(ctx context.Context, q promproxy.Query) (rerr error) { + ctx, span := tracer.Start(ctx, "Send", + trace.WithSpanKind(trace.SpanKindClient), + ) + defer func() { + if rerr != nil { + span.RecordError(rerr) + span.SetStatus(codes.Error, rerr.Error()) + } else { + span.SetStatus(codes.Ok, "") + } + span.End() + if err := p.batchSpanProcessor.ForceFlush(ctx); err != nil { + rerr = multierr.Append(rerr, errors.Wrap(err, "force flush")) + } + + p.traces = append(p.traces, span.SpanContext().TraceID().String()) + }() if err := p.send(ctx, q); err != nil { return errors.Wrap(err, "send") } @@ -144,6 +311,21 @@ func (p *PromQL) Run(ctx context.Context) error { return errors.Wrap(err, "finish progress bar") } fmt.Println("done in", time.Since(start).Round(time.Millisecond)) + fmt.Println("waiting for traces") + + pb = progressbar.Default(int64(len(p.traces))) + for _, traceID := range p.traces { + if err := p.waitForTrace(ctx, traceID); err != nil { + return errors.Wrap(err, "wait for trace") + } + if err := pb.Add(1); err != nil { + return errors.Wrap(err, "update progress bar") + } + } + if err := pb.Finish(); err != nil { + return errors.Wrap(err, "finish progress bar") + } + fmt.Println("done") return nil } @@ -153,15 +335,18 @@ func newPromQLCommand() *cobra.Command { Use: "promql", Short: "Run promql queries", RunE: func(cmd *cobra.Command, args []string) error { - if err := p.Setup(); err != nil { + ctx := cmd.Context() + if err := p.Setup(ctx); err != nil { return errors.Wrap(err, "setup") } - return p.Run(cmd.Context()) + return p.Run(ctx) }, } f := cmd.Flags() f.StringVar(&p.Addr, "addr", "http://localhost:9090", "Prometheus address") f.StringVarP(&p.Input, "input", "i", "queries.jsonl", "Input file") f.DurationVar(&p.RequestTimeout, "request-timeout", time.Second*10, "Request timeout") + f.StringVar(&p.TracesExporterAddr, "traces-exporter-addr", "http://127.0.0.1:4317", "Traces exporter OTLP endpoint") + f.StringVar(&p.TempoAddr, "tempo-addr", "http://127.0.0.1:3200", "Tempo endpoint") return cmd } diff --git a/dev/local/ch-bench-read/clickhouse.xml b/dev/local/ch-bench-read/clickhouse.xml new file mode 100644 index 00000000..ec9a249f --- /dev/null +++ b/dev/local/ch-bench-read/clickhouse.xml @@ -0,0 +1,26 @@ + + 0.0.0.0 + + information + true + + + + + /metrics + 8080 + true + true + true + + + + engine MergeTree + order by (start_time_us, trace_id) + ttl toDateTime(finish_time_us/1000000) + toIntervalMinute(15) + + system + opentelemetry_span_log
+ 10 +
+
diff --git a/dev/local/ch-bench-read/docker-compose.yml b/dev/local/ch-bench-read/docker-compose.yml index f79a6247..8efe8e6d 100644 --- a/dev/local/ch-bench-read/docker-compose.yml +++ b/dev/local/ch-bench-read/docker-compose.yml @@ -10,7 +10,7 @@ services: - "127.0.0.1:9000:9000" - "127.0.0.1:8123:8123" volumes: - - ../clickhouse.xml:/etc/clickhouse-server/config.d/monitoring.xml + - ./clickhouse.xml:/etc/clickhouse-server/config.d/monitoring.xml healthcheck: test: ['CMD', 'wget', '--spider', '-q', '127.0.0.1:8123/ping'] interval: 1s @@ -30,7 +30,10 @@ services: - OTEL_LOG_LEVEL=info - OTEL_METRICS_EXPORTER=none - OTEL_LOGS_EXPORTER=none - - OTEL_TRACES_EXPORTER=none + - OTEL_TRACES_EXPORTER=otlp + - OTEL_TRACES_SAMPLER=parentbased_always_off + - OTEL_EXPORTER_OTLP_PROTOCOL=grpc + - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317 - OTEL_RESOURCE_ATTRIBUTES=service.name=go-faster.oteldb healthcheck: test: ['CMD', 'wget', '--spider', '-q', '127.0.0.1:13133/liveness'] @@ -43,6 +46,21 @@ services: depends_on: - clickhouse + # Exports traces from clickhouse internal table to otel. + chotel: + restart: always + build: + context: ../../../ + dockerfile: chotel.Dockerfile + environment: + - OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317 + - OTEL_EXPORTER_OTLP_PROTOCOL=grpc + - OTEL_EXPORTER_OTLP_INSECURE=true + - OTEL_RESOURCE_ATTRIBUTES=service.name=go-faster.oteldb.chotel + - CHOTEL_SEND_RATE=50ms + depends_on: + - clickhouse + otelproxy: build: context: ../../../ @@ -57,6 +75,24 @@ services: - OTEL_LOGS_EXPORTER=none - OTEL_TRACES_EXPORTER=none - OTEL_RESOURCE_ATTRIBUTES=service.name=go-faster.otelproxy + tempo: + image: grafana/tempo:latest + command: [ "-config.file=/etc/tempo.yml" ] + ports: + - "127.0.0.1:3200:3200" + volumes: + - ./tempo.yml:/etc/tempo.yml + + # https://opentelemetry.io/docs/collector/installation/#docker-compose + otelcol: + image: ghcr.io/open-telemetry/opentelemetry-collector-releases/opentelemetry-collector-contrib:0.89.0 + volumes: + - ./otelcol.yml:/etc/otelcol-contrib/config.yaml + ports: + - "127.0.0.1:4317:4317" + command: + - '--config' + - '/etc/otelcol-contrib/config.yaml' grafana: image: "grafana/grafana:10.0.0" @@ -70,7 +106,7 @@ services: - GF_LOG_LEVEL=debug - GF_INSTALL_PLUGINS=grafana-clickhouse-datasource ports: - - "3000:3000" + - "127.0.0.1:3000:3000" volumes: - ./grafana/datasources.yml:/etc/grafana/provisioning/datasources/datasources.yml - ./grafana/dashboards.yml:/etc/grafana/provisioning/dashboards/default.yml diff --git a/dev/local/ch-bench-read/grafana/datasources.yml b/dev/local/ch-bench-read/grafana/datasources.yml index daafe5b5..25a8ee8b 100644 --- a/dev/local/ch-bench-read/grafana/datasources.yml +++ b/dev/local/ch-bench-read/grafana/datasources.yml @@ -1,16 +1,14 @@ apiVersion: 1 datasources: - - name: "TraceQL" + - name: Tempo type: tempo + uid: tempo + url: http://tempo:3200 access: proxy - orgId: 1 - url: http://oteldb:3200 - uid: tempo-oteldb + httpMethod: GET jsonData: httpMethod: GET - serviceMap: - datasourceUid: promoteldb - name: "PromQL" type: prometheus diff --git a/dev/local/ch-bench-read/otelcol.yml b/dev/local/ch-bench-read/otelcol.yml new file mode 100644 index 00000000..9709d1b9 --- /dev/null +++ b/dev/local/ch-bench-read/otelcol.yml @@ -0,0 +1,23 @@ +receivers: + otlp: + protocols: + grpc: + +exporters: + otlp: + endpoint: tempo:4317 + tls: + insecure: true + +extensions: + health_check: + pprof: + zpages: + +service: + extensions: [health_check, pprof, zpages] + pipelines: + traces: + receivers: [otlp] + processors: [] + exporters: [otlp] diff --git a/dev/local/ch-bench-read/oteldb.yml b/dev/local/ch-bench-read/oteldb.yml index 587a6bb7..f0205aa3 100644 --- a/dev/local/ch-bench-read/oteldb.yml +++ b/dev/local/ch-bench-read/oteldb.yml @@ -1,3 +1,4 @@ +dsn: clickhouse://clickhouse:9000 prometheus: bind: 0.0.0.0:9090 max_samples: 1_000_000 diff --git a/dev/local/ch-bench-read/tempo.yml b/dev/local/ch-bench-read/tempo.yml new file mode 100644 index 00000000..2eb2e887 --- /dev/null +++ b/dev/local/ch-bench-read/tempo.yml @@ -0,0 +1,47 @@ +server: + http_listen_port: 3200 + +query_frontend: + search: + duration_slo: 5s + throughput_bytes_slo: 1.073741824e+09 + trace_by_id: + duration_slo: 5s + +distributor: + receivers: + otlp: + protocols: + http: + grpc: + +ingester: + max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally + +compactor: + compaction: + block_retention: 1h # overall Tempo trace retention. set for demo purposes + +metrics_generator: + registry: + external_labels: + source: tempo + cluster: docker-compose + storage: + path: /tmp/tempo/generator/wal + remote_write: + - url: http://prometheus:9090/api/v1/write + send_exemplars: true + +storage: + trace: + backend: local # backend configuration to use + wal: + path: /tmp/tempo/wal # where to store the the wal locally + local: + path: /tmp/tempo/blocks + +overrides: + defaults: + metrics_generator: + processors: [service-graphs, span-metrics] # enables metrics generator \ No newline at end of file diff --git a/internal/chstorage/dial.go b/internal/chstorage/dial.go index 9ef5c4d2..f944d596 100644 --- a/internal/chstorage/dial.go +++ b/internal/chstorage/dial.go @@ -11,6 +11,7 @@ import ( "github.com/ClickHouse/ch-go/chpool" "github.com/cenkalti/backoff/v4" "github.com/go-faster/errors" + "github.com/go-faster/sdk/zctx" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" @@ -50,6 +51,8 @@ func Dial(ctx context.Context, dsn string, opts DialOptions) (*chpool.Pool, erro return nil, errors.Wrap(err, "parse DSN") } + zctx.From(ctx).Info("DSN", zap.String("v", dsn)) + pass, _ := u.User.Password() chLogger := lg.Named("ch") {