Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tempo-query: improve performance of FindTraces #4159

Merged
merged 5 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [ENHANCEMENT] Speedup tempo-query trace search by allowing parallel queries [#4159](https://github.com/grafana/tempo/pull/4159) (@pavolloffay)
* [CHANGE] tempo-cli: add support for /api/v2/traces endpoint [#4127](https://github.com/grafana/tempo/pull/4127) (@electron0zero)
**BREAKING CHANGE** The `tempo-cli` now uses the `/api/v2/traces` endpoint by default,
please use `--v1` flag to use `/api/traces` endpoint, which was the default in previous versions.
Expand Down
33 changes: 18 additions & 15 deletions cmd/tempo-query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,29 @@ package main
import (
"flag"
"net"
"os"
"strings"

"github.com/hashicorp/go-hclog"
hcplugin "github.com/hashicorp/go-plugin"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
zaplogfmt "github.com/jsternberg/zap-logfmt"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
google_grpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/grafana/tempo/cmd/tempo-query/tempo"
)

func main() {
logger := hclog.New(&hclog.LoggerOptions{
Name: "jaeger-tempo",
Level: hclog.Error,
JSONFormat: true,
})
config := zap.NewProductionEncoderConfig()
logger := zap.New(zapcore.NewCore(
zaplogfmt.NewEncoder(config),
os.Stdout,
zapcore.InfoLevel,
))

var configPath string
flag.StringVar(&configPath, "config", "", "A path to the plugin's configuration file")
Expand All @@ -37,16 +40,16 @@ func main() {

err := v.ReadInConfig()
if err != nil {
logger.Error("failed to parse configuration file", "error", err)
logger.Error("failed to parse configuration file", zap.Error(err))
}
}

cfg := &tempo.Config{}
cfg.InitFromViper(v)

backend, err := tempo.New(cfg)
backend, err := tempo.New(logger, cfg)
if err != nil {
logger.Error("failed to init tracer backend", "error", err)
logger.Error("failed to init tracer backend", zap.Error(err))
}

grpcOpts := []google_grpc.ServerOption{
Expand All @@ -57,25 +60,25 @@ func main() {
if cfg.TLSEnabled {
creds, err := credentials.NewClientTLSFromFile(cfg.TLS.CertPath, cfg.TLS.ServerName)
if err != nil {
logger.Error("failed to load TLS credentials", "error", err)
logger.Error("failed to load TLS credentials", zap.Error(err))
} else {
grpcOpts = append(grpcOpts, google_grpc.Creds(creds))
}
}

srv := hcplugin.DefaultGRPCServer(grpcOpts)
srv := google_grpc.NewServer(grpcOpts...)

storage_v1.RegisterSpanReaderPluginServer(srv, backend)
storage_v1.RegisterDependenciesReaderPluginServer(srv, backend)
storage_v1.RegisterSpanWriterPluginServer(srv, backend)

lis, err := net.Listen("tcp", cfg.Address)
if err != nil {
logger.Error("failed to listen", "error", err)
logger.Error("failed to listen", zap.Error(err))
}

logger.Info("Server starts serving", "address", cfg.Address)
logger.Info("Server starts serving", zap.String("address", cfg.Address))
if err := srv.Serve(lis); err != nil {
logger.Error("failed to serve", "error", err)
logger.Error("failed to serve", zap.Error(err))
}
}
6 changes: 6 additions & 0 deletions cmd/tempo-query/tempo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type Config struct {
TLS tls.ClientConfig `yaml:",inline"`
TenantHeaderKey string `yaml:"tenant_header_key"`
QueryServicesDuration string `yaml:"services_query_duration"`
// FindTracesConcurrentRequests defines how many concurrent requests trace search submits to get a trace.
FindTracesConcurrentRequests int `yaml:"find_traces_concurrent_requests"`
}

// InitFromViper initializes the options struct with values from Viper
Expand All @@ -33,7 +35,11 @@ func (c *Config) InitFromViper(v *viper.Viper) {
c.TLS.CipherSuites = v.GetString("tls_cipher_suites")
c.TLS.MinVersion = v.GetString("tls_min_version")
c.QueryServicesDuration = v.GetString("services_query_duration")
c.FindTracesConcurrentRequests = v.GetInt("find_traces_concurrent_requests")

if c.FindTracesConcurrentRequests == 0 {
c.FindTracesConcurrentRequests = 1
}
tenantHeader := v.GetString("tenant_header_key")
if tenantHeader == "" {
tenantHeader = shared.BearerTokenKey
Expand Down
95 changes: 75 additions & 20 deletions cmd/tempo-query/tempo/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/go-logfmt/logfmt"
Expand All @@ -22,6 +23,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/zap"
"google.golang.org/grpc/metadata"

jaeger "github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -62,15 +64,17 @@ var (
var tracer = otel.Tracer("cmd/tempo-query/tempo")

type Backend struct {
tempoBackend string
tlsEnabled bool
tls tlsCfg.ClientConfig
httpClient *http.Client
tenantHeaderKey string
QueryServicesDuration *time.Duration
logger *zap.Logger
tempoBackend string
tlsEnabled bool
tls tlsCfg.ClientConfig
httpClient *http.Client
tenantHeaderKey string
QueryServicesDuration *time.Duration
findTracesConcurrentRequests int
}

func New(cfg *Config) (*Backend, error) {
func New(logger *zap.Logger, cfg *Config) (*Backend, error) {
httpClient, err := createHTTPClient(cfg)
if err != nil {
return nil, err
Expand All @@ -88,12 +92,14 @@ func New(cfg *Config) (*Backend, error) {
}

return &Backend{
tempoBackend: cfg.Backend,
tlsEnabled: cfg.TLSEnabled,
tls: cfg.TLS,
httpClient: httpClient,
tenantHeaderKey: cfg.TenantHeaderKey,
QueryServicesDuration: queryServiceDuration,
logger: logger,
tempoBackend: cfg.Backend,
tlsEnabled: cfg.TLSEnabled,
tls: cfg.TLS,
httpClient: httpClient,
tenantHeaderKey: cfg.TenantHeaderKey,
QueryServicesDuration: queryServiceDuration,
findTracesConcurrentRequests: cfg.FindTracesConcurrentRequests,
}, nil
}

Expand Down Expand Up @@ -306,6 +312,28 @@ func (b *Backend) GetOperations(ctx context.Context, _ *storage_v1.GetOperations
}, nil
}

type job struct {
ctx context.Context
traceID jaeger.TraceID
}

type jobResult struct {
traceID jaeger.TraceID
trace *jaeger.Trace
err error
}

func worker(b *Backend, jobs <-chan job, results chan<- jobResult) {
for job := range jobs {
jaegerTrace, err := b.getTrace(job.ctx, job.traceID)
results <- jobResult{
traceID: job.traceID,
trace: jaegerTrace,
err: err,
}
}
}

func (b *Backend) FindTraces(req *storage_v1.FindTracesRequest, stream storage_v1.SpanReaderPlugin_FindTracesServer) error {
ctx, span := tracer.Start(stream.Context(), "tempo-query.FindTraces")
defer span.End()
Expand All @@ -316,19 +344,46 @@ func (b *Backend) FindTraces(req *storage_v1.FindTracesRequest, stream storage_v
}

span.AddEvent(fmt.Sprintf("Found %d trace IDs", len(resp.TraceIDs)))
b.logger.Info("FindTraces: fetching traces", zap.Int("traceids", len(resp.TraceIDs)))

numWorkers := b.findTracesConcurrentRequests
jobs := make(chan job, len(resp.TraceIDs))
results := make(chan jobResult, len(resp.TraceIDs))
var workersDone sync.WaitGroup
// Start workers
for w := 0; w < numWorkers; w++ {
pavolloffay marked this conversation as resolved.
Show resolved Hide resolved
workersDone.Add(1)
go func() { defer workersDone.Done(); worker(b, jobs, results) }()
}

// for every traceID, get the full trace
var jaegerTraces []*jaeger.Trace
for _, traceID := range resp.TraceIDs {
trace, err := b.getTrace(ctx, traceID)
if err != nil {
// TODO this seems to be an internal inconsistency error, ignore so we can still show the rest
span.AddEvent(fmt.Sprintf("could not get trace for traceID %v", traceID))
jobs <- job{
ctx: ctx,
traceID: traceID,
}
}
close(jobs)
workersDone.Wait()

var failedTraces []jobResult
// Collecting results
for i := 0; i < len(resp.TraceIDs); i++ {
result := <-results
if result.err != nil {
//// TODO this seems to be an internal inconsistency error, ignore so we can still show the rest
b.logger.Info("failed to get a trace", zap.Error(err), zap.String("traceid", result.traceID.String()))
span.AddEvent(fmt.Sprintf("could not get trace for traceID %v", result.traceID))
span.RecordError(err)
continue
failedTraces = append(failedTraces, result)
} else {
jaegerTraces = append(jaegerTraces, result.trace)
}

jaegerTraces = append(jaegerTraces, trace)
}
close(results)
if len(failedTraces) > 0 {
b.logger.Info("FindTraces: failed to find traces, getTrace failed", zap.Int32("limit", req.Query.NumTraces), zap.Int("failed", len(failedTraces)))
}

span.AddEvent(fmt.Sprintf("Returning %d traces", len(jaegerTraces)))
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ require (
github.com/gorilla/mux v1.8.1
github.com/grafana/dskit v0.0.0-20240801171758-736c44c85382
github.com/grafana/e2e v0.1.1
github.com/hashicorp/go-hclog v1.6.3
github.com/hashicorp/go-plugin v1.6.0
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/hashicorp/go-plugin v1.6.0 // indirect
github.com/jaegertracing/jaeger v1.57.0
github.com/jedib0t/go-pretty/v6 v6.2.4
github.com/json-iterator/go v1.1.12
Expand Down