Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Fanout options to select namespaces #1328

Merged
merged 11 commits into from
Jan 31, 2019
1 change: 0 additions & 1 deletion scripts/docker-integration-tests/carbon/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,3 @@ echo "foo.min.catch-all.baz 10 $t" | nc 0.0.0.0 7204
echo "foo.min.catch-all.baz 20 $t" | nc 0.0.0.0 7204
echo "Attempting to read mean aggregated carbon metric"
ATTEMPTS=10 TIMEOUT=1 retry_with_backoff read_carbon foo.min.catch-all.baz 15
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidentally deleted


Original file line number Diff line number Diff line change
Expand Up @@ -35,36 +35,36 @@ import (
)

const (
// SearchURL is the url for searching graphite metrics.
SearchURL = handler.RoutePrefixV1 + "/graphite/metrics/find"
// FindURL is the url for finding graphite metrics.
FindURL = handler.RoutePrefixV1 + "/graphite/metrics/find"
)

var (
// SearchHTTPMethods is the HTTP methods used with this resource.
SearchHTTPMethods = []string{http.MethodGet, http.MethodPost}
// FindHTTPMethods is the HTTP methods used with this resource.
FindHTTPMethods = []string{http.MethodGet, http.MethodPost}
)

type grahiteSearchHandler struct {
type grahiteFindHandler struct {
storage storage.Storage
}

// NewSearchHandler returns a new instance of handler.
func NewSearchHandler(
// NewFindHandler returns a new instance of handler.
func NewFindHandler(
storage storage.Storage,
) http.Handler {
return &grahiteSearchHandler{
return &grahiteFindHandler{
storage: storage,
}
}

func (h *grahiteSearchHandler) ServeHTTP(
func (h *grahiteFindHandler) ServeHTTP(
w http.ResponseWriter,
r *http.Request,
) {
ctx := context.WithValue(r.Context(), handler.HeaderKey, r.Header)
logger := logging.WithContext(ctx)
w.Header().Set("Content-Type", "application/json")
query, rErr := parseSearchParamsToQuery(r)
query, rErr := parseFindParamsToQuery(r)
if rErr != nil {
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
Expand Down Expand Up @@ -108,8 +108,8 @@ func (h *grahiteSearchHandler) ServeHTTP(
}

// TODO: Support multiple result types
if err = searchResultsJSON(w, prefix, seenMap); err != nil {
logger.Error("unable to print search results", zap.Error(err))
if err = findResultsJSON(w, prefix, seenMap); err != nil {
logger.Error("unable to print find results", zap.Error(err))
xhttp.Error(w, err, http.StatusBadRequest)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/m3db/m3/src/x/net/http"
)

func parseSearchParamsToQuery(r *http.Request) (
func parseFindParamsToQuery(r *http.Request) (
*storage.FetchQuery,
*xhttp.ParseError,
) {
Expand Down Expand Up @@ -86,7 +86,7 @@ func parseSearchParamsToQuery(r *http.Request) (
}, nil
}

func searchResultsJSON(
func findResultsJSON(
w io.Writer,
prefix string,
tags map[string]bool,
Expand Down
11 changes: 9 additions & 2 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.promReadMetrics.fetchSuccess.Inc(1)
}

func (h *PromReadHandler) parseRequest(r *http.Request) (*prompb.ReadRequest, *xhttp.ParseError) {
func (h *PromReadHandler) parseRequest(
r *http.Request,
) (*prompb.ReadRequest, *xhttp.ParseError) {
reqBuf, err := prometheus.ParsePromCompressedRequest(r)
if err != nil {
return nil, err
Expand All @@ -142,7 +144,12 @@ func (h *PromReadHandler) parseRequest(r *http.Request) (*prompb.ReadRequest, *x
return &req, nil
}

func (h *PromReadHandler) read(reqCtx context.Context, w http.ResponseWriter, r *prompb.ReadRequest, timeout time.Duration) ([]*prompb.QueryResult, error) {
func (h *PromReadHandler) read(
reqCtx context.Context,
w http.ResponseWriter,
r *prompb.ReadRequest,
timeout time.Duration,
) ([]*prompb.QueryResult, error) {
// TODO: Handle multi query use case
if len(r.Queries) != 1 {
return nil, fmt.Errorf("prometheus read endpoint currently only supports one query at a time")
Expand Down
6 changes: 3 additions & 3 deletions src/query/api/v1/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ func (h *Handler) RegisterRoutes() error {
logged(graphite.NewRenderHandler(h.storage)).ServeHTTP,
).Methods(graphite.ReadHTTPMethods...)

h.router.HandleFunc(graphite.SearchURL,
logged(graphite.NewSearchHandler(h.storage)).ServeHTTP,
).Methods(graphite.SearchHTTPMethods...)
h.router.HandleFunc(graphite.FindURL,
logged(graphite.NewFindHandler(h.storage)).ServeHTTP,
).Methods(graphite.FindHTTPMethods...)

if h.clusterClient != nil {
placementOpts := placement.HandlerOptions{
Expand Down
19 changes: 16 additions & 3 deletions src/query/executor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,16 @@ func newEngineMetrics(scope tally.Scope) *engineMetrics {
}

// Execute runs the query and closes the results channel once done
func (e *Engine) Execute(ctx context.Context, query *storage.FetchQuery, opts *EngineOptions, results chan *storage.QueryResult) {
func (e *Engine) Execute(
ctx context.Context,
query *storage.FetchQuery,
opts *EngineOptions,
results chan *storage.QueryResult,
) {
defer close(results)
result, err := e.store.Fetch(ctx, query, &storage.FetchOptions{})
fetchOpts := storage.NewFetchOptions()
fetchOpts.Limit = 0
result, err := e.store.Fetch(ctx, query, fetchOpts)
if err != nil {
results <- &storage.QueryResult{Err: err}
return
Expand All @@ -115,7 +122,13 @@ func (e *Engine) Execute(ctx context.Context, query *storage.FetchQuery, opts *E

// ExecuteExpr runs the query DAG and closes the results channel once done
// nolint: unparam
func (e *Engine) ExecuteExpr(ctx context.Context, parser parser.Parser, opts *EngineOptions, params models.RequestParams, results chan Query) {
func (e *Engine) ExecuteExpr(
ctx context.Context,
parser parser.Parser,
opts *EngineOptions,
params models.RequestParams,
results chan Query,
) {
defer close(results)

req := newRequest(e, params)
Expand Down
6 changes: 3 additions & 3 deletions src/query/functions/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func (n *FetchNode) Execute(ctx context.Context) error {
// No need to adjust start and ends since physical plan already considers the offset, range
startTime := timeSpec.Start
endTime := timeSpec.End
opts := storage.NewFetchOptions()
opts.BlockType = n.blockType
blockResult, err := n.storage.FetchBlocks(ctx, &storage.FetchQuery{
Start: startTime,
End: endTime,
TagMatchers: n.op.Matchers,
Interval: timeSpec.Step,
}, &storage.FetchOptions{
BlockType: n.blockType,
})
}, opts)
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions src/query/graphite/storage/m3_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,14 @@ func (s *m3WrappedStore) FetchByQuery(
m3query := translateQuery(query, opts)
m3ctx, cancel := context.WithTimeout(ctx.RequestContext(), opts.Timeout)
defer cancel()
fetchOptions := storage.NewFetchOptions()
fetchOptions.FanoutOptions = &storage.FanoutOptions{
FanoutUnaggregated: storage.FanoutForceDisable,
FanoutAggregated: storage.FanoutDefault,
FanoutAggregatedOptimized: storage.FanoutForceDisable,
}

m3result, err := s.m3.Fetch(m3ctx, m3query,
storage.NewFetchOptions())
m3result, err := s.m3.Fetch(m3ctx, m3query, fetchOptions)
if err != nil {
return nil, err
}
Expand Down
14 changes: 9 additions & 5 deletions src/query/storage/fanout/storage_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// +build big
//
// Copyright (c) 2018 Uber Technologies, Inc.
//
Expand Down Expand Up @@ -141,16 +140,20 @@ func TestFanoutReadEmpty(t *testing.T) {

func TestFanoutReadError(t *testing.T) {
store := setupFanoutRead(t, true)
_, err := store.Fetch(context.TODO(), &storage.FetchQuery{}, &storage.FetchOptions{})
opts := storage.NewFetchOptions()
_, err := store.Fetch(context.TODO(), &storage.FetchQuery{}, opts)
assert.Error(t, err)
}

func TestFanoutReadSuccess(t *testing.T) {
store := setupFanoutRead(t, true, &fetchResponse{result: fakeIterator(t)}, &fetchResponse{result: fakeIterator(t)})
store := setupFanoutRead(t, true, &fetchResponse{
result: fakeIterator(t)},
&fetchResponse{result: fakeIterator(t)},
)
res, err := store.Fetch(context.TODO(), &storage.FetchQuery{
Start: time.Now().Add(-time.Hour),
End: time.Now(),
}, &storage.FetchOptions{})
}, storage.NewFetchOptions())
require.NoError(t, err, "no error on read")
assert.NotNil(t, res)
assert.NoError(t, store.Close())
Expand All @@ -166,7 +169,8 @@ func TestFanoutSearchEmpty(t *testing.T) {

func TestFanoutSearchError(t *testing.T) {
store := setupFanoutRead(t, true)
_, err := store.FetchTags(context.TODO(), &storage.FetchQuery{}, &storage.FetchOptions{})
opts := storage.NewFetchOptions()
_, err := store.FetchTags(context.TODO(), &storage.FetchQuery{}, opts)
assert.Error(t, err)
}

Expand Down
Loading