Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest consumer: handle Push errors #6940

Merged
merged 8 commits into from
Dec 18, 2023

Conversation

dimitarvdimitrov
Copy link
Contributor

This adds error handling for ingester errors. Client errors are only logged at warning level (like they are with regular gRPC ingestion) and ignored otherwise. Server errors trigger a backoff at the consumer; the backoff is unlimited and retries the same batch of records until it is successfully ingested.

This adds error handling for ingester errors. Client errors are only logged at warning level (like they are today). Server errors trigger a backoff at the consumer; the backoff is unlimited and retries the same batch of records until it is successfully ingested.

Signed-off-by: Dimitar Dimitrov <[email protected]>
@dimitarvdimitrov dimitarvdimitrov requested a review from a team as a code owner December 16, 2023 21:20
@dimitarvdimitrov dimitarvdimitrov force-pushed the dimitar/ingest/ingester-error-handling branch from 68ec2ed to 0fbfaeb Compare December 16, 2023 21:26
Signed-off-by: Dimitar Dimitrov <[email protected]>
@dimitarvdimitrov dimitarvdimitrov changed the title ingest consumer: handler Push errors ingest consumer: handle Push errors Dec 16, 2023
@pracucci pracucci self-requested a review December 18, 2023 10:14
for boff.Ongoing() {
err := r.consumer.consume(ctx, records)
if err != nil {
level.Error(r.logger).Log(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a metric here, to be able to alert on it (we need to compute the failure rate).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking, this for loop could be an infinite loop in case of a persistent error, so I would like to better understand how we're going to alert on it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that we alert on end-to-end latency. If the consume errors are infrequent enough we can probably get away with them without user-visible effects. Since we block consumption the failure rate will either be 0% or 100%, which doesn't sound very insightful.

// TODO move distributor's isClientError to a separate package and use that here to swallow only client errors and abort on others
continue
if !isClientIngesterError(err) {
return fmt.Errorf("consuming record at index %d for tenant %s: %w", recordIdx, wr.tenantID, err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the actual value of knowing the index? Looks something unpredictable so I can't understand what we'll use it for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my idea was to give enough information to be able to deduce the exact kafka record offset from the errors here and the error in the retry

"msg", "encountered error while ingesting data from Kafka; will retry",
"err", err,
"record_min_offset", minOffset,
"record_max_offset", maxOffset,
"num_retries", boff.NumRetries(),

Copy link
Collaborator

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my feedback. I just have a last comment about the metric.

@@ -45,6 +49,14 @@ func newPusherConsumer(p Pusher, reg prometheus.Registerer, l log.Logger) *pushe
MaxAge: time.Minute,
AgeBuckets: 10,
}),
clientErrRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_client_error_requests_total",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bit of naming mismatch with the rest of the code. In the other metrics we're calling it "records" and not "requests", which I think makes sense because it allows us to distinguish them with actual requests issued (e.g. requests to Kafka).

WDYT if we have:

  • cortex_ingest_storage_reader_records_failed_total: generalised in "failed" and then we differentiate whether it's a client or server error through a label, so we also keep the count of server errors?
  • cortex_ingest_storage_reader_records_total

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in bf4b27d

…x_ingest_storage_reader_records_total

Signed-off-by: Dimitar Dimitrov <[email protected]>
Copy link
Collaborator

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@dimitarvdimitrov dimitarvdimitrov merged commit 4170afa into main Dec 18, 2023
@dimitarvdimitrov dimitarvdimitrov deleted the dimitar/ingest/ingester-error-handling branch December 18, 2023 13:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants