Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retention label to loki_distributor_bytes_received_total metrics #3840

Merged
merged 8 commits into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions clients/pkg/promtail/targets/lokipush/pushtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"

"github.com/grafana/loki/pkg/loghttp/push"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
)

type PushTarget struct {
Expand Down Expand Up @@ -107,7 +107,7 @@ func (t *PushTarget) run() error {
func (t *PushTarget) handle(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := user.ExtractOrgID(r.Context())
req, err := util.ParseRequest(logger, userID, r)
req, err := push.ParseRequest(logger, userID, r, nil)
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/operations/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ The Loki Distributors expose the following metrics:
| ------------------------------------------------- | ----------- | ------------------------------------------------------------------------------------------------------------------------------------ |
| `loki_distributor_ingester_appends_total` | Counter | The total number of batch appends sent to ingesters. |
| `loki_distributor_ingester_append_failures_total` | Counter | The total number of failed batch appends sent to ingesters. |
| `loki_distributor_bytes_received_total` | Counter | The total number of uncompressed bytes received per tenant. |
| `loki_distributor_bytes_received_total` | Counter | The total number of uncompressed bytes received per both tenant and retention hours. |
| `loki_distributor_lines_received_total` | Counter | The total number of log _entries_ received per tenant (not necessarily of _lines_, as an entry can have more than one line of text). |

The Loki Ingesters expose the following metrics:
Expand Down
19 changes: 10 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/validation"
)

var (
maxLabelCacheSize = 100000
)
var maxLabelCacheSize = 100000

// Config for a Distributor.
type Config struct {
Expand All @@ -53,12 +52,13 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
type Distributor struct {
services.Service

cfg Config
clientCfg client.Config
tenantConfigs *runtime.TenantConfigs
ingestersRing ring.ReadRing
validator *Validator
pool *ring_client.Pool
cfg Config
clientCfg client.Config
tenantConfigs *runtime.TenantConfigs
tenantsRetention *retention.TenantsRetention
ingestersRing ring.ReadRing
validator *Validator
pool *ring_client.Pool

// The global rate limiter requires a distributors ring to count
// the number of healthy instances.
Expand Down Expand Up @@ -118,6 +118,7 @@ func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, in
cfg: cfg,
clientCfg: clientCfg,
tenantConfigs: configs,
tenantsRetention: retention.NewTenantsRetention(overrides),
ingestersRing: ingestersRing,
distributorsRing: distributorsRing,
validator: validator,
Expand Down
5 changes: 2 additions & 3 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,16 @@ import (

util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/loghttp/push"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

lokiutil "github.com/grafana/loki/pkg/util"
)

// PushHandler reads a snappy-compressed proto from the HTTP body.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := user.ExtractOrgID(r.Context())
req, err := lokiutil.ParseRequest(logger, userID, r)
req, err := push.ParseRequest(logger, userID, r, d.tenantsRetention)
if err != nil {
if d.tenantConfigs.LogPushRequest(userID) {
level.Debug(logger).Log(
Expand Down
108 changes: 59 additions & 49 deletions pkg/util/parse.go → pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package push

import (
"compress/gzip"
Expand All @@ -14,22 +14,24 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
loki_util "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/unmarshal"
unmarshal2 "github.com/grafana/loki/pkg/util/unmarshal/legacy"
)

var (
contentType = http.CanonicalHeaderKey("Content-Type")
contentEnc = http.CanonicalHeaderKey("Content-Encoding")

contentType = http.CanonicalHeaderKey("Content-Type")
contentEnc = http.CanonicalHeaderKey("Content-Encoding")
bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant",
}, []string{"tenant"})
}, []string{"tenant", "retention_hours"})
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_lines_received_total",
Expand All @@ -39,12 +41,15 @@ var (

const applicationJSON = "application/json"

func ParseRequest(logger log.Logger, userID string, r *http.Request) (*logproto.PushRequest, error) {
type TenantsRetention interface {
RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration
}

func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, error) {
// Body
var body io.Reader
// bodySize should always reflect the compressed size of the request body
bodySize := NewSizeReader(r.Body)
bodySize := loki_util.NewSizeReader(r.Body)
contentEncoding := r.Header.Get(contentEnc)
switch contentEncoding {
case "":
Expand All @@ -66,48 +71,12 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request) (*logproto.
}

contentType := r.Header.Get(contentType)
var req logproto.PushRequest

defer func() {
var (
entriesSize int64
streamLabelsSize int64
totalEntries int64
)

mostRecentEntry := time.Unix(0, 0)

for _, s := range req.Streams {
streamLabelsSize += int64(len(s.Labels))
for _, e := range s.Entries {
totalEntries++
entriesSize += int64(len(e.Line))
if e.Timestamp.After(mostRecentEntry) {
mostRecentEntry = e.Timestamp
}
}
}

// incrementing tenant metrics if we have a tenant.
if totalEntries != 0 && userID != "" {
bytesIngested.WithLabelValues(userID).Add(float64(entriesSize))
linesIngested.WithLabelValues(userID).Add(float64(totalEntries))
}

level.Debug(logger).Log(
"msg", "push request parsed",
"path", r.URL.Path,
"contentType", contentType,
"contentEncoding", contentEncoding,
"bodySize", humanize.Bytes(uint64(bodySize.Size())),
"streams", len(req.Streams),
"entries", totalEntries,
"streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)),
"entriesSize", humanize.Bytes(uint64(entriesSize)),
"totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)),
"mostRecentLagMs", time.Since(mostRecentEntry).Milliseconds(),
)
}()
var (
entriesSize int64
streamLabelsSize int64
totalEntries int64
req logproto.PushRequest
)

switch contentType {
case applicationJSON:
Expand All @@ -132,5 +101,46 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request) (*logproto.
return nil, err
}
}

mostRecentEntry := time.Unix(0, 0)

for _, s := range req.Streams {
streamLabelsSize += int64(len(s.Labels))
var retentionHours string
if tenantsRetention != nil {
lbs, err := logql.ParseLabels(s.Labels)
if err != nil {
return nil, err
}
retentionHours = fmt.Sprintf("%d", int64(math.Floor(tenantsRetention.RetentionPeriodFor(userID, lbs).Hours())))
}
for _, e := range s.Entries {
totalEntries++
entriesSize += int64(len(e.Line))
bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(int64(len(e.Line))))
if e.Timestamp.After(mostRecentEntry) {
mostRecentEntry = e.Timestamp
}
}
}

// incrementing tenant metrics if we have a tenant.
if totalEntries != 0 && userID != "" {
linesIngested.WithLabelValues(userID).Add(float64(totalEntries))
}

level.Debug(logger).Log(
"msg", "push request parsed",
"path", r.URL.Path,
"contentType", contentType,
"contentEncoding", contentEncoding,
"bodySize", humanize.Bytes(uint64(bodySize.Size())),
"streams", len(req.Streams),
"entries", totalEntries,
"streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)),
"entriesSize", humanize.Bytes(uint64(entriesSize)),
"totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)),
"mostRecentLagMs", time.Since(mostRecentEntry).Milliseconds(),
)
return &req, nil
}
25 changes: 16 additions & 9 deletions pkg/util/parse_test.go → pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package push

import (
"bytes"
Expand Down Expand Up @@ -26,7 +26,7 @@ func gzipString(source string) string {
}

func TestParseRequest(t *testing.T) {
var tests = []struct {
tests := []struct {
path string
body string
contentType string
Expand All @@ -37,35 +37,42 @@ func TestParseRequest(t *testing.T) {
path: `/loki/api/v1/push`,
body: ``,
contentType: `application/json`,
valid: false},
valid: false,
},
{
path: `/loki/api/v1/push`,
body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`,
contentType: ``,
valid: false},
valid: false,
},
{
path: `/loki/api/v1/push`,
body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`,
contentType: `application/json`,
valid: true},
valid: true,
},
{
path: `/loki/api/v1/push`,
body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`,
contentType: `application/json`,
contentEncoding: ``,
valid: true},
valid: true,
},
{
path: `/loki/api/v1/push`,
body: gzipString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`),
contentType: `application/json`,
contentEncoding: `gzip`,
valid: true},
valid: true,
},
{
path: `/loki/api/v1/push`,
body: gzipString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`),
contentType: `application/json`,
contentEncoding: `snappy`,
valid: false}}
valid: false,
},
}

// Testing input array
for index, test := range tests {
Expand All @@ -76,7 +83,7 @@ func TestParseRequest(t *testing.T) {
if len(test.contentEncoding) > 0 {
request.Header.Add("Content-Encoding", test.contentEncoding)
}
data, err := ParseRequest(util_log.Logger, "", request)
data, err := ParseRequest(util_log.Logger, "", request, nil)
if test.valid {
assert.Nil(t, err, "Should not give error for %d", index)
assert.NotNil(t, data, "Should give data for %d", index)
Expand Down
30 changes: 23 additions & 7 deletions pkg/storage/stores/shipper/compactor/retention/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/validation"
)
Expand All @@ -13,7 +14,7 @@ type ExpirationChecker interface {
}

type expirationChecker struct {
limits Limits
tenantsRetention *TenantsRetention
}

type Limits interface {
Expand All @@ -23,23 +24,38 @@ type Limits interface {

func NewExpirationChecker(limits Limits) ExpirationChecker {
return &expirationChecker{
limits: limits,
tenantsRetention: NewTenantsRetention(limits),
}
}

// Expired tells if a ref chunk is expired based on retention rules.
func (e *expirationChecker) Expired(ref ChunkEntry, now model.Time) (bool, []model.Interval) {
userID := unsafeGetString(ref.UserID)
streamRetentions := e.limits.StreamRetention(userID)
globalRetention := e.limits.RetentionPeriod(userID)
period := e.tenantsRetention.RetentionPeriodFor(userID, ref.Labels)
return now.Sub(ref.Through) > period, nil
}

type TenantsRetention struct {
limits Limits
}

func NewTenantsRetention(l Limits) *TenantsRetention {
return &TenantsRetention{
limits: l,
}
}

func (tr *TenantsRetention) RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration {
streamRetentions := tr.limits.StreamRetention(userID)
globalRetention := tr.limits.RetentionPeriod(userID)
var (
matchedRule validation.StreamRetention
found bool
)
Outer:
for _, streamRetention := range streamRetentions {
for _, m := range streamRetention.Matchers {
if !m.Matches(ref.Labels.Get(m.Name)) {
if !m.Matches(lbs.Get(m.Name)) {
continue Outer
}
}
Expand All @@ -58,7 +74,7 @@ Outer:
matchedRule = streamRetention
}
if found {
return now.Sub(ref.Through) > time.Duration(matchedRule.Period), nil
return time.Duration(matchedRule.Period)
}
return now.Sub(ref.Through) > globalRetention, nil
return globalRetention
}