Skip to content

Commit

Permalink
Rewrite page too large exception based on network protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
highker committed Jan 17, 2020
1 parent 785eb79 commit be031b1
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 13 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,12 @@
<version>${dep.drift.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-transport-spi</artifactId>
<version>${dep.drift.version}</version>
</dependency>

<dependency>
<groupId>io.airlift.tpch</groupId>
<artifactId>tpch</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions presto-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@
<artifactId>drift-transport-netty</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-transport-spi</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.Response;
import com.facebook.airlift.http.client.ResponseHandler;
import com.facebook.airlift.http.client.ResponseTooLargeException;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.execution.buffer.SerializedPage;
import com.facebook.presto.operator.PageBufferClient.PagesResponse;
Expand Down Expand Up @@ -112,6 +113,15 @@ public ListenableFuture<?> abortResults()
return httpClient.executeAsync(prepareDelete().setUri(location).build(), createStatusResponseHandler());
}

@Override
public Throwable rewriteException(Throwable throwable)
{
if (throwable instanceof ResponseTooLargeException) {
return new PageTooLargeException(throwable);
}
return throwable;
}

public static class PageResponseHandler
implements ResponseHandler<PagesResponse, RuntimeException>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.operator;

import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.http.client.ResponseTooLargeException;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.execution.buffer.SerializedPage;
import com.facebook.presto.server.remotetask.Backoff;
Expand Down Expand Up @@ -342,7 +341,7 @@ public void onFailure(Throwable t)
log.debug("Request to %s failed %s", uri, t);
checkNotHoldsLock(this);

t = rewriteException(t);
t = resultClient.rewriteException(t);
if (!(t instanceof PrestoException) && backoff.failure()) {
String message = format("%s (%s - %s failures, failure duration %s, total failed request time %s)",
WORKER_NODE_ERROR,
Expand Down Expand Up @@ -470,14 +469,6 @@ else if (future != null) {
.toString();
}

private static Throwable rewriteException(Throwable t)
{
if (t instanceof ResponseTooLargeException) {
return new PageTooLargeException();
}
return t;
}

public static class PagesResponse
{
public static PagesResponse createPagesResponse(String taskInstanceId, long token, long nextToken, Iterable<SerializedPage> pages, boolean complete)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
public class PageTooLargeException
extends PrestoException
{
public PageTooLargeException()
public PageTooLargeException(Throwable e)
{
super(PAGE_TOO_LARGE, "Remote page is too large");
super(PAGE_TOO_LARGE, "Remote page is too large", e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ public interface RpcShuffleClient
* Close remote buffer
*/
ListenableFuture<?> abortResults();

/**
* Rewrite network related exception to Presto exception
*/
Throwable rewriteException(Throwable throwable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.log.Logger;
import com.facebook.drift.client.DriftClient;
import com.facebook.drift.transport.client.MessageTooLargeException;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.buffer.BufferResult;
import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId;
Expand Down Expand Up @@ -92,4 +93,13 @@ public ListenableFuture<?> abortResults()
{
return thriftClient.abortResults(taskId, outputBufferId);
}

@Override
public Throwable rewriteException(Throwable throwable)
{
if (throwable instanceof MessageTooLargeException) {
return new PageTooLargeException(throwable);
}
return throwable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public void testExceptionFromResponseHandler()
@Test
public void testErrorCodes()
{
assertEquals(new PageTooLargeException().getErrorCode(), PAGE_TOO_LARGE.toErrorCode());
assertEquals(new PageTooLargeException(null).getErrorCode(), PAGE_TOO_LARGE.toErrorCode());
assertEquals(new PageTransportErrorException("").getErrorCode(), PAGE_TRANSPORT_ERROR.toErrorCode());
assertEquals(new PageTransportTimeoutException(HostAddress.fromParts("127.0.0.1", 8080), "", null).getErrorCode(), PAGE_TRANSPORT_TIMEOUT.toErrorCode());
}
Expand Down

0 comments on commit be031b1

Please sign in to comment.