Skip to content

Commit

Permalink
Add thrift support for v1/task/results
Browse files Browse the repository at this point in the history
  • Loading branch information
highker committed Jan 17, 2020
1 parent a2e20c7 commit 785eb79
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ public interface LocationFactory

URI createLocalTaskLocation(TaskId taskId);

/**
* TODO: this method is required since not not all RPC call is supported by thrift.
* It should be merged into {@code createTaskLocation} once full thrift support is in-place for v1/task
*/
@Deprecated
URI createLegacyTaskLocation(InternalNode node, TaskId taskId);

/**
* TODO: implement full thrift support for v1/task
*/
URI createTaskLocation(InternalNode node, TaskId taskId);

URI createMemoryInfoLocation(InternalNode node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;

import java.net.URI;

public interface RemoteTask
{
TaskId getTaskId();
Expand All @@ -30,6 +32,11 @@ public interface RemoteTask

TaskStatus getTaskStatus();

/**
* TODO: this should be merged into getTaskStatus once full thrift support is in-place for v1/task
*/
URI getRemoteTaskLocation();

void start();

void addSplits(Multimap<PlanNodeId, Split> splits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ public synchronized void addExchangeLocations(PlanFragmentId fragmentId, Set<Rem
ImmutableMultimap.Builder<PlanNodeId, Split> newSplits = ImmutableMultimap.builder();
for (RemoteTask sourceTask : sourceTasks) {
TaskStatus sourceTaskStatus = sourceTask.getTaskStatus();
newSplits.put(remoteSource.getId(), createRemoteSplitFor(task.getTaskId(), sourceTaskStatus.getSelf(), sourceTaskStatus.getTaskId()));
newSplits.put(remoteSource.getId(), createRemoteSplitFor(task.getTaskId(), sourceTask.getRemoteTaskLocation(), sourceTaskStatus.getTaskId()));
}
task.addSplits(newSplits.build());
}
Expand Down Expand Up @@ -491,7 +491,7 @@ private synchronized RemoteTask scheduleTask(InternalNode node, TaskId taskId, M
sourceTasks.forEach((planNodeId, task) -> {
TaskStatus status = task.getTaskStatus();
if (status.getState() != TaskState.FINISHED) {
initialSplits.put(planNodeId, createRemoteSplitFor(taskId, status.getSelf(), status.getTaskId()));
initialSplits.put(planNodeId, createRemoteSplitFor(taskId, task.getRemoteTaskLocation(), status.getTaskId()));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.server.remotetask;

import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.http.server.HttpServerInfo;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.StageId;
Expand All @@ -26,6 +27,7 @@
import javax.inject.Inject;

import java.net.URI;
import java.util.OptionalInt;

import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -73,18 +75,37 @@ public URI createStageLocation(StageId stageId)
@Override
public URI createLocalTaskLocation(TaskId taskId)
{
return createTaskLocation(nodeManager.getCurrentNode(), taskId);
return createHttpTaskLocation(nodeManager.getCurrentNode(), taskId);
}

@Override
public URI createLegacyTaskLocation(InternalNode node, TaskId taskId)
{
return createHttpTaskLocation(node, taskId);
}

@Override
public URI createTaskLocation(InternalNode node, TaskId taskId)
{
requireNonNull(node, "node is null");
requireNonNull(taskId, "taskId is null");
return uriBuilderFrom(node.getInternalUri())
.appendPath("/v1/task")
.appendPath(taskId.toString())
.build();

if (taskCommunicationProtocol.equals(CommunicationProtocol.HTTP)) {
return createLegacyTaskLocation(node, taskId);
}

OptionalInt thriftPort = node.getThriftPort();

HttpUriBuilder builder = uriBuilderFrom(node.getInternalUri());
if (taskCommunicationProtocol.equals(CommunicationProtocol.THRIFT) && thriftPort.isPresent()) {
builder.scheme("thrift");
builder.port(thriftPort.getAsInt());
}
else {
// fall back to http case
}

return builder.appendPath("/v1/task").appendPath(taskId.toString()).build();
}

@Override
Expand All @@ -94,4 +115,14 @@ public URI createMemoryInfoLocation(InternalNode node)
return uriBuilderFrom(node.getInternalUri())
.appendPath("/v1/memory").build();
}

private URI createHttpTaskLocation(InternalNode node, TaskId taskId)
{
requireNonNull(node, "node is null");
requireNonNull(taskId, "taskId is null");
return uriBuilderFrom(node.getInternalUri())
.appendPath("/v1/task")
.appendPath(taskId.toString())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public final class HttpRemoteTask

private final TaskId taskId;
private final URI taskLocation;
private final URI remoteTaskLocation;

private final Session session;
private final String nodeId;
Expand Down Expand Up @@ -190,6 +191,7 @@ public HttpRemoteTask(
TaskId taskId,
String nodeId,
URI location,
URI remoteLocation,
PlanFragment planFragment,
Multimap<PlanNodeId, Split> initialSplits,
OutputBuffers outputBuffers,
Expand All @@ -215,6 +217,7 @@ public HttpRemoteTask(
requireNonNull(taskId, "taskId is null");
requireNonNull(nodeId, "nodeId is null");
requireNonNull(location, "location is null");
requireNonNull(remoteLocation, "remoteLocation is null");
requireNonNull(planFragment, "planFragment is null");
requireNonNull(outputBuffers, "outputBuffers is null");
requireNonNull(httpClient, "httpClient is null");
Expand All @@ -231,6 +234,7 @@ public HttpRemoteTask(
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
this.taskId = taskId;
this.taskLocation = location;
this.remoteTaskLocation = remoteLocation;
this.session = session;
this.nodeId = nodeId;
this.planFragment = planFragment;
Expand Down Expand Up @@ -337,6 +341,12 @@ public TaskStatus getTaskStatus()
return taskStatusFetcher.getTaskStatus();
}

@Override
public URI getRemoteTaskLocation()
{
return remoteTaskLocation;
}

@Override
public void start()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public RemoteTask createRemoteTask(
session,
taskId,
node.getNodeIdentifier(),
locationFactory.createLegacyTaskLocation(node, taskId),
locationFactory.createTaskLocation(node, taskId),
fragment,
initialSplits,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ public TaskInfo getTaskInfo()
true);
}

@Override
public URI getRemoteTaskLocation()
{
return location;
}

@Override
public TaskStatus getTaskStatus()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,17 @@ public URI createLocalTaskLocation(TaskId taskId)
}

@Override
public URI createTaskLocation(InternalNode node, TaskId taskId)
public URI createLegacyTaskLocation(InternalNode node, TaskId taskId)
{
return URI.create("http://fake.invalid/task/" + node.getNodeIdentifier() + "/" + taskId);
}

@Override
public URI createTaskLocation(InternalNode node, TaskId taskId)
{
return createLegacyTaskLocation(node, taskId);
}

@Override
public URI createMemoryInfoLocation(InternalNode node)
{
Expand Down

0 comments on commit 785eb79

Please sign in to comment.