diff --git a/pom.xml b/pom.xml index 1a6ad62611da..78a5921257f8 100644 --- a/pom.xml +++ b/pom.xml @@ -735,6 +735,12 @@ ${dep.drift.version} + + com.facebook.drift + drift-transport-spi + ${dep.drift.version} + + io.airlift.tpch tpch diff --git a/presto-main/pom.xml b/presto-main/pom.xml index 8b3bd84902be..9aea0928a737 100644 --- a/presto-main/pom.xml +++ b/presto-main/pom.xml @@ -206,6 +206,11 @@ drift-transport-netty + + com.facebook.drift + drift-transport-spi + + com.facebook.drift drift-client diff --git a/presto-main/src/main/java/com/facebook/presto/operator/HttpRpcShuffleClient.java b/presto-main/src/main/java/com/facebook/presto/operator/HttpRpcShuffleClient.java index dbabda9ef0c2..788afc5d7192 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/HttpRpcShuffleClient.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/HttpRpcShuffleClient.java @@ -18,6 +18,7 @@ import com.facebook.airlift.http.client.Request; import com.facebook.airlift.http.client.Response; import com.facebook.airlift.http.client.ResponseHandler; +import com.facebook.airlift.http.client.ResponseTooLargeException; import com.facebook.airlift.log.Logger; import com.facebook.presto.execution.buffer.SerializedPage; import com.facebook.presto.operator.PageBufferClient.PagesResponse; @@ -112,6 +113,15 @@ public ListenableFuture abortResults() return httpClient.executeAsync(prepareDelete().setUri(location).build(), createStatusResponseHandler()); } + @Override + public Throwable rewriteException(Throwable throwable) + { + if (throwable instanceof ResponseTooLargeException) { + return new PageTooLargeException(throwable); + } + return throwable; + } + public static class PageResponseHandler implements ResponseHandler { diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PageBufferClient.java b/presto-main/src/main/java/com/facebook/presto/operator/PageBufferClient.java index f5b20bcc820a..b3d1ee9cf148 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PageBufferClient.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PageBufferClient.java @@ -14,7 +14,6 @@ package com.facebook.presto.operator; import com.facebook.airlift.http.client.HttpUriBuilder; -import com.facebook.airlift.http.client.ResponseTooLargeException; import com.facebook.airlift.log.Logger; import com.facebook.presto.execution.buffer.SerializedPage; import com.facebook.presto.server.remotetask.Backoff; @@ -342,7 +341,7 @@ public void onFailure(Throwable t) log.debug("Request to %s failed %s", uri, t); checkNotHoldsLock(this); - t = rewriteException(t); + t = resultClient.rewriteException(t); if (!(t instanceof PrestoException) && backoff.failure()) { String message = format("%s (%s - %s failures, failure duration %s, total failed request time %s)", WORKER_NODE_ERROR, @@ -470,14 +469,6 @@ else if (future != null) { .toString(); } - private static Throwable rewriteException(Throwable t) - { - if (t instanceof ResponseTooLargeException) { - return new PageTooLargeException(); - } - return t; - } - public static class PagesResponse { public static PagesResponse createPagesResponse(String taskInstanceId, long token, long nextToken, Iterable pages, boolean complete) diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PageTooLargeException.java b/presto-main/src/main/java/com/facebook/presto/operator/PageTooLargeException.java index 31b3885fe0d7..98b847a8972a 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PageTooLargeException.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PageTooLargeException.java @@ -20,8 +20,8 @@ public class PageTooLargeException extends PrestoException { - public PageTooLargeException() + public PageTooLargeException(Throwable e) { - super(PAGE_TOO_LARGE, "Remote page is too large"); + super(PAGE_TOO_LARGE, "Remote page is too large", e); } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/RpcShuffleClient.java b/presto-main/src/main/java/com/facebook/presto/operator/RpcShuffleClient.java index 2ee9f01e26b2..b03ea89e042b 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/RpcShuffleClient.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/RpcShuffleClient.java @@ -36,4 +36,9 @@ public interface RpcShuffleClient * Close remote buffer */ ListenableFuture abortResults(); + + /** + * Rewrite network related exception to Presto exception + */ + Throwable rewriteException(Throwable throwable); } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/ThriftRpcShuffleClient.java b/presto-main/src/main/java/com/facebook/presto/operator/ThriftRpcShuffleClient.java index a5e27a7c252a..5c8fec4dde98 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/ThriftRpcShuffleClient.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/ThriftRpcShuffleClient.java @@ -15,6 +15,7 @@ import com.facebook.airlift.log.Logger; import com.facebook.drift.client.DriftClient; +import com.facebook.drift.transport.client.MessageTooLargeException; import com.facebook.presto.execution.TaskId; import com.facebook.presto.execution.buffer.BufferResult; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; @@ -92,4 +93,13 @@ public ListenableFuture abortResults() { return thriftClient.abortResults(taskId, outputBufferId); } + + @Override + public Throwable rewriteException(Throwable throwable) + { + if (throwable instanceof MessageTooLargeException) { + return new PageTooLargeException(throwable); + } + return throwable; + } } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestPageBufferClient.java b/presto-main/src/test/java/com/facebook/presto/operator/TestPageBufferClient.java index 2f2568f2870c..2a8a72e378dc 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestPageBufferClient.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestPageBufferClient.java @@ -404,7 +404,7 @@ public void testExceptionFromResponseHandler() @Test public void testErrorCodes() { - assertEquals(new PageTooLargeException().getErrorCode(), PAGE_TOO_LARGE.toErrorCode()); + assertEquals(new PageTooLargeException(null).getErrorCode(), PAGE_TOO_LARGE.toErrorCode()); assertEquals(new PageTransportErrorException("").getErrorCode(), PAGE_TRANSPORT_ERROR.toErrorCode()); assertEquals(new PageTransportTimeoutException(HostAddress.fromParts("127.0.0.1", 8080), "", null).getErrorCode(), PAGE_TRANSPORT_TIMEOUT.toErrorCode()); }