Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent failed KafkaConsumer creation from blocking overlord startup #6383

Merged
merged 5 commits into from
Oct 4, 2018

Conversation

jon-wei
Copy link
Contributor

@jon-wei jon-wei commented Sep 26, 2018

Taking a stab at fixing #6114

This PR changes the initialization behavior for KafkaSupervisor so that it does not block the lifecycle if supervisor init fails. The supervisor will retry the initialization indefinitely.

Marking WIP as it needs more testing, but opening it in case anyone has comments about the approach.

@jon-wei jon-wei changed the title Prevent failed KafkaConsumer creation from blocking overlord startup [WIP] Prevent failed KafkaConsumer creation from blocking overlord startup Sep 26, 2018
@b-slim
Copy link
Contributor

b-slim commented Sep 26, 2018

How about changing the workflow and switch to a consumerProvider that can be used at runtime (when needed), the provider can be supplier with memoization or something similar if the consumer is to be reused, and the task that needs it will fail if the provider times-out.

@jon-wei
Copy link
Contributor Author

jon-wei commented Sep 26, 2018

@b-slim I initially thought about lazily creating the consumer, but decided that the supervisor couldn't really do anything useful if it couldn't get a consumer, so I think it makes sense to have the supervisor not finish initialization, so it's as if the supervisor isn't there (seems like a simpler failure mode than "running but with invalid consumer")

@b-slim
Copy link
Contributor

b-slim commented Sep 26, 2018

In My opinion they both provide the same functionality (starting with try in background) is like starting and try later. But one makes the code complex while the other is kind of cleaner by moving complexity of building consumer, out of start method, anyway it is just a suggestion...

@jihoonson jihoonson added this to the 0.13.0 milestone Sep 28, 2018
// Try normal initialization first, if that fails then schedule periodic initialization retries
tryInit();
if (!started) {
initializationExec.scheduleAtFixedRate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using RetryUtils.retry()? It has a backoff mechanism which I think makes sense to apply here.

Copy link
Contributor Author

@jon-wei jon-wei Oct 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are checks for submitting an invalid supervisor spec (e.g., if nothing in "bootstrap.servers" is resolvable), the error condition this is addressing seems to be situations where some external state changes and the supervisor spec no longer works. For the unresolvable bootstrap.servers issue as an example, maybe its a transient DNS error, so I feel like it's better to retry indefinitely without increasing backoff times (with retries tied to the configured supervisor run period instead). Otherwise, suppose the backoff time became extremely large but the user had fixed the underlying issue in their environment, with a huge backoff time they would have to restart the overlord for the change to take effect.

RetryUtils.retry() is also a blocking call, which I want to avoid here in the lifecycle startup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think backoff is more appropriate for transient errors. It starts with retrying in a very short interval at first, but the retry interval grows as the failure continues. This makes sense because it can recover quickly with short retry periods if the error is transient. If the error is permanent, the retry period would grow in the end and it won't use excessive resources. Regarding too large backoff time, RetryUtils.retry() has the MAX_SLEEP_MILLIS which I think it makes sense to make it configurable. Regarding blocking call, please check my below comment. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does it make sense to add a sort of max limit for retrying?

Copy link
Contributor Author

@jon-wei jon-wei Oct 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I suppose the user can resubmit the supervisor if they don't want to wait for the retries or if the retries are exhausted, I'll change to limited retries with backoff

@@ -260,6 +261,7 @@ public String toString()

private volatile DateTime firstRunTime;
private volatile KafkaConsumer consumer;
private volatile boolean lifecycleStarted = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a new InitNotice? It can be processed by the existing exec, so we don't have to add a new initializationExec.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would need to start the notice handler thread to use InitNotice, which I was trying to avoid doing (the idea of the patch was to keep the supervisor in a state similar to what it would have been in if it were stuck in the current lifecycle start, while allowing the rest of the lifecycle to start).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea of the patch was to keep the supervisor in a state similar to what it would have been in if it were stuck in the current lifecycle start, while allowing the rest of the lifecycle to start

Got it. Makes sense.

The reason I commented about InitNotice was, I'm not sure initializationExec is needed. Maybe we can use scheduledExec instead and cancel the scheduledFuture once tryInit succeeds? Or we can submit a runnable task to exec which calls RetryUtils.retry().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll change to submit a runnable to exec for the init retries

@jon-wei jon-wei removed the WIP label Oct 3, 2018
@jon-wei jon-wei changed the title [WIP] Prevent failed KafkaConsumer creation from blocking overlord startup Prevent failed KafkaConsumer creation from blocking overlord startup Oct 3, 2018
@jon-wei
Copy link
Contributor Author

jon-wei commented Oct 3, 2018

Addressed comments.

I also found an issue with RealtimeIndexTask.makeRandomId() introduced in #6226, the random portion of the task ID was shortened from 8 -> 1, this PR fixes that

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after Travis. Thanks!

@@ -89,7 +89,7 @@

private static final int TASK_ID_BITS_PER_SYMBOL = 4;
private static final int TASK_ID_SYMBOL_MASK = (1 << TASK_ID_BITS_PER_SYMBOL) - 1;
private static final int TASK_ID_LENGTH = Integer.BYTES / TASK_ID_BITS_PER_SYMBOL;
private static final int TASK_ID_LENGTH = (Integer.BYTES * 8) / TASK_ID_BITS_PER_SYMBOL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Integer.SIZE returns 32.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to Integer.SIZE

@fjy fjy merged commit c7ac878 into apache:master Oct 4, 2018
jsun98 added a commit to jsun98/druid that referenced this pull request Oct 5, 2018
jsun98 added a commit to jsun98/druid that referenced this pull request Oct 19, 2018
jsun98 added a commit to jsun98/druid that referenced this pull request Oct 26, 2018
dclim pushed a commit that referenced this pull request Dec 21, 2018
* created seekablestream classes

* created seekablestreamsupervisor class

* first attempt to integrate kafa indexing service to use SeekableStream

* seekablestream bug fixes

* kafkarecordsupplier

* integrated kafka indexing service with seekablestream

* implemented resume/suspend and refactored some package names

* moved kinesis indexing service into core druid extensions

* merged some changes from kafka supervisor race condition

* integrated kinesis-indexing-service with seekablestream

* unite tests for kinesis-indexing-service

* various bug fixes for kinesis-indexing-service

* refactored kinesisindexingtask

* finished up more kinesis unit tests

* more bug fixes for kinesis-indexing-service

* finsihed refactoring kinesis unit tests

* removed KinesisParititons and KafkaPartitions to use SeekableStreamPartitions

* kinesis-indexing-service code cleanup and docs

* merge #6291

merge #6337

merge #6383

* added more docs and reordered methods

* fixd kinesis tests after merging master and added docs in seekablestream

* fix various things from pr comment

* improve recordsupplier and add unit tests

* migrated to aws-java-sdk-kinesis

* merge changes from master

* fix pom files and forbiddenapi checks

* checkpoint JavaType bug fix

* fix pom and stuff

* disable checkpointing in kinesis

* fix kinesis sequence number null in closed shard

* merge changes from master

* fixes for kinesis tasks

* capitalized <partitionType, sequenceType>

* removed abstract class loggers

* conform to guava api restrictions

* add docker for travis other modules test

* address comments

* improve RecordSupplier to supply records in batch

* fix strict compile issue

* add test scope for localstack dependency

* kinesis indexing task refactoring

* comments

* github comments

* minor fix

* removed unneeded readme

* fix deserialization bug

* fix various bugs

* KinesisRecordSupplier unable to catch up to earliest position in stream bug fix

* minor changes to kinesis

* implement deaggregate for kinesis

* Merge remote-tracking branch 'upstream/master' into seekablestream

* fix kinesis offset discrepancy with kafka

* kinesis record supplier disable getPosition

* pr comments

* mock for kinesis tests and remove docker dependency for unit tests

* PR comments

* avg lag in kafkasupervisor #6587

* refacotred SequenceMetadata in taskRunners

* small fix

* more small fix

* recordsupplier resource leak

* revert .travis.yml formatting

* fix style

* kinesis docs

* doc part2

* more docs

* comments

* comments*2

* revert string replace changes

* comments

* teamcity

* comments part 1

* comments part 2

* comments part 3

* merge #6754

* fix injection binding

* comments

* KinesisRegion refactor

* comments part idk lol

* can't think of a commit msg anymore

* remove possiblyResetDataSourceMetadata() for IncrementalPublishingTaskRunner

* commmmmmmmmmments

* extra error handling in KinesisRecordSupplier getRecords

* comments

* quickfix

* typo

* oof
clintropolis added a commit to implydata/druid-public that referenced this pull request Feb 5, 2019
…pache#6383)

* Prevent failed KafkaConsumer creation from blocking overlord startup

* PR comments

* Fix random task ID length

* Adjust test timer

* Use Integer.SIZE
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants