Skip to content

Commit

Permalink
Respect shard number in series api (#4577)
Browse files Browse the repository at this point in the history
* Respect shard number in series api

* Refactor codes

* remove duplicate logic in instance
* format with goimport

* Improve complexity of getting all fingerprints
  • Loading branch information
taisho6339 authored Oct 29, 2021
1 parent 0c97afa commit 0e21f02
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 16 deletions.
42 changes: 36 additions & 6 deletions pkg/ingester/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,21 +144,26 @@ func labelsString(b *bytes.Buffer, ls labels.Labels) {

// Lookup all fingerprints for the provided matchers.
func (ii *InvertedIndex) Lookup(matchers []*labels.Matcher, shard *astmapper.ShardAnnotation) ([]model.Fingerprint, error) {
if len(matchers) == 0 {
return nil, nil
}

if err := validateShard(ii.totalShards, shard); err != nil {
return nil, err
}

result := []model.Fingerprint{}
var result []model.Fingerprint
shards := ii.getShards(shard)

// if no matcher is specified, all fingerprints would be returned
if len(matchers) == 0 {
for i := range shards {
fps := shards[i].allFPs()
result = append(result, fps...)
}
return result, nil
}

for i := range shards {
fps := shards[i].lookup(matchers)
result = append(result, fps...)
}

return result, nil
}

Expand Down Expand Up @@ -310,6 +315,31 @@ func (shard *indexShard) lookup(matchers []*labels.Matcher) []model.Fingerprint
return result
}

func (shard *indexShard) allFPs() model.Fingerprints {
shard.mtx.RLock()
defer shard.mtx.RUnlock()

var fps model.Fingerprints
for _, ie := range shard.idx {
for _, ive := range ie.fps {
fps = append(fps, ive.fps...)
}
}
if len(fps) == 0 {
return nil
}

var result model.Fingerprints
var m = map[model.Fingerprint]struct{}{}
for _, fp := range fps {
if _, ok := m[fp]; !ok {
m[fp] = struct{}{}
result = append(result, fp)
}
}
return result
}

func (shard *indexShard) labelNames() []string {
shard.mtx.RLock()
defer shard.mtx.RUnlock()
Expand Down
20 changes: 20 additions & 0 deletions pkg/ingester/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ func Test_hash_mapping(t *testing.T) {
}
}

func Test_NoMatcherLookup(t *testing.T) {
lbs := labels.Labels{
labels.Label{Name: "foo", Value: "bar"},
labels.Label{Name: "hi", Value: "hello"},
}
// with no shard param
ii := NewWithShards(16)
ii.Add(cortexpb.FromLabelsToLabelAdapters(lbs), 1)
ids, err := ii.Lookup(nil, nil)
require.Nil(t, err)
require.Equal(t, model.Fingerprint(1), ids[0])

// with shard param
ii = NewWithShards(16)
ii.Add(cortexpb.FromLabelsToLabelAdapters(lbs), 1)
ids, err = ii.Lookup(nil, &astmapper.ShardAnnotation{Shard: int(labelsSeriesIDHash(lbs) % 16), Of: 16})
require.Nil(t, err)
require.Equal(t, model.Fingerprint(1), ids[0])
}

func Test_ConsistentMapping(t *testing.T) {
a := NewWithShards(16)
b := NewWithShards(32)
Expand Down
32 changes: 22 additions & 10 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,17 +310,10 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter
ingStats := stats.GetIngesterData(ctx)
var iters []iter.EntryIterator

var shard *astmapper.ShardAnnotation
shards, err := logql.ParseShards(req.Shards)
shard, err := parseShardFromRequest(req.Shards)
if err != nil {
return nil, err
}
if len(shards) > 1 {
return nil, errors.New("only one shard per ingester query is supported")
}
if len(shards) == 1 {
shard = &shards[0]
}

err = i.forMatchingStreams(
ctx,
Expand Down Expand Up @@ -418,13 +411,17 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo
if err != nil {
return nil, err
}
shard, err := parseShardFromRequest(req.Shards)
if err != nil {
return nil, err
}

var series []logproto.SeriesIdentifier

// If no matchers were supplied we include all streams.
if len(groups) == 0 {
series = make([]logproto.SeriesIdentifier, 0, len(i.streams))
err = i.forAllStreams(ctx, func(stream *stream) error {
err = i.forMatchingStreams(ctx, nil, shard, func(stream *stream) error {
// consider the stream only if it overlaps the request time range
if shouldConsiderStream(stream, req) {
series = append(series, logproto.SeriesIdentifier{
Expand All @@ -439,7 +436,7 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo
} else {
dedupedSeries := make(map[uint64]logproto.SeriesIdentifier)
for _, matchers := range groups {
err = i.forMatchingStreams(ctx, matchers, nil, func(stream *stream) error {
err = i.forMatchingStreams(ctx, matchers, shard, func(stream *stream) error {
// consider the stream only if it overlaps the request time range
if shouldConsiderStream(stream, req) {
// exit early when this stream was added by an earlier group
Expand Down Expand Up @@ -613,6 +610,21 @@ func (i *instance) openTailersCount() uint32 {
return uint32(len(i.tailers))
}

func parseShardFromRequest(reqShards []string) (*astmapper.ShardAnnotation, error) {
var shard *astmapper.ShardAnnotation
shards, err := logql.ParseShards(reqShards)
if err != nil {
return nil, err
}
if len(shards) > 1 {
return nil, errors.New("only one shard per ingester query is supported")
}
if len(shards) == 1 {
shard = &shards[0]
}
return shard, nil
}

func isDone(ctx context.Context) bool {
select {
case <-ctx.Done():
Expand Down
20 changes: 20 additions & 0 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/querier/astmapper"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -160,11 +162,13 @@ func Test_SeriesQuery(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
indexShards := 2

// just some random values
cfg := defaultConfig()
cfg.SyncPeriod = 1 * time.Minute
cfg.SyncMinUtilization = 0.20
cfg.IndexShards = indexShards

instance := newInstance(cfg, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)

Expand Down Expand Up @@ -212,6 +216,22 @@ func Test_SeriesQuery(t *testing.T) {
{Labels: map[string]string{"app": "test2", "job": "varlogs"}},
},
},
{
"overlapping request with shard param",
&logproto.SeriesRequest{
Start: currentTime.Add(1 * time.Nanosecond),
End: currentTime.Add(7 * time.Nanosecond),
Groups: []string{`{job="varlogs"}`},
Shards: []string{astmapper.ShardAnnotation{
Shard: 1,
Of: indexShards,
}.String()},
},
[]logproto.SeriesIdentifier{
// Separated by shard number
{Labels: map[string]string{"app": "test2", "job": "varlogs"}},
},
},
{
"request end time overlaps stream start time",
&logproto.SeriesRequest{
Expand Down

0 comments on commit 0e21f02

Please sign in to comment.