Skip to content

Commit

Permalink
Tenant metrics (#135)
Browse files Browse the repository at this point in the history
* Fix stream tracking metric

Signed-off-by: Goutham Veeramachaneni <[email protected]>

* Track lines and bytes received per user

Signed-off-by: Goutham Veeramachaneni <[email protected]>

* Use instance instead of org

Signed-off-by: Goutham Veeramachaneni <[email protected]>

* Nuke dead code

Usage has been removed as part of perf-optimisation:
cortexproject/cortex@3006e39#diff-180f56d9aaf5a9aa079e6fc9cfcf1bc8L365

Signed-off-by: Goutham Veeramachaneni <[email protected]>
  • Loading branch information
gouthamve authored Dec 18, 2018
1 parent 56db733 commit 7bcdf0e
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 26 deletions.
40 changes: 26 additions & 14 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"

Expand All @@ -19,29 +20,28 @@ import (
)

var (
sendDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "distributor_send_duration_seconds",
Help: "Time spent sending a sample batch to multiple replicated ingesters.",
Buckets: []float64{.001, .0025, .005, .01, .025, .05, .1, .25, .5, 1},
}, []string{"method", "status_code"})
ingesterAppends = prometheus.NewCounterVec(prometheus.CounterOpts{
ingesterAppends = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_ingester_appends_total",
Help: "The total number of batch appends sent to ingesters.",
}, []string{"ingester"})
ingesterAppendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
ingesterAppendFailures = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_ingester_append_failures_total",
Help: "The total number of failed batch appends sent to ingesters.",
}, []string{"ingester"})
)

func init() {
prometheus.MustRegister(sendDuration)
prometheus.MustRegister(ingesterAppends)
prometheus.MustRegister(ingesterAppendFailures)
}
bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_bytes_received_total",
Help: "The total number of uncompressed bytes received per instance",
}, []string{"instance"})
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_lines_received_total",
Help: "The total number of lines received per instance",
}, []string{"instance"})
)

// Config for a Distributor.
type Config struct {
Expand Down Expand Up @@ -97,6 +97,18 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, err
}

// Track metrics.
bytesCount := 0
lineCount := 0
for _, stream := range req.Streams {
for _, entry := range stream.Entries {
bytesCount += len(entry.Line)
lineCount++
}
}
bytesIngested.WithLabelValues(userID).Add(float64(bytesCount))
linesIngested.WithLabelValues(userID).Add(float64(lineCount))

// First we flatten out the request into a list of samples.
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {

if len(stream.chunks) == 0 {
delete(instance.streams, stream.fp)
instance.streamsRemovedTotal.Inc()
}
}

Expand Down
26 changes: 14 additions & 12 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

Expand All @@ -28,37 +29,37 @@ var (
)

var (
streamsCreatedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
streamsCreatedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_streams_created_total",
Help: "The total number of streams created in the ingester.",
}, []string{"org"})
streamsRemovedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Help: "The total number of streams created per instance in the ingester.",
}, []string{"instance"})
streamsRemovedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_streams_removed_total",
Help: "The total number of streams removed by the ingester.",
}, []string{"org"})
Help: "The total number of streams removed per instance by the ingester.",
}, []string{"instance"})
)

func init() {
prometheus.MustRegister(streamsCreatedTotal)
prometheus.MustRegister(streamsRemovedTotal)
}

type instance struct {
streamsMtx sync.RWMutex
streams map[model.Fingerprint]*stream
index *index.InvertedIndex

instanceID string

streamsCreatedTotal prometheus.Counter
streamsRemovedTotal prometheus.Counter
}

func newInstance(instanceID string) *instance {
streamsCreatedTotal.WithLabelValues(instanceID).Inc()
return &instance{
streams: map[model.Fingerprint]*stream{},
index: index.New(),
instanceID: instanceID,

streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),
}
}

Expand All @@ -78,6 +79,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
stream = newStream(fp, labels)
i.index.Add(labels, fp)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
}

if err := stream.Push(ctx, s.Entries); err != nil {
Expand Down

0 comments on commit 7bcdf0e

Please sign in to comment.