Skip to content

Commit

Permalink
Add dispatching to HandledTransportAction (#38050) (#38066)
Browse files Browse the repository at this point in the history
This commit allows implementors of the `HandledTransportAction` to
specify what thread the action should be executed on. The motivation for
this commit is that certain CCR requests should be performed on the
generic threadpool.
  • Loading branch information
Tim-Brooks authored Jan 31, 2019
1 parent 63c57b3 commit 8f327f3
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ protected HandledTransportAction(Settings settings, String actionName, ThreadPoo
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
}

protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> request, String executor) {
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, request, executor);
}

protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
IndexNameExpressionResolver indexNameExpressionResolver) {
Expand All @@ -51,8 +57,15 @@ protected HandledTransportAction(Settings settings, String actionName, ThreadPoo
protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
this(settings, actionName, canTripCircuitBreaker, threadPool, transportService, actionFilters, indexNameExpressionResolver,
request, ThreadPool.Names.SAME);
}

protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request, String executor) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, false, canTripCircuitBreaker,
transportService.registerRequestHandler(actionName, request, executor, false, canTripCircuitBreaker,
new TransportHandler());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,21 @@ public static class TransportDeleteCcrRestoreSessionAction
extends HandledTransportAction<ClearCcrRestoreSessionRequest, ClearCcrRestoreSessionResponse> {

private final CcrRestoreSourceService ccrRestoreService;
private final ThreadPool threadPool;

@Inject
public TransportDeleteCcrRestoreSessionAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver resolver,
TransportService transportService, CcrRestoreSourceService ccrRestoreService) {
super(settings, NAME, threadPool, transportService, actionFilters, resolver, ClearCcrRestoreSessionRequest::new);
super(settings, NAME, threadPool, transportService, actionFilters, resolver, ClearCcrRestoreSessionRequest::new,
ThreadPool.Names.GENERIC);
TransportActionProxy.registerProxyAction(transportService, NAME, ClearCcrRestoreSessionResponse::new);
this.ccrRestoreService = ccrRestoreService;
this.threadPool = transportService.getThreadPool();
}

@Override
protected void doExecute(ClearCcrRestoreSessionRequest request, ActionListener<ClearCcrRestoreSessionResponse> listener) {
// TODO: Currently blocking actions might occur in the session closed callbacks. This dispatch
// may be unnecessary when we remove these callbacks.
threadPool.generic().execute(() -> {
ccrRestoreService.closeSession(request.getSessionUUID());
listener.onResponse(new ClearCcrRestoreSessionResponse());
});
ccrRestoreService.closeSession(request.getSessionUUID());
listener.onResponse(new ClearCcrRestoreSessionResponse());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -68,7 +67,7 @@ public TransportGetCcrRestoreFileChunkAction(Settings settings, BigArrays bigArr
IndexNameExpressionResolver resolver,
ActionFilters actionFilters, CcrRestoreSourceService restoreSourceService) {
super(settings, NAME, transportService.getThreadPool(), transportService, actionFilters, resolver,
GetCcrRestoreFileChunkRequest::new);
GetCcrRestoreFileChunkRequest::new, ThreadPool.Names.GENERIC);
TransportActionProxy.registerProxyAction(transportService, NAME, GetCcrRestoreFileChunkResponse::new);
this.threadPool = transportService.getThreadPool();
this.restoreSourceService = restoreSourceService;
Expand All @@ -77,29 +76,21 @@ public TransportGetCcrRestoreFileChunkAction(Settings settings, BigArrays bigArr

@Override
protected void doExecute(GetCcrRestoreFileChunkRequest request, ActionListener<GetCcrRestoreFileChunkResponse> listener) {
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
int bytesRequested = request.getSize();
ByteArray array = bigArrays.newByteArray(bytesRequested, false);
String fileName = request.getFileName();
String sessionUUID = request.getSessionUUID();
// This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data
// structure on the same thread. So the bytes will be copied before the reference is released.
try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) {
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) {
long offsetAfterRead = sessionReader.readFileBytes(fileName, reference);
long offsetBeforeRead = offsetAfterRead - reference.length();
listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference));
}

@Override
protected void doRun() throws Exception {
int bytesRequested = request.getSize();
ByteArray array = bigArrays.newByteArray(bytesRequested, false);
String fileName = request.getFileName();
String sessionUUID = request.getSessionUUID();
// This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data
// structure on the same thread. So the bytes will be copied before the reference is released.
try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) {
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) {
long offsetAfterRead = sessionReader.readFileBytes(fileName, reference);
long offsetBeforeRead = offsetAfterRead - reference.length();
listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference));
}
}
}
});
} catch (IOException e) {
listener.onFailure(e);
}
}
}

Expand Down

0 comments on commit 8f327f3

Please sign in to comment.