Skip to content

Commit

Permalink
Promtail: Add a stream lagging metric (#2618)
Browse files Browse the repository at this point in the history
* add a metric which displays how far a stream lags the last timestamp in a batch

* register your metrics

* recorded metric in the wrong spot, moved to the correct spot

* cardinality is much too high if we add all the stream labels, instead just use filename.

* work on using an interface to set/remove metrics

* reworking interface again

* for now just register this metric based on filename only, we can improve this in the future if the need arises

* change the timeout to 1min for expiring the metric

* add host label

* include host label when removing metric

* change the batch size for promtail from 100kb to 1Mb, this will not affect sending for lower volumes which are still driven by the BatchWait setting. Through the addition of this metric it was found that for higher volume files > 100kb/sec this was much too low causing too many batches to be unnecessarily sent.
  • Loading branch information
slim-bean authored Sep 18, 2020
1 parent 29790ea commit 0b1dbe2
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 7 deletions.
11 changes: 11 additions & 0 deletions pkg/logentry/metric/metricvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ func (c *metricVec) With(labels model.LabelSet) prometheus.Metric {
return metric
}

func (c *metricVec) Delete(labels model.LabelSet) bool {
c.mtx.Lock()
defer c.mtx.Unlock()
fp := labels.Fingerprint()
_, ok := c.metrics[fp]
if ok {
delete(c.metrics, fp)
}
return ok
}

// prune will remove all metrics which implement the Expirable interface and have expired
// it does not take out a lock on the metrics map so whoever calls this function should do so.
func (c *metricVec) prune() {
Expand Down
5 changes: 5 additions & 0 deletions pkg/promtail/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"github.com/prometheus/common/model"
)

type InstrumentedEntryHandler interface {
EntryHandler
UnregisterLatencyMetric(labels model.LabelSet)
}

// EntryHandler is something that can "handle" entries.
type EntryHandler interface {
Handle(labels model.LabelSet, time time.Time, entry string) error
Expand Down
54 changes: 48 additions & 6 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"sync"
"time"

"github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/loki/pkg/logentry/metric"
"github.com/grafana/loki/pkg/promtail/api"

"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -34,39 +37,43 @@ const (
// Label reserved to override the tenant ID while processing
// pipeline stages
ReservedLabelTenantID = "__tenant_id__"

LatencyLabel = "filename"
HostLabel = "host"
)

var (
encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "encoded_bytes_total",
Help: "Number of bytes encoded and ready to send.",
}, []string{"host"})
}, []string{HostLabel})
sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_bytes_total",
Help: "Number of bytes sent.",
}, []string{"host"})
}, []string{HostLabel})
droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_bytes_total",
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.",
}, []string{"host"})
}, []string{HostLabel})
sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "sent_entries_total",
Help: "Number of log entries sent to the ingester.",
}, []string{"host"})
}, []string{HostLabel})
droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "promtail",
Name: "dropped_entries_total",
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.",
}, []string{"host"})
}, []string{HostLabel})
requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "promtail",
Name: "request_duration_seconds",
Help: "Duration of send requests.",
}, []string{"status_code", "host"})
}, []string{"status_code", HostLabel})
streamLag *metric.Gauges

countersWithHost = []*prometheus.CounterVec{
encodedBytes, sentBytes, droppedBytes, sentEntries, droppedEntries,
Expand All @@ -82,6 +89,16 @@ func init() {
prometheus.MustRegister(sentEntries)
prometheus.MustRegister(droppedEntries)
prometheus.MustRegister(requestDuration)
var err error
streamLag, err = metric.NewGauges("promtail_stream_lag_seconds",
"Difference between current time and last batch timestamp for successful sends",
metric.GaugeConfig{Action: "set"},
int64(1*time.Minute.Seconds()), // This strips out files which update slowly and reduces noise in this metric.
)
if err != nil {
panic(err)
}
prometheus.MustRegister(streamLag)
}

// Client pushes entries to Loki and can be stopped
Expand Down Expand Up @@ -234,6 +251,26 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
if err == nil {
sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
for _, s := range batch.streams {
lbls, err := parser.ParseMetric(s.Labels)
if err != nil {
// is this possible?
level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err)
return
}
var lblSet model.LabelSet
for i := range lbls {
if lbls[i].Name == LatencyLabel {
lblSet = model.LabelSet{
model.LabelName(HostLabel): model.LabelValue(c.cfg.URL.Host),
model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value),
}
}
}
if lblSet != nil {
streamLag.With(lblSet).Set(time.Now().Sub(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
}
}
return
}

Expand Down Expand Up @@ -330,3 +367,8 @@ func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {
}}
return nil
}

func (c *client) UnregisterLatencyMetric(labels model.LabelSet) {
labels[HostLabel] = model.LabelValue(c.cfg.URL.Host)
streamLag.Delete(labels)
}
2 changes: 1 addition & 1 deletion pkg/promtail/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Config struct {
func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Var(&c.URL, prefix+"client.url", "URL of log server")
f.DurationVar(&c.BatchWait, prefix+"client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.")
f.IntVar(&c.BatchSize, prefix+"client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ")
f.IntVar(&c.BatchSize, prefix+"client.batch-size-bytes", 1024*1024, "Maximum batch size to accrue before sending. ")
// Default backoff schedule: 0.5s, 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s(4.267m) For a total time of 511.5s(8.5m) before logs are lost
f.IntVar(&c.BackoffConfig.MaxRetries, prefix+"client.max-retries", 10, "Maximum number of retires when sending batches.")
f.DurationVar(&c.BackoffConfig.MinBackoff, prefix+"client.min-backoff", 500*time.Millisecond, "Initial backoff time between retries.")
Expand Down
4 changes: 4 additions & 0 deletions pkg/promtail/targets/file/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/client"
"github.com/grafana/loki/pkg/promtail/positions"
"github.com/grafana/loki/pkg/promtail/targets/target"
)
Expand Down Expand Up @@ -316,6 +317,9 @@ func (t *FileTarget) stopTailingAndRemovePosition(ps []string) {
t.positions.Remove(tailer.path)
delete(t.tails, p)
}
if h, ok := t.handler.(api.InstrumentedEntryHandler); ok {
h.UnregisterLatencyMetric(model.LabelSet{model.LabelName(client.LatencyLabel): model.LabelValue(p)})
}
}
}

Expand Down

0 comments on commit 0b1dbe2

Please sign in to comment.