Skip to content

Commit

Permalink
fix(lokihandler): return proper error status code
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed May 30, 2024
1 parent 63715c1 commit 9398bdc
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 36 deletions.
42 changes: 42 additions & 0 deletions internal/lokihandler/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package lokihandler

import (
"fmt"
"net/http"

"github.com/go-faster/errors"

"github.com/go-faster/oteldb/internal/logql"
"github.com/go-faster/oteldb/internal/logql/lexer"
"github.com/go-faster/oteldb/internal/lokiapi"
)

func evalErr(err error) error {
_, isLexerErr := errors.Into[*lexer.Error](err)
_, isParseErr := errors.Into[*logql.ParseError](err)
if isLexerErr || isParseErr {
return &lokiapi.ErrorStatusCode{
StatusCode: http.StatusBadRequest,
Response: lokiapi.Error(err.Error()),
}
}

return &lokiapi.ErrorStatusCode{
StatusCode: http.StatusInternalServerError,
Response: lokiapi.Error(fmt.Sprintf("eval: %s", err)),
}
}

func validationErr(err error, msg string) error {
return &lokiapi.ErrorStatusCode{
StatusCode: http.StatusBadRequest,
Response: lokiapi.Error(fmt.Sprintf("%s: %s", msg, err)),
}
}

func executionErr(err error, msg string) error {
return &lokiapi.ErrorStatusCode{
StatusCode: http.StatusInternalServerError,
Response: lokiapi.Error(fmt.Sprintf("%s: %s", msg, err)),
}
}
55 changes: 19 additions & 36 deletions internal/lokihandler/lokihandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ func (h *LokiAPI) LabelValues(ctx context.Context, params lokiapi.LabelValuesPar
params.Since,
)
if err != nil {
return nil, errors.Wrap(err, "parse time range")
return nil, validationErr(err, "parse time range")
}

iter, err := h.q.LabelValues(ctx, params.Name, logstorage.LabelsOptions{
Start: otelstorage.NewTimestampFromTime(start),
End: otelstorage.NewTimestampFromTime(end),
})
if err != nil {
return nil, errors.Wrap(err, "query")
return nil, executionErr(err, "get label values")
}
defer func() {
_ = iter.Close()
Expand All @@ -79,7 +79,7 @@ func (h *LokiAPI) LabelValues(ctx context.Context, params lokiapi.LabelValuesPar
values = append(values, tag.Value)
return nil
}); err != nil {
return nil, errors.Wrap(err, "map tags")
return nil, executionErr(err, "read tags")
}
lg.Debug("Got tag values",
zap.String("label_name", params.Name),
Expand Down Expand Up @@ -108,15 +108,15 @@ func (h *LokiAPI) Labels(ctx context.Context, params lokiapi.LabelsParams) (*lok
params.Since,
)
if err != nil {
return nil, errors.Wrap(err, "parse time range")
return nil, validationErr(err, "parse time range")
}

names, err := h.q.LabelNames(ctx, logstorage.LabelsOptions{
Start: otelstorage.NewTimestampFromTime(start),
End: otelstorage.NewTimestampFromTime(end),
})
if err != nil {
return nil, errors.Wrap(err, "query")
return nil, executionErr(err, "get label names")
}
lg.Debug("Got label names", zap.Int("count", len(names)))

Expand All @@ -141,21 +141,14 @@ func (h *LokiAPI) Push(context.Context, lokiapi.PushReq) error {
//
// GET /loki/api/v1/query
func (h *LokiAPI) Query(ctx context.Context, params lokiapi.QueryParams) (*lokiapi.QueryResponse, error) {
lg := zctx.From(ctx)

ts, err := ParseTimestamp(params.Time.Value, time.Now())
if err != nil {
return nil, errors.Wrap(err, "parse time")
return nil, validationErr(err, "parse time")
}

var direction logqlengine.Direction
switch d := params.Direction.Or(lokiapi.DirectionBackward); d {
case lokiapi.DirectionBackward:
direction = logqlengine.DirectionBackward
case lokiapi.DirectionForward:
direction = logqlengine.DirectionForward
default:
return nil, errors.Errorf("invalid direction %q", d)
direction, err := parseDirection(params.Direction)
if err != nil {
return nil, validationErr(err, "parse direction")
}

data, err := h.eval(ctx, params.Query, logqlengine.EvalParams{
Expand All @@ -166,11 +159,9 @@ func (h *LokiAPI) Query(ctx context.Context, params lokiapi.QueryParams) (*lokia
Limit: params.Limit.Or(100),
})
if err != nil {
return nil, err
return nil, executionErr(err, "evaluate instant query")
}

lg.Debug("Query", zap.String("type", string(data.Type)))

return &lokiapi.QueryResponse{
Status: "success",
Data: data,
Expand All @@ -183,31 +174,24 @@ func (h *LokiAPI) Query(ctx context.Context, params lokiapi.QueryParams) (*lokia
//
// GET /loki/api/v1/query_range
func (h *LokiAPI) QueryRange(ctx context.Context, params lokiapi.QueryRangeParams) (*lokiapi.QueryResponse, error) {
lg := zctx.From(ctx)

start, end, err := parseTimeRange(
time.Now(),
params.Start,
params.End,
params.Since,
)
if err != nil {
return nil, errors.Wrap(err, "parse time range")
return nil, validationErr(err, "parse time range")
}

step, err := parseStep(params.Step, start, end)
if err != nil {
return nil, errors.Wrap(err, "parse step")
return nil, validationErr(err, "parse step")
}

var direction logqlengine.Direction
switch d := params.Direction.Or(lokiapi.DirectionBackward); d {
case lokiapi.DirectionBackward:
direction = logqlengine.DirectionBackward
case lokiapi.DirectionForward:
direction = logqlengine.DirectionForward
default:
return nil, errors.Errorf("invalid direction %q", d)
direction, err := parseDirection(params.Direction)
if err != nil {
return nil, validationErr(err, "parse direction")
}

data, err := h.eval(ctx, params.Query, logqlengine.EvalParams{
Expand All @@ -218,9 +202,8 @@ func (h *LokiAPI) QueryRange(ctx context.Context, params lokiapi.QueryRangeParam
Limit: params.Limit.Or(100),
})
if err != nil {
return nil, err
return nil, executionErr(err, "evaluate range query")
}
lg.Debug("Query range", zap.String("type", string(data.Type)))

return &lokiapi.QueryResponse{
Status: "success",
Expand All @@ -241,10 +224,10 @@ func (h *LokiAPI) Series(ctx context.Context, params lokiapi.SeriesParams) (*lok
params.Since,
)
if err != nil {
return nil, errors.Wrap(err, "parse time range")
return nil, validationErr(err, "parse time range")
}

out := make([]lokiapi.MapsDataItem, 0, len(params.Match))
zctx.From(ctx).Info("Series", zap.Int("match", len(params.Match)))
for _, q := range params.Match {
// TODO(ernado): offload
data, err := h.eval(ctx, q, logqlengine.EvalParams{
Expand All @@ -254,7 +237,7 @@ func (h *LokiAPI) Series(ctx context.Context, params lokiapi.SeriesParams) (*lok
Limit: 1_000,
})
if err != nil {
return nil, errors.Wrap(err, "eval")
return nil, executionErr(err, "evaluate series query")
}
if streams, ok := data.GetStreamsResult(); ok {
for _, stream := range streams.Result {
Expand Down
12 changes: 12 additions & 0 deletions internal/lokihandler/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-faster/errors"
"github.com/prometheus/common/model"

"github.com/go-faster/oteldb/internal/logql/logqlengine"
"github.com/go-faster/oteldb/internal/lokiapi"
)

Expand Down Expand Up @@ -108,3 +109,14 @@ func ParseDuration[S ~string](param S) (time.Duration, error) {
md, err := model.ParseDuration(value)
return time.Duration(md), err
}

func parseDirection(opt lokiapi.OptDirection) (r logqlengine.Direction, _ error) {
switch d := opt.Or(lokiapi.DirectionBackward); d {
case lokiapi.DirectionBackward:
return logqlengine.DirectionBackward, nil
case lokiapi.DirectionForward:
return logqlengine.DirectionForward, nil
default:
return r, errors.Errorf("invalid direction %q", d)
}
}

0 comments on commit 9398bdc

Please sign in to comment.