diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index e6461c676b..945a91181e 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -59,8 +59,8 @@ const ( var ( // errConflict is returned whenever an operation fails due to any conflict-type error. - errConflict = errors.New("conflict") - + errConflict = errors.New("conflict") + errOutOfBounds = storage.ErrOutOfBounds errBadReplica = errors.New("request replica exceeds receiver replication factor") errNotReady = errors.New("target not ready") errUnavailable = errors.New("target not available") @@ -360,6 +360,8 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusServiceUnavailable) case errConflict: http.Error(w, err.Error(), http.StatusConflict) + case errOutOfBounds: + http.Error(w, err.Error(), http.StatusBadRequest) case errBadReplica: http.Error(w, err.Error(), http.StatusBadRequest) default: @@ -679,7 +681,6 @@ func isConflict(err error) bool { return err == errConflict || err == storage.ErrDuplicateSampleForTimestamp || err == storage.ErrOutOfOrderSample || - err == storage.ErrOutOfBounds || status.Code(err) == codes.AlreadyExists } @@ -696,6 +697,11 @@ func isUnavailable(err error) bool { status.Code(err) == codes.Unavailable } +func isOutOfBounds(err error) bool { + return err == storage.ErrOutOfBounds || + status.Code(err) == codes.OutOfRange +} + // retryState encapsulates the number of request attempt made against a peer and, // next allowed time for the next attempt. type retryState struct { @@ -738,6 +744,7 @@ func determineWriteErrorCause(err error, threshold int) error { {err: errConflict, cause: isConflict}, {err: errNotReady, cause: isNotReady}, {err: errUnavailable, cause: isUnavailable}, + {err: errOutOfBounds, cause: isOutOfBounds}, } for _, exp := range expErrs { exp.count = 0 diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 6c92fbabc8..a5c0989c19 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -331,7 +331,8 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin func TestReceiveQuorum(t *testing.T) { appenderErrFn := func() error { return errors.New("failed to get appender") } - conflictErrFn := func() error { return storage.ErrOutOfBounds } + conflictErrFn := func() error { return storage.ErrOutOfOrderSample } + outOfBoundsErrFn := func() error { return storage.ErrOutOfBounds } commitErrFn := func() error { return errors.New("failed to commit") } wreq1 := &prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{ @@ -399,6 +400,17 @@ func TestReceiveQuorum(t *testing.T) { }, }, }, + { + name: "size 1 outofbound", + status: http.StatusBadRequest, + replicationFactor: 1, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(outOfBoundsErrFn, nil, nil), + }, + }, + }, { name: "size 2 success", status: http.StatusOK, @@ -667,7 +679,8 @@ func TestReceiveQuorum(t *testing.T) { func TestReceiveWithConsistencyDelay(t *testing.T) { appenderErrFn := func() error { return errors.New("failed to get appender") } - conflictErrFn := func() error { return storage.ErrOutOfBounds } + conflictErrFn := func() error { return storage.ErrOutOfOrderSample } + outOfBoundsErrFn := func() error { return storage.ErrOutOfBounds } commitErrFn := func() error { return errors.New("failed to commit") } wreq1 := &prompb.WriteRequest{ Timeseries: []prompb.TimeSeries{ @@ -735,6 +748,17 @@ func TestReceiveWithConsistencyDelay(t *testing.T) { }, }, }, + { + name: "size 1 outofbound", + status: http.StatusBadRequest, + replicationFactor: 1, + wreq: wreq1, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(outOfBoundsErrFn, nil, nil), + }, + }, + }, { name: "size 2 success", status: http.StatusOK, diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index decc5c68f5..a59989ce15 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -85,13 +85,13 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR switch err { case storage.ErrOutOfOrderSample: numOutOfOrder++ - level.Debug(r.logger).Log("msg", "Out of order sample", "lset", lset, "sample", s) + level.Debug(r.logger).Log("msg", "Out of order sample", "lset", lset, "sample value", s.Value, "sample timestamp", s.Timestamp) case storage.ErrDuplicateSampleForTimestamp: numDuplicates++ - level.Debug(r.logger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "sample", s) + level.Debug(r.logger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "sample value", s.Value, "sample timestamp", s.Timestamp) case storage.ErrOutOfBounds: numOutOfBounds++ - level.Debug(r.logger).Log("msg", "Out of bounds metric", "lset", lset, "sample", s) + level.Debug(r.logger).Log("msg", "Out of bounds metric", "lset", lset, "sample value", s.Value, "sample timestamp", s.Timestamp) } }