From 6c06d50589f5c0cd9d9798d8a3167119e80de117 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 27 Jan 2021 19:05:49 +0530 Subject: [PATCH 1/5] Checkpoint: first pass at RunJobs Signed-off-by: Annanay --- tempodb/pool/config.go | 8 ++++++++ tempodb/pool/pool.go | 26 +++++++++----------------- tempodb/tempodb.go | 20 ++++++++++++++++++-- 3 files changed, 35 insertions(+), 19 deletions(-) diff --git a/tempodb/pool/config.go b/tempodb/pool/config.go index 1bd3c193f41..73fbceb8ed4 100644 --- a/tempodb/pool/config.go +++ b/tempodb/pool/config.go @@ -4,3 +4,11 @@ type Config struct { MaxWorkers int `yaml:"max_workers"` QueueDepth int `yaml:"queue_depth"` } + +// default is concurrency disabled +func defaultConfig() *Config { + return &Config{ + MaxWorkers: 30, + QueueDepth: 10000, + } +} diff --git a/tempodb/pool/pool.go b/tempodb/pool/pool.go index bce13f45a0a..be70a18682a 100644 --- a/tempodb/pool/pool.go +++ b/tempodb/pool/pool.go @@ -75,7 +75,7 @@ func NewPool(cfg *Config) *Pool { return p } -func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([]byte, error) { +func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) ([][]byte, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -86,7 +86,7 @@ func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) return nil, fmt.Errorf("queue doesn't have room for %d jobs", len(payloads)) } - resultsCh := make(chan []byte, 1) // way for jobs to send back results + resultsCh := make(chan []byte, totalJobs) // way for jobs to send back results err := atomic.NewError(nil) // way for jobs to send back an error stop := atomic.NewBool(false) // way to signal to the jobs to quit wg := &sync.WaitGroup{} // way to wait for all jobs to complete @@ -118,16 +118,16 @@ func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) // wait for all jobs to finish wg.Wait() - // see if anything ended up in the results channel - var msg []byte - select { - case msg = <-resultsCh: - default: + // read all from results channel + var msg [][]byte + // fixme: this loop seems to be running infinitely :/ + for res := range resultsCh { + msg = append(msg, res) } // ignore err if msg != nil. otherwise errors like "context cancelled" // will take precedence over the err - if msg != nil { + if len(msg) > 0 { return msg, nil } @@ -172,13 +172,13 @@ func (p *Pool) reportQueueLength() { func runJob(job *job) { defer job.wg.Done() + // bail in case not all jobs could be enqueued if job.stop.Load() { return } msg, err := job.fn(job.ctx, job.payload) if msg != nil { - job.stop.Store(true) // one job was successful. stop all others // Commenting out job cancellations for now because of a resource leak suspected in the GCS golang client. // Issue logged here: https://github.com/googleapis/google-cloud-go/issues/3018 // job.cancel() @@ -191,11 +191,3 @@ func runJob(job *job) { job.err.Store(err) } } - -// default is concurrency disabled -func defaultConfig() *Config { - return &Config{ - MaxWorkers: 30, - QueueDepth: 10000, - } -} diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index c5082c518e9..b50cc306e0d 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -234,7 +234,7 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, return nil, metrics, nil } - foundBytes, err := rw.pool.RunJobs(derivedCtx, copiedBlocklist, func(ctx context.Context, payload interface{}) ([]byte, error) { + partialTraces, err := rw.pool.RunJobs(derivedCtx, copiedBlocklist, func(ctx context.Context, payload interface{}) ([]byte, error) { meta := payload.(*backend.BlockMeta) block, err := encoding.NewBackendBlock(meta) if err != nil { @@ -256,7 +256,23 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, return foundObject, nil }) - return foundBytes, metrics, err + //if len(partialTraces) == 0 { + // return nil, metrics, err + //} + // + //// merge all partial trace bytes into partialTraces[0] + //for i := range partialTraces { + // if i == 0 { + // continue + // } + // partialTraces[0], err = tempo_util.CombineTraces(partialTraces[0], partialTraces[i]) + // // todo: we may want to ignore the error here + // if err != nil { + // return nil, metrics, err + // } + //} + + return partialTraces[0], metrics, err } func (rw *readerWriter) Shutdown() { From 5549b113ab673b7b1c64e6389669b798682f3950 Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 29 Jan 2021 19:55:06 +0530 Subject: [PATCH 2/5] Complete exhaustive search Signed-off-by: Annanay --- modules/querier/querier.go | 49 ++++++++++++++++++++++---------------- tempodb/pool/pool.go | 4 +++- tempodb/pool/pool_test.go | 15 ++++++++---- tempodb/tempodb.go | 31 ++++++++++++------------ 4 files changed, 58 insertions(+), 41 deletions(-) diff --git a/modules/querier/querier.go b/modules/querier/querier.go index 1452ffacc64..4e0206a601a 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -184,37 +184,44 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque if trace != nil { var spanCountA, spanCountB, spanCountTotal int completeTrace, spanCountA, spanCountB, spanCountTotal = tempo_util.CombineTraceProtos(completeTrace, trace) - span.LogFields(ot_log.String("msg", "combined trace protos"), + span.LogFields(ot_log.String("msg", "combined trace protos from ingesters"), ot_log.Int("spansCountA", spanCountA), ot_log.Int("spansCountB", spanCountB), ot_log.Int("spansCountTotal", spanCountTotal)) } } - } - // if the ingester didn't have it check the store. - if completeTrace == nil { - foundBytes, metrics, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd) - if err != nil { - return nil, errors.Wrap(err, "error querying store in Querier.FindTraceByID") - } + span.LogFields(ot_log.String("msg", "done searching ingesters"), ot_log.Bool("found", completeTrace != nil)) + } - out := &tempopb.Trace{} - err = proto.Unmarshal(foundBytes, out) - if err != nil { - return nil, err - } + span.LogFields(ot_log.String("msg", "searching store")) + foundBytes, metrics, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd) + if err != nil { + return nil, errors.Wrap(err, "error querying store in Querier.FindTraceByID") + } - span.LogFields(ot_log.String("msg", "found backend trace"), ot_log.Int("len", len(foundBytes))) - completeTrace = out - metricQueryReads.WithLabelValues("bloom").Observe(float64(metrics.BloomFilterReads.Load())) - metricQueryBytesRead.WithLabelValues("bloom").Observe(float64(metrics.BloomFilterBytesRead.Load())) - metricQueryReads.WithLabelValues("index").Observe(float64(metrics.IndexReads.Load())) - metricQueryBytesRead.WithLabelValues("index").Observe(float64(metrics.IndexBytesRead.Load())) - metricQueryReads.WithLabelValues("block").Observe(float64(metrics.BlockReads.Load())) - metricQueryBytesRead.WithLabelValues("block").Observe(float64(metrics.BlockBytesRead.Load())) + storeTrace := &tempopb.Trace{} + err = proto.Unmarshal(foundBytes, storeTrace) + if err != nil { + return nil, err } + span.LogFields(ot_log.String("msg", "found backend trace"), ot_log.Int("len", len(foundBytes))) + metricQueryReads.WithLabelValues("bloom").Observe(float64(metrics.BloomFilterReads.Load())) + metricQueryBytesRead.WithLabelValues("bloom").Observe(float64(metrics.BloomFilterBytesRead.Load())) + metricQueryReads.WithLabelValues("index").Observe(float64(metrics.IndexReads.Load())) + metricQueryBytesRead.WithLabelValues("index").Observe(float64(metrics.IndexBytesRead.Load())) + metricQueryReads.WithLabelValues("block").Observe(float64(metrics.BlockReads.Load())) + metricQueryBytesRead.WithLabelValues("block").Observe(float64(metrics.BlockBytesRead.Load())) + + // combine out with completeTrace + var spanCountA, spanCountB, spanCountTotal int + completeTrace, spanCountA, spanCountB, spanCountTotal = tempo_util.CombineTraceProtos(completeTrace, storeTrace) + span.LogFields(ot_log.String("msg", "combined trace protos from ingesters and store"), + ot_log.Int("spansCountA", spanCountA), + ot_log.Int("spansCountB", spanCountB), + ot_log.Int("spansCountTotal", spanCountTotal)) + return &tempopb.TraceByIDResponse{ Trace: completeTrace, }, nil diff --git a/tempodb/pool/pool.go b/tempodb/pool/pool.go index be70a18682a..e1e7fa7747d 100644 --- a/tempodb/pool/pool.go +++ b/tempodb/pool/pool.go @@ -118,9 +118,11 @@ func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) // wait for all jobs to finish wg.Wait() + // close resultsCh + close(resultsCh) + // read all from results channel var msg [][]byte - // fixme: this loop seems to be running infinitely :/ for res := range resultsCh { msg = append(msg, res) } diff --git a/tempodb/pool/pool_test.go b/tempodb/pool/pool_test.go index 207338a2921..46ff69ebc36 100644 --- a/tempodb/pool/pool_test.go +++ b/tempodb/pool/pool_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/goleak" ) @@ -34,7 +35,8 @@ func TestResults(t *testing.T) { msg, err := p.RunJobs(context.Background(), payloads, fn) assert.NoError(t, err) - assert.Equal(t, ret, msg) + require.Len(t, msg, 1) + assert.Equal(t, ret, msg[0]) goleak.VerifyNone(t, opts) p.Shutdown() @@ -80,7 +82,10 @@ func TestMultipleHits(t *testing.T) { payloads := []interface{}{1, 2, 3, 4, 5} msg, err := p.RunJobs(context.Background(), payloads, fn) - assert.Equal(t, ret, msg) + require.Len(t, msg, 5) + for i, _ := range payloads { + assert.Equal(t, ret, msg[i]) + } assert.Nil(t, err) goleak.VerifyNone(t, opts) @@ -186,7 +191,8 @@ func TestOneWorker(t *testing.T) { msg, err := p.RunJobs(context.Background(), payloads, fn) assert.NoError(t, err) - assert.Equal(t, ret, msg) + require.Len(t, msg, 1) + assert.Equal(t, ret, msg[0]) goleak.VerifyNone(t, opts) p.Shutdown() @@ -221,7 +227,8 @@ func TestGoingHam(t *testing.T) { msg, err := p.RunJobs(context.Background(), payloads, fn) assert.NoError(t, err) - assert.Equal(t, ret, msg) + require.Len(t, msg, 1) + assert.Equal(t, ret, msg[0]) wg.Done() }() } diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 8497ff7a66f..d111ff6db2e 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -5,6 +5,8 @@ import ( "context" "encoding/hex" "fmt" + tempo_util "github.com/grafana/tempo/pkg/util" + "github.com/pkg/errors" "sort" "sync" "time" @@ -256,21 +258,20 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, return foundObject, nil }) - //if len(partialTraces) == 0 { - // return nil, metrics, err - //} - // - //// merge all partial trace bytes into partialTraces[0] - //for i := range partialTraces { - // if i == 0 { - // continue - // } - // partialTraces[0], err = tempo_util.CombineTraces(partialTraces[0], partialTraces[i]) - // // todo: we may want to ignore the error here - // if err != nil { - // return nil, metrics, err - // } - //} + if len(partialTraces) == 0 { + return nil, metrics, err + } + + // merge all partial trace bytes into partialTraces[0] + for i := range partialTraces { + if i == 0 { + continue + } + partialTraces[0], err = tempo_util.CombineTraces(partialTraces[0], partialTraces[i]) + if err != nil { + return nil, metrics, errors.Wrap(err, "error combining traces in store.Find") + } + } return partialTraces[0], metrics, err } From b58dfd143d2e4334000ba77c5618858460dbc805 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 3 Feb 2021 12:26:24 +0530 Subject: [PATCH 3/5] Added tests, address comments Signed-off-by: Annanay --- modules/querier/querier.go | 21 +++--- modules/querier/querier_test.go | 116 ++++++++++++++++++++++++++++++++ pkg/util/test/req.go | 20 ++++++ pkg/util/trace_test.go | 23 +------ tempodb/compactor_test.go | 2 +- tempodb/pool/pool.go | 14 ++-- tempodb/pool/pool_test.go | 2 +- tempodb/tempodb.go | 29 ++------ tempodb/tempodb_test.go | 4 +- 9 files changed, 165 insertions(+), 66 deletions(-) create mode 100644 modules/querier/querier_test.go diff --git a/modules/querier/querier.go b/modules/querier/querier.go index 4e0206a601a..81b971d7f61 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -195,18 +195,12 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque } span.LogFields(ot_log.String("msg", "searching store")) - foundBytes, metrics, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd) + partialTraces, metrics, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd) if err != nil { return nil, errors.Wrap(err, "error querying store in Querier.FindTraceByID") } - storeTrace := &tempopb.Trace{} - err = proto.Unmarshal(foundBytes, storeTrace) - if err != nil { - return nil, err - } - - span.LogFields(ot_log.String("msg", "found backend trace"), ot_log.Int("len", len(foundBytes))) + span.LogFields(ot_log.String("msg", "done searching store")) metricQueryReads.WithLabelValues("bloom").Observe(float64(metrics.BloomFilterReads.Load())) metricQueryBytesRead.WithLabelValues("bloom").Observe(float64(metrics.BloomFilterBytesRead.Load())) metricQueryReads.WithLabelValues("index").Observe(float64(metrics.IndexReads.Load())) @@ -214,9 +208,16 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque metricQueryReads.WithLabelValues("block").Observe(float64(metrics.BlockReads.Load())) metricQueryBytesRead.WithLabelValues("block").Observe(float64(metrics.BlockBytesRead.Load())) - // combine out with completeTrace + // combine partialTraces with completeTrace var spanCountA, spanCountB, spanCountTotal int - completeTrace, spanCountA, spanCountB, spanCountTotal = tempo_util.CombineTraceProtos(completeTrace, storeTrace) + for _, partialTrace := range partialTraces { + storeTrace := &tempopb.Trace{} + err = proto.Unmarshal(partialTrace, storeTrace) + if err != nil { + return nil, err + } + completeTrace, spanCountA, spanCountB, spanCountTotal = tempo_util.CombineTraceProtos(completeTrace, storeTrace) + } span.LogFields(ot_log.String("msg", "combined trace protos from ingesters and store"), ot_log.Int("spansCountA", spanCountA), ot_log.Int("spansCountB", spanCountB), diff --git a/modules/querier/querier_test.go b/modules/querier/querier_test.go new file mode 100644 index 00000000000..6b67582cab9 --- /dev/null +++ b/modules/querier/querier_test.go @@ -0,0 +1,116 @@ +package querier + +import ( + "context" + "io/ioutil" + "math/rand" + "os" + "path" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/golang/protobuf/proto" + "github.com/google/uuid" + v1 "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/util" + "github.com/grafana/tempo/pkg/util/test" + "github.com/grafana/tempo/tempodb" + "github.com/grafana/tempo/tempodb/backend/local" + "github.com/grafana/tempo/tempodb/pool" + "github.com/grafana/tempo/tempodb/wal" +) + +type mockSharder struct { +} + +func (m *mockSharder) Owns(hash string) bool { + return true +} + +func (m *mockSharder) Combine(objA []byte, objB []byte) []byte { + combined, _ := util.CombineTraces(objA, objB) + return combined +} + +func TestReturnAllHits(t *testing.T) { + tempDir, err := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + assert.NoError(t, err, "unexpected error creating temp dir") + + r, w, _, err := tempodb.New(&tempodb.Config{ + Backend: "local", + Pool: &pool.Config{ + MaxWorkers: 10, + QueueDepth: 100, + }, + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + IndexDownsample: 10, + BloomFP: .05, + }, + BlocklistPoll: 50 * time.Millisecond, + }, log.NewNopLogger()) + assert.NoError(t, err, "unexpected error creating tempodb") + + wal := w.WAL() + assert.NoError(t, err) + + blockCount := 2 + testTraceID := make([]byte, 16) + _, err = rand.Read(testTraceID) + assert.NoError(t, err) + + // keep track of traces sent + testTraces := make([]*tempopb.Trace, 0, blockCount) + + // split the same trace across multiple blocks + for i := 0; i < blockCount; i++ { + blockID := uuid.New() + head, err := wal.NewBlock(blockID, util.FakeTenantID) + assert.NoError(t, err) + + req := test.MakeRequest(10, testTraceID) + testTraces = append(testTraces, &tempopb.Trace{Batches: []*v1.ResourceSpans{req.Batch}}) + bReq, err := proto.Marshal(req) + assert.NoError(t, err) + + err = head.Write(testTraceID, bReq) + assert.NoError(t, err, "unexpected error writing req") + + complete, err := head.Complete(wal, &mockSharder{}) + assert.NoError(t, err) + + err = w.WriteBlock(context.Background(), complete) + assert.NoError(t, err) + } + + // sleep for blocklist poll + time.Sleep(100 * time.Millisecond) + + // find should return both now + foundBytes, _, err := r.Find(context.Background(), util.FakeTenantID, testTraceID, tempodb.BlockIDMin, tempodb.BlockIDMax) + assert.NoError(t, err) + require.Len(t, foundBytes, 2) + + // expected trace + expectedTrace, _, _, _ := util.CombineTraceProtos(testTraces[0], testTraces[1]) + test.SortTrace(expectedTrace) + + // actual trace + actualTraceBytes, err := util.CombineTraces(foundBytes[1], foundBytes[0]) + assert.NoError(t, err) + actualTrace := &tempopb.Trace{} + err = proto.Unmarshal(actualTraceBytes, actualTrace) + assert.NoError(t, err) + + test.SortTrace(actualTrace) + assert.Equal(t, expectedTrace, actualTrace) +} diff --git a/pkg/util/test/req.go b/pkg/util/test/req.go index f13f1765294..fe30f0f99b1 100644 --- a/pkg/util/test/req.go +++ b/pkg/util/test/req.go @@ -1,7 +1,9 @@ package test import ( + "bytes" "math/rand" + "sort" "github.com/grafana/tempo/pkg/tempopb" v1 "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1" @@ -69,3 +71,21 @@ func MakeTraceWithSpanCount(requests int, spansEach int, traceID []byte) *tempop return trace } + +func SortTrace(t *tempopb.Trace) { + sort.Slice(t.Batches, func(i, j int) bool { + return bytes.Compare(t.Batches[i].InstrumentationLibrarySpans[0].Spans[0].SpanId, t.Batches[j].InstrumentationLibrarySpans[0].Spans[0].SpanId) == 1 + }) + + for _, b := range t.Batches { + sort.Slice(b.InstrumentationLibrarySpans, func(i, j int) bool { + return bytes.Compare(b.InstrumentationLibrarySpans[i].Spans[0].SpanId, b.InstrumentationLibrarySpans[j].Spans[0].SpanId) == 1 + }) + + for _, ils := range b.InstrumentationLibrarySpans { + sort.Slice(ils.Spans, func(i, j int) bool { + return bytes.Compare(ils.Spans[i].SpanId, ils.Spans[j].SpanId) == 1 + }) + } + } +} diff --git a/pkg/util/trace_test.go b/pkg/util/trace_test.go index 4a3815d54f6..fb2c0859119 100644 --- a/pkg/util/trace_test.go +++ b/pkg/util/trace_test.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "math/rand" - "sort" "testing" "github.com/golang/protobuf/proto" @@ -96,32 +95,14 @@ func TestCombine(t *testing.T) { err = proto.Unmarshal(actual, actualTrace) assert.NoError(t, err) - sortTrace(actualTrace) - sortTrace(expectedTrace) + test.SortTrace(actualTrace) + test.SortTrace(expectedTrace) assert.Equal(t, expectedTrace, actualTrace) } } } -func sortTrace(t *tempopb.Trace) { - sort.Slice(t.Batches, func(i, j int) bool { - return bytes.Compare(t.Batches[i].InstrumentationLibrarySpans[0].Spans[0].SpanId, t.Batches[j].InstrumentationLibrarySpans[0].Spans[0].SpanId) == 1 - }) - - for _, b := range t.Batches { - sort.Slice(b.InstrumentationLibrarySpans, func(i, j int) bool { - return bytes.Compare(b.InstrumentationLibrarySpans[i].Spans[0].SpanId, b.InstrumentationLibrarySpans[j].Spans[0].SpanId) == 1 - }) - - for _, ils := range b.InstrumentationLibrarySpans { - sort.Slice(ils.Spans, func(i, j int) bool { - return bytes.Compare(ils.Spans[i].SpanId, ils.Spans[j].SpanId) == 1 - }) - } - } -} - // logic of actually combining traces should be tested above. focusing on the spancounts here func TestCombineProtos(t *testing.T) { sameTrace := test.MakeTraceWithSpanCount(10, 10, []byte{0x01, 0x03}) diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 52803e4fbb4..e2f2cbf0e63 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -153,7 +153,7 @@ func TestCompaction(t *testing.T) { assert.NoError(t, err) out := &tempopb.PushRequest{} - err = proto.Unmarshal(b, out) + err = proto.Unmarshal(b[0], out) assert.NoError(t, err) assert.True(t, proto.Equal(allReqs[i], out)) diff --git a/tempodb/pool/pool.go b/tempodb/pool/pool.go index e1e7fa7747d..3161e7d3cf3 100644 --- a/tempodb/pool/pool.go +++ b/tempodb/pool/pool.go @@ -87,9 +87,9 @@ func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) } resultsCh := make(chan []byte, totalJobs) // way for jobs to send back results - err := atomic.NewError(nil) // way for jobs to send back an error - stop := atomic.NewBool(false) // way to signal to the jobs to quit - wg := &sync.WaitGroup{} // way to wait for all jobs to complete + err := atomic.NewError(nil) // way for jobs to send back an error + stop := atomic.NewBool(false) // way to signal to the jobs to quit + wg := &sync.WaitGroup{} // way to wait for all jobs to complete // add each job one at a time. even though we checked length above these might still fail for _, payload := range payloads { @@ -127,13 +127,11 @@ func (p *Pool) RunJobs(ctx context.Context, payloads []interface{}, fn JobFunc) msg = append(msg, res) } - // ignore err if msg != nil. otherwise errors like "context cancelled" - // will take precedence over the err - if len(msg) > 0 { - return msg, nil + if err := err.Load(); err != nil { + return nil, err } - return nil, err.Load() + return msg, nil } func (p *Pool) Shutdown() { diff --git a/tempodb/pool/pool_test.go b/tempodb/pool/pool_test.go index 46ff69ebc36..9d15a5cc951 100644 --- a/tempodb/pool/pool_test.go +++ b/tempodb/pool/pool_test.go @@ -83,7 +83,7 @@ func TestMultipleHits(t *testing.T) { msg, err := p.RunJobs(context.Background(), payloads, fn) require.Len(t, msg, 5) - for i, _ := range payloads { + for i := range payloads { assert.Equal(t, ret, msg[i]) } assert.Nil(t, err) diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index d111ff6db2e..ff3ac00aed8 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -5,8 +5,6 @@ import ( "context" "encoding/hex" "fmt" - tempo_util "github.com/grafana/tempo/pkg/util" - "github.com/pkg/errors" "sort" "sync" "time" @@ -85,7 +83,7 @@ type Writer interface { } type Reader interface { - Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string) ([]byte, common.FindMetrics, error) + Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string) ([][]byte, common.FindMetrics, error) Shutdown() } @@ -189,12 +187,12 @@ func (rw *readerWriter) WAL() *wal.WAL { return rw.wal } -func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string) ([]byte, common.FindMetrics, error) { +func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, blockStart string, blockEnd string) ([][]byte, common.FindMetrics, error) { metrics := common.NewFindMetrics() // tracing instrumentation logger := util.WithContext(ctx, util.Logger) - span, derivedCtx := opentracing.StartSpanFromContext(ctx, "store.Find") + span, ctx := opentracing.StartSpanFromContext(ctx, "store.Find") defer span.Finish() blockStartUUID, err := uuid.Parse(blockStart) @@ -236,14 +234,14 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, return nil, metrics, nil } - partialTraces, err := rw.pool.RunJobs(derivedCtx, copiedBlocklist, func(ctx context.Context, payload interface{}) ([]byte, error) { + partialTraces, err := rw.pool.RunJobs(ctx, copiedBlocklist, func(ctx context.Context, payload interface{}) ([]byte, error) { meta := payload.(*backend.BlockMeta) block, err := encoding.NewBackendBlock(meta) if err != nil { return nil, err } - foundObject, err := block.Find(derivedCtx, rw.r, id, &metrics) + foundObject, err := block.Find(ctx, rw.r, id, &metrics) if err != nil { return nil, err } @@ -258,22 +256,7 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID, return foundObject, nil }) - if len(partialTraces) == 0 { - return nil, metrics, err - } - - // merge all partial trace bytes into partialTraces[0] - for i := range partialTraces { - if i == 0 { - continue - } - partialTraces[0], err = tempo_util.CombineTraces(partialTraces[0], partialTraces[i]) - if err != nil { - return nil, metrics, errors.Wrap(err, "error combining traces in store.Find") - } - } - - return partialTraces[0], metrics, err + return partialTraces, metrics, err } func (rw *readerWriter) Shutdown() { diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 4f678bccb17..615f37ecc65 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -91,7 +91,7 @@ func TestDB(t *testing.T) { assert.NoError(t, err) out := &tempopb.PushRequest{} - err = proto.Unmarshal(bFound, out) + err = proto.Unmarshal(bFound[0], out) assert.NoError(t, err) assert.True(t, proto.Equal(out, reqs[i])) @@ -162,7 +162,7 @@ func TestBlockSharding(t *testing.T) { assert.Greater(t, len(bFound), 0) out := &tempopb.PushRequest{} - err = proto.Unmarshal(bFound, out) + err = proto.Unmarshal(bFound[0], out) assert.NoError(t, err) assert.True(t, proto.Equal(out, req)) From 3e306076afca627e21a37d55f60684aba12fce96 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 3 Feb 2021 13:41:03 +0530 Subject: [PATCH 4/5] Update CHANGELOG & Troubleshooting guide Signed-off-by: Annanay --- CHANGELOG.md | 1 + docs/tempo/website/troubleshooting/_index.md | 29 ++++++++++++++++++-- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 806e0db61fd..60bab74aecd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ * [CHANGE] Fixed ingester latency spikes on read [#461](https://github.com/grafana/tempo/pull/461) * [CHANGE] Ingester cut blocks based on size instead of trace count. Replace ingester `traces_per_block` setting with `max_block_bytes`. This is a **breaking change**. [#474](https://github.com/grafana/tempo/issues/474) * [ENHANCEMENT] Serve config at the "/config" endpoint. [#446](https://github.com/grafana/tempo/pull/446) +* [ENHANCEMENT] Add exhaustive search to combine traces from all blocks in the backend. [#489](https://github.com/grafana/tempo/pull/489) * [BUGFIX] Upgrade cortex dependency to 1.6 to address issue with forgetting ring membership [#442](https://github.com/grafana/tempo/pull/442) * [BUGFIX] No longer raise the `tempodb_blocklist_poll_errors_total` metric if a block doesn't have meta or compacted meta. [#481](https://github.com/grafana/tempo/pull/481) diff --git a/docs/tempo/website/troubleshooting/_index.md b/docs/tempo/website/troubleshooting/_index.md index 1d93cd6a101..c75b25a917f 100644 --- a/docs/tempo/website/troubleshooting/_index.md +++ b/docs/tempo/website/troubleshooting/_index.md @@ -7,7 +7,7 @@ weight: 470 This topic helps with day zero operational issues that may come up when getting started with Tempo. It walks through debugging each part of the ingestion and query pipeline to drill down and diagnose issues. ## Problem 1. I am unable to see any of my traces in Tempo ->** Potential causes** +** Potential causes** - There could be issues in ingestion of the data into Tempo, that is, spans are either not being sent correctly to Tempo or they are not getting sampled. - There could be issues querying for traces that have been received by Tempo. @@ -57,19 +57,42 @@ This can also be confirmed by checking the metric `tempo_request_duration_second ### Diagnosing and fixing issues with querying traces If you have determined that data has been ingested correctly into Tempo, then it is time to investigate possible issues with querying the data. -The presence of the following errors in the Tempo Querier log files may explain why traces are missing: +Check the logs of the Tempo Query Frontend. The Query Frontend pod runs with two containers (Query Frontend & Tempo Query), so lets use the following command to view Query Frontend logs - +```console +kubectl logs -f pod/query-frontend-xxxxx -c query-frontend +``` + +The presence of the following errors in the log may explain issues with querying traces: + +- `level=info ts=XXXXXXX caller=frontend.go:63 method=GET traceID=XXXXXXXXX url=/api/traces/XXXXXXXXX duration=5m41.729449877s status=500` - `no org id` - `could not dial 10.X.X.X:3100 connection refused` - `tenant-id not found` Possible reasons for the above errors are: +- Tempo Querier not connected to Tempo Query Frontend. Check the value of the metric `cortex_query_frontend_connected_clients` exposed by the Query Frontend. + It should be > 0, which indicates that Queriers are connected to the Query Frontend. +- Grafana Tempo Datasource not configured to pass tenant-id in Authorization header (only applicable to multi-tenant deployments). - Not connected to Tempo Querier correctly - Insufficient permissions #### Solution - Fixing connection issues - - In case the application is not connected to Tempo Querier correctly, update the `backend.yaml` configuration file so that it is attempting to connect to the right port of the querier. + - In case we the queriers are not connected to the Query Frontend, check the following section in Querier configuration and make sure the address of the Query Frontend is correct + ``` + querier: + frontend_worker: + frontend_address: query-frontend-discovery.default.svc.cluster.local:9095 + ``` + - Verify the `backend.yaml` configuration file present on the Tempo Query container and make sure it is attempting to connect to the right port of the query frontend. + - Verify the Grafana Tempo Datasource configuration, and make sure it has the following settings configured: + ``` + jsonData: + httpHeaderName1: 'Authorization' + secureJsonData: + httpHeaderValue1: 'Bearer ' + ``` - Fixing insufficient permissions issue - Verify that the Querier has the LIST and GET permissions on the bucket. From 93bd57512ec928b4a45d99b879758c69db57f6f0 Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 4 Feb 2021 17:39:14 +0530 Subject: [PATCH 5/5] Add tag with total span count Signed-off-by: Annanay --- modules/querier/querier.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/modules/querier/querier.go b/modules/querier/querier.go index 81b971d7f61..8d1ef103c0b 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -160,6 +160,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque defer span.Finish() var completeTrace *tempopb.Trace + var spanCount, spanCountTotal int if req.QueryIngesters { key := tempo_util.TokenFor(userID, req.TraceID) @@ -182,12 +183,11 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque for _, r := range responses { trace := r.response.Trace if trace != nil { - var spanCountA, spanCountB, spanCountTotal int - completeTrace, spanCountA, spanCountB, spanCountTotal = tempo_util.CombineTraceProtos(completeTrace, trace) - span.LogFields(ot_log.String("msg", "combined trace protos from ingesters"), - ot_log.Int("spansCountA", spanCountA), - ot_log.Int("spansCountB", spanCountB), - ot_log.Int("spansCountTotal", spanCountTotal)) + completeTrace, _, _, spanCount = tempo_util.CombineTraceProtos(completeTrace, trace) + if spanCount > 0 { + spanCountTotal = spanCount + } + span.LogFields(ot_log.String("msg", "combined trace protos from ingesters")) } } @@ -209,19 +209,19 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque metricQueryBytesRead.WithLabelValues("block").Observe(float64(metrics.BlockBytesRead.Load())) // combine partialTraces with completeTrace - var spanCountA, spanCountB, spanCountTotal int for _, partialTrace := range partialTraces { storeTrace := &tempopb.Trace{} err = proto.Unmarshal(partialTrace, storeTrace) if err != nil { return nil, err } - completeTrace, spanCountA, spanCountB, spanCountTotal = tempo_util.CombineTraceProtos(completeTrace, storeTrace) + completeTrace, _, _, spanCount = tempo_util.CombineTraceProtos(completeTrace, storeTrace) + if spanCount > 0 { + spanCountTotal = spanCount + } } span.LogFields(ot_log.String("msg", "combined trace protos from ingesters and store"), - ot_log.Int("spansCountA", spanCountA), - ot_log.Int("spansCountB", spanCountB), - ot_log.Int("spansCountTotal", spanCountTotal)) + ot_log.Int("spanCountTotal", spanCountTotal)) return &tempopb.TraceByIDResponse{ Trace: completeTrace,