-
Notifications
You must be signed in to change notification settings - Fork 300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
error handling changes for Athena federation sdk #2278
base: master
Are you sure you want to change the base?
error handling changes for Athena federation sdk #2278
Conversation
f3cfdc5
to
623454c
Compare
a4ac524
to
2cf646f
Compare
@@ -126,7 +129,7 @@ private void checkStatus(String queryId, int attempt) | |||
if (e instanceof InvalidRequestException) { | |||
// query does not exist, so no need to keep calling Athena | |||
logger.debug("Athena reports query {} not found. Interrupting checker thread", queryId); | |||
throw new InterruptedException(); | |||
throw new AthenaConnectorException(e, e.getMessage(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be an internal service exception. This code path is best effort and is meant to kill the lambda early in the case that the calling Athena query is no longer running. We should keep this as InterruptedException since its just killing the thread. This doesn't get sent back to the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted back to InterruptedException.
@@ -75,7 +78,7 @@ public Block read(S3SpillLocation spillLocation, EncryptionKey key, Schema schem | |||
return block; | |||
} | |||
catch (IOException ex) { | |||
throw new RuntimeException(ex); | |||
throw new AthenaConnectorException(ex, ex.getMessage(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably shouldn't be an internal service error. I'm wondering if we should introduce a RuntimeException to FederationSourceErrorCode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
took a look at the available exceptions from Glue. Lets keep this as a service error and we'll fix later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.
throw new RuntimeException("block has " + block.getRowCount() | ||
+ " rows but requested to check " + row); | ||
throw new AthenaConnectorException("block has " + block.getRowCount() | ||
+ " rows but requested to check " + row, ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InvalidInputException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
+ field.getName() + " using value " + value | ||
+ " with minor type " + Types.getMinorTypeForArrowType(type), ex); | ||
+ " with minor type " + Types.getMinorTypeForArrowType(type), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FederationSourceErrorCode.INVALID_INPUT_EXCEPTION
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
@@ -1015,13 +1018,13 @@ else if (value instanceof Boolean && (boolean) value) { | |||
} | |||
break; | |||
default: | |||
throw new IllegalArgumentException("Unknown type " + type); | |||
throw new AthenaConnectorException("Unknown type " + type, ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OperationNotSupportedException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
throw new RuntimeException("Call generated more than " + maxRowsPerCall + "rows. Generating " + | ||
"too many rows per call to writeRows(...) can result in blocks that exceed the max size."); | ||
throw new AthenaConnectorException("Call generated more than " + maxRowsPerCall + "rows. Generating " + | ||
"too many rows per call to writeRows(...) can result in blocks that exceed the max size.", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InvalidInputException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
@@ -512,7 +514,7 @@ private ThreadPoolExecutor makeAsyncSpillPool(SpillConfig config) | |||
} | |||
catch (InterruptedException e) { | |||
Thread.currentThread().interrupt(); | |||
throw new RejectedExecutionException("Received an exception while submitting spillBlock task: ", e); | |||
throw new AthenaConnectorException("Received an exception while submitting spillBlock task: ", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to stay as a RejectedExecutionException. The caller can apply a retry policy and the retry policy expects a RejectedExecutionException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted back.
@@ -93,7 +95,7 @@ void updateBucketState() | |||
state = BucketState.INVALID; | |||
} | |||
else { | |||
throw new RuntimeException("Error while checking bucket ownership for " + bucket, ex); | |||
throw new AthenaConnectorException(ex, "Error while checking bucket ownership for " + bucket, ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AccessDeniedException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
case INVALID: | ||
throw new RuntimeException(String.format("spill_bucket: \"%s\" not found under your account. Please make sure you have access to the bucket and spill_bucket input has no trailing '/'", bucket)); | ||
throw new AthenaConnectorException(String.format("spill_bucket: \"%s\" not found under your account. Please make sure you have access to the bucket and spill_bucket input has no trailing '/'", bucket), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InvalidInputException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
Thank you for the review and suggestion. I have updated the code with the changes you recommended. However, I noticed that there are multiple exceptions applicable in certain scenarios. Earlier also we got different exceptions recommendations from other reviewers. If possible, could you provide a specific rule set or guidelines for handling these exceptions? This would help ensure consistency and correctness in the implementation. |
Issue #, if available:
Description of changes:
Error handling improvement changes for Athena Federation SDK.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.