Skip to content

Commit

Permalink
Add dispatching to HandledTransportAction (#38050)
Browse files Browse the repository at this point in the history
This commit allows implementors of the `HandledTransportAction` to
specify what thread the action should be executed on. The motivation for
this commit is that certain CCR requests should be performed on the
generic threadpool.
  • Loading branch information
Tim-Brooks authored Jan 30, 2019
1 parent 54dbf94 commit b88bdfe
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ protected HandledTransportAction(String actionName, TransportService transportSe
this(actionName, true, transportService, actionFilters, requestReader);
}

protected HandledTransportAction(String actionName, TransportService transportService,
ActionFilters actionFilters, Writeable.Reader<Request> requestReader, String executor) {
this(actionName, true, transportService, actionFilters, requestReader, executor);
}

protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker,
TransportService transportService, ActionFilters actionFilters, Supplier<Request> request) {
super(actionName, actionFilters, transportService.getTaskManager());
Expand All @@ -55,8 +60,14 @@ protected HandledTransportAction(String actionName, boolean canTripCircuitBreake
protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker,
TransportService transportService, ActionFilters actionFilters,
Writeable.Reader<Request> requestReader) {
this(actionName, canTripCircuitBreaker, transportService, actionFilters, requestReader, ThreadPool.Names.SAME);
}

protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker,
TransportService transportService, ActionFilters actionFilters,
Writeable.Reader<Request> requestReader, String executor) {
super(actionName, actionFilters, transportService.getTaskManager());
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, false, canTripCircuitBreaker, requestReader,
transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader,
new TransportHandler());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,20 @@ public static class TransportDeleteCcrRestoreSessionAction
extends HandledTransportAction<ClearCcrRestoreSessionRequest, ClearCcrRestoreSessionResponse> {

private final CcrRestoreSourceService ccrRestoreService;
private final ThreadPool threadPool;

@Inject
public TransportDeleteCcrRestoreSessionAction(ActionFilters actionFilters, TransportService transportService,
CcrRestoreSourceService ccrRestoreService) {
super(NAME, transportService, actionFilters, ClearCcrRestoreSessionRequest::new);
super(NAME, transportService, actionFilters, ClearCcrRestoreSessionRequest::new, ThreadPool.Names.GENERIC);
TransportActionProxy.registerProxyAction(transportService, NAME, ClearCcrRestoreSessionResponse::new);
this.ccrRestoreService = ccrRestoreService;
this.threadPool = transportService.getThreadPool();
}

@Override
protected void doExecute(Task task, ClearCcrRestoreSessionRequest request,
ActionListener<ClearCcrRestoreSessionResponse> listener) {
// TODO: Currently blocking actions might occur in the session closed callbacks. This dispatch
// may be unnecessary when we remove these callbacks.
threadPool.generic().execute(() -> {
ccrRestoreService.closeSession(request.getSessionUUID());
listener.onResponse(new ClearCcrRestoreSessionResponse());
});
ccrRestoreService.closeSession(request.getSessionUUID());
listener.onResponse(new ClearCcrRestoreSessionResponse());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportActionProxy;
Expand Down Expand Up @@ -58,7 +57,7 @@ public static class TransportGetCcrRestoreFileChunkAction
@Inject
public TransportGetCcrRestoreFileChunkAction(BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters,
CcrRestoreSourceService restoreSourceService) {
super(NAME, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new);
super(NAME, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new, ThreadPool.Names.GENERIC);
TransportActionProxy.registerProxyAction(transportService, NAME, GetCcrRestoreFileChunkResponse::new);
this.threadPool = transportService.getThreadPool();
this.restoreSourceService = restoreSourceService;
Expand All @@ -68,29 +67,21 @@ public TransportGetCcrRestoreFileChunkAction(BigArrays bigArrays, TransportServi
@Override
protected void doExecute(Task task, GetCcrRestoreFileChunkRequest request,
ActionListener<GetCcrRestoreFileChunkResponse> listener) {
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
int bytesRequested = request.getSize();
ByteArray array = bigArrays.newByteArray(bytesRequested, false);
String fileName = request.getFileName();
String sessionUUID = request.getSessionUUID();
// This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data
// structure on the same thread. So the bytes will be copied before the reference is released.
try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) {
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) {
long offsetAfterRead = sessionReader.readFileBytes(fileName, reference);
long offsetBeforeRead = offsetAfterRead - reference.length();
listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference));
}

@Override
protected void doRun() throws Exception {
int bytesRequested = request.getSize();
ByteArray array = bigArrays.newByteArray(bytesRequested, false);
String fileName = request.getFileName();
String sessionUUID = request.getSessionUUID();
// This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data
// structure on the same thread. So the bytes will be copied before the reference is released.
try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) {
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) {
long offsetAfterRead = sessionReader.readFileBytes(fileName, reference);
long offsetBeforeRead = offsetAfterRead - reference.length();
listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference));
}
}
}
});
} catch (IOException e) {
listener.onFailure(e);
}
}
}

Expand Down

0 comments on commit b88bdfe

Please sign in to comment.