Skip to content

Commit

Permalink
ingest consumer: handle Push errors (#6940)
Browse files Browse the repository at this point in the history
* ingest consumer: handler Push errors

This adds error handling for ingester errors. Client errors are only logged at warning level (like they are today). Server errors trigger a backoff at the consumer; the backoff is unlimited and retries the same batch of records until it is successfully ingested.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove rance in TestReader_ConsumerError

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Remove outdated comment

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Only allow gRPC errors when running ingest-storage

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Simplify retries

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Better error tracking and metrics

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Shortcut consumeFetches if there aren't any fetches

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Introduce cortex_ingest_storage_reader_records_failed_total and cortex_ingest_storage_reader_records_total

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Dec 18, 2023
1 parent cd5780f commit 4170afa
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 23 deletions.
3 changes: 3 additions & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.IngestStorage.Validate(); err != nil {
return errors.Wrap(err, "invalid ingest storage config")
}
if c.IngestStorage.Enabled && !c.Ingester.ReturnOnlyGRPCErrors {
return errors.New("to use ingest storage (-ingest-storage.enabled) also enable -ingester.return-only-grpc-errors")
}
if err := c.BlocksStorage.Validate(c.Ingester.ActiveSeriesMetrics, log); err != nil {
return errors.Wrap(err, "invalid TSDB config")
}
Expand Down
45 changes: 42 additions & 3 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ package ingest

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/cancellation"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -25,6 +27,9 @@ type pusherConsumer struct {
p Pusher

processingTimeSeconds prometheus.Observer
clientErrRequests prometheus.Counter
serverErrRequests prometheus.Counter
totalRequests prometheus.Counter
l log.Logger
}

Expand All @@ -35,6 +40,11 @@ type parsedRecord struct {
}

func newPusherConsumer(p Pusher, reg prometheus.Registerer, l log.Logger) *pusherConsumer {
errRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_records_failed_total",
Help: "Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.",
}, []string{"cause"})

return &pusherConsumer{
p: p,
l: l,
Expand All @@ -45,6 +55,12 @@ func newPusherConsumer(p Pusher, reg prometheus.Registerer, l log.Logger) *pushe
MaxAge: time.Minute,
AgeBuckets: 10,
}),
clientErrRequests: errRequestsCounter.WithLabelValues("client"),
serverErrRequests: errRequestsCounter.WithLabelValues("server"),
totalRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_records_total",
Help: "Number of attempted records (write requests).",
}),
}
}

Expand All @@ -63,7 +79,9 @@ func (c pusherConsumer) consume(ctx context.Context, records []record) error {
}

func (c pusherConsumer) pushRequests(ctx context.Context, reqC <-chan parsedRecord) error {
recordIdx := -1
for wr := range reqC {
recordIdx++
if wr.err != nil {
level.Error(c.l).Log("msg", "failed to parse write request; skipping", "err", wr.err)
continue
Expand All @@ -74,15 +92,36 @@ func (c pusherConsumer) pushRequests(ctx context.Context, reqC <-chan parsedReco
_, err := c.p.Push(ctx, wr.WriteRequest)

c.processingTimeSeconds.Observe(time.Since(processingStart).Seconds())
c.totalRequests.Inc()
if err != nil {
level.Error(c.l).Log("msg", "failed to push write request; skipping", "err", err)
// TODO move distributor's isClientError to a separate package and use that here to swallow only client errors and abort on others
continue
if !isClientIngesterError(err) {
c.serverErrRequests.Inc()
return fmt.Errorf("consuming record at index %d for tenant %s: %w", recordIdx, wr.tenantID, err)
}
c.clientErrRequests.Inc()
level.Warn(c.l).Log("msg", "detected a client error while ingesting write request (the request may have been partially ingested)", "err", err, "user", wr.tenantID)
}
}
return nil
}

func isClientIngesterError(err error) bool {
stat, ok := grpcutil.ErrorToStatus(err)
if !ok {
// This should not be reached but in case it is, fall back to assuming it's our fault.
return false
}

if details := stat.Details(); len(details) > 0 {
if errDetails, ok := details[0].(*mimirpb.ErrorDetails); ok {
// This is usually the case.
return errDetails.Cause == mimirpb.BAD_DATA
}
}
// This should not be reached but in case it is, fall back to assuming it's our fault.
return false
}

func (c pusherConsumer) unmarshalRequests(ctx context.Context, records []record, recC chan<- parsedRecord) {
defer close(recC)
done := ctx.Done()
Expand Down
52 changes: 47 additions & 5 deletions pkg/storage/ingest/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"testing"

"github.com/go-kit/log"
"github.com/gogo/status"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"

"github.com/grafana/mimir/pkg/mimirpb"
)
Expand All @@ -27,6 +29,8 @@ func TestPusherConsumer(t *testing.T) {
{Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_1"), mockPreallocTimeseries("series_2")}},
{Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3")}},
{Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_4")}},
{Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_5")}},
{Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_6")}},
}

wrBytes := make([][]byte, len(writeReqs))
Expand Down Expand Up @@ -93,10 +97,9 @@ func TestPusherConsumer(t *testing.T) {
responses: []response{
okResponse,
{err: assert.AnError},
okResponse,
},
expectedWRs: writeReqs[0:3],
expErr: "",
expectedWRs: writeReqs[0:2],
expErr: assert.AnError.Error(),
},
"failed processing of last record": {
records: []record{
Expand All @@ -108,7 +111,7 @@ func TestPusherConsumer(t *testing.T) {
{err: assert.AnError},
},
expectedWRs: writeReqs[0:2],
expErr: "",
expErr: assert.AnError.Error(),
},
"failed processing & failed unmarshalling": {
records: []record{
Expand All @@ -121,9 +124,38 @@ func TestPusherConsumer(t *testing.T) {
{err: assert.AnError},
},
expectedWRs: writeReqs[0:2],
expErr: "",
expErr: assert.AnError.Error(),
},
"no records": {},
"ingester client error": {
records: []record{
{content: wrBytes[0], tenantID: tenantID},
{content: wrBytes[1], tenantID: tenantID},
{content: wrBytes[2], tenantID: tenantID},
},
responses: []response{
{err: ingesterError(mimirpb.BAD_DATA, codes.InvalidArgument, "ingester test error")},
{err: ingesterError(mimirpb.BAD_DATA, codes.Unknown, "ingester test error")}, // status code doesn't matter
okResponse,
},
expectedWRs: writeReqs[0:3],
expErr: "", // since all fof those were client errors, we don't return an error
},
"ingester server error": {
records: []record{
{content: wrBytes[0], tenantID: tenantID},
{content: wrBytes[1], tenantID: tenantID},
{content: wrBytes[2], tenantID: tenantID},
{content: wrBytes[3], tenantID: tenantID},
{content: wrBytes[4], tenantID: tenantID},
},
responses: []response{
{err: ingesterError(mimirpb.BAD_DATA, codes.InvalidArgument, "ingester test error")},
{err: ingesterError(mimirpb.TSDB_UNAVAILABLE, codes.Unavailable, "ingester internal error")},
},
expectedWRs: writeReqs[0:2], // the rest of the requests are not attempted
expErr: "ingester internal error",
},
}

for name, tc := range testCases {
Expand Down Expand Up @@ -155,3 +187,13 @@ func TestPusherConsumer(t *testing.T) {
})
}
}

// ingesterError mimics how the ingester construct errors
func ingesterError(cause mimirpb.ErrorCause, statusCode codes.Code, message string) error {
errorDetails := &mimirpb.ErrorDetails{Cause: cause}
statWithDetails, err := status.New(statusCode, message).WithDetails(errorDetails)
if err != nil {
panic(err)
}
return statWithDetails.Err()
}
30 changes: 25 additions & 5 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type record struct {
}

type recordConsumer interface {
// consume should return an error only if there is a recoverable error. Returning an error will cause consumption to slow down.
consume(context.Context, []record) error
}

Expand Down Expand Up @@ -154,7 +155,10 @@ func (r *PartitionReader) enqueueCommit(fetches kgo.Fetches) {
}

func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetches) {
records := make([]record, 0, len(fetches.Records()))
if fetches.NumRecords() == 0 {
return
}
records := make([]record, 0, fetches.NumRecords())

var (
minOffset = math.MaxInt
Expand All @@ -169,11 +173,27 @@ func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetche
})
})

err := r.consumer.consume(ctx, records)
if err != nil {
level.Error(r.logger).Log("msg", "encountered error processing records; skipping", "min_offset", minOffset, "max_offset", maxOffset, "err", err)
// TODO abort ingesting & back off if it's a server error, ignore error if it's a client error
boff := backoff.New(ctx, backoff.Config{
MinBackoff: 250 * time.Millisecond,
MaxBackoff: 2 * time.Second,
MaxRetries: 0, // retry forever
})

for boff.Ongoing() {
err := r.consumer.consume(ctx, records)
if err == nil {
break
}
level.Error(r.logger).Log(
"msg", "encountered error while ingesting data from Kafka; will retry",
"err", err,
"record_min_offset", minOffset,
"record_max_offset", maxOffset,
"num_retries", boff.NumRetries(),
)
boff.Wait()
}

}

func (r *PartitionReader) recordFetchesMetrics(fetches kgo.Fetches) {
Expand Down
37 changes: 27 additions & 10 deletions pkg/storage/ingest/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/atomic"
)

func TestReader(t *testing.T) {
Expand Down Expand Up @@ -45,7 +46,7 @@ func TestReader(t *testing.T) {
assert.Equal(t, [][]byte{content, content}, records)
}

func TestReader_IgnoredConsumerErrors(t *testing.T) {
func TestReader_ConsumerError(t *testing.T) {
const (
topicName = "test"
partitionID = 1
Expand All @@ -56,24 +57,34 @@ func TestReader_IgnoredConsumerErrors(t *testing.T) {

_, clusterAddr := createTestCluster(t, partitionID+1, topicName)

content := []byte("special content")
consumer := newTestConsumer(1)
invocations := atomic.NewInt64(0)
returnErrors := atomic.NewBool(true)
trackingConsumer := newTestConsumer(2)
consumer := consumerFunc(func(ctx context.Context, records []record) error {
invocations.Inc()
if !returnErrors.Load() {
return trackingConsumer.consume(ctx, records)
}
// There may be more records, but we only care that the one we failed to consume in the first place is still there.
assert.Equal(t, "1", string(records[0].content))
return errors.New("consumer error")
})
startReader(ctx, t, clusterAddr, topicName, partitionID, consumer)

// Write to Kafka.
writeClient := newKafkaProduceClient(t, clusterAddr)

produceRecord(ctx, t, writeClient, topicName, partitionID, content)
produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("1"))
produceRecord(ctx, t, writeClient, topicName, partitionID, []byte("2"))

records, err := consumer.waitRecords(1, time.Second, 0)
assert.NoError(t, err)
assert.Equal(t, [][]byte{content}, records)
// There are more than one invocation because the reader will retry.
assert.Eventually(t, func() bool { return invocations.Load() > 1 }, 5*time.Second, 100*time.Millisecond)

produceRecord(ctx, t, writeClient, topicName, partitionID, content)
returnErrors.Store(false)

records, err = consumer.waitRecords(1, time.Second, 0)
records, err := trackingConsumer.waitRecords(2, time.Second, 0)
assert.NoError(t, err)
assert.Equal(t, [][]byte{content}, records)
assert.Equal(t, [][]byte{[]byte("1"), []byte("2")}, records)
}

func newKafkaProduceClient(t *testing.T, addrs string) *kgo.Client {
Expand Down Expand Up @@ -326,3 +337,9 @@ func (t testConsumer) waitRecords(numRecords int, waitTimeout, drainPeriod time.
}
}
}

type consumerFunc func(ctx context.Context, records []record) error

func (c consumerFunc) consume(ctx context.Context, records []record) error {
return c(ctx, records)
}

0 comments on commit 4170afa

Please sign in to comment.