diff --git a/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java b/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java index 0b35bc8fb89d6..c0bc0af839967 100644 --- a/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java @@ -45,6 +45,11 @@ protected HandledTransportAction(String actionName, TransportService transportSe this(actionName, true, transportService, actionFilters, requestReader); } + protected HandledTransportAction(String actionName, TransportService transportService, + ActionFilters actionFilters, Writeable.Reader requestReader, String executor) { + this(actionName, true, transportService, actionFilters, requestReader, executor); + } + protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker, TransportService transportService, ActionFilters actionFilters, Supplier request) { super(actionName, actionFilters, transportService.getTaskManager()); @@ -55,8 +60,14 @@ protected HandledTransportAction(String actionName, boolean canTripCircuitBreake protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker, TransportService transportService, ActionFilters actionFilters, Writeable.Reader requestReader) { + this(actionName, canTripCircuitBreaker, transportService, actionFilters, requestReader, ThreadPool.Names.SAME); + } + + protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker, + TransportService transportService, ActionFilters actionFilters, + Writeable.Reader requestReader, String executor) { super(actionName, actionFilters, transportService.getTaskManager()); - transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, false, canTripCircuitBreaker, requestReader, + transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader, new TransportHandler()); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 81cde2984f500..317890edb4206 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -43,26 +43,20 @@ public static class TransportDeleteCcrRestoreSessionAction extends HandledTransportAction { private final CcrRestoreSourceService ccrRestoreService; - private final ThreadPool threadPool; @Inject public TransportDeleteCcrRestoreSessionAction(ActionFilters actionFilters, TransportService transportService, CcrRestoreSourceService ccrRestoreService) { - super(NAME, transportService, actionFilters, ClearCcrRestoreSessionRequest::new); + super(NAME, transportService, actionFilters, ClearCcrRestoreSessionRequest::new, ThreadPool.Names.GENERIC); TransportActionProxy.registerProxyAction(transportService, NAME, ClearCcrRestoreSessionResponse::new); this.ccrRestoreService = ccrRestoreService; - this.threadPool = transportService.getThreadPool(); } @Override protected void doExecute(Task task, ClearCcrRestoreSessionRequest request, ActionListener listener) { - // TODO: Currently blocking actions might occur in the session closed callbacks. This dispatch - // may be unnecessary when we remove these callbacks. - threadPool.generic().execute(() -> { - ccrRestoreService.closeSession(request.getSessionUUID()); - listener.onResponse(new ClearCcrRestoreSessionResponse()); - }); + ccrRestoreService.closeSession(request.getSessionUUID()); + listener.onResponse(new ClearCcrRestoreSessionResponse()); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java index 3f473f25c2411..cf8d2e5c55f48 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportActionProxy; @@ -58,7 +57,7 @@ public static class TransportGetCcrRestoreFileChunkAction @Inject public TransportGetCcrRestoreFileChunkAction(BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters, CcrRestoreSourceService restoreSourceService) { - super(NAME, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new); + super(NAME, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new, ThreadPool.Names.GENERIC); TransportActionProxy.registerProxyAction(transportService, NAME, GetCcrRestoreFileChunkResponse::new); this.threadPool = transportService.getThreadPool(); this.restoreSourceService = restoreSourceService; @@ -68,29 +67,21 @@ public TransportGetCcrRestoreFileChunkAction(BigArrays bigArrays, TransportServi @Override protected void doExecute(Task task, GetCcrRestoreFileChunkRequest request, ActionListener listener) { - threadPool.generic().execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); + int bytesRequested = request.getSize(); + ByteArray array = bigArrays.newByteArray(bytesRequested, false); + String fileName = request.getFileName(); + String sessionUUID = request.getSessionUUID(); + // This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data + // structure on the same thread. So the bytes will be copied before the reference is released. + try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) { + try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) { + long offsetAfterRead = sessionReader.readFileBytes(fileName, reference); + long offsetBeforeRead = offsetAfterRead - reference.length(); + listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference)); } - - @Override - protected void doRun() throws Exception { - int bytesRequested = request.getSize(); - ByteArray array = bigArrays.newByteArray(bytesRequested, false); - String fileName = request.getFileName(); - String sessionUUID = request.getSessionUUID(); - // This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data - // structure on the same thread. So the bytes will be copied before the reference is released. - try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) { - try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) { - long offsetAfterRead = sessionReader.readFileBytes(fileName, reference); - long offsetBeforeRead = offsetAfterRead - reference.length(); - listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference)); - } - } - } - }); + } catch (IOException e) { + listener.onFailure(e); + } } }