-
Notifications
You must be signed in to change notification settings - Fork 569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ingest consumer: handle Push errors #6940
ingest consumer: handle Push errors #6940
Conversation
This adds error handling for ingester errors. Client errors are only logged at warning level (like they are today). Server errors trigger a backoff at the consumer; the backoff is unlimited and retries the same batch of records until it is successfully ingested. Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
68ec2ed
to
0fbfaeb
Compare
Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
pkg/storage/ingest/reader.go
Outdated
for boff.Ongoing() { | ||
err := r.consumer.consume(ctx, records) | ||
if err != nil { | ||
level.Error(r.logger).Log( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a metric here, to be able to alert on it (we need to compute the failure rate).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking, this for
loop could be an infinite loop in case of a persistent error, so I would like to better understand how we're going to alert on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that we alert on end-to-end latency. If the consume errors are infrequent enough we can probably get away with them without user-visible effects. Since we block consumption the failure rate will either be 0% or 100%, which doesn't sound very insightful.
// TODO move distributor's isClientError to a separate package and use that here to swallow only client errors and abort on others | ||
continue | ||
if !isClientIngesterError(err) { | ||
return fmt.Errorf("consuming record at index %d for tenant %s: %w", recordIdx, wr.tenantID, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the actual value of knowing the index? Looks something unpredictable so I can't understand what we'll use it for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my idea was to give enough information to be able to deduce the exact kafka record offset from the errors here and the error in the retry
mimir/pkg/storage/ingest/reader.go
Lines 185 to 189 in a5d37b9
"msg", "encountered error while ingesting data from Kafka; will retry", | |
"err", err, | |
"record_min_offset", minOffset, | |
"record_max_offset", maxOffset, | |
"num_retries", boff.NumRetries(), |
Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
Signed-off-by: Dimitar Dimitrov <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing my feedback. I just have a last comment about the metric.
pkg/storage/ingest/pusher.go
Outdated
@@ -45,6 +49,14 @@ func newPusherConsumer(p Pusher, reg prometheus.Registerer, l log.Logger) *pushe | |||
MaxAge: time.Minute, | |||
AgeBuckets: 10, | |||
}), | |||
clientErrRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ | |||
Name: "cortex_ingest_storage_reader_client_error_requests_total", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a bit of naming mismatch with the rest of the code. In the other metrics we're calling it "records" and not "requests", which I think makes sense because it allows us to distinguish them with actual requests issued (e.g. requests to Kafka).
WDYT if we have:
- cortex_ingest_storage_reader_records_failed_total: generalised in "failed" and then we differentiate whether it's a client or server error through a label, so we also keep the count of server errors?
- cortex_ingest_storage_reader_records_total
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in bf4b27d
…x_ingest_storage_reader_records_total Signed-off-by: Dimitar Dimitrov <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
This adds error handling for ingester errors. Client errors are only logged at warning level (like they are with regular gRPC ingestion) and ignored otherwise. Server errors trigger a backoff at the consumer; the backoff is unlimited and retries the same batch of records until it is successfully ingested.