Skip to content

Commit

Permalink
Search: drop use of TagCache, extract tags and tag values on-demand (#…
Browse files Browse the repository at this point in the history
…1068)

* Search: drop use of TagCache, extract tags and tag values on-demand

* Fix compile-error in tests

* fmt

* Remove TagCache :cat-salute:

* Add error handling

* lint

* Clean up and optimisations

* Fix compilation errors

* Refactor methods for consistency with instance.Search

* Tweak tag lookup binary search to eliminate last comparison and fetch of data from flatbuffers. Add new FindTag function

* Use tempofb.FindTag

* Add SearchDataMap RangeKeys and RangeKeyValues

* make fmt

* Update CHANGELOG.md

* Cast to string once

* Reuse SearchEntry buffer where possible

* fix key/value typo

* Add limit on response size for a tag-values query

Signed-off-by: Annanay <[email protected]>

* Lint and CHANGELOG

Signed-off-by: Annanay <[email protected]>

* lint, fix test by adding userID to ctx

Signed-off-by: Annanay <[email protected]>

Co-authored-by: Martin Disibio <[email protected]>
Co-authored-by: Annanay <[email protected]>
  • Loading branch information
3 people authored Dec 2, 2021
1 parent 678f31c commit 6ec3eb8
Show file tree
Hide file tree
Showing 24 changed files with 496 additions and 251 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
## main / unreleased

* [CHANGE] Search: Add new per-tenant limit `max_bytes_per_tag_values_query` to limit the size of tag-values response. [#1068](https://github.com/grafana/tempo/pull/1068) (@annanay25)
* [ENHANCEMENT] Expose `upto` parameter on hedged requests for each backend with `hedge_requests_up_to`. [#1085](https://github.com/grafana/tempo/pull/1085) (@joe-elliott)
* [ENHANCEMENT] Search: drop use of TagCache, extract tags and tag values on-demand [#1068](https://github.com/grafana/tempo/pull/1068) (@kvrhdn)
* [ENHANCEMENT] Jsonnet: add `$._config.namespace` to filter by namespace in cortex metrics [#1098](https://github.com/grafana/tempo/pull/1098) (@mapno)
* [ENHANCEMENT] Add middleware to compress frontend HTTP responses with gzip if requested [#1080](https://github.com/grafana/tempo/pull/1080) (@kvrhdn, @zalegrala)
* [ENHANCEMENT] Allow query disablement in vulture [#1117](https://github.com/grafana/tempo/pull/1117) (@zalegrala)
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (t *App) setupModuleManager() error {
Ring: {Server, MemberlistKV},
Distributor: {Ring, Server, Overrides},
Ingester: {Store, Server, Overrides, MemberlistKV},
Querier: {Store, Ring},
Querier: {Store, Ring, Overrides},
Compactor: {Store, Server, Overrides, MemberlistKV},
SingleBinary: {Compactor, QueryFrontend, Querier, Ingester, Distributor},
ScalableSingleBinary: {SingleBinary},
Expand Down
1 change: 1 addition & 0 deletions example/docker-compose/tempo-search/overrides.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ overrides:
max_global_traces_per_user: 0
max_bytes_per_trace: 50000
max_search_bytes_per_trace: 0
max_bytes_per_tag_values_query: 5000000
block_retention: 0s
3 changes: 0 additions & 3 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,6 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
if err != nil {
level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to complete block", "err", err)
}

// periodically purge tag cache, keep tags within complete block timeout (i.e. data that is locally)
instance.PurgeExpiredSearchTags(time.Now().Add(-i.cfg.CompleteBlockTimeout))
}

func (i *Ingester) flushLoop(j int) {
Expand Down
18 changes: 8 additions & 10 deletions modules/ingester/ingester_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,12 @@ func (i *Ingester) SearchTags(ctx context.Context, req *tempopb.SearchTagsReques
return &tempopb.SearchTagsResponse{}, nil
}

tags := inst.GetSearchTags()

resp := &tempopb.SearchTagsResponse{
TagNames: tags,
res, err := inst.SearchTags(ctx)
if err != nil {
return nil, err
}

return resp, nil
return res, nil
}

func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagValuesRequest) (*tempopb.SearchTagValuesResponse, error) {
Expand All @@ -56,13 +55,12 @@ func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVa
return &tempopb.SearchTagValuesResponse{}, nil
}

vals := inst.GetSearchTagValues(req.TagName)

resp := &tempopb.SearchTagValuesResponse{
TagValues: vals,
res, err := inst.SearchTagValues(ctx, req.TagName)
if err != nil {
return nil, err
}

return resp, nil
return res, nil
}

// todo(search): consolidate. this only exists so that the ingester continues to implement the tempopb.QuerierServer interface.
Expand Down
6 changes: 0 additions & 6 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type instance struct {
searchHeadBlock *searchStreamingBlockEntry
searchAppendBlocks map[*wal.AppendBlock]*searchStreamingBlockEntry
searchCompleteBlocks map[*wal.LocalBlock]*searchLocalBlockEntry
searchTagCache *search.TagCache

lastBlockCut time.Time

Expand Down Expand Up @@ -107,7 +106,6 @@ func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer, l *
traces: map[uint32]*trace{},
searchAppendBlocks: map[*wal.AppendBlock]*searchStreamingBlockEntry{},
searchCompleteBlocks: map[*wal.LocalBlock]*searchLocalBlockEntry{},
searchTagCache: search.NewTagCache(),

instanceID: instanceID,
tracesCreatedTotal: metricTracesCreatedTotal.WithLabelValues(instanceID),
Expand Down Expand Up @@ -176,10 +174,6 @@ func (i *instance) PushBytes(ctx context.Context, id []byte, traceBytes []byte,
return status.Errorf(codes.FailedPrecondition, "%s max live traces per tenant exceeded: %v", overrides.ErrorPrefixLiveTracesExceeded, err)
}

if searchData != nil {
i.RecordSearchLookupValues(searchData)
}

i.tracesMtx.Lock()
defer i.tracesMtx.Unlock()

Expand Down
193 changes: 182 additions & 11 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package ingester
import (
"context"
"sort"
"time"

cortex_util "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log/level"
"github.com/grafana/tempo/pkg/util"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
"github.com/weaveworks/common/user"

"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/pkg/tempopb"
Expand Down Expand Up @@ -93,6 +94,8 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr *

span.LogFields(ot_log.Event("live traces mtx acquired"))

entry := &tempofb.SearchEntry{} // buffer

for _, t := range i.traces {
if sr.Quit() {
return
Expand All @@ -106,7 +109,7 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr *
for _, s := range t.searchData {
sr.AddBytesInspected(uint64(len(s)))

entry := tempofb.SearchEntryFromBytes(s)
entry.Reset(s)
if p.Matches(entry) {
newResult := search.GetSearchResultFromData(entry)
if result != nil {
Expand Down Expand Up @@ -181,19 +184,187 @@ func (i *instance) searchLocalBlocks(ctx context.Context, p search.Pipeline, sr
}
}

func (i *instance) GetSearchTags() []string {
return i.searchTagCache.GetNames()
func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse, error) {
tags := map[string]struct{}{}

kv := &tempofb.KeyValues{}
err := i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) {
for i, ii := 0, entry.TagsLength(); i < ii; i++ {
entry.Tags(kv, i)
key := string(kv.Key())
// check the tag is already set, this is more performant with repetitive values
if _, ok := tags[key]; !ok {
tags[key] = struct{}{}
}
}
})
if err != nil {
return nil, err
}

err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error {
return block.Tags(ctx, tags)
})
if err != nil {
return nil, err
}

return &tempopb.SearchTagsResponse{
TagNames: extractKeys(tags),
}, nil
}

func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempopb.SearchTagValuesResponse, error) {
values := map[string]struct{}{}

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
// get limit from override
maxBytesPerTagValuesQuery := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)

kv := &tempofb.KeyValues{}
tagNameBytes := []byte(tagName)
err = i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) {
kv := tempofb.FindTag(entry, kv, tagNameBytes)
if kv != nil {
for i, ii := 0, kv.ValueLength(); i < ii; i++ {
key := string(kv.Value(i))
// check the value is already set, this is more performant with repetitive values
if _, ok := values[key]; !ok {
values[key] = struct{}{}
}
}
}
})
if err != nil {
return nil, err
}

// check if size of values map is within limit after scanning live traces
if !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(cortex_util.Logger).Log("msg", "size of tag values from live traces exceeded limit, reduce cardinality or size of tags", "tag", tagName)
// return empty response to avoid querier OOMs
return &tempopb.SearchTagValuesResponse{
TagValues: []string{},
}, nil
}

err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error {
return block.TagValues(ctx, tagName, values)
})
if err != nil {
return nil, err
}

// check if size of values map is within limit after scanning all blocks
if !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(cortex_util.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName)
// return empty response to avoid querier OOMs
return &tempopb.SearchTagValuesResponse{
TagValues: []string{},
}, nil
}

return &tempopb.SearchTagValuesResponse{
TagValues: extractKeys(values),
}, nil
}

func (i *instance) visitSearchEntriesLiveTraces(ctx context.Context, visitFn func(entry *tempofb.SearchEntry)) error {
span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchEntriesLiveTraces")
defer span.Finish()

i.tracesMtx.Lock()
defer i.tracesMtx.Unlock()

se := &tempofb.SearchEntry{}
for _, t := range i.traces {
for _, s := range t.searchData {
se.Reset(s)
visitFn(se)

if err := ctx.Err(); err != nil {
return err
}
}
}
return nil
}

func (i *instance) visitSearchableBlocks(ctx context.Context, visitFn func(block search.SearchableBlock) error) error {
i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()

err := i.visitSearchableBlocksWAL(ctx, visitFn)
if err != nil {
return err
}

return i.visitSearchableBlocksLocalBlocks(ctx, visitFn)
}

func (i *instance) GetSearchTagValues(tagName string) []string {
return i.searchTagCache.GetValues(tagName)
// visitSearchableBlocksWAL visits every WAL block. Must be called under lock.
func (i *instance) visitSearchableBlocksWAL(ctx context.Context, visitFn func(block search.SearchableBlock) error) error {
span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksWAL")
defer span.Finish()

visitUnderLock := func(entry *searchStreamingBlockEntry) error {
entry.mtx.RLock()
defer entry.mtx.RUnlock()

return visitFn(entry.b)
}

err := visitUnderLock(i.searchHeadBlock)
if err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}

for _, b := range i.searchAppendBlocks {
err := visitUnderLock(b)
if err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
}
return nil
}

func (i *instance) RecordSearchLookupValues(b []byte) {
s := tempofb.SearchEntryFromBytes(b)
i.searchTagCache.SetData(time.Now(), s)
// visitSearchableBlocksWAL visits every local block. Must be called under lock.
func (i *instance) visitSearchableBlocksLocalBlocks(ctx context.Context, visitFn func(block search.SearchableBlock) error) error {
span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksLocalBlocks")
defer span.Finish()

visitUnderLock := func(entry *searchLocalBlockEntry) error {
entry.mtx.RLock()
defer entry.mtx.RUnlock()

return visitFn(entry.b)
}

for _, b := range i.searchCompleteBlocks {
err := visitUnderLock(b)
if err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
}
return nil
}

func (i *instance) PurgeExpiredSearchTags(before time.Time) {
i.searchTagCache.PurgeExpired(before)
func extractKeys(set map[string]struct{}) []string {
keys := make([]string, 0, len(set))
for k := range set {
keys = append(keys, k)
}
return keys
}
16 changes: 11 additions & 5 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ import (
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/atomic"
"github.com/weaveworks/common/user"

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/search"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/atomic"
)

func checkEqual(t *testing.T, ids [][]byte, sr *tempopb.SearchResponse) {
Expand Down Expand Up @@ -247,11 +249,15 @@ func TestInstanceSearchDoesNotRace(t *testing.T) {
})

go concurrent(func() {
i.GetSearchTags()
_, err := i.SearchTags(context.Background())
require.NoError(t, err, "error getting search tags")
})

go concurrent(func() {
i.GetSearchTagValues(tagKey)
// SearchTagValues queries now require userID in ctx
ctx := user.InjectOrgID(context.Background(), "test")
_, err := i.SearchTagValues(ctx, tagKey)
require.NoError(t, err, "error getting search tag values")
})

time.Sleep(2000 * time.Millisecond)
Expand Down
Loading

0 comments on commit 6ec3eb8

Please sign in to comment.