Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query path - Add exhaustive search to combine traces split across blocks #489

Merged
merged 7 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [ENHANCEMENT] Serve config at the "/config" endpoint. [#446](https://github.com/grafana/tempo/pull/446)
* [ENHANCEMENT] Switch blocklist polling and retention to different concurrency mechanism, add configuration options. [#475](https://github.com/grafana/tempo/issues/475)
* [ENHANCEMENT] Add S3 options region and forcepathstyle [#431](https://github.com/grafana/tempo/issues/431)
* [ENHANCEMENT] Add exhaustive search to combine traces from all blocks in the backend. [#489](https://github.com/grafana/tempo/pull/489)
* [BUGFIX] Upgrade cortex dependency to 1.6 to address issue with forgetting ring membership [#442](https://github.com/grafana/tempo/pull/442)
* [BUGFIX] No longer raise the `tempodb_blocklist_poll_errors_total` metric if a block doesn't have meta or compacted meta. [#481](https://github.com/grafana/tempo/pull/481)

Expand Down
29 changes: 26 additions & 3 deletions docs/tempo/website/troubleshooting/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ weight: 470
This topic helps with day zero operational issues that may come up when getting started with Tempo. It walks through debugging each part of the ingestion and query pipeline to drill down and diagnose issues.

## Problem 1. I am unable to see any of my traces in Tempo
>** Potential causes**
** Potential causes**
- There could be issues in ingestion of the data into Tempo, that is, spans are either not being sent correctly to Tempo or they are not getting sampled.
- There could be issues querying for traces that have been received by Tempo.

Expand Down Expand Up @@ -57,19 +57,42 @@ This can also be confirmed by checking the metric `tempo_request_duration_second
### Diagnosing and fixing issues with querying traces
If you have determined that data has been ingested correctly into Tempo, then it is time to investigate possible issues with querying the data.

The presence of the following errors in the Tempo Querier log files may explain why traces are missing:
Check the logs of the Tempo Query Frontend. The Query Frontend pod runs with two containers (Query Frontend & Tempo Query), so lets use the following command to view Query Frontend logs -

```console
kubectl logs -f pod/query-frontend-xxxxx -c query-frontend
```

The presence of the following errors in the log may explain issues with querying traces:

- `level=info ts=XXXXXXX caller=frontend.go:63 method=GET traceID=XXXXXXXXX url=/api/traces/XXXXXXXXX duration=5m41.729449877s status=500`
- `no org id`
- `could not dial 10.X.X.X:3100 connection refused`
- `tenant-id not found`

Possible reasons for the above errors are:
- Tempo Querier not connected to Tempo Query Frontend. Check the value of the metric `cortex_query_frontend_connected_clients` exposed by the Query Frontend.
It should be > 0, which indicates that Queriers are connected to the Query Frontend.
- Grafana Tempo Datasource not configured to pass tenant-id in Authorization header (only applicable to multi-tenant deployments).
- Not connected to Tempo Querier correctly
- Insufficient permissions

#### Solution
- Fixing connection issues
- In case the application is not connected to Tempo Querier correctly, update the `backend.yaml` configuration file so that it is attempting to connect to the right port of the querier.
- In case we the queriers are not connected to the Query Frontend, check the following section in Querier configuration and make sure the address of the Query Frontend is correct
```
querier:
frontend_worker:
frontend_address: query-frontend-discovery.default.svc.cluster.local:9095
```
- Verify the `backend.yaml` configuration file present on the Tempo Query container and make sure it is attempting to connect to the right port of the query frontend.
- Verify the Grafana Tempo Datasource configuration, and make sure it has the following settings configured:
```
jsonData:
httpHeaderName1: 'Authorization'
secureJsonData:
httpHeaderValue1: 'Bearer <tenant-id>'
```
- Fixing insufficient permissions issue
- Verify that the Querier has the LIST and GET permissions on the bucket.

Expand Down
44 changes: 26 additions & 18 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,36 +184,44 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
if trace != nil {
var spanCountA, spanCountB, spanCountTotal int
completeTrace, spanCountA, spanCountB, spanCountTotal = tempo_util.CombineTraceProtos(completeTrace, trace)
span.LogFields(ot_log.String("msg", "combined trace protos"),
span.LogFields(ot_log.String("msg", "combined trace protos from ingesters"),
ot_log.Int("spansCountA", spanCountA),
ot_log.Int("spansCountB", spanCountB),
ot_log.Int("spansCountTotal", spanCountTotal))
}
}

span.LogFields(ot_log.String("msg", "done searching ingesters"), ot_log.Bool("found", completeTrace != nil))
}

// if the ingester didn't have it check the store.
if completeTrace == nil {
foundBytes, metrics, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd)
if err != nil {
return nil, errors.Wrap(err, "error querying store in Querier.FindTraceByID")
}
span.LogFields(ot_log.String("msg", "searching store"))
partialTraces, metrics, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd)
if err != nil {
return nil, errors.Wrap(err, "error querying store in Querier.FindTraceByID")
}

out := &tempopb.Trace{}
err = proto.Unmarshal(foundBytes, out)
span.LogFields(ot_log.String("msg", "done searching store"))
metricQueryReads.WithLabelValues("bloom").Observe(float64(metrics.BloomFilterReads.Load()))
metricQueryBytesRead.WithLabelValues("bloom").Observe(float64(metrics.BloomFilterBytesRead.Load()))
metricQueryReads.WithLabelValues("index").Observe(float64(metrics.IndexReads.Load()))
metricQueryBytesRead.WithLabelValues("index").Observe(float64(metrics.IndexBytesRead.Load()))
metricQueryReads.WithLabelValues("block").Observe(float64(metrics.BlockReads.Load()))
metricQueryBytesRead.WithLabelValues("block").Observe(float64(metrics.BlockBytesRead.Load()))

// combine partialTraces with completeTrace
var spanCountA, spanCountB, spanCountTotal int
for _, partialTrace := range partialTraces {
storeTrace := &tempopb.Trace{}
err = proto.Unmarshal(partialTrace, storeTrace)
if err != nil {
return nil, err
}

span.LogFields(ot_log.String("msg", "found backend trace"), ot_log.Int("len", len(foundBytes)))
completeTrace = out
metricQueryReads.WithLabelValues("bloom").Observe(float64(metrics.BloomFilterReads.Load()))
metricQueryBytesRead.WithLabelValues("bloom").Observe(float64(metrics.BloomFilterBytesRead.Load()))
metricQueryReads.WithLabelValues("index").Observe(float64(metrics.IndexReads.Load()))
metricQueryBytesRead.WithLabelValues("index").Observe(float64(metrics.IndexBytesRead.Load()))
metricQueryReads.WithLabelValues("block").Observe(float64(metrics.BlockReads.Load()))
metricQueryBytesRead.WithLabelValues("block").Observe(float64(metrics.BlockBytesRead.Load()))
completeTrace, spanCountA, spanCountB, spanCountTotal = tempo_util.CombineTraceProtos(completeTrace, storeTrace)
}
span.LogFields(ot_log.String("msg", "combined trace protos from ingesters and store"),
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
ot_log.Int("spansCountA", spanCountA),
ot_log.Int("spansCountB", spanCountB),
ot_log.Int("spansCountTotal", spanCountTotal))

return &tempopb.TraceByIDResponse{
Trace: completeTrace,
Expand Down
116 changes: 116 additions & 0 deletions modules/querier/querier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package querier

import (
"context"
"io/ioutil"
"math/rand"
"os"
"path"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
v1 "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/pool"
"github.com/grafana/tempo/tempodb/wal"
)

type mockSharder struct {
}

func (m *mockSharder) Owns(hash string) bool {
return true
}

func (m *mockSharder) Combine(objA []byte, objB []byte) []byte {
combined, _ := util.CombineTraces(objA, objB)
return combined
}

func TestReturnAllHits(t *testing.T) {
tempDir, err := ioutil.TempDir("/tmp", "")
defer os.RemoveAll(tempDir)
assert.NoError(t, err, "unexpected error creating temp dir")

r, w, _, err := tempodb.New(&tempodb.Config{
Backend: "local",
Pool: &pool.Config{
MaxWorkers: 10,
QueueDepth: 100,
},
Local: &local.Config{
Path: path.Join(tempDir, "traces"),
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
IndexDownsample: 10,
BloomFP: .05,
},
BlocklistPoll: 50 * time.Millisecond,
}, log.NewNopLogger())
assert.NoError(t, err, "unexpected error creating tempodb")

wal := w.WAL()
assert.NoError(t, err)

blockCount := 2
testTraceID := make([]byte, 16)
_, err = rand.Read(testTraceID)
assert.NoError(t, err)

// keep track of traces sent
testTraces := make([]*tempopb.Trace, 0, blockCount)

// split the same trace across multiple blocks
for i := 0; i < blockCount; i++ {
blockID := uuid.New()
head, err := wal.NewBlock(blockID, util.FakeTenantID)
assert.NoError(t, err)

req := test.MakeRequest(10, testTraceID)
testTraces = append(testTraces, &tempopb.Trace{Batches: []*v1.ResourceSpans{req.Batch}})
bReq, err := proto.Marshal(req)
assert.NoError(t, err)

err = head.Write(testTraceID, bReq)
assert.NoError(t, err, "unexpected error writing req")

complete, err := head.Complete(wal, &mockSharder{})
assert.NoError(t, err)

err = w.WriteBlock(context.Background(), complete)
assert.NoError(t, err)
}

// sleep for blocklist poll
time.Sleep(100 * time.Millisecond)

// find should return both now
foundBytes, _, err := r.Find(context.Background(), util.FakeTenantID, testTraceID, tempodb.BlockIDMin, tempodb.BlockIDMax)
assert.NoError(t, err)
require.Len(t, foundBytes, 2)

// expected trace
expectedTrace, _, _, _ := util.CombineTraceProtos(testTraces[0], testTraces[1])
test.SortTrace(expectedTrace)

// actual trace
actualTraceBytes, err := util.CombineTraces(foundBytes[1], foundBytes[0])
assert.NoError(t, err)
actualTrace := &tempopb.Trace{}
err = proto.Unmarshal(actualTraceBytes, actualTrace)
assert.NoError(t, err)

test.SortTrace(actualTrace)
assert.Equal(t, expectedTrace, actualTrace)
}
20 changes: 20 additions & 0 deletions pkg/util/test/req.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package test

import (
"bytes"
"math/rand"
"sort"

"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
Expand Down Expand Up @@ -69,3 +71,21 @@ func MakeTraceWithSpanCount(requests int, spansEach int, traceID []byte) *tempop

return trace
}

func SortTrace(t *tempopb.Trace) {
sort.Slice(t.Batches, func(i, j int) bool {
return bytes.Compare(t.Batches[i].InstrumentationLibrarySpans[0].Spans[0].SpanId, t.Batches[j].InstrumentationLibrarySpans[0].Spans[0].SpanId) == 1
})

for _, b := range t.Batches {
sort.Slice(b.InstrumentationLibrarySpans, func(i, j int) bool {
return bytes.Compare(b.InstrumentationLibrarySpans[i].Spans[0].SpanId, b.InstrumentationLibrarySpans[j].Spans[0].SpanId) == 1
})

for _, ils := range b.InstrumentationLibrarySpans {
sort.Slice(ils.Spans, func(i, j int) bool {
return bytes.Compare(ils.Spans[i].SpanId, ils.Spans[j].SpanId) == 1
})
}
}
}
23 changes: 2 additions & 21 deletions pkg/util/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"math/rand"
"sort"
"testing"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -96,32 +95,14 @@ func TestCombine(t *testing.T) {
err = proto.Unmarshal(actual, actualTrace)
assert.NoError(t, err)

sortTrace(actualTrace)
sortTrace(expectedTrace)
test.SortTrace(actualTrace)
test.SortTrace(expectedTrace)

assert.Equal(t, expectedTrace, actualTrace)
}
}
}

func sortTrace(t *tempopb.Trace) {
sort.Slice(t.Batches, func(i, j int) bool {
return bytes.Compare(t.Batches[i].InstrumentationLibrarySpans[0].Spans[0].SpanId, t.Batches[j].InstrumentationLibrarySpans[0].Spans[0].SpanId) == 1
})

for _, b := range t.Batches {
sort.Slice(b.InstrumentationLibrarySpans, func(i, j int) bool {
return bytes.Compare(b.InstrumentationLibrarySpans[i].Spans[0].SpanId, b.InstrumentationLibrarySpans[j].Spans[0].SpanId) == 1
})

for _, ils := range b.InstrumentationLibrarySpans {
sort.Slice(ils.Spans, func(i, j int) bool {
return bytes.Compare(ils.Spans[i].SpanId, ils.Spans[j].SpanId) == 1
})
}
}
}

// logic of actually combining traces should be tested above. focusing on the spancounts here
func TestCombineProtos(t *testing.T) {
sameTrace := test.MakeTraceWithSpanCount(10, 10, []byte{0x01, 0x03})
Expand Down
2 changes: 1 addition & 1 deletion tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestCompaction(t *testing.T) {
assert.NoError(t, err)

out := &tempopb.PushRequest{}
err = proto.Unmarshal(b, out)
err = proto.Unmarshal(b[0], out)
assert.NoError(t, err)

assert.True(t, proto.Equal(allReqs[i], out))
Expand Down
8 changes: 8 additions & 0 deletions tempodb/pool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,11 @@ type Config struct {
MaxWorkers int `yaml:"max_workers"`
QueueDepth int `yaml:"queue_depth"`
}

// default is concurrency disabled
func defaultConfig() *Config {
return &Config{
MaxWorkers: 30,
QueueDepth: 10000,
}
}
Loading