Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tenant metrics #135

Merged
merged 4 commits into from
Dec 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"

Expand All @@ -19,29 +20,28 @@ import (
)

var (
sendDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "distributor_send_duration_seconds",
Help: "Time spent sending a sample batch to multiple replicated ingesters.",
Buckets: []float64{.001, .0025, .005, .01, .025, .05, .1, .25, .5, 1},
}, []string{"method", "status_code"})
ingesterAppends = prometheus.NewCounterVec(prometheus.CounterOpts{
ingesterAppends = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_ingester_appends_total",
Help: "The total number of batch appends sent to ingesters.",
}, []string{"ingester"})
ingesterAppendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
ingesterAppendFailures = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_ingester_append_failures_total",
Help: "The total number of failed batch appends sent to ingesters.",
}, []string{"ingester"})
)

func init() {
prometheus.MustRegister(sendDuration)
prometheus.MustRegister(ingesterAppends)
prometheus.MustRegister(ingesterAppendFailures)
}
bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_bytes_received_total",
Help: "The total number of uncompressed bytes received per instance",
}, []string{"instance"})
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_lines_received_total",
Help: "The total number of lines received per instance",
}, []string{"instance"})
)

// Config for a Distributor.
type Config struct {
Expand Down Expand Up @@ -97,6 +97,18 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, err
}

// Track metrics.
bytesCount := 0
lineCount := 0
for _, stream := range req.Streams {
for _, entry := range stream.Entries {
bytesCount += len(entry.Line)
lineCount++
}
}
bytesIngested.WithLabelValues(userID).Add(float64(bytesCount))
linesIngested.WithLabelValues(userID).Add(float64(lineCount))

// First we flatten out the request into a list of samples.
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {

if len(stream.chunks) == 0 {
delete(instance.streams, stream.fp)
instance.streamsRemovedTotal.Inc()
}
}

Expand Down
26 changes: 14 additions & 12 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

Expand All @@ -28,37 +29,37 @@ var (
)

var (
streamsCreatedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
streamsCreatedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_streams_created_total",
Help: "The total number of streams created in the ingester.",
}, []string{"org"})
streamsRemovedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Help: "The total number of streams created per instance in the ingester.",
}, []string{"instance"})
streamsRemovedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_streams_removed_total",
Help: "The total number of streams removed by the ingester.",
}, []string{"org"})
Help: "The total number of streams removed per instance by the ingester.",
}, []string{"instance"})
)

func init() {
prometheus.MustRegister(streamsCreatedTotal)
prometheus.MustRegister(streamsRemovedTotal)
}

type instance struct {
streamsMtx sync.RWMutex
streams map[model.Fingerprint]*stream
index *index.InvertedIndex

instanceID string

streamsCreatedTotal prometheus.Counter
streamsRemovedTotal prometheus.Counter
}

func newInstance(instanceID string) *instance {
streamsCreatedTotal.WithLabelValues(instanceID).Inc()
return &instance{
streams: map[model.Fingerprint]*stream{},
index: index.New(),
instanceID: instanceID,

streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),
}
}

Expand All @@ -78,6 +79,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
stream = newStream(fp, labels)
i.index.Add(labels, fp)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
}

if err := stream.Push(ctx, s.Entries); err != nil {
Expand Down