Skip to content

Commit

Permalink
Overhaul Query{Params,Result} parsing, merging, etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbourgon committed Jan 24, 2017
1 parent 219b916 commit 00b9845
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 67 deletions.
161 changes: 107 additions & 54 deletions pkg/store/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
Expand Down Expand Up @@ -76,14 +77,10 @@ func (a *API) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case method == "GET" && path == "/":
r.URL.Path = APIPathUserQuery
http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect)
case method == "GET" && path == APIPathUserQuery:
a.handleUserQuery(w, r, false)
case method == "HEAD" && path == APIPathUserQuery:
a.handleUserQuery(w, r, true)
case method == "GET" && path == APIPathInternalQuery:
a.handleInternalQuery(w, r, false)
case method == "HEAD" && path == APIPathInternalQuery:
a.handleInternalQuery(w, r, true)
case (method == "GET" || method == "HEAD") && path == APIPathUserQuery:
a.handleUserQuery(w, r)
case (method == "GET" || method == "HEAD") && path == APIPathInternalQuery:
a.handleInternalQuery(w, r)
case method == "GET" && path == APIPathUserStream:
a.handleUserStream(w, r)
case method == "GET" && path == APIPathInternalStream:
Expand All @@ -107,71 +104,89 @@ func (iw *interceptingWriter) WriteHeader(code int) {
iw.ResponseWriter.WriteHeader(code)
}

func (a *API) handleUserQuery(w http.ResponseWriter, r *http.Request, statsOnly bool) {
func (a *API) handleUserQuery(w http.ResponseWriter, r *http.Request) {
begin := time.Now()

// Validate user input.
var qp QueryParams
if err := qp.DecodeFrom(r.URL); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

members := a.peer.Current(cluster.PeerTypeStore)
if len(members) <= 0 {
// Very odd; we should at least find ourselves!
http.Error(w, "no store nodes available", http.StatusServiceUnavailable)
return
}

query, err := MakeQueryParams(r.URL.Query())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

method := "GET"
if statsOnly {
method = "HEAD"
}

var requests []*http.Request
for _, hostport := range members {
u, err := url.Parse(fmt.Sprintf("http://%s/store%s", hostport, APIPathInternalQuery))
// Copy original URL, to save all the query params, etc.
u, err := url.Parse(r.URL.String())
if err != nil {
err = errors.Wrapf(err, "constructing URL for %s", hostport)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
query.EncodeTo(u) // use query directly, no translation needed
req, err := http.NewRequest(method, u.String(), nil)

// Fix the scheme, host, and path.
// (These may be empty due to StripPrefix.)
u.Scheme = "http"
u.Host = hostport
u.Path = fmt.Sprintf("store%s", APIPathInternalQuery)

// Construct a new request.
req, err := http.NewRequest(r.Method, u.String(), nil)
if err != nil {
err = errors.Wrapf(err, "constructing request for %s", hostport)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// Execute that request later.
requests = append(requests, req)
}

// Execute all requests concurrently.
type response struct {
resp *http.Response
err error
}
c := make(chan response, len(requests))
for _, req := range requests {
go func(req *http.Request) {
// TODO(pb): don't use http.DefaultClient
resp, err := http.DefaultClient.Do(req)
resp, err := a.client.Do(req)
c <- response{resp, err}
}(req)
}

// We'll collect responses into a single QueryResult.
qr := QueryResult{Params: qp}

// We'll merge all records in a single pass.
var readClosers []io.ReadCloser
defer func() {
// Don't leak if we need to make an early return.
for _, rc := range readClosers {
rc.Close()
}
}()

// Collect responses.
responses := make([]response, cap(c))
for i := 0; i < cap(c); i++ {
for i := 0; i < len(responses); i++ {
responses[i] = <-c
}
result := QueryResult{
Params: query,
}
for _, response := range responses {
// Direct error, network problem?
if response.err != nil {
level.Error(a.logger).Log("during", "query_gather", "err", response.err)
result.ErrorCount++
qr.ErrorCount++
continue
}

// Non-200, bad parameters or internal server error?
if response.resp.StatusCode != http.StatusOK {
buf, err := ioutil.ReadAll(response.resp.Body)
if err != nil {
Expand All @@ -182,29 +197,60 @@ func (a *API) handleUserQuery(w http.ResponseWriter, r *http.Request, statsOnly
}
response.resp.Body.Close()
level.Error(a.logger).Log("during", "query_gather", "status_code", response.resp.StatusCode, "err", strings.TrimSpace(string(buf)))
result.ErrorCount++
qr.ErrorCount++
continue
}

// Decode the individual result.
var partialResult QueryResult
partialResult.DecodeFrom(response.resp)
if err := result.Merge(partialResult); err != nil {
if err := partialResult.DecodeFrom(response.resp); err != nil {
err = errors.Wrap(err, "decoding partial result")
level.Error(a.logger).Log("during", "query_gather", "err", err)
qr.ErrorCount++
continue
}

// We do a single lazy merge of all records, at the end!
// Extract the records ReadCloser, for later processing.
readClosers = append(readClosers, partialResult.Records)
partialResult.Records = nil

// Merge everything else, though.
if err := qr.Merge(partialResult); err != nil {
err = errors.Wrap(err, "merging results")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
result.Duration = time.Since(begin).String()
result.EncodeTo(w)
}

func (a *API) handleInternalQuery(w http.ResponseWriter, r *http.Request, statsOnly bool) {
query, err := MakeQueryParams(r.URL.Query())
// Now bind all the partial ReadClosers together.
mrc, err := newMergeReadCloser(readClosers)
if err != nil {
err = errors.Wrap(err, "constructing merging reader")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
qr.Records = mrc // lazy reader
readClosers = []io.ReadCloser{} // don't double-close on return

// Return!
qr.Duration = time.Since(begin).String() // overwrite
qr.EncodeTo(w)
}

func (a *API) handleInternalQuery(w http.ResponseWriter, r *http.Request) {
var qp QueryParams
if err := qp.DecodeFrom(r.URL); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

result, err := a.log.Query(query, statsOnly)
statsOnly := false
if r.Method == "HEAD" {
statsOnly = true
}

result, err := a.log.Query(qp, statsOnly)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand All @@ -214,8 +260,9 @@ func (a *API) handleInternalQuery(w http.ResponseWriter, r *http.Request, statsO
}

func (a *API) handleUserStream(w http.ResponseWriter, r *http.Request) {
query, err := MakeQueryParams(r.URL.Query())
if err != nil {
// Validate user input.
var qp QueryParams
if err := qp.DecodeFrom(r.URL); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand All @@ -231,11 +278,19 @@ func (a *API) handleUserStream(w http.ResponseWriter, r *http.Request) {
}

readerFactory := stream.HTTPReaderFactory(a.client, func(addr string) string {
u, err := url.Parse(fmt.Sprintf("http://%s/store%s", addr, APIPathInternalStream))
// Copy original URL, to save all the query params, etc.
u, err := url.Parse(r.URL.String())
if err != nil {
panic(err)
}
query.EncodeTo(u)

// Fix the scheme, host, and path.
// (These may be empty due to StripPrefix.)
u.Scheme = "http"
u.Host = addr
u.Path = fmt.Sprintf("store%s", APIPathInternalStream)

// That's our internal stream URL.
return u.String()
})

Expand All @@ -261,8 +316,8 @@ func (a *API) handleUserStream(w http.ResponseWriter, r *http.Request) {
}

func (a *API) handleInternalStream(w http.ResponseWriter, r *http.Request) {
query, err := MakeQueryParams(r.URL.Query())
if err != nil {
var qp QueryParams
if err := qp.DecodeFrom(r.URL); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand All @@ -273,20 +328,18 @@ func (a *API) handleInternalStream(w http.ResponseWriter, r *http.Request) {
return
}

records := a.log.Stream(r.Context(), query)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
records := a.log.Stream(r.Context(), qp)

for {
select {
case <-r.Context().Done():
return // the cancelation is transitive, just need to return

case record := <-records:
fmt.Fprintf(w, "%s\n", record)
flusher.Flush()

case <-r.Context().Done():
// Context cancelation is transitive.
// We just need to exit.
return
}
}
}
Expand Down
42 changes: 29 additions & 13 deletions pkg/store/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,20 @@ type QueryParams struct {
Regex bool `json:"regex"`
}

// MakeQueryParams is a bit of sugar for DecodeFrom.
func MakeQueryParams(values url.Values) (QueryParams, error) {
var params QueryParams
return params, params.DecodeFrom(values)
}

// DecodeFrom populates a QueryParams from a set of url.Values.
func (qp *QueryParams) DecodeFrom(values url.Values) error {
from, err := time.Parse(time.RFC3339Nano, values.Get("from"))
// DecodeFrom populates a QueryParams from a URL.
func (qp *QueryParams) DecodeFrom(u *url.URL) error {
from, err := time.Parse(time.RFC3339Nano, u.Query().Get("from"))
if err != nil {
return errors.Wrap(err, "parsing 'from'")
}
to, err := time.Parse(time.RFC3339Nano, values.Get("to"))
to, err := time.Parse(time.RFC3339Nano, u.Query().Get("to"))
if err != nil {
return errors.Wrap(err, "parsing 'to'")
}
qp.From = from
qp.To = to
qp.Q = values.Get("q")
_, qp.Regex = values["regex"]
qp.Q = u.Query().Get("q")
_, qp.Regex = u.Query()["regex"]
return nil
}

Expand Down Expand Up @@ -91,6 +85,22 @@ func (qp *QueryParams) UnmarshalJSON(data []byte) error {
return nil
}

// MarshalJSON implements json.Marshaler.
// It marshals the times as RFC3339Nano timestamps.
func (qp *QueryParams) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
From string `json:"from"`
To string `json:"to"`
Q string `json:"q"`
Regex bool `json:"regex"`
}{
From: qp.From.Format(time.RFC3339Nano),
To: qp.To.Format(time.RFC3339Nano),
Q: qp.Q,
Regex: qp.Regex,
})
}

// QueryResult contains statistics about, and matching records for, a query.
type QueryResult struct {
Params QueryParams `json:"query"`
Expand Down Expand Up @@ -407,8 +417,14 @@ func (rc *mergeReadCloser) Read(p []byte) (int, error) {
if !rc.ok[i] {
continue // already drained
}
if smallest < 0 || bytes.Compare(rc.id[i], rc.id[smallest]) < 0 {
switch {
case smallest < 0, bytes.Compare(rc.id[i], rc.id[smallest]) < 0:
smallest = i
case bytes.Compare(rc.id[i], rc.id[smallest]) == 0: // duplicate
if err := rc.advance(i); err != nil {
return 0, err
}
continue
}
}
if smallest < 0 {
Expand Down

0 comments on commit 00b9845

Please sign in to comment.