Skip to content

Commit

Permalink
added replicaLabels to api
Browse files Browse the repository at this point in the history
Signed-off-by: Krasi Georgiev <[email protected]>
  • Loading branch information
krasi-georgiev committed Aug 2, 2019
1 parent a35f320 commit d8d83e0
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 10 deletions.
36 changes: 31 additions & 5 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,17 @@ func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool
return enableDeduplication, nil
}

func (api *API) parseReplicaLabelsParam(r *http.Request) (replicaLabels []string, _ *ApiError) {
if err := r.ParseForm(); err != nil {
return nil, &ApiError{ErrorInternal, errors.Wrap(err, "parse form")}
}

if len(r.Form["replicaLabels[]"]) > 0 {
replicaLabels = r.Form["match[]"]
}
return replicaLabels, nil
}

func (api *API) parseDownsamplingParamMillis(r *http.Request, step time.Duration) (maxResolutionMillis int64, _ *ApiError) {
const maxSourceResolutionParam = "max_source_resolution"
maxSourceResolution := 0 * time.Second
Expand Down Expand Up @@ -274,6 +285,11 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

replicaLabels, apiErr := api.parseReplicaLabelsParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

enablePartialResponse, apiErr := api.parsePartialResponseParam(r)
if apiErr != nil {
return nil, nil, apiErr
Expand All @@ -294,7 +310,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
defer span.Finish()

begin := api.now()
qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, 0, enablePartialResponse, warningReporter), r.FormValue("query"), ts)
qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse, warningReporter), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &ApiError{errorBadData, err}
}
Expand Down Expand Up @@ -367,6 +383,11 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

replicaLabels, apiErr := api.parseReplicaLabelsParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

maxSourceResolution, apiErr := api.parseDownsamplingParamMillis(r, step)
if apiErr != nil {
return nil, nil, apiErr
Expand All @@ -393,7 +414,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {

begin := api.now()
qry, err := api.queryEngine.NewRangeQuery(
api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse, warningReporter),
api.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, warningReporter),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -444,7 +465,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) {
warnmtx.Unlock()
}

q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -510,6 +531,11 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

replicaLabels, apiErr := api.parseReplicaLabelsParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

enablePartialResponse, apiErr := api.parsePartialResponseParam(r)
if apiErr != nil {
return nil, nil, apiErr
Expand All @@ -526,7 +552,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
}

// TODO(bwplotka): Support downsampling?
q, err := api.queryableCreate(enableDedup, 0, enablePartialResponse, warningReporter).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
q, err := api.queryableCreate(enableDedup, replicaLabels, 0, enablePartialResponse, warningReporter).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -637,7 +663,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
warnmtx.Unlock()
}

q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, nil, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
)

func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator {
return func(_ bool, _ int64, _ bool, _ query.WarningReporter) storage.Queryable {
return func(_ bool, _ []string, _ int64, _ bool, _ query.WarningReporter) storage.Queryable {
return queryable
}
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ type WarningReporter func(error)

// QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints.
// If deduplication is enabled, all data retrieved from it will be deduplicated along all replicaLabels by default.
// When the replicaLabels argument is not empty it overwrites the global replicaLabels flag. This allows specifing
// replicaLabels at query time.
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy.
type QueryableCreator func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable
type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLabels []string) QueryableCreator {
return func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable {
return func(deduplicate bool, replicaLabelsOverwrite []string, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable {
if len(replicaLabelsOverwrite) > 0 {
replicaLabels = replicaLabelsOverwrite
}
return &queryable{
logger: logger,
replicaLabels: replicaLabels,
Expand Down
4 changes: 2 additions & 2 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {
queryableCreator := NewQueryableCreator(nil, testProxy, []string{"test"})

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
queryable := queryableCreator(false, oneHourMillis, false, func(err error) {})
queryable := queryableCreator(false, nil, oneHourMillis, false, func(err error) {})

q, err := queryable.Querier(context.Background(), 0, 42)
testutil.Ok(t, err)
Expand All @@ -55,7 +55,7 @@ func TestQuerier_DownsampledData(t *testing.T) {
},
}

q := NewQueryableCreator(nil, testProxy, []string{""})(false, 9999999, false, nil)
q := NewQueryableCreator(nil, testProxy, []string{""})(false, nil, 9999999, false, nil)

engine := promql.NewEngine(
promql.EngineOpts{
Expand Down

0 comments on commit d8d83e0

Please sign in to comment.