Skip to content

Commit

Permalink
Configure jaeger (basic)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewmains12 committed Feb 21, 2019
1 parent b31e764 commit cdd4a00
Show file tree
Hide file tree
Showing 35 changed files with 942 additions and 154 deletions.
74 changes: 74 additions & 0 deletions docs/operational_guide/monitoring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
## Metrics

TODO

## Logs

TODO

## Tracing

M3DB is integrated with [opentracing](https://opentracing.io/) to provide
insight into query performance and errors.

### Configuration
Currently, only https://www.jaegertracing.io/ is supported as a backend.

To enable it, set tracing.backend to "jaeger":

```
tracing:
backend: jaeger # enables jaeger with default configs
jaeger:
# optional configuration for jaeger -- see
# https://github.com/jaegertracing/jaeger-client-go/blob/master/config/config.go#L37
# for options
...
```

Jaeger can be run locally with docker as described in
https://www.jaegertracing.io/docs/1.9/getting-started/.

The default configuration will report traces via udp to localhost:6831;
using the all-in-one jaeger container, they will be accessible at

http://localhost:16686


#### Alternative backends

If you'd like additional backends, we'd love to support them!

File an issue against M3 and we can work with you on how best to add
the backend. The first time's going to be a little rough--opentracing
unfortunately doesn't support Go plugins (yet--see
https://github.com/opentracing/opentracing-go/issues/133), and `glide`'s
update model means that adding dependencies directly will update
*everything*, which isn't ideal for an isolated dependency change.
These problems are all solvable though,
and we'll work with you to make it happen!

### Use cases

Note: all URLs assume a local jaeger setup as described in Jaeger's
[docs](https://www.jaegertracing.io/docs/1.9/getting-started/).


#### Finding slow queries

To find prom queries longer than <threshold>, filter for `minDuration >= <threshold>` on
`operation="GET /api/v1/query_range"`.

Sample query:
http://localhost:16686/search?end=1548876672544000&limit=20&lookback=1h&maxDuration&minDuration=1ms&operation=GET%20%2Fapi%2Fv1%2Fquery_range&service=m3query&start=1548873072544000

#### Finding queries with errors

Search for `error=true` on `operation="GET /api/v1/query_range"`
http://localhost:16686/search?operation=GET%20%2Fapi%2Fv1%2Fquery_range&service=m3query&tags=%7B%22error%22%3A%22true%22%7D

#### Finding 500 (Internal Server Error) responses

Search for `http.status_code=500`.

http://localhost:16686/search?limit=20&lookback=24h&maxDuration&minDuration&operation=GET%20%2Fapi%2Fv1%2Fquery_range&service=m3query&start=1548802430108000&tags=%7B"http.status_code"%3A"500"%7D
3 changes: 3 additions & 0 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type Configuration struct {
// Metrics configuration.
Metrics instrument.MetricsConfiguration `yaml:"metrics"`

// Tracing configures opentracing. If not provided, tracing is disabled.
Tracing instrument.TracingConfiguration `yaml:"tracing"`

// Clusters is the DB cluster configurations for read, write and
// query endpoints.
Clusters m3.ClustersStaticConfiguration `yaml:"clusters"`
Expand Down
10 changes: 8 additions & 2 deletions src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ import (
"github.com/m3db/m3/src/query/executor"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3/src/query/util/httperrors"
"github.com/m3db/m3/src/query/util/logging"
xhttp "github.com/m3db/m3/src/x/net/http"
opentracingutil "github.com/m3db/m3/src/query/util/opentracing"

opentracingext "github.com/opentracing/opentracing-go/ext"
opentracinglog "github.com/opentracing/opentracing-go/log"
"github.com/uber-go/tally"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -124,7 +127,7 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

result, params, respErr := h.ServeHTTPWithEngine(w, r, h.engine)
if respErr != nil {
xhttp.Error(w, respErr.Err, respErr.Code)
httperrors.ErrorWithReqID(w, r, respErr.Err, respErr.Code)
return
}

Expand Down Expand Up @@ -167,6 +170,9 @@ func (h *PromReadHandler) ServeHTTPWithEngine(

result, err := read(ctx, engine, h.tagOpts, w, params)
if err != nil {
sp := opentracingutil.SpanFromContextOrRoot(ctx)
sp.LogFields(opentracinglog.Error(err))
opentracingext.Error.Set(sp, true)
logger.Error("unable to fetch data", zap.Error(err))
h.promReadMetrics.fetchErrorsServer.Inc(1)
return nil, emptyReqParams, &RespError{Err: err, Code: http.StatusInternalServerError}
Expand Down
12 changes: 12 additions & 0 deletions src/query/api/v1/handler/prometheus/native/read_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser/promql"
"github.com/m3db/m3/src/query/ts"
opentracingutil "github.com/m3db/m3/src/query/util/opentracing"

opentracinglog "github.com/opentracing/opentracing-go/log"
)

func read(
Expand All @@ -45,6 +48,15 @@ func read(
ctx, cancel := context.WithTimeout(reqCtx, params.Timeout)
defer cancel()

sp := opentracingutil.SpanFromContextOrRoot(ctx)
sp.LogFields(
opentracinglog.String("params.query", params.Query),
opentracingutil.Time("params.start", params.Start),
opentracingutil.Time("params.end", params.End),
opentracingutil.Time("params.now", params.Now),
opentracingutil.Duration("params.step", params.Step),
)

opts := &executor.EngineOptions{}
// Detect clients closing connections
handler.CloseWatcher(ctx, cancel, w)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/m3db/m3/src/query/api/v1/handler/prometheus"
"github.com/m3db/m3/src/query/executor"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/util/httperrors"
"github.com/m3db/m3/src/query/util/logging"
xhttp "github.com/m3db/m3/src/x/net/http"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -69,7 +69,7 @@ func (h *PromReadInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
logger := logging.WithContext(ctx)
params, rErr := parseInstantaneousParams(r, h.timeoutOpts)
if rErr != nil {
xhttp.Error(w, rErr.Inner(), rErr.Code())
httperrors.ErrorWithReqID(w, r, rErr, rErr.Code())
return
}

Expand All @@ -80,7 +80,7 @@ func (h *PromReadInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
result, err := read(ctx, h.engine, h.tagOpts, w, params)
if err != nil {
logger.Error("unable to fetch data", zap.Error(err))
xhttp.Error(w, err, http.StatusBadRequest)
httperrors.ErrorWithReqID(w, r, rErr, http.StatusBadRequest)
return
}

Expand Down
15 changes: 12 additions & 3 deletions src/query/api/v1/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package httpd
import (
"encoding/json"
"errors"
"fmt"
"net/http"
_ "net/http/pprof" // needed for pprof handler registration
"time"
Expand Down Expand Up @@ -52,6 +53,8 @@ import (
"github.com/m3db/m3/src/x/net/http/cors"

"github.com/gorilla/mux"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
)

Expand Down Expand Up @@ -102,13 +105,19 @@ func NewHandler(
) (*Handler, error) {
r := mux.NewRouter()

// apply middleware. Just CORS for now, but we could add more here as needed.
withMiddleware := &cors.Handler{
// apply middleware.
withMiddleware := http.Handler(&cors.Handler{
Handler: r,
Info: &cors.Info{
"*": true,
},
}
})

// add jaeger tracing to our endpoints
withMiddleware = nethttp.Middleware(opentracing.GlobalTracer(), withMiddleware,
nethttp.OperationNameFunc(func(r *http.Request) string {
return fmt.Sprintf("%s %s", r.Method, r.URL.Path)
}))

var timeoutOpts = &prometheus.TimeoutOpts{}
if embeddedDbCfg == nil || embeddedDbCfg.Client.FetchTimeout == nil {
Expand Down
6 changes: 6 additions & 0 deletions src/query/config/m3query-dev-etcd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,9 @@ writeWorkerPoolPolicy:

tagOptions:
idScheme: quoted

# Uncomment this to enable local jaeger tracing. See https://www.jaegertracing.io/docs/1.9/getting-started/
# for quick local setup (which this config will send data to).

# tracing:
# backend: jaeger
8 changes: 6 additions & 2 deletions src/query/executor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util/opentracing"

"github.com/uber-go/tally"
)
Expand Down Expand Up @@ -134,7 +135,7 @@ func (e *Engine) ExecuteExpr(
defer close(results)

req := newRequest(e, params)
defer req.finish()

nodes, edges, err := req.compile(ctx, parser)
if err != nil {
results <- Query{Err: err}
Expand All @@ -147,13 +148,16 @@ func (e *Engine) ExecuteExpr(
return
}

state, err := req.execute(ctx, pp)
state, err := req.generateExecutionState(ctx, pp)
// free up resources
if err != nil {
results <- Query{Err: err}
return
}

sp, ctx := opentracingutil.StartSpanFromContext(ctx, "executing")
defer sp.Finish()

result := state.resultNode
results <- Query{Result: result}

Expand Down
68 changes: 14 additions & 54 deletions src/query/executor/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ package executor
import (
"context"
"fmt"
"time"

"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser"
"github.com/m3db/m3/src/query/plan"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/query/util/opentracing"

"github.com/uber-go/tally"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -65,42 +64,36 @@ func (s State) durationString() string {

// Request represents a single request.
type Request struct {
engine *Engine
params models.RequestParams
parentSpan *span
engine *Engine
params models.RequestParams
}

func newRequest(engine *Engine, params models.RequestParams) *Request {
parentSpan := startSpan(engine.metrics.activeHist, engine.metrics.all)
return &Request{
engine: engine,
params: params,
parentSpan: parentSpan,
}
return &Request{engine: engine, params: params}
}

func (r *Request) compile(ctx context.Context, parser parser.Parser) (parser.Nodes, parser.Edges, error) {
sp := startSpan(r.engine.metrics.compilingHist, r.engine.metrics.compiling)
sp, ctx := opentracingutil.StartSpanFromContext(ctx, "compile")
defer sp.Finish()
// TODO: Change DAG interface to take in a context
nodes, edges, err := parser.DAG()
if err != nil {
sp.finish(err)
return nil, nil, err
}

if r.params.Debug {
logging.WithContext(ctx).Info("compiling dag", zap.Any("nodes", nodes), zap.Any("edges", edges))
}

sp.finish(nil)
return nodes, edges, nil
}

func (r *Request) plan(ctx context.Context, nodes parser.Nodes, edges parser.Edges) (plan.PhysicalPlan, error) {
sp := startSpan(r.engine.metrics.planningHist, r.engine.metrics.planning)
sp, ctx := opentracingutil.StartSpanFromContext(ctx, "plan")
defer sp.Finish()

lp, err := plan.NewLogicalPlan(nodes, edges)
if err != nil {
sp.finish(err)
return plan.PhysicalPlan{}, err
}

Expand All @@ -110,63 +103,30 @@ func (r *Request) plan(ctx context.Context, nodes parser.Nodes, edges parser.Edg

pp, err := plan.NewPhysicalPlan(lp, r.engine.store, r.params, r.engine.lookbackDuration)
if err != nil {
sp.finish(err)
return plan.PhysicalPlan{}, err
}

if r.params.Debug {
logging.WithContext(ctx).Info("physical plan", zap.String("plan", pp.String()))
}

sp.finish(nil)
return pp, nil
}

func (r *Request) execute(ctx context.Context, pp plan.PhysicalPlan) (*ExecutionState, error) {
sp := startSpan(r.engine.metrics.executingHist, r.engine.metrics.executing)
func (r *Request) generateExecutionState(ctx context.Context, pp plan.PhysicalPlan) (*ExecutionState, error) {
sp, ctx := opentracingutil.StartSpanFromContext(ctx,
"generate_execution_state")
defer sp.Finish()

state, err := GenerateExecutionState(pp, r.engine.store)
// free up resources
if err != nil {
sp.finish(err)
return nil, err
}

if r.params.Debug {
logging.WithContext(ctx).Info("execution state", zap.String("state", state.String()))
}

sp.finish(nil)
return state, nil
}

func (r *Request) finish() {
r.parentSpan.finish(nil)
}

// span is a simple wrapper around opentracing.Span in order to
// get access to the duration of the span for metrics reporting.
type span struct {
start time.Time
durationHist tally.Histogram
counter *counterWithDecrement
}

func startSpan(durationHist tally.Histogram, counter *counterWithDecrement) *span {
now := time.Now()
counter.Inc()
return &span{
durationHist: durationHist,
start: now,
counter: counter,
}
}

func (s *span) finish(err error) {
s.counter.Dec()
// Don't record duration for error cases
if err == nil {
now := time.Now()
duration := now.Sub(s.start)
s.durationHist.RecordDuration(duration)
}
}
Loading

0 comments on commit cdd4a00

Please sign in to comment.