Skip to content

Commit

Permalink
Inline TransportReplAction#registerRequestHandlers
Browse files Browse the repository at this point in the history
It is important that resync actions are not rejected on the primary even if its
`write` threadpool is overloaded. Today we do this by exposing
`registerRequestHandlers` to subclasses and overriding it in
`TransportResyncReplicationAction`. This isn't ideal because it obscures the
difference between this action and other replication actions, and also might
allow subclasses to try and use some state before they are properly
initialised. This change replaces this override with a constructor parameter to
solve these issues.

Relates elastic#40706
  • Loading branch information
DaveCTurner committed Apr 3, 2019
1 parent 9ad9f16 commit dd60098
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public TransportShardBulkAction(Settings settings, TransportService transportSer
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE);
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
this.threadPool = threadPool;
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.function.Supplier;

public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
Expand All @@ -60,22 +59,8 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE);
}

@Override
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
// we should never reject resync because of thread pool capacity on primary
transportService.registerRequestHandler(transportPrimaryAction,
() -> new ConcreteShardRequest<>(request),
executor, true, true,
this::handlePrimaryRequest);
transportService.registerRequestHandler(transportReplicaAction,
() -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true,
this::handleReplicaRequest);
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
true /* we should never reject resync because of thread pool capacity on primary */);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor, false);
indexNameExpressionResolver, request, replicaRequest, executor, false, false);
}


Expand All @@ -132,7 +132,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor,
boolean syncGlobalCheckpointAfterOperation) {
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
super(actionName, actionFilters, transportService.getTaskManager());
this.threadPool = threadPool;
this.transportService = transportService;
Expand All @@ -144,21 +144,19 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans

this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);

this.transportOptions = transportOptions(settings);
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);

this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
}
transportService.registerRequestHandler(transportPrimaryAction,
() -> new ConcreteShardRequest<>(request), executor, forceExecutionOnPrimary, true, this::handlePrimaryRequest);

protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
this::handlePrimaryRequest);
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(
transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest);
transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest),
executor, true, true, this::handleReplicaRequest);

this.transportOptions = transportOptions(settings);

this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ public abstract class TransportWriteAction<
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {

protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor) {
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
indexNameExpressionResolver, request, replicaRequest, executor, true);
indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary);
}

/** Syncs operation result to the translog or throws a shard not available failure */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public RetentionLeaseSyncAction(
indexNameExpressionResolver,
RetentionLeaseSyncAction.Request::new,
RetentionLeaseSyncAction.Request::new,
ThreadPool.Names.MANAGEMENT);
ThreadPool.Names.MANAGEMENT, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> null, null, Collections.emptySet()), null, null, null, null,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), TestRequest::new,
TestRequest::new, ThreadPool.Names.SAME);
TestRequest::new, ThreadPool.Names.SAME, false);
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
}
Expand All @@ -412,7 +412,7 @@ protected TestAction(Settings settings, String actionName, TransportService tran
super(settings, actionName, transportService, clusterService,
mockIndicesService(clusterService), threadPool, shardStateAction,
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME);
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
this.withDocumentFailureOnPrimary = false;
this.withDocumentFailureOnReplica = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public TransportBulkShardOperationsAction(
indexNameExpressionResolver,
BulkShardOperationsRequest::new,
BulkShardOperationsRequest::new,
ThreadPool.Names.WRITE);
ThreadPool.Names.WRITE, false);
}

@Override
Expand Down

0 comments on commit dd60098

Please sign in to comment.