Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receiver: Validate labels in write requests #5508

Merged
merged 13 commits into from
Aug 6, 2022
6 changes: 3 additions & 3 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ type HealthStats struct {
MetricLabelValuesCount int64
}

// PrometheusIssue5372Err returns an error if the HealthStats object indicates
// OutOfOrderLabelsErr returns an error if the HealthStats object indicates
// postings with out of order labels. This is corrected by Prometheus Issue
// #5372 and affects Prometheus versions 2.8.0 and below.
func (i HealthStats) PrometheusIssue5372Err() error {
func (i HealthStats) OutOfOrderLabelsErr() error {
if i.OutOfOrderLabels > 0 {
return errors.Errorf("index contains %d postings with out of order labels",
i.OutOfOrderLabels)
Expand Down Expand Up @@ -157,7 +157,7 @@ func (i HealthStats) AnyErr() error {
errMsg = append(errMsg, err.Error())
}

if err := i.PrometheusIssue5372Err(); err != nil {
if err := i.OutOfOrderLabelsErr(); err != nil {
errMsg = append(errMsg, err.Error())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}

if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil {
if err := stats.OutOfOrderLabelsErr(); !cg.acceptMalformedIndex && err != nil {
return errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}
Expand Down
31 changes: 26 additions & 5 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,13 +812,34 @@ func isConflict(err error) bool {
return false
}
return err == errConflict ||
err == storage.ErrDuplicateSampleForTimestamp ||
isSampleConflictErr(err) ||
isExemplarConflictErr(err) ||
isLabelsConflictErr(err) ||
status.Code(err) == codes.AlreadyExists
}

// isSampleConflictErr returns whether or not the given error represents
// a sample-related conflict.
func isSampleConflictErr(err error) bool {
return err == storage.ErrDuplicateSampleForTimestamp ||
err == storage.ErrOutOfOrderSample ||
err == storage.ErrOutOfBounds ||
err == storage.ErrDuplicateExemplar ||
err == storage.ErrOutOfBounds
}

// isExemplarConflictErr returns whether or not the given error represents
// a exemplar-related conflict.
func isExemplarConflictErr(err error) bool {
return err == storage.ErrDuplicateExemplar ||
err == storage.ErrOutOfOrderExemplar ||
err == storage.ErrExemplarLabelLength ||
status.Code(err) == codes.AlreadyExists
err == storage.ErrExemplarLabelLength
}

// isLabelsConflictErr returns whether or not the given error represents
// a labels-related conflict.
func isLabelsConflictErr(err error) bool {
return err == labelpb.ErrDuplicateLabels ||
err == labelpb.ErrEmptyLabels ||
err == labelpb.ErrOutOfOrderLabels
}

// isNotReady returns whether or not the given error represents a not ready error.
Expand Down
26 changes: 25 additions & 1 deletion pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ func TestDetermineWriteErrorCause(t *testing.T) {
threshold: 1,
exp: errConflict,
},
{
name: "matching multierror (labels error)",
err: errutil.NonNilMultiError([]error{
labelpb.ErrEmptyLabels,
errors.New("foo"),
errors.New("bar"),
}),
threshold: 1,
exp: errConflict,
},
{
name: "matching but below threshold multierror",
err: errutil.NonNilMultiError([]error{
Expand Down Expand Up @@ -163,7 +173,7 @@ func TestDetermineWriteErrorCause(t *testing.T) {
exp: errConflict,
},
{
name: "matching multierror many, both above threshold, conflict have precedence",
name: "matching multierror many, both above threshold, conflict has precedence",
err: errutil.NonNilMultiError([]error{
storage.ErrOutOfOrderSample,
errConflict,
Expand All @@ -176,6 +186,20 @@ func TestDetermineWriteErrorCause(t *testing.T) {
threshold: 2,
exp: errConflict,
},
{
name: "matching multierror many, both above threshold, conflict has precedence (labels error)",
err: errutil.NonNilMultiError([]error{
labelpb.ErrDuplicateLabels,
labelpb.ErrDuplicateLabels,
tsdb.ErrNotReady,
tsdb.ErrNotReady,
tsdb.ErrNotReady,
labelpb.ErrDuplicateLabels,
errors.New("foo"),
}),
threshold: 2,
exp: errConflict,
},
{
name: "nested matching multierror",
err: errors.Wrap(errors.Wrap(errutil.NonNilMultiError([]error{
Expand Down
106 changes: 75 additions & 31 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
tLogger := log.With(r.logger, "tenant", tenantID)

var (
numOutOfOrder = 0
numDuplicates = 0
numOutOfBounds = 0
numLabelsOutOfOrder = 0
numLabelsDuplicates = 0
numLabelsEmpty = 0

numSamplesOutOfOrder = 0
numSamplesDuplicates = 0
numSamplesOutOfBounds = 0

numExemplarsOutOfOrder = 0
numExemplarsDuplicate = 0
numExemplarsLabelLength = 0
Expand All @@ -70,6 +75,26 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
errs errutil.MultiError
)
for _, t := range wreq.Timeseries {
// Check if time series labels are valid. If not, skip the time series
// and report the error.
if err := labelpb.ValidateLabels(t.Labels); err != nil {
switch err {
case labelpb.ErrOutOfOrderLabels:
numLabelsOutOfOrder++
level.Debug(tLogger).Log("msg", "Out of order labels in the label set", "lset", t.Labels)
case labelpb.ErrDuplicateLabels:
numLabelsDuplicates++
level.Debug(tLogger).Log("msg", "Duplicate labels in the label set", "lset", t.Labels)
case labelpb.ErrEmptyLabels:
numLabelsEmpty++
level.Debug(tLogger).Log("msg", "Labels with empty name in the label set", "lset", t.Labels)
default:
level.Debug(tLogger).Log("msg", "Error validating labels", "err", err)
}

continue
matej-g marked this conversation as resolved.
Show resolved Hide resolved
}

lset := labelpb.ZLabelsToPromLabels(t.Labels)

// Check if the TSDB has cached reference for those labels.
Expand All @@ -86,14 +111,18 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
ref, err = app.Append(ref, lset, s.Timestamp, s.Value)
switch err {
case storage.ErrOutOfOrderSample:
numOutOfOrder++
numSamplesOutOfOrder++
level.Debug(tLogger).Log("msg", "Out of order sample", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
numSamplesDuplicates++
level.Debug(tLogger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
case storage.ErrOutOfBounds:
numOutOfBounds++
numSamplesOutOfBounds++
level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
default:
if err != nil {
level.Debug(tLogger).Log("msg", "Error ingesting sample", "err", err)
}
}
}

Expand All @@ -102,45 +131,60 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
if ref != 0 && len(t.Exemplars) > 0 {
for _, ex := range t.Exemplars {
exLset := labelpb.ZLabelsToPromLabels(ex.Labels)
logger := log.With(tLogger, "exemplarLset", exLset, "exemplar", ex.String())
exLogger := log.With(tLogger, "exemplarLset", exLset, "exemplar", ex.String())

_, err = app.AppendExemplar(ref, lset, exemplar.Exemplar{
if _, err = app.AppendExemplar(ref, lset, exemplar.Exemplar{
Labels: exLset,
Value: ex.Value,
Ts: ex.Timestamp,
HasTs: true,
})
switch err {
case storage.ErrOutOfOrderExemplar:
numExemplarsOutOfOrder++
level.Debug(logger).Log("msg", "Out of order exemplar")
case storage.ErrDuplicateExemplar:
numExemplarsDuplicate++
level.Debug(logger).Log("msg", "Duplicate exemplar")
case storage.ErrExemplarLabelLength:
numExemplarsLabelLength++
level.Debug(logger).Log("msg", "Label length for exemplar exceeds max limit", "limit", exemplar.ExemplarMaxLabelSetLength)
default:
if err != nil {
level.Debug(logger).Log("msg", "Error ingesting exemplar", "err", err)
}); err != nil {
switch err {
case storage.ErrOutOfOrderExemplar:
numExemplarsOutOfOrder++
level.Debug(exLogger).Log("msg", "Out of order exemplar")
case storage.ErrDuplicateExemplar:
numExemplarsDuplicate++
level.Debug(exLogger).Log("msg", "Duplicate exemplar")
case storage.ErrExemplarLabelLength:
numExemplarsLabelLength++
level.Debug(exLogger).Log("msg", "Label length for exemplar exceeds max limit", "limit", exemplar.ExemplarMaxLabelSetLength)
default:
if err != nil {
level.Debug(exLogger).Log("msg", "Error ingesting exemplar", "err", err)
}
}
}
}
}
}

if numOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numOutOfOrder)
errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "add %d samples", numOutOfOrder))
if numLabelsOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on series with out-of-order labels", "numDropped", numLabelsOutOfOrder)
errs.Add(errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add %d series", numLabelsOutOfOrder))
}
if numLabelsDuplicates > 0 {
level.Warn(tLogger).Log("msg", "Error on series with duplicate labels", "numDropped", numLabelsDuplicates)
errs.Add(errors.Wrapf(labelpb.ErrDuplicateLabels, "add %d series", numLabelsDuplicates))
}
if numLabelsEmpty > 0 {
level.Warn(tLogger).Log("msg", "Error on series with empty label name or value", "numDropped", numLabelsEmpty)
errs.Add(errors.Wrapf(labelpb.ErrEmptyLabels, "add %d series", numLabelsEmpty))
}

if numSamplesOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numSamplesOutOfOrder)
errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "add %d samples", numSamplesOutOfOrder))
}
if numDuplicates > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numDuplicates)
errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numDuplicates))
if numSamplesDuplicates > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numSamplesDuplicates)
errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numSamplesDuplicates))
}
if numOutOfBounds > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numOutOfBounds)
errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numOutOfBounds))
if numSamplesOutOfBounds > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numSamplesOutOfBounds)
errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numSamplesOutOfBounds))
}

if numExemplarsOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order exemplars", "numDropped", numExemplarsOutOfOrder)
errs.Add(errors.Wrapf(storage.ErrOutOfOrderExemplar, "add %d exemplars", numExemplarsOutOfOrder))
Expand Down
98 changes: 98 additions & 0 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,94 @@ func TestWriter(t *testing.T) {
expectedIngested []prompb.TimeSeries
maxExemplars int64
}{
"should error out on series with no labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
{
Labels: []labelpb.ZLabel{{Name: "__name__", Value: ""}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: errors.Wrapf(labelpb.ErrEmptyLabels, "add 2 series"),
},
"should succeed on series with valid labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: nil,
expectedIngested: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
"should error out and skip series with out-of-order labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "1"}, labelpb.ZLabel{Name: "Z", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add 1 series"),
},
"should error out and skip series with duplicate labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}, labelpb.ZLabel{Name: "z", Value: "1"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: errors.Wrapf(labelpb.ErrDuplicateLabels, "add 1 series"),
},
"should error out and skip series with out-of-order labels; accept series with valid labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "A", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
{
Labels: append(lbls, labelpb.ZLabel{Name: "c", Value: "1"}, labelpb.ZLabel{Name: "d", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
{
Labels: append(lbls, labelpb.ZLabel{Name: "E", Value: "1"}, labelpb.ZLabel{Name: "f", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
},
expectedErr: errors.Wrapf(labelpb.ErrOutOfOrderLabels, "add 2 series"),
expectedIngested: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "c", Value: "1"}, labelpb.ZLabel{Name: "d", Value: "2"}),
Samples: []prompb.Sample{{Value: 1, Timestamp: 10}},
},
},
},
"should succeed on valid series with exemplars": {
reqs: []*prompb.WriteRequest{{
Timeseries: []prompb.TimeSeries{
Expand Down Expand Up @@ -172,6 +260,16 @@ func TestWriter(t *testing.T) {
testutil.Equals(t, testData.expectedErr.Error(), err.Error())
}
}

// On each expected series, assert we have a ref available.
a, err := app.Appender(context.Background())
testutil.Ok(t, err)
gr := a.(storage.GetRef)

for _, ts := range testData.expectedIngested {
ref, _ := gr.GetRef(labelpb.ZLabelsToPromLabels(ts.Labels))
testutil.Assert(t, ref != 0, fmt.Sprintf("appender should have reference to series %v", ts))
}
})
}
}
Expand Down
Loading