-
Notifications
You must be signed in to change notification settings - Fork 466
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clean shutdown when multilang deamen is terminated by giving one last chance to checkpoint. #174
Conversation
… chance to checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for submitting this, just some minor changes before I can accept the request.
} | ||
try { | ||
LOG.info("Requesting a checkpoint on shutdown notification."); | ||
checkpointer.checkpoint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether to checkpoint is determined by the application, so checkpointing here isn't safe. This should use a similar dispatch mechanism that processRecords uses. It might be possible to shortcut the dispatch at this time, and just make whether to checkpoint a configuration flag.
The dispatch code for processRecords for reference.
config.getKinesisClientLibConfiguration(), | ||
config.getRecordProcessorFactory(), | ||
executorService); | ||
|
||
Runtime.getRuntime().addShutdownHook(new Thread() | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please match the existing style with the curly brace on the same line.
LOG.info("Process terminanted, will initiate shutdown."); | ||
try { | ||
Future<Void> fut = daemon.worker.requestShutdown(); | ||
fut.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a worst case scenario this could cause the shutdown to become stuck. It would be best to start with some timeout value of how long to wait for the shutdown request to complete. I usually start with something in the range of 5 seconds. In the future it may make sense to make it configurable.
LOG.info("Requesting a checkpoint on shutdown notification."); | ||
checkpointer.checkpoint(); | ||
} catch (InvalidStateException e) { | ||
LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java will automatically call toString during string concatenation. Additionally for exceptions it's common to pass the exception as the second argument to the logger call. This will allow the logger to emit a stack trace as well e.g.
LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e, e);
{ | ||
LOG.info("Process terminanted, will initiate shutdown."); | ||
try { | ||
Future<Void> fut = daemon.worker.requestShutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to ensure that when requestShutdown
is invoked. There is no process records invoked. Need a lock may be ?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way dispatch is handled in the worker already guarantees this. The record processors are represented as a state machine, and all request shutdown does is request that on the change event to go to the Shutdown Notification Requested state. This is the diagram for the state changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if you can add some simple tests. Integration testing will require updating one of the MultiLang Daemon Clients.
/** | ||
* The name used for the action field in {@link Message}. | ||
*/ | ||
public static final String ACTION = "shutdownrequested"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Please use camelCase for the action name for consistency.
* @param checkpointer A checkpointer. | ||
* @return Whether or not this operation succeeded. | ||
*/ | ||
boolean shutdownRequested(IRecordProcessorCheckpointer checkpointer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add some tests for this method in MultiLangProtocolTest.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
Thanks everything looks good. Starting the process to merge the changes now. |
One last thing: Please confirm that we can use, modify, copy, and redistribute this contribution. Thanks. |
@pfifer - yes confirmed. You can modify, copy and redistribute this contribution. Thanks, |
* Added support for graceful shutdown in MultiLang Clients * PR awslabs#174 * PR awslabs#182 * Updated documentation for `v2.IRecordProcessor#shutdown`, and `KinesisClientLibConfiguration#idleTimeBetweenReadsMillis` * PR awslabs#170 * Updated to version 1.11.151 of the AWS Java SDK * PR awslabs#183
No description provided.