Skip to content

Commit

Permalink
only fetches one chunk per series in /series (#1914)
Browse files Browse the repository at this point in the history
* only fetches one chunk per series in /series

* Update pkg/storage/store.go

* chunksBySeries naming

* sorts series response, adds tests for chunks < batchsize
  • Loading branch information
owen-d authored Apr 8, 2020
1 parent 0e53190 commit 3347871
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 49 deletions.
12 changes: 12 additions & 0 deletions pkg/logproto/extensions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package logproto

import "github.com/prometheus/prometheus/pkg/labels"

type SeriesIdentifiers []SeriesIdentifier

func (ids SeriesIdentifiers) Len() int { return len(ids) }
func (ids SeriesIdentifiers) Swap(i, j int) { ids[i], ids[j] = ids[j], ids[i] }
func (ids SeriesIdentifiers) Less(i, j int) bool {
a, b := labels.FromMap(ids[i].Labels), labels.FromMap(ids[j].Labels)
return labels.Compare(a, b) <= 0
}
17 changes: 2 additions & 15 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/marshal"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/validation"
Expand Down Expand Up @@ -484,12 +483,9 @@ func (q *Querier) seriesForMatchers(
groups []string,
) ([]logproto.SeriesIdentifier, error) {

ctx, cancel := context.WithCancel(ctx)
defer cancel()

var results []logproto.SeriesIdentifier
for _, group := range groups {
iter, err := q.store.LazyQuery(ctx, logql.SelectParams{
ids, err := q.store.GetSeries(ctx, logql.SelectParams{
QueryRequest: &logproto.QueryRequest{
Selector: group,
Limit: 1,
Expand All @@ -502,19 +498,10 @@ func (q *Querier) seriesForMatchers(
return nil, err
}

for iter.Next() {
ls, err := marshal.NewLabelSet(iter.Labels())
if err != nil {
return nil, err
}
results = append(results, ids...)

results = append(results, logproto.SeriesIdentifier{
Labels: ls.Map(),
})
}
}
return results, nil

}

func (q *Querier) validateQueryRequest(ctx context.Context, req *logproto.QueryRequest) error {
Expand Down
18 changes: 9 additions & 9 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ func (s *storeMock) DeleteSeriesIDs(ctx context.Context, from, through model.Tim
panic("don't call me please")
}

func (s *storeMock) GetSeries(ctx context.Context, req logql.SelectParams) ([]logproto.SeriesIdentifier, error) {
args := s.Called(ctx, req)
res := args.Get(0)
if res == nil {
return []logproto.SeriesIdentifier(nil), args.Error(1)
}
return res.([]logproto.SeriesIdentifier), args.Error(1)
}

func (s *storeMock) Stop() {

}
Expand Down Expand Up @@ -314,15 +323,6 @@ func mockStreamIterator(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockStream(from, quantity))
}

func mockStreamIterFromLabelSets(from, quantity int, sets []string) iter.EntryIterator {
var streams []*logproto.Stream
for _, s := range sets {
streams = append(streams, mockStreamWithLabels(from, quantity, s))
}

return iter.NewStreamsIterator(context.Background(), streams, logproto.FORWARD)
}

// mockStream return a stream with quantity entries, where entries timestamp and
// line string are constructed as sequential numbers starting at from
func mockStream(from int, quantity int) *logproto.Stream {
Expand Down
27 changes: 12 additions & 15 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func TestQuerier_SeriesAPI(t *testing.T) {
func(store *storeMock, querier *queryClientMock, ingester *querierClientMock, limits validation.Limits, req *logproto.SeriesRequest) {
ingester.On("Series", mock.Anything, req, mock.Anything).Return(nil, errors.New("tst-err"))

store.On("LazyQuery", mock.Anything, mock.Anything).Return(mockStreamIterator(0, 0), nil)
store.On("GetSeries", mock.Anything, mock.Anything).Return(nil, nil)
},
func(t *testing.T, q *Querier, req *logproto.SeriesRequest) {
ctx := user.InjectOrgID(context.Background(), "test")
Expand All @@ -345,7 +345,7 @@ func TestQuerier_SeriesAPI(t *testing.T) {
{"a": "1"},
}), nil)

store.On("LazyQuery", mock.Anything, mock.Anything).Return(nil, context.DeadlineExceeded)
store.On("GetSeries", mock.Anything, mock.Anything).Return(nil, context.DeadlineExceeded)
},
func(t *testing.T, q *Querier, req *logproto.SeriesRequest) {
ctx := user.InjectOrgID(context.Background(), "test")
Expand All @@ -358,9 +358,7 @@ func TestQuerier_SeriesAPI(t *testing.T) {
mkReq([]string{`{a="1"}`}),
func(store *storeMock, querier *queryClientMock, ingester *querierClientMock, limits validation.Limits, req *logproto.SeriesRequest) {
ingester.On("Series", mock.Anything, req, mock.Anything).Return(mockSeriesResponse(nil), nil)

store.On("LazyQuery", mock.Anything, mock.Anything).
Return(mockStreamIterator(0, 0), nil)
store.On("GetSeries", mock.Anything, mock.Anything).Return(nil, nil)
},
func(t *testing.T, q *Querier, req *logproto.SeriesRequest) {
ctx := user.InjectOrgID(context.Background(), "test")
Expand All @@ -378,11 +376,10 @@ func TestQuerier_SeriesAPI(t *testing.T) {
{"a": "1", "b": "3"},
}), nil)

store.On("LazyQuery", mock.Anything, mock.Anything).
Return(mockStreamIterFromLabelSets(0, 10, []string{
`{a="1",b="4"}`,
`{a="1",b="5"}`,
}), nil)
store.On("GetSeries", mock.Anything, mock.Anything).Return([]logproto.SeriesIdentifier{
{Labels: map[string]string{"a": "1", "b": "4"}},
{Labels: map[string]string{"a": "1", "b": "5"}},
}, nil)
},
func(t *testing.T, q *Querier, req *logproto.SeriesRequest) {
ctx := user.InjectOrgID(context.Background(), "test")
Expand All @@ -404,11 +401,11 @@ func TestQuerier_SeriesAPI(t *testing.T) {
{"a": "1", "b": "2"},
}), nil)

store.On("LazyQuery", mock.Anything, mock.Anything).
Return(mockStreamIterFromLabelSets(0, 10, []string{
`{a="1",b="2"}`,
`{a="1",b="3"}`,
}), nil)
store.On("GetSeries", mock.Anything, mock.Anything).Return([]logproto.SeriesIdentifier{
{Labels: map[string]string{"a": "1", "b": "2"}},
{Labels: map[string]string{"a": "1", "b": "3"}},
}, nil)

},
func(t *testing.T, q *Querier, req *logproto.SeriesRequest) {
ctx := user.InjectOrgID(context.Background(), "test")
Expand Down
108 changes: 98 additions & 10 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"context"
"flag"
"sort"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
Expand All @@ -13,6 +14,7 @@ import (

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util"
Expand All @@ -34,6 +36,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
type Store interface {
chunk.Store
LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error)
GetSeries(ctx context.Context, req logql.SelectParams) ([]logproto.SeriesIdentifier, error)
}

type store struct {
Expand All @@ -53,34 +56,38 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
}, nil
}

// LazyQuery returns an iterator that will query the store for more chunks while iterating instead of fetching all chunks upfront
// for that request.
func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) {
storeStats := stats.GetStoreData(ctx)

// decodeReq sanitizes an incoming request, rounds bounds, and appends the __name__ matcher
func decodeReq(req logql.SelectParams) ([]*labels.Matcher, logql.LineFilter, model.Time, model.Time, error) {
expr, err := req.LogSelector()
if err != nil {
return nil, err
return nil, nil, 0, 0, err
}

filter, err := expr.Filter()
if err != nil {
return nil, err
return nil, nil, 0, 0, err
}

matchers := expr.Matchers()
nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs")
if err != nil {
return nil, err
return nil, nil, 0, 0, err
}
matchers = append(matchers, nameLabelMatcher)

from, through := util.RoundToMilliseconds(req.Start, req.End)
return matchers, filter, from, through, nil
}

// lazyChunks is an internal function used to resolve a set of lazy chunks from the store without actually loading them. It's used internally by `LazyQuery` and `GetSeries`
func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from, through model.Time) ([]*chunkenc.LazyChunk, error) {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

matchers = append(matchers, nameLabelMatcher)
from, through := util.RoundToMilliseconds(req.Start, req.End)
storeStats := stats.GetStoreData(ctx)

chks, fetchers, err := s.GetChunkRefs(ctx, userID, from, through, matchers...)
if err != nil {
return nil, err
Expand All @@ -99,6 +106,87 @@ func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.Ent
lazyChunks = append(lazyChunks, &chunkenc.LazyChunk{Chunk: c, Fetcher: fetchers[i]})
}
}
return lazyChunks, nil
}

func (s *store) GetSeries(ctx context.Context, req logql.SelectParams) ([]logproto.SeriesIdentifier, error) {
matchers, _, from, through, err := decodeReq(req)
if err != nil {
return nil, err
}

lazyChunks, err := s.lazyChunks(ctx, matchers, from, through)
if err != nil {
return nil, err
}

// group chunks by series
chunksBySeries := partitionBySeriesChunks(lazyChunks)

firstChunksPerSeries := make([]*chunkenc.LazyChunk, 0, len(chunksBySeries))

// discard all but one chunk per series
for _, chks := range chunksBySeries {
firstChunksPerSeries = append(firstChunksPerSeries, chks[0][0])
}

results := make(logproto.SeriesIdentifiers, 0, len(firstChunksPerSeries))

// bound concurrency
groups := make([][]*chunkenc.LazyChunk, 0, len(firstChunksPerSeries)/s.cfg.MaxChunkBatchSize+1)

split := s.cfg.MaxChunkBatchSize
if len(firstChunksPerSeries) < split {
split = len(firstChunksPerSeries)
}

for split > 0 {
groups = append(groups, firstChunksPerSeries[:split])
firstChunksPerSeries = firstChunksPerSeries[split:]
if len(firstChunksPerSeries) < split {
split = len(firstChunksPerSeries)
}
}

for _, group := range groups {
err = fetchLazyChunks(ctx, group)
if err != nil {
return nil, err
}

outer:
for _, chk := range group {
for _, matcher := range matchers {
if !matcher.Matches(chk.Chunk.Metric.Get(matcher.Name)) {
continue outer
}
}

m := chk.Chunk.Metric.Map()
delete(m, labels.MetricName)
results = append(results, logproto.SeriesIdentifier{
Labels: m,
})
}
}
sort.Sort(results)
return results, nil

}

// LazyQuery returns an iterator that will query the store for more chunks while iterating instead of fetching all chunks upfront
// for that request.
func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) {
matchers, filter, from, through, err := decodeReq(req)
if err != nil {
return nil, err
}

lazyChunks, err := s.lazyChunks(ctx, matchers, from, through)
if err != nil {
return nil, err
}

return newBatchChunkIterator(ctx, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, req.QueryRequest), nil

}
Expand Down
Loading

0 comments on commit 3347871

Please sign in to comment.