Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receiver: Validate labels in write requests #5508

Merged
merged 13 commits into from
Aug 6, 2022
6 changes: 3 additions & 3 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ type HealthStats struct {
MetricLabelValuesCount int64
}

// PrometheusIssue5372Err returns an error if the HealthStats object indicates
// OutOfOrderLabelsErr returns an error if the HealthStats object indicates
// postings with out of order labels. This is corrected by Prometheus Issue
// #5372 and affects Prometheus versions 2.8.0 and below.
func (i HealthStats) PrometheusIssue5372Err() error {
func (i HealthStats) OutOfOrderLabelsErr() error {
if i.OutOfOrderLabels > 0 {
return errors.Errorf("index contains %d postings with out of order labels",
i.OutOfOrderLabels)
Expand Down Expand Up @@ -157,7 +157,7 @@ func (i HealthStats) AnyErr() error {
errMsg = append(errMsg, err.Error())
}

if err := i.PrometheusIssue5372Err(); err != nil {
if err := i.OutOfOrderLabelsErr(); err != nil {
errMsg = append(errMsg, err.Error())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}

if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil {
if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil {
return errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}
Expand Down
31 changes: 26 additions & 5 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,13 +812,34 @@ func isConflict(err error) bool {
return false
}
return err == errConflict ||
err == storage.ErrDuplicateSampleForTimestamp ||
isSampleConflictErr(err) ||
isExemplarConflictErr(err) ||
isLabelsConflictErr(err) ||
status.Code(err) == codes.AlreadyExists
}

// isSampleConflictErr returns whether or not the given error represents
// a sample-related conflict.
func isSampleConflictErr(err error) bool {
return err == storage.ErrDuplicateSampleForTimestamp ||
err == storage.ErrOutOfOrderSample ||
err == storage.ErrOutOfBounds ||
err == storage.ErrDuplicateExemplar ||
err == storage.ErrOutOfBounds
}

// isExemplarConflictErr returns whether or not the given error represents
// a exemplar-related conflict.
func isExemplarConflictErr(err error) bool {
return err == storage.ErrDuplicateExemplar ||
err == storage.ErrOutOfOrderExemplar ||
err == storage.ErrExemplarLabelLength ||
status.Code(err) == codes.AlreadyExists
err == storage.ErrExemplarLabelLength
}

// isLabelsConflictErr returns whether or not the given error represents
// a labels-related conflict.
func isLabelsConflictErr(err error) bool {
return err == labelpb.ErrDuplicateLabels ||
err == labelpb.ErrEmptyLabels ||
err == labelpb.ErrOutOfOrderLabels
}

// isNotReady returns whether or not the given error represents a not ready error.
Expand Down
26 changes: 25 additions & 1 deletion pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ func TestDetermineWriteErrorCause(t *testing.T) {
threshold: 1,
exp: errConflict,
},
{
name: "matching multierror (labels error)",
err: errutil.NonNilMultiError([]error{
labelpb.ErrEmptyLabels,
errors.New("foo"),
errors.New("bar"),
}),
threshold: 1,
exp: errConflict,
},
{
name: "matching but below threshold multierror",
err: errutil.NonNilMultiError([]error{
Expand Down Expand Up @@ -163,7 +173,7 @@ func TestDetermineWriteErrorCause(t *testing.T) {
exp: errConflict,
},
{
name: "matching multierror many, both above threshold, conflict have precedence",
name: "matching multierror many, both above threshold, conflict has precedence",
err: errutil.NonNilMultiError([]error{
storage.ErrOutOfOrderSample,
errConflict,
Expand All @@ -176,6 +186,20 @@ func TestDetermineWriteErrorCause(t *testing.T) {
threshold: 2,
exp: errConflict,
},
{
name: "matching multierror many, both above threshold, conflict has precedence (labels error)",
err: errutil.NonNilMultiError([]error{
labelpb.ErrDuplicateLabels,
labelpb.ErrDuplicateLabels,
tsdb.ErrNotReady,
tsdb.ErrNotReady,
tsdb.ErrNotReady,
labelpb.ErrDuplicateLabels,
errors.New("foo"),
}),
threshold: 2,
exp: errConflict,
},
{
name: "nested matching multierror",
err: errors.Wrap(errors.Wrap(errutil.NonNilMultiError([]error{
Expand Down
68 changes: 53 additions & 15 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
tLogger := log.With(r.logger, "tenant", tenantID)

var (
numOutOfOrder = 0
numDuplicates = 0
numOutOfBounds = 0
numLabelsOutOfOrder = 0
numLabelsDuplicates = 0
numLabelsEmpty = 0

numSamplesOutOfOrder = 0
numSamplesDuplicates = 0
numSamplesOutOfBounds = 0

numExemplarsOutOfOrder = 0
numExemplarsDuplicate = 0
numExemplarsLabelLength = 0
Expand All @@ -70,6 +75,25 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
errs errutil.MultiError
)
for _, t := range wreq.Timeseries {
// Check if time series labels are valid. If not, skip the time series
// and report the error.
err := labelpb.ValidateLabels(t.Labels)
if err != nil {
switch err {
case labelpb.ErrOutOfOrderLabels:
numLabelsOutOfOrder++
level.Debug(tLogger).Log("msg", "Out of order labels in the label set", "lset", t.Labels)
case labelpb.ErrDuplicateLabels:
numLabelsDuplicates++
level.Debug(tLogger).Log("msg", "Duplicate labels in the label set", "lset", t.Labels)
case labelpb.ErrEmptyLabels:
numLabelsEmpty++
level.Debug(tLogger).Log("msg", "Labels with empty name in the label set", "lset", t.Labels)
}

continue
matej-g marked this conversation as resolved.
Show resolved Hide resolved
}
matej-g marked this conversation as resolved.
Show resolved Hide resolved

lset := labelpb.ZLabelsToPromLabels(t.Labels)

// Check if the TSDB has cached reference for those labels.
Expand All @@ -86,13 +110,13 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
ref, err = app.Append(ref, lset, s.Timestamp, s.Value)
switch err {
case storage.ErrOutOfOrderSample:
numOutOfOrder++
numSamplesOutOfOrder++
level.Debug(tLogger).Log("msg", "Out of order sample", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
numSamplesDuplicates++
level.Debug(tLogger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
case storage.ErrOutOfBounds:
numOutOfBounds++
numSamplesOutOfBounds++
level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
}
}
Expand Down Expand Up @@ -129,18 +153,32 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
}
}

if numOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numOutOfOrder)
errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "add %d samples", numOutOfOrder))
if numLabelsOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on series with out-of-order labels", "numDropped", numLabelsOutOfOrder)
errs.Add(errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add %d series", numLabelsOutOfOrder))
}
if numLabelsDuplicates > 0 {
level.Warn(tLogger).Log("msg", "Error on series with duplicate labels", "numDropped", numLabelsDuplicates)
errs.Add(errors.Wrapf(labelpb.ErrDuplicateLabels, "add %d series", numLabelsDuplicates))
}
if numLabelsEmpty > 0 {
level.Warn(tLogger).Log("msg", "Error on series with empty label name or value", "numDropped", numLabelsEmpty)
errs.Add(errors.Wrapf(labelpb.ErrEmptyLabels, "add %d series", numLabelsEmpty))
}

if numSamplesOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numSamplesOutOfOrder)
errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "add %d samples", numSamplesOutOfOrder))
}
if numDuplicates > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numDuplicates)
errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numDuplicates))
if numSamplesDuplicates > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numSamplesDuplicates)
errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numSamplesDuplicates))
}
if numOutOfBounds > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numOutOfBounds)
errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numOutOfBounds))
if numSamplesOutOfBounds > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numSamplesOutOfBounds)
errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numSamplesOutOfBounds))
}

if numExemplarsOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order exemplars", "numDropped", numExemplarsOutOfOrder)
errs.Add(errors.Wrapf(storage.ErrOutOfOrderExemplar, "add %d exemplars", numExemplarsOutOfOrder))
Expand Down
98 changes: 98 additions & 0 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,94 @@ func TestWriter(t *testing.T) {
expectedIngested []prompb.TimeSeries
maxExemplars int64
}{
"should error out on series with no labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
{
Labels: []labelpb.ZLabel{{Name: "__name__", Value: ""}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: errors.Wrapf(labelpb.ErrEmptyLabels, "add 2 series"),
},
"should succeed on series with valid labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: nil,
expectedIngested: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
"should error out and skip series with out-of-order labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "1"}, labelpb.ZLabel{Name: "Z", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add 1 series"),
},
"should error out and skip series with duplicate labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}, labelpb.ZLabel{Name: "z", Value: "1"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: errors.Wrapf(labelpb.ErrDuplicateLabels, "add 1 series"),
},
"should error out and skip series with out-of-order labels; accept series with valid labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "A", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
{
Labels: append(lbls, labelpb.ZLabel{Name: "c", Value: "1"}, labelpb.ZLabel{Name: "d", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
{
Labels: append(lbls, labelpb.ZLabel{Name: "E", Value: "1"}, labelpb.ZLabel{Name: "f", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add 2 series"),
expectedIngested: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "c", Value: "1"}, labelpb.ZLabel{Name: "d", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
"should succeed on valid series with exemplars": {
reqs: []*prompb.WriteRequest{{
Timeseries: []prompb.TimeSeries{
Expand Down Expand Up @@ -172,6 +260,16 @@ func TestWriter(t *testing.T) {
testutil.Equals(t, testData.expectedErr.Error(), err.Error())
}
}

// On each expected series, assert we have a ref available.
a, err := app.Appender(context.Background())
testutil.Ok(t, err)
gr := a.(storage.GetRef)

for _, ts := range testData.expectedIngested {
ref, _ := gr.GetRef(labelpb.ZLabelsToPromLabels(ts.Labels))
testutil.Assert(t, ref != 0, fmt.Sprintf("appender should have reference to series %v", ts))
}
})
}
}
Expand Down
42 changes: 41 additions & 1 deletion pkg/store/labelpb/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ import (
"github.com/prometheus/prometheus/model/labels"
)

var sep = []byte{'\xff'}
var (
ErrOutOfOrderLabels = errors.New("out of order labels")
ErrEmptyLabels = errors.New("label set contains a label with empty name or value")
ErrDuplicateLabels = errors.New("label set contains duplicate label names")

sep = []byte{'\xff'}
)

func noAllocString(buf []byte) string {
return *(*string)(unsafe.Pointer(&buf))
Expand Down Expand Up @@ -365,6 +371,40 @@ func HashWithPrefix(prefix string, lbls []ZLabel) uint64 {
return xxhash.Sum64(b)
}

// ValidateLabels validates label names and values (checks for empty
// names and values, out of order labels and duplicate label names)
// Returns appropriate error if validation fails on a label.
func ValidateLabels(lbls []ZLabel) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any concerns with increasing write latency due to this added validation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could benchmark it so that we can make an informed decision about how much the latency changes. Maybe the validation is negligible compared to the storing or the metrics in the TSDB, maybe not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking if possible would be great. If that's too complicated, I can keep an eye on this after deploying.

Now that you mentioned TSDB, I wonder why it even accepts writing samples with OOO labels. Should we also open an issue in prometheus/prometheus?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is a valid concern, I'd see how complicated it would be to add a benchmark for this.

As for TSDB, I'm not sure whether this was ever a requirement or a problem, I've seen various reference to ordering sprinkled throughout the repo / codebase (prometheus/prometheus#8861 (comment), prometheus/prometheus#10532, prometheus/prometheus#5372), but I cannot find any authoritative doc saying how labels must be ordered at time of ingestion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually realized that we can simplify the validation, since we are expecting the labels to be ordered, we can first check order and then duplication, meaning we don't have to use a map to check duplicates (saving us allocations).

I ran benchmarks from #5533 on old and new code and there's no discernible difference:

name                                     old time/op    new time/op    delta
WriterTimeSeriesWithSingleLabel_10-12      4.08µs ± 0%    4.02µs ± 0%   ~     (p=1.000 n=1+1)
WriterTimeSeriesWithSingleLabel_100-12     22.6µs ± 0%    21.3µs ± 0%   ~     (p=1.000 n=1+1)
WriterTimeSeriesWithSingleLabel_1000-12     238µs ± 0%     197µs ± 0%   ~     (p=1.000 n=1+1)
WriterTimeSeriesWith10Labels_10-12         6.18µs ± 0%    5.57µs ± 0%   ~     (p=1.000 n=1+1)
WriterTimeSeriesWith10Labels_100-12        37.9µs ± 0%    41.1µs ± 0%   ~     (p=1.000 n=1+1)
WriterTimeSeriesWith10Labels_1000-12        368µs ± 0%     430µs ± 0%   ~     (p=1.000 n=1+1)

name                                     old alloc/op   new alloc/op   delta
WriterTimeSeriesWithSingleLabel_10-12        411B ± 0%      411B ± 0%   ~     (all equal)
WriterTimeSeriesWithSingleLabel_100-12       413B ± 0%      411B ± 0%   ~     (p=1.000 n=1+1)
WriterTimeSeriesWithSingleLabel_1000-12      454B ± 0%      453B ± 0%   ~     (p=1.000 n=1+1)
WriterTimeSeriesWith10Labels_10-12           410B ± 0%      410B ± 0%   ~     (all equal)
WriterTimeSeriesWith10Labels_100-12          411B ± 0%      412B ± 0%   ~     (p=1.000 n=1+1)
WriterTimeSeriesWith10Labels_1000-12         460B ± 0%      471B ± 0%   ~     (p=1.000 n=1+1)

name                                     old allocs/op  new allocs/op  delta
WriterTimeSeriesWithSingleLabel_10-12        9.00 ± 0%      9.00 ± 0%   ~     (all equal)
WriterTimeSeriesWithSingleLabel_100-12       9.00 ± 0%      9.00 ± 0%   ~     (all equal)
WriterTimeSeriesWithSingleLabel_1000-12      9.00 ± 0%      9.00 ± 0%   ~     (all equal)
WriterTimeSeriesWith10Labels_10-12           9.00 ± 0%      9.00 ± 0%   ~     (all equal)
WriterTimeSeriesWith10Labels_100-12          9.00 ± 0%      9.00 ± 0%   ~     (all equal)
WriterTimeSeriesWith10Labels_1000-12         9.00 ± 0%      9.00 ± 0%   ~     (all equal)

if len(lbls) == 0 {
return ErrEmptyLabels
}

// Check first label.
l0 := lbls[0]
if l0.Name == "" || l0.Value == "" {
return ErrEmptyLabels
}

// Iterate over the rest, check each for empty / duplicates and
// check lexicographical (alphabetically) ordering.
for _, l := range lbls[1:] {
if l.Name == "" || l.Value == "" {
return ErrEmptyLabels
}

if l.Name == l0.Name {
return ErrDuplicateLabels
}

if l.Name < l0.Name {
return ErrOutOfOrderLabels
}
l0 = l
}

return nil
}

// ZLabelSets is a sortable list of ZLabelSet. It assumes the label pairs in each ZLabelSet element are already sorted.
type ZLabelSets []ZLabelSet

Expand Down
Loading