Skip to content

Commit

Permalink
Enforce the time range on ingester search (#2783)
Browse files Browse the repository at this point in the history
* enforce range on ingester search

Signed-off-by: Joe Elliott <[email protected]>

* tests

Signed-off-by: Joe Elliott <[email protected]>

* changelog

Signed-off-by: Joe Elliott <[email protected]>

---------

Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott authored Aug 11, 2023
1 parent 24b7c27 commit bc3f543
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [ENHANCEMENT] Update /api/metrics/summary to correctly handle missing attributes and improve performance of TraceQL `select()` queries. [#2765](https://github.com/grafana/tempo/pull/2765) (@mdisibio)
* [BUGFIX] Fix panic in metrics summary api [#2738](https://github.com/grafana/tempo/pull/2738) (@mdisibio)
* [BUGFIX] Fix node role auth IDMSv1 [#2760](https://github.com/grafana/tempo/pull/2760) (@coufalja)
* [BUGFIX] Only search ingester blocks that fall within the request time range. [#2783](https://github.com/grafana/tempo/pull/2783) (@joe-elliott)

## v2.2.0 / 2023-07-31

Expand Down
32 changes: 27 additions & 5 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"

"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/user"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
Expand All @@ -17,6 +16,7 @@ import (
"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/log"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
Expand Down Expand Up @@ -49,7 +49,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
// then headblockMtx. Even if the likelihood is low it is a statistical certainly
// that eventually a deadlock will occur.
i.headBlockMtx.RLock()
i.searchBlock(ctx, req, sr, i.headBlock.BlockMeta().BlockID, i.headBlock, i.headBlockMtx.RUnlock)
i.searchBlock(ctx, req, sr, i.headBlock.BlockMeta(), i.headBlock, i.headBlockMtx.RUnlock)

// Lock blocks mutex until all search tasks are finished and this function exists. This avoids
// deadlocking with other activity (ingest, flushing), caused by releasing
Expand All @@ -58,11 +58,11 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
defer i.blocksMtx.RUnlock()

for _, b := range i.completingBlocks {
i.searchBlock(ctx, req, sr, b.BlockMeta().BlockID, b, nil)
i.searchBlock(ctx, req, sr, b.BlockMeta(), b, nil)
}

for _, b := range i.completeBlocks {
i.searchBlock(ctx, req, sr, b.BlockMeta().BlockID, b, nil)
i.searchBlock(ctx, req, sr, b.BlockMeta(), b, nil)
}

sr.AllWorkersStarted()
Expand Down Expand Up @@ -101,7 +101,17 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem

// searchBlock starts a search task for the given block. The block must already be under lock,
// and this method calls cleanup to unlock the block when done.
func (i *instance) searchBlock(ctx context.Context, req *tempopb.SearchRequest, sr *search.Results, blockID uuid.UUID, block common.Searcher, cleanup func()) {
func (i *instance) searchBlock(ctx context.Context, req *tempopb.SearchRequest, sr *search.Results, meta *backend.BlockMeta, block common.Searcher, cleanup func()) {
// confirm block should be included in search
if !includeBlock(meta, req) {
if cleanup != nil {
cleanup()
}
return
}

blockID := meta.BlockID

sr.StartWorker()
go func(e common.Searcher, cleanup func()) {
if cleanup != nil {
Expand Down Expand Up @@ -537,3 +547,15 @@ func extractMatchers(query string) string {

return q.String()
}

// includeBlock uses the provided time range to determine if the block should be included in the search.
func includeBlock(b *backend.BlockMeta, req *tempopb.SearchRequest) bool {
start := int64(req.Start)
end := int64(req.End)

if start == 0 || end == 0 {
return true
}

return b.StartTime.Unix() <= end && b.EndTime.Unix() >= start
}
94 changes: 89 additions & 5 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
)

const (
Expand Down Expand Up @@ -189,16 +190,16 @@ func TestInstanceSearchWithStartAndEnd(t *testing.T) {

// writeTracesForSearch will build spans that end 1 second from now
// query 2 min range to have extra slack and always be within range
sr = search(req, uint32(time.Now().Add(-time.Minute).Unix()), uint32(time.Now().Add(time.Minute).Unix()))
sr = search(req, uint32(time.Now().Add(-5*time.Minute).Unix()), uint32(time.Now().Add(5*time.Minute).Unix()))
assert.Len(t, sr.Traces, len(ids))
assert.Equal(t, sr.Metrics.InspectedTraces, inspectedTraces)
checkEqual(t, ids, sr)

// search with start=1m from now, end=2m from now
sr = search(req, uint32(time.Now().Add(time.Minute).Unix()), uint32(time.Now().Add(2*time.Minute).Unix()))
// search with start=5m from now, end=10m from now
sr = search(req, uint32(time.Now().Add(5*time.Minute).Unix()), uint32(time.Now().Add(10*time.Minute).Unix()))
// no results and should inspect 100 traces in wal
assert.Len(t, sr.Traces, 0)
assert.Equal(t, sr.Metrics.InspectedTraces, inspectedTraces)
assert.Equal(t, uint32(0), sr.Metrics.InspectedTraces)
}

req := &tempopb.SearchRequest{
Expand Down Expand Up @@ -477,6 +478,7 @@ func writeTracesForSearch(t *testing.T, i *instance, tagKey string, tagValue str
ids := [][]byte{}
expectedTagValues := []string{}

now := time.Now()
for j := 0; j < numTraces; j++ {
id := make([]byte, 16)
_, err := crand.Read(id)
Expand All @@ -491,13 +493,22 @@ func writeTracesForSearch(t *testing.T, i *instance, tagKey string, tagValue str
ids = append(ids, id)

testTrace := test.MakeTrace(10, id)
// add the time
for _, batch := range testTrace.Batches {
for _, ils := range batch.InstrumentationLibrarySpans {
for _, span := range ils.Spans {
span.StartTimeUnixNano = uint64(now.UnixNano())
span.EndTimeUnixNano = uint64(now.UnixNano())
}
}
}
testTrace.Batches[0].ScopeSpans[0].Spans[0].Attributes = append(
testTrace.Batches[0].ScopeSpans[0].Spans[0].Attributes,
kv,
)
trace.SortTrace(testTrace)

traceBytes, err := dec.PrepareForWrite(testTrace, 0, 0)
traceBytes, err := dec.PrepareForWrite(testTrace, uint32(now.Unix()), uint32(now.Unix()))
require.NoError(t, err)

// searchData will be nil if not
Expand Down Expand Up @@ -931,3 +942,76 @@ func BenchmarkExtractMatchers(b *testing.B) {
})
}
}

func TestIncludeBlock(t *testing.T) {
tests := []struct {
blocKStart int64
blockEnd int64
reqStart uint32
reqEnd uint32
expected bool
}{
// if request is 0s, block start/end don't matter
{
blocKStart: 100,
blockEnd: 200,
reqStart: 0,
reqEnd: 0,
expected: true,
},
// req before
{
blocKStart: 100,
blockEnd: 200,
reqStart: 50,
reqEnd: 99,
expected: false,
},
// overlap front
{
blocKStart: 100,
blockEnd: 200,
reqStart: 50,
reqEnd: 150,
expected: true,
},
// inside block
{
blocKStart: 100,
blockEnd: 200,
reqStart: 110,
reqEnd: 150,
expected: true,
},
// overlap end
{
blocKStart: 100,
blockEnd: 200,
reqStart: 150,
reqEnd: 250,
expected: true,
},
// after block
{
blocKStart: 100,
blockEnd: 200,
reqStart: 201,
reqEnd: 250,
expected: false,
},
}

for _, tc := range tests {
t.Run(fmt.Sprintf("%d-%d-%d-%d", tc.blocKStart, tc.blockEnd, tc.reqStart, tc.reqEnd), func(t *testing.T) {
actual := includeBlock(&backend.BlockMeta{
StartTime: time.Unix(tc.blocKStart, 0),
EndTime: time.Unix(tc.blockEnd, 0),
}, &tempopb.SearchRequest{
Start: tc.reqStart,
End: tc.reqEnd,
})

require.Equal(t, tc.expected, actual)
})
}
}

0 comments on commit bc3f543

Please sign in to comment.