diff --git a/presto-main/src/main/java/com/facebook/presto/execution/LocationFactory.java b/presto-main/src/main/java/com/facebook/presto/execution/LocationFactory.java index e61f2b62f6374..2644e2cb58683 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/LocationFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/LocationFactory.java @@ -26,6 +26,16 @@ public interface LocationFactory URI createLocalTaskLocation(TaskId taskId); + /** + * TODO: this method is required since not not all RPC call is supported by thrift. + * It should be merged into {@code createTaskLocation} once full thrift support is in-place for v1/task + */ + @Deprecated + URI createLegacyTaskLocation(InternalNode node, TaskId taskId); + + /** + * TODO: implement full thrift support for v1/task + */ URI createTaskLocation(InternalNode node, TaskId taskId); URI createMemoryInfoLocation(InternalNode node); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/RemoteTask.java b/presto-main/src/main/java/com/facebook/presto/execution/RemoteTask.java index 32358eddf01f0..73ea659501e29 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/RemoteTask.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/RemoteTask.java @@ -20,6 +20,8 @@ import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ListenableFuture; +import java.net.URI; + public interface RemoteTask { TaskId getTaskId(); @@ -30,6 +32,11 @@ public interface RemoteTask TaskStatus getTaskStatus(); + /** + * TODO: this should be merged into getTaskStatus once full thrift support is in-place for v1/task + */ + URI getRemoteTaskLocation(); + void start(); void addSplits(Multimap splits); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java index 8e480a61ed424..a6e7e12b99af2 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java @@ -365,7 +365,7 @@ public synchronized void addExchangeLocations(PlanFragmentId fragmentId, Set newSplits = ImmutableMultimap.builder(); for (RemoteTask sourceTask : sourceTasks) { TaskStatus sourceTaskStatus = sourceTask.getTaskStatus(); - newSplits.put(remoteSource.getId(), createRemoteSplitFor(task.getTaskId(), sourceTaskStatus.getSelf(), sourceTaskStatus.getTaskId())); + newSplits.put(remoteSource.getId(), createRemoteSplitFor(task.getTaskId(), sourceTask.getRemoteTaskLocation(), sourceTaskStatus.getTaskId())); } task.addSplits(newSplits.build()); } @@ -491,7 +491,7 @@ private synchronized RemoteTask scheduleTask(InternalNode node, TaskId taskId, M sourceTasks.forEach((planNodeId, task) -> { TaskStatus status = task.getTaskStatus(); if (status.getState() != TaskState.FINISHED) { - initialSplits.put(planNodeId, createRemoteSplitFor(taskId, status.getSelf(), status.getTaskId())); + initialSplits.put(planNodeId, createRemoteSplitFor(taskId, task.getRemoteTaskLocation(), status.getTaskId())); } }); diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpLocationFactory.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpLocationFactory.java index 3a7872d3808d4..ff80ac7a260b2 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpLocationFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpLocationFactory.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.server.remotetask; +import com.facebook.airlift.http.client.HttpUriBuilder; import com.facebook.airlift.http.server.HttpServerInfo; import com.facebook.presto.execution.LocationFactory; import com.facebook.presto.execution.StageId; @@ -26,6 +27,7 @@ import javax.inject.Inject; import java.net.URI; +import java.util.OptionalInt; import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom; import static java.util.Objects.requireNonNull; @@ -73,7 +75,13 @@ public URI createStageLocation(StageId stageId) @Override public URI createLocalTaskLocation(TaskId taskId) { - return createTaskLocation(nodeManager.getCurrentNode(), taskId); + return createHttpTaskLocation(nodeManager.getCurrentNode(), taskId); + } + + @Override + public URI createLegacyTaskLocation(InternalNode node, TaskId taskId) + { + return createHttpTaskLocation(node, taskId); } @Override @@ -81,10 +89,23 @@ public URI createTaskLocation(InternalNode node, TaskId taskId) { requireNonNull(node, "node is null"); requireNonNull(taskId, "taskId is null"); - return uriBuilderFrom(node.getInternalUri()) - .appendPath("/v1/task") - .appendPath(taskId.toString()) - .build(); + + if (taskCommunicationProtocol.equals(CommunicationProtocol.HTTP)) { + return createLegacyTaskLocation(node, taskId); + } + + OptionalInt thriftPort = node.getThriftPort(); + + HttpUriBuilder builder = uriBuilderFrom(node.getInternalUri()); + if (taskCommunicationProtocol.equals(CommunicationProtocol.THRIFT) && thriftPort.isPresent()) { + builder.scheme("thrift"); + builder.port(thriftPort.getAsInt()); + } + else { + // fall back to http case + } + + return builder.appendPath("/v1/task").appendPath(taskId.toString()).build(); } @Override @@ -94,4 +115,14 @@ public URI createMemoryInfoLocation(InternalNode node) return uriBuilderFrom(node.getInternalUri()) .appendPath("/v1/memory").build(); } + + private URI createHttpTaskLocation(InternalNode node, TaskId taskId) + { + requireNonNull(node, "node is null"); + requireNonNull(taskId, "taskId is null"); + return uriBuilderFrom(node.getInternalUri()) + .appendPath("/v1/task") + .appendPath(taskId.toString()) + .build(); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java index 3984ccf64244b..5a5cfaa387430 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java @@ -124,6 +124,7 @@ public final class HttpRemoteTask private final TaskId taskId; private final URI taskLocation; + private final URI remoteTaskLocation; private final Session session; private final String nodeId; @@ -190,6 +191,7 @@ public HttpRemoteTask( TaskId taskId, String nodeId, URI location, + URI remoteLocation, PlanFragment planFragment, Multimap initialSplits, OutputBuffers outputBuffers, @@ -215,6 +217,7 @@ public HttpRemoteTask( requireNonNull(taskId, "taskId is null"); requireNonNull(nodeId, "nodeId is null"); requireNonNull(location, "location is null"); + requireNonNull(remoteLocation, "remoteLocation is null"); requireNonNull(planFragment, "planFragment is null"); requireNonNull(outputBuffers, "outputBuffers is null"); requireNonNull(httpClient, "httpClient is null"); @@ -231,6 +234,7 @@ public HttpRemoteTask( try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) { this.taskId = taskId; this.taskLocation = location; + this.remoteTaskLocation = remoteLocation; this.session = session; this.nodeId = nodeId; this.planFragment = planFragment; @@ -337,6 +341,12 @@ public TaskStatus getTaskStatus() return taskStatusFetcher.getTaskStatus(); } + @Override + public URI getRemoteTaskLocation() + { + return remoteTaskLocation; + } + @Override public void start() { diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java index dcdc167174e1e..77ae401cb6190 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTaskFactory.java @@ -156,6 +156,7 @@ public RemoteTask createRemoteTask( session, taskId, node.getNodeIdentifier(), + locationFactory.createLegacyTaskLocation(node, taskId), locationFactory.createTaskLocation(node, taskId), fragment, initialSplits, diff --git a/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java b/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java index a1cffffb35b49..9907cdc67ac87 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java @@ -270,6 +270,12 @@ public TaskInfo getTaskInfo() true); } + @Override + public URI getRemoteTaskLocation() + { + return location; + } + @Override public TaskStatus getTaskStatus() { diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java index cbaa439ad8d5c..6069ccbb78fa8 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java @@ -314,11 +314,17 @@ public URI createLocalTaskLocation(TaskId taskId) } @Override - public URI createTaskLocation(InternalNode node, TaskId taskId) + public URI createLegacyTaskLocation(InternalNode node, TaskId taskId) { return URI.create("http://fake.invalid/task/" + node.getNodeIdentifier() + "/" + taskId); } + @Override + public URI createTaskLocation(InternalNode node, TaskId taskId) + { + return createLegacyTaskLocation(node, taskId); + } + @Override public URI createMemoryInfoLocation(InternalNode node) {