Skip to content

Commit

Permalink
Shards Series API. (#3856)
Browse files Browse the repository at this point in the history
* Shards Series API.

This PR introduces Series Queries Sharding. It does not check the boundaries of ingesters data since I'm assuming
#3852 will be merge first.

Signed-off-by: Cyril Tovena <[email protected]>

* Fixes tests sorting.

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena authored Jul 7, 2021
1 parent 97912b6 commit bee4567
Show file tree
Hide file tree
Showing 13 changed files with 513 additions and 179 deletions.
2 changes: 1 addition & 1 deletion pkg/loghttp/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func ParseSeriesQuery(r *http.Request) (*logproto.SeriesRequest, error) {
Start: start,
End: end,
Groups: deduped,
Shards: shards(r),
}, nil

}

func union(cols ...[]string) []string {
Expand Down
243 changes: 157 additions & 86 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ message SeriesRequest {
google.protobuf.Timestamp start = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp end = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
repeated string groups = 3;
repeated string shards = 4 [(gogoproto.jsontag) = "shards,omitempty"];
}

message SeriesResponse {
Expand Down Expand Up @@ -163,4 +164,4 @@ message GetChunkIDsRequest {

message GetChunkIDsResponse {
repeated string chunkIDs = 1;
}
}
14 changes: 6 additions & 8 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@ const (

// ShardingMetrics is the metrics wrapper used in shard mapping
type ShardingMetrics struct {
shards *prometheus.CounterVec // sharded queries total, partitioned by (streams/metric)
Shards *prometheus.CounterVec // sharded queries total, partitioned by (streams/metric)
ShardFactor prometheus.Histogram // per request shard factor
parsed *prometheus.CounterVec // parsed ASTs total, partitioned by (success/failure/noop)
shardFactor prometheus.Histogram // per request shard factor
}

func NewShardingMetrics(registerer prometheus.Registerer) *ShardingMetrics {

return &ShardingMetrics{
shards: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Shards: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "query_frontend_shards_total",
}, []string{"type"}),
parsed: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "query_frontend_sharding_parsed_queries_total",
}, []string{"type"}),
shardFactor: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
ShardFactor: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Name: "query_frontend_shard_factor",
Help: "Number of shards per request",
Expand Down Expand Up @@ -67,14 +66,14 @@ type shardRecorder struct {
// Add increments both the shard count and tracks it for the eventual histogram entry.
func (r *shardRecorder) Add(x int, key string) {
r.total += x
r.shards.WithLabelValues(key).Add(float64(x))
r.Shards.WithLabelValues(key).Add(float64(x))
}

// Finish idemptotently records a histogram entry with the total shard factor.
func (r *shardRecorder) Finish() {
if !r.done {
r.done = true
r.shardFactor.Observe(float64(r.total))
r.ShardFactor.Observe(float64(r.total))
}
}

Expand Down Expand Up @@ -203,7 +202,6 @@ func (m ShardMapper) mapSampleExpr(expr SampleExpr, r *shardRecorder) SampleExpr
// technically, std{dev,var} are also parallelizable if there is no cross-shard merging
// in descendent nodes in the AST. This optimization is currently avoided for simplicity.
func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr, r *shardRecorder) (SampleExpr, error) {

// if this AST contains unshardable operations, don't shard this at this level,
// but attempt to shard a child node.
if !expr.Shardable() {
Expand Down
10 changes: 6 additions & 4 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (q *Querier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest)
}

go func() {
storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups())
storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups(), req.Shards)
if err != nil {
errs <- err
return
Expand Down Expand Up @@ -441,20 +441,21 @@ func (q *Querier) seriesForMatchers(
ctx context.Context,
from, through time.Time,
groups []string,
shards []string,
) ([]logproto.SeriesIdentifier, error) {

var results []logproto.SeriesIdentifier
// If no matchers were specified for the series query,
// we send a query with an empty matcher which will match every series.
if len(groups) == 0 {
var err error
results, err = q.seriesForMatcher(ctx, from, through, "")
results, err = q.seriesForMatcher(ctx, from, through, "", shards)
if err != nil {
return nil, err
}
} else {
for _, group := range groups {
ids, err := q.seriesForMatcher(ctx, from, through, group)
ids, err := q.seriesForMatcher(ctx, from, through, group, shards)
if err != nil {
return nil, err
}
Expand All @@ -465,14 +466,15 @@ func (q *Querier) seriesForMatchers(
}

// seriesForMatcher fetches series from the store for a given matcher
func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time, matcher string) ([]logproto.SeriesIdentifier, error) {
func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time, matcher string, shards []string) ([]logproto.SeriesIdentifier, error) {
ids, err := q.store.GetSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: matcher,
Limit: 1,
Start: from,
End: through,
Direction: logproto.FORWARD,
Shards: shards,
},
})
if err != nil {
Expand Down
15 changes: 7 additions & 8 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (r *LokiSeriesRequest) LogToSpan(sp opentracing.Span) {
otlog.String("matchers", strings.Join(r.GetMatch(), ",")),
otlog.String("start", timestamp.Time(r.GetStart()).String()),
otlog.String("end", timestamp.Time(r.GetEnd()).String()),
otlog.String("shards", strings.Join(r.GetShards(), ",")),
)
}

Expand Down Expand Up @@ -196,7 +197,6 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Reque
default:
return nil, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf("unknown request path: %s", r.URL.Path))
}

}

func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) {
Expand Down Expand Up @@ -235,7 +235,9 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req
"end": []string{fmt.Sprintf("%d", request.EndTs.UnixNano())},
"match[]": request.Match,
}

if len(request.Shards) > 0 {
params["shards"] = request.Shards
}
u := &url.URL{
Path: "/loki/api/v1/series",
RawQuery: params.Encode(),
Expand Down Expand Up @@ -355,7 +357,6 @@ func (Codec) DecodeResponse(ctx context.Context, r *http.Response, req queryrang
return nil, httpgrpc.Errorf(http.StatusBadRequest, "unsupported response type")
}
}

}

func (Codec) EncodeResponse(ctx context.Context, res queryrange.Response) (*http.Response, error) {
Expand Down Expand Up @@ -482,7 +483,6 @@ func (Codec) MergeResponse(responses ...queryrange.Response) (queryrange.Respons
lokiSeriesData = append(lokiSeriesData, series)
uniqueSeries[series.String()] = struct{}{}
}

}
}

Expand All @@ -504,7 +504,6 @@ func (Codec) MergeResponse(responses ...queryrange.Response) (queryrange.Respons
names = append(names, labelName)
uniqueNames[labelName] = struct{}{}
}

}
}

Expand All @@ -520,7 +519,6 @@ func (Codec) MergeResponse(responses ...queryrange.Response) (queryrange.Respons

// mergeOrderedNonOverlappingStreams merges a set of ordered, nonoverlapping responses by concatenating matching streams then running them through a heap to pull out limit values
func mergeOrderedNonOverlappingStreams(resps []*LokiResponse, limit uint32, direction logproto.Direction) []logproto.Stream {

var total int

// turn resps -> map[labels] []entries
Expand Down Expand Up @@ -612,7 +610,6 @@ func mergeOrderedNonOverlappingStreams(resps []*LokiResponse, limit uint32, dire
}

return results

}

func toProto(m loghttp.Matrix) []queryrange.SampleStream {
Expand Down Expand Up @@ -642,7 +639,6 @@ func (res LokiResponse) Count() int64 {
result += int64(len(s.Entries))
}
return result

}

type paramsWrapper struct {
Expand All @@ -658,12 +654,15 @@ func paramsFromRequest(req queryrange.Request) *paramsWrapper {
func (p paramsWrapper) Query() string {
return p.LokiRequest.Query
}

func (p paramsWrapper) Start() time.Time {
return p.StartTs
}

func (p paramsWrapper) End() time.Time {
return p.EndTs
}

func (p paramsWrapper) Step() time.Duration {
return time.Duration(p.LokiRequest.Step * 1e6)
}
Expand Down
Loading

0 comments on commit bee4567

Please sign in to comment.