Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

error handling changes for Athena federation sdk #2278

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
*/
package com.amazonaws.athena.connector.lambda;

import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.athena.model.GetQueryExecutionRequest;
import software.amazon.awssdk.services.athena.model.GetQueryExecutionResponse;
import software.amazon.awssdk.services.athena.model.InvalidRequestException;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;

import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
*/

import com.amazonaws.athena.connector.lambda.data.BlockSpiller;
import com.amazonaws.athena.connector.lambda.exceptions.FederationThrottleException;
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import com.google.common.base.MoreObjects;
import org.apache.arrow.util.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -99,15 +101,15 @@ public ThrottlingInvoker(Builder builder)
BlockSpiller spiller)
{
if (decrease > 1 || decrease < .001) {
throw new IllegalArgumentException("decrease was " + decrease + " but should be between .001 and 1");
throw new AthenaConnectorException("decrease was " + decrease + " but should be between .001 and 1", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}

if (maxDelayMs < 1) {
throw new IllegalArgumentException("maxDelayMs was " + maxDelayMs + " but must be >= 1");
throw new AthenaConnectorException("maxDelayMs was " + maxDelayMs + " but must be >= 1", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}

if (increase < 1) {
throw new IllegalArgumentException("increase was " + increase + " but must be >= 1");
throw new AthenaConnectorException("increase was " + increase + " but must be >= 1", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}

this.initialDelayMs = initialDelayMs;
Expand Down Expand Up @@ -198,7 +200,7 @@ public <T> T invoke(Callable<T> callable, long timeoutMillis)
}
while (!isTimedOut(startTime, timeoutMillis));

throw new TimeoutException("Timed out before call succeeded after " + (System.currentTimeMillis() - startTime) + " ms");
throw new AthenaConnectorException("Timed out before call succeeded after " + (System.currentTimeMillis() - startTime) + " ms", ErrorDetails.builder().errorCode(FederationSourceErrorCode.OPERATION_TIMEOUT_EXCEPTION.toString()).build());
}

/**
Expand Down Expand Up @@ -254,7 +256,7 @@ else if (newDelay > maxDelayMs) {

if (spillerRef.get() != null && !spillerRef.get().spilled()) {
//If no blocks have spilled, it is better to signal the Throttle to Athena by propagating.
throw new FederationThrottleException("ThrottlingInvoker requesting slow down due to " + ex, ex);
throw new AthenaConnectorException("ThrottlingInvoker requesting slow down due to " + ex, ErrorDetails.builder().errorCode(FederationSourceErrorCode.THROTTLING_EXCEPTION.toString()).build());
}
}

Expand All @@ -281,7 +283,7 @@ private void applySleep()
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
throw new AthenaConnectorException(ex, ex.getMessage(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
* #L%
*/

import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.bouncycastle.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;

import java.math.BigDecimal;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -115,7 +118,7 @@ else if (lhs.hashCode() < rhs.hashCode()) {
default:
//logging because throwing in a comparator gets swallowed in many unit tests that use equality asserts
logger.warn("compare: Unknown type " + type + " object: " + lhs.getClass());
throw new IllegalArgumentException("Unknown type " + type + " object: " + lhs.getClass());
throw new AthenaConnectorException("Unknown type " + type + " object: " + lhs.getClass(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

package com.amazonaws.athena.connector.lambda.data;

import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.types.MetadataVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;

import java.lang.reflect.Field;

Expand Down Expand Up @@ -60,7 +63,7 @@ private static IpcOption getIpcOption()
}
catch (ReflectiveOperationException ex) {
// Rethrow as unchecked because most of the callers do not already declare any throws
throw new RuntimeException(ex);
throw new AthenaConnectorException(ex.getMessage(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).errorMessage(ex.getMessage()).build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* #L%
*/

import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
Expand All @@ -31,6 +32,8 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -197,7 +200,7 @@ public synchronized ArrowRecordBatch registerBatch(BatchGenerator generator)
throw ex;
}
catch (Exception ex) {
throw new RuntimeException(ex);
throw new AthenaConnectorException(ex.getMessage(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* #L%
*/

import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.VisibleForTesting;
Expand Down Expand Up @@ -74,6 +75,8 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.Text;
import org.apache.commons.codec.Charsets;
import software.amazon.awssdk.services.glue.model.ErrorDetails;
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;

import java.math.BigDecimal;
import java.math.RoundingMode;
Expand Down Expand Up @@ -152,7 +155,7 @@ public static Block newBlock(BlockAllocator allocator, String columnName, ArrowT
setValue(block.getFieldVector(columnName), count++, next);
}
catch (Exception ex) {
throw new RuntimeException("Error for " + type + " " + columnName + " " + next, ex);
throw new AthenaConnectorException("Error for " + type + " " + columnName + " " + next, ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).errorMessage(ex.getMessage()).build());
}
}
block.setRowCount(count);
Expand Down Expand Up @@ -218,8 +221,8 @@ else if (vector instanceof StructVector) {
resolver);
}
else {
throw new RuntimeException("Unsupported 'Complex' vector " +
vector.getClass().getSimpleName() + " for field " + vector.getField().getName());
throw new AthenaConnectorException("Unsupported 'Complex' vector " +
vector.getClass().getSimpleName() + " for field " + vector.getField().getName(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.OPERATION_NOT_SUPPORTED_EXCEPTION.toString()).build());
}
}

Expand Down Expand Up @@ -395,14 +398,14 @@ else if (value instanceof Boolean && (boolean) value) {
}
break;
default:
throw new IllegalArgumentException("Unknown type " + vector.getMinorType());
throw new AthenaConnectorException("Unknown type " + vector.getMinorType(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
}
catch (RuntimeException ex) {
String fieldName = (vector != null) ? vector.getField().getName() : "null_vector";
throw new RuntimeException("Unable to set value for field " + fieldName
throw new AthenaConnectorException("Unable to set value for field " + fieldName
+ " using value " + value
+ " of type " + vector.getMinorType(), ex);
+ " of type " + vector.getMinorType(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
}

Expand All @@ -417,7 +420,7 @@ else if (value instanceof Boolean && (boolean) value) {
public static String rowToString(Block block, int row)
{
if (row > block.getRowCount()) {
throw new IllegalArgumentException(row + " exceeds available rows " + block.getRowCount());
throw new AthenaConnectorException(row + " exceeds available rows " + block.getRowCount(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}

StringBuilder sb = new StringBuilder();
Expand All @@ -434,7 +437,7 @@ public static String rowToString(Block block, int row)
sb.append("]");
}
catch (RuntimeException ex) {
throw new RuntimeException("Error processing field " + nextReader.getField().getName(), ex);
throw new AthenaConnectorException("Error processing field " + nextReader.getField().getName(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).errorMessage(ex.getMessage()).build());
}
}

Expand Down Expand Up @@ -531,8 +534,8 @@ public static String fieldToString(FieldReader reader)
public static int copyRows(Block srcBlock, Block dstBlock, int firstRow, int lastRow)
{
if (firstRow > lastRow || lastRow > srcBlock.getRowCount() - 1) {
throw new RuntimeException("src has " + srcBlock.getRowCount()
+ " but requested copy of " + firstRow + " to " + lastRow);
throw new AthenaConnectorException("src has " + srcBlock.getRowCount()
+ " but requested copy of " + firstRow + " to " + lastRow, ErrorDetails.builder().errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString()).build());
}

for (FieldReader src : srcBlock.getFieldReaders()) {
Expand All @@ -559,8 +562,8 @@ public static int copyRows(Block srcBlock, Block dstBlock, int firstRow, int las
public static boolean isNullRow(Block block, int row)
{
if (row > block.getRowCount() - 1) {
throw new RuntimeException("block has " + block.getRowCount()
+ " rows but requested to check " + row);
throw new AthenaConnectorException("block has " + block.getRowCount()
+ " rows but requested to check " + row, ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}

//If any column is non-null then return false
Expand Down Expand Up @@ -681,26 +684,26 @@ protected static void writeMap(BufferAllocator allocator,
List<Field> children = field.getChildren();
Field keyValueStructField;
if (children.size() != 1) {
throw new IllegalStateException("Invalid Arrow Map schema: " + field);
throw new AthenaConnectorException("Invalid Arrow Map schema: " + field, ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
else {
keyValueStructField = children.get(0);
if (!MapVector.DATA_VECTOR_NAME.equals(keyValueStructField.getName()) || !(keyValueStructField.getType() instanceof ArrowType.Struct)) {
throw new IllegalStateException("Invalid Arrow Map schema: " + field);
throw new AthenaConnectorException("Invalid Arrow Map schema: " + field, ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
}

List<Field> keyValueChildren = keyValueStructField.getChildren();
Field keyField;
Field valueField;
if (keyValueChildren.size() != 2) {
throw new IllegalStateException("Invalid Arrow Map schema: " + field);
throw new AthenaConnectorException("Invalid Arrow Map schema: " + field, ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
else {
keyField = keyValueChildren.get(0);
valueField = keyValueChildren.get(1);
if (!MapVector.KEY_NAME.equals(keyField.getName()) || !MapVector.VALUE_NAME.equals(valueField.getName())) {
throw new IllegalStateException("Invalid Arrow Map schema: " + field);
throw new AthenaConnectorException("Invalid Arrow Map schema: " + field, ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
}

Expand Down Expand Up @@ -1015,13 +1018,13 @@ else if (value instanceof Boolean && (boolean) value) {
}
break;
default:
throw new IllegalArgumentException("Unknown type " + type);
throw new AthenaConnectorException("Unknown type " + type, ErrorDetails.builder().errorCode(FederationSourceErrorCode.OPERATION_NOT_SUPPORTED_EXCEPTION.toString()).build());
}
}
catch (RuntimeException ex) {
throw new RuntimeException("Unable to write value for field "
throw new AthenaConnectorException("Unable to write value for field "
+ field.getName() + " using value " + value
+ " with minor type " + Types.getMinorTypeForArrowType(type), ex);
+ " with minor type " + Types.getMinorTypeForArrowType(type), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
}

Expand Down Expand Up @@ -1089,7 +1092,7 @@ private static void setNullValue(FieldVector vector, int pos)
((BitVector) vector).setNull(pos);
break;
default:
throw new IllegalArgumentException("Unknown type " + vector.getMinorType());
throw new AthenaConnectorException("Unknown type " + vector.getMinorType(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
}

Expand Down Expand Up @@ -1173,7 +1176,7 @@ public static void unsetRow(int row, Block block)
((MapVector) vector).setNull(row);
break;
default:
throw new IllegalArgumentException("Unknown type " + vector.getMinorType());
throw new AthenaConnectorException("Unknown type " + vector.getMinorType(), ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
}
}
Expand Down Expand Up @@ -1224,7 +1227,7 @@ public static Class getJavaType(Types.MinorType minorType)
case STRUCT:
return Map.class;
default:
throw new IllegalArgumentException("Unknown type " + minorType);
throw new AthenaConnectorException("Unknown type " + minorType, ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
}

Expand Down
Loading