Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Search: drop use of TagCache, extract tags and tag values on-demand #1068

Merged
merged 30 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
654efd3
Search: drop use of TagCache, extract tags and tag values on-demand
Oct 21, 2021
f4faba8
Fix compile-error in tests
Oct 21, 2021
e8b5d5f
fmt
Oct 21, 2021
b02db6c
Remove TagCache :cat-salute:
Oct 21, 2021
8a177ed
Add error handling
Oct 21, 2021
30c453e
lint
Oct 21, 2021
ec96dc0
Clean up and optimisations
Oct 24, 2021
c260a9a
Merge branch 'main' into drop-tag-cache
Oct 25, 2021
689eba8
Merge branch 'main' into drop-tag-cache
Oct 26, 2021
5f2d553
Fix compilation errors
Oct 26, 2021
f4b62c6
Refactor methods for consistency with instance.Search
Oct 26, 2021
d16fef7
Tweak tag lookup binary search to eliminate last comparison and fetch…
mdisibio Oct 25, 2021
ba89666
Use tempofb.FindTag
Oct 26, 2021
fbb66c1
Add SearchDataMap RangeKeys and RangeKeyValues
Oct 26, 2021
f3dfef9
make fmt
Oct 26, 2021
744be0c
Update CHANGELOG.md
Oct 26, 2021
b8724ee
Cast to string once
Oct 27, 2021
16570c7
Reuse SearchEntry buffer where possible
mdisibio Oct 27, 2021
ab3b5b4
fix key/value typo
mdisibio Oct 27, 2021
f0ceedc
Merge pull request #1 from mdisibio/drog-tag-cache-2
Oct 28, 2021
b66d696
Merge branch 'main' into drop-tag-cache
Oct 28, 2021
15292e1
Merge branch 'main' into drop-tag-cache
Oct 28, 2021
1fd1015
Merge branch 'main' into drop-tag-cache
Nov 8, 2021
6fb4a90
Merge branch 'main' into drop-tag-cache
Nov 8, 2021
6943eb3
Merge branch 'main' into pr-1068
annanay25 Nov 25, 2021
362d97c
Add limit on response size for a tag-values query
annanay25 Nov 25, 2021
f7e95f0
Lint and CHANGELOG
annanay25 Dec 1, 2021
535263c
lint, fix test by adding userID to ctx
annanay25 Dec 2, 2021
c33b392
Merge branch 'main' into pr-1068
annanay25 Dec 2, 2021
c0d8ac5
Merge branch 'main' into pr-1068
annanay25 Dec 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
## main / unreleased

* [CHANGE] Search: Add new per-tenant limit `max_bytes_per_tag_values_query` to limit the size of tag-values response. [#1068](https://github.com/grafana/tempo/pull/1068) (@annanay25)
* [ENHANCEMENT] Expose `upto` parameter on hedged requests for each backend with `hedge_requests_up_to`. [#1085](https://github.com/grafana/tempo/pull/1085) (@joe-elliott)
* [ENHANCEMENT] Search: drop use of TagCache, extract tags and tag values on-demand [#1068](https://github.com/grafana/tempo/pull/1068) (@kvrhdn)
* [ENHANCEMENT] Jsonnet: add `$._config.namespace` to filter by namespace in cortex metrics [#1098](https://github.com/grafana/tempo/pull/1098) (@mapno)
* [ENHANCEMENT] Add middleware to compress frontend HTTP responses with gzip if requested [#1080](https://github.com/grafana/tempo/pull/1080) (@kvrhdn, @zalegrala)
* [ENHANCEMENT] Allow query disablement in vulture [#1117](https://github.com/grafana/tempo/pull/1117) (@zalegrala)
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (t *App) setupModuleManager() error {
Ring: {Server, MemberlistKV},
Distributor: {Ring, Server, Overrides},
Ingester: {Store, Server, Overrides, MemberlistKV},
Querier: {Store, Ring},
Querier: {Store, Ring, Overrides},
Compactor: {Store, Server, Overrides, MemberlistKV},
SingleBinary: {Compactor, QueryFrontend, Querier, Ingester, Distributor},
ScalableSingleBinary: {SingleBinary},
Expand Down
1 change: 1 addition & 0 deletions example/docker-compose/tempo-search/overrides.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ overrides:
max_global_traces_per_user: 0
max_bytes_per_trace: 50000
max_search_bytes_per_trace: 0
max_bytes_per_tag_values_query: 5000000
block_retention: 0s
3 changes: 0 additions & 3 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,6 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
if err != nil {
level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to complete block", "err", err)
}

// periodically purge tag cache, keep tags within complete block timeout (i.e. data that is locally)
instance.PurgeExpiredSearchTags(time.Now().Add(-i.cfg.CompleteBlockTimeout))
}

func (i *Ingester) flushLoop(j int) {
Expand Down
18 changes: 8 additions & 10 deletions modules/ingester/ingester_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,12 @@ func (i *Ingester) SearchTags(ctx context.Context, req *tempopb.SearchTagsReques
return &tempopb.SearchTagsResponse{}, nil
}

tags := inst.GetSearchTags()

resp := &tempopb.SearchTagsResponse{
TagNames: tags,
res, err := inst.SearchTags(ctx)
if err != nil {
return nil, err
}

return resp, nil
return res, nil
}

func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagValuesRequest) (*tempopb.SearchTagValuesResponse, error) {
Expand All @@ -56,13 +55,12 @@ func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVa
return &tempopb.SearchTagValuesResponse{}, nil
}

vals := inst.GetSearchTagValues(req.TagName)

resp := &tempopb.SearchTagValuesResponse{
TagValues: vals,
res, err := inst.SearchTagValues(ctx, req.TagName)
if err != nil {
return nil, err
}

return resp, nil
return res, nil
}

// todo(search): consolidate. this only exists so that the ingester continues to implement the tempopb.QuerierServer interface.
Expand Down
6 changes: 0 additions & 6 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type instance struct {
searchHeadBlock *searchStreamingBlockEntry
searchAppendBlocks map[*wal.AppendBlock]*searchStreamingBlockEntry
searchCompleteBlocks map[*wal.LocalBlock]*searchLocalBlockEntry
searchTagCache *search.TagCache

lastBlockCut time.Time

Expand Down Expand Up @@ -107,7 +106,6 @@ func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer, l *
traces: map[uint32]*trace{},
searchAppendBlocks: map[*wal.AppendBlock]*searchStreamingBlockEntry{},
searchCompleteBlocks: map[*wal.LocalBlock]*searchLocalBlockEntry{},
searchTagCache: search.NewTagCache(),

instanceID: instanceID,
tracesCreatedTotal: metricTracesCreatedTotal.WithLabelValues(instanceID),
Expand Down Expand Up @@ -176,10 +174,6 @@ func (i *instance) PushBytes(ctx context.Context, id []byte, traceBytes []byte,
return status.Errorf(codes.FailedPrecondition, "%s max live traces per tenant exceeded: %v", overrides.ErrorPrefixLiveTracesExceeded, err)
}

if searchData != nil {
i.RecordSearchLookupValues(searchData)
}

i.tracesMtx.Lock()
defer i.tracesMtx.Unlock()

Expand Down
193 changes: 182 additions & 11 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package ingester
import (
"context"
"sort"
"time"

cortex_util "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log/level"
"github.com/grafana/tempo/pkg/util"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
"github.com/weaveworks/common/user"

"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/pkg/tempopb"
Expand Down Expand Up @@ -93,6 +94,8 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr *

span.LogFields(ot_log.Event("live traces mtx acquired"))

entry := &tempofb.SearchEntry{} // buffer

for _, t := range i.traces {
if sr.Quit() {
return
Expand All @@ -106,7 +109,7 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr *
for _, s := range t.searchData {
sr.AddBytesInspected(uint64(len(s)))

entry := tempofb.SearchEntryFromBytes(s)
entry.Reset(s)
if p.Matches(entry) {
newResult := search.GetSearchResultFromData(entry)
if result != nil {
Expand Down Expand Up @@ -181,19 +184,187 @@ func (i *instance) searchLocalBlocks(ctx context.Context, p search.Pipeline, sr
}
}

func (i *instance) GetSearchTags() []string {
return i.searchTagCache.GetNames()
func (i *instance) SearchTags(ctx context.Context) (*tempopb.SearchTagsResponse, error) {
tags := map[string]struct{}{}

kv := &tempofb.KeyValues{}
err := i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) {
for i, ii := 0, entry.TagsLength(); i < ii; i++ {
entry.Tags(kv, i)
key := string(kv.Key())
// check the tag is already set, this is more performant with repetitive values
if _, ok := tags[key]; !ok {
tags[key] = struct{}{}
}
}
})
if err != nil {
return nil, err
}

err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error {
return block.Tags(ctx, tags)
})
if err != nil {
return nil, err
}

return &tempopb.SearchTagsResponse{
TagNames: extractKeys(tags),
}, nil
}

func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempopb.SearchTagValuesResponse, error) {
values := map[string]struct{}{}

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
// get limit from override
maxBytesPerTagValuesQuery := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)

kv := &tempofb.KeyValues{}
tagNameBytes := []byte(tagName)
err = i.visitSearchEntriesLiveTraces(ctx, func(entry *tempofb.SearchEntry) {
kv := tempofb.FindTag(entry, kv, tagNameBytes)
if kv != nil {
for i, ii := 0, kv.ValueLength(); i < ii; i++ {
key := string(kv.Value(i))
// check the value is already set, this is more performant with repetitive values
if _, ok := values[key]; !ok {
values[key] = struct{}{}
}
}
}
})
if err != nil {
return nil, err
}

// check if size of values map is within limit after scanning live traces
if !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(cortex_util.Logger).Log("msg", "size of tag values from live traces exceeded limit, reduce cardinality or size of tags", "tag", tagName)
// return empty response to avoid querier OOMs
return &tempopb.SearchTagValuesResponse{
TagValues: []string{},
}, nil
}

err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error {
return block.TagValues(ctx, tagName, values)
})
if err != nil {
return nil, err
}

// check if size of values map is within limit after scanning all blocks
if !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(cortex_util.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName)
// return empty response to avoid querier OOMs
return &tempopb.SearchTagValuesResponse{
TagValues: []string{},
}, nil
}

return &tempopb.SearchTagValuesResponse{
TagValues: extractKeys(values),
}, nil
}

func (i *instance) visitSearchEntriesLiveTraces(ctx context.Context, visitFn func(entry *tempofb.SearchEntry)) error {
span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchEntriesLiveTraces")
defer span.Finish()

i.tracesMtx.Lock()
defer i.tracesMtx.Unlock()

se := &tempofb.SearchEntry{}
for _, t := range i.traces {
for _, s := range t.searchData {
se.Reset(s)
visitFn(se)

if err := ctx.Err(); err != nil {
return err
}
}
}
return nil
}

func (i *instance) visitSearchableBlocks(ctx context.Context, visitFn func(block search.SearchableBlock) error) error {
i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()

err := i.visitSearchableBlocksWAL(ctx, visitFn)
if err != nil {
return err
}

return i.visitSearchableBlocksLocalBlocks(ctx, visitFn)
}

func (i *instance) GetSearchTagValues(tagName string) []string {
return i.searchTagCache.GetValues(tagName)
// visitSearchableBlocksWAL visits every WAL block. Must be called under lock.
func (i *instance) visitSearchableBlocksWAL(ctx context.Context, visitFn func(block search.SearchableBlock) error) error {
span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksWAL")
defer span.Finish()

visitUnderLock := func(entry *searchStreamingBlockEntry) error {
entry.mtx.RLock()
defer entry.mtx.RUnlock()

return visitFn(entry.b)
}

err := visitUnderLock(i.searchHeadBlock)
if err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}

for _, b := range i.searchAppendBlocks {
err := visitUnderLock(b)
if err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
}
return nil
}

func (i *instance) RecordSearchLookupValues(b []byte) {
s := tempofb.SearchEntryFromBytes(b)
i.searchTagCache.SetData(time.Now(), s)
// visitSearchableBlocksWAL visits every local block. Must be called under lock.
func (i *instance) visitSearchableBlocksLocalBlocks(ctx context.Context, visitFn func(block search.SearchableBlock) error) error {
span, _ := opentracing.StartSpanFromContext(ctx, "instance.visitSearchableBlocksLocalBlocks")
defer span.Finish()

visitUnderLock := func(entry *searchLocalBlockEntry) error {
entry.mtx.RLock()
defer entry.mtx.RUnlock()

return visitFn(entry.b)
}

for _, b := range i.searchCompleteBlocks {
err := visitUnderLock(b)
if err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
}
return nil
}

func (i *instance) PurgeExpiredSearchTags(before time.Time) {
i.searchTagCache.PurgeExpired(before)
func extractKeys(set map[string]struct{}) []string {
keys := make([]string, 0, len(set))
for k := range set {
keys = append(keys, k)
}
return keys
}
6 changes: 4 additions & 2 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,13 @@ func TestInstanceSearchDoesNotRace(t *testing.T) {
})

go concurrent(func() {
i.GetSearchTags()
_, err := i.SearchTags(context.Background())
require.NoError(t, err, "error getting search tags")
})

go concurrent(func() {
i.GetSearchTagValues(tagKey)
_, err := i.SearchTagValues(context.Background(), tagKey)
require.NoError(t, err, "error getting search tag values")
})

time.Sleep(2000 * time.Millisecond)
Expand Down
22 changes: 15 additions & 7 deletions modules/overrides/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ const (
ErrorPrefixRateLimited = "RATE_LIMITED:"

// metrics
MetricMaxLocalTracesPerUser = "max_local_traces_per_user"
MetricMaxGlobalTracesPerUser = "max_global_traces_per_user"
MetricMaxBytesPerTrace = "max_bytes_per_trace"
MetricMaxSearchBytesPerTrace = "max_search_bytes_per_trace"
MetricIngestionRateLimitBytes = "ingestion_rate_limit_bytes"
MetricIngestionBurstSizeBytes = "ingestion_burst_size_bytes"
MetricBlockRetention = "block_retention"
MetricMaxLocalTracesPerUser = "max_local_traces_per_user"
MetricMaxGlobalTracesPerUser = "max_global_traces_per_user"
MetricMaxBytesPerTrace = "max_bytes_per_trace"
MetricMaxSearchBytesPerTrace = "max_search_bytes_per_trace"
MetricMaxBytesPerTagValuesQuery = "max_bytes_per_tag_values_query"
MetricIngestionRateLimitBytes = "ingestion_rate_limit_bytes"
MetricIngestionBurstSizeBytes = "ingestion_burst_size_bytes"
MetricBlockRetention = "block_retention"
)

var (
Expand Down Expand Up @@ -57,6 +58,9 @@ type Limits struct {
// Compactor enforced limits.
BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"`

// Querier enforced limits.
MaxBytesPerTagValuesQuery int `yaml:"max_bytes_per_tag_values_query" json:"max_bytes_per_tag_values_query"`

// Configuration for overrides, convenient if it goes here.
PerTenantOverrideConfig string `yaml:"per_tenant_override_config" json:"per_tenant_override_config"`
PerTenantOverridePeriod model.Duration `yaml:"per_tenant_override_period" json:"per_tenant_override_period"`
Expand All @@ -75,6 +79,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxBytesPerTrace, "ingester.max-bytes-per-trace", 50e5, "Maximum size of a trace in bytes. 0 to disable.")
f.IntVar(&l.MaxSearchBytesPerTrace, "ingester.max-search-bytes-per-trace", 50e3, "Maximum size of search data per trace in bytes. 0 to disable.")

// Querier limits
f.IntVar(&l.MaxBytesPerTagValuesQuery, "querier.max-bytes-per-tag-values-query", 50e5, "Maximum size of response for a tag-values query. Used mainly to limit large the number of values associated with a particular tag")

f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides.")
_ = l.PerTenantOverridePeriod.Set("10s")
f.Var(&l.PerTenantOverridePeriod, "limits.per-user-override-period", "Period with this to reload the overrides.")
Expand All @@ -89,6 +96,7 @@ func (l *Limits) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxGlobalTracesPerUser), MetricMaxGlobalTracesPerUser)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxBytesPerTrace), MetricMaxBytesPerTrace)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxSearchBytesPerTrace), MetricMaxSearchBytesPerTrace)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.MaxBytesPerTagValuesQuery), MetricMaxBytesPerTagValuesQuery)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.IngestionRateLimitBytes), MetricIngestionRateLimitBytes)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.IngestionBurstSizeBytes), MetricIngestionBurstSizeBytes)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(l.BlockRetention), MetricBlockRetention)
Expand Down
Loading