Skip to content

Commit

Permalink
Dedicated threadpool for system index writes (#61655)
Browse files Browse the repository at this point in the history
This commit adds a dedicated threadpool for system index write
operations. The dedicated resources for system index writes serves as
a means to ensure that user activity does not block important system
operations from occurring such as the management of users and roles.
  • Loading branch information
jaymode authored Sep 22, 2020
1 parent 3ee74a4 commit 242083a
Show file tree
Hide file tree
Showing 30 changed files with 399 additions and 132 deletions.
5 changes: 5 additions & 0 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ There are several thread pools, but the important ones include:
Thread pool type is `fixed` and a default maximum size of
`min(5, (`<<node.processors, `# of allocated processors`>>`) / 2)`.

`system_write`::
For write operations on system indices.
Thread pool type is `fixed` and a default maximum size of
`min(5, (`<<node.processors, `# of allocated processors`>>`) / 2)`.

Changing a specific thread pool can be done by setting its type-specific
parameters; for example, changing the number of threads in the `write` thread
pool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -394,4 +395,8 @@ private static Boolean valueOrDefault(Boolean value, Boolean globalDefault) {
public long ramBytesUsed() {
return SHALLOW_SIZE + requests.stream().mapToLong(Accountable::ramBytesUsed).sum();
}

public Set<String> getIndices() {
return Collections.unmodifiableSet(indices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
Expand All @@ -81,6 +83,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
Expand Down Expand Up @@ -110,20 +113,22 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final IndexNameExpressionResolver indexNameExpressionResolver;
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
private final IndexingPressure indexingPressure;
private final SystemIndices systemIndices;

@Inject
public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
NodeClient client, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure) {
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, SystemIndices systemIndices) {
this(threadPool, transportService, clusterService, ingestService, client, actionFilters,
indexNameExpressionResolver, autoCreateIndex, indexingPressure, System::nanoTime);
indexNameExpressionResolver, autoCreateIndex, indexingPressure, systemIndices, System::nanoTime);
}

public TransportBulkAction(ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
NodeClient client, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, LongSupplier relativeTimeProvider) {
AutoCreateIndex autoCreateIndex, IndexingPressure indexingPressure, SystemIndices systemIndices,
LongSupplier relativeTimeProvider) {
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, ThreadPool.Names.SAME);
Objects.requireNonNull(relativeTimeProvider);
this.threadPool = threadPool;
Expand All @@ -135,6 +140,7 @@ public TransportBulkAction(ThreadPool threadPool, TransportService transportServ
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexingPressure = indexingPressure;
this.systemIndices = systemIndices;
clusterService.addStateApplier(this.ingestForwarder);
}

Expand All @@ -158,17 +164,19 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
long indexingBytes = bulkRequest.ramBytesUsed();
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes);
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingBytes, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
try {
doInternalExecute(task, bulkRequest, releasingListener);
doInternalExecute(task, bulkRequest, executorName, releasingListener);
} catch (Exception e) {
releasingListener.onFailure(e);
}
}

protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());

Expand Down Expand Up @@ -206,7 +214,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListe
assert arePipelinesResolved : bulkRequest;
}
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
processBulkIndexIngestRequest(task, bulkRequest, executorName, listener);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
Expand Down Expand Up @@ -255,7 +263,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, ActionListe
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
threadPool.executor(ThreadPool.Names.WRITE).execute(
threadPool.executor(executorName).execute(
() -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));
}
}
Expand All @@ -272,10 +280,11 @@ public void onFailure(Exception e) {
}
}
if (counter.decrementAndGet() == 0) {
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
threadPool.executor(executorName).execute(() -> executeBulk(task, bulkRequest, startTime,
ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}), responses, indicesThatCannotBeCreated);
}), responses, indicesThatCannotBeCreated));
}
}
});
Expand Down Expand Up @@ -336,6 +345,18 @@ static void prohibitCustomRoutingOnDataStream(DocWriteRequest<?> writeRequest, M
}
}

boolean isOnlySystem(BulkRequest request, SortedMap<String, IndexAbstraction> indicesLookup, SystemIndices systemIndices) {
final boolean onlySystem = request.getIndices().stream().allMatch(indexName -> {
final IndexAbstraction abstraction = indicesLookup.get(indexName);
if (abstraction != null) {
return abstraction.isSystem();
} else {
return systemIndices.isSystemIndex(indexName);
}
});
return onlySystem;
}

boolean needToCheck() {
return autoCreateIndex.needToCheck();
}
Expand Down Expand Up @@ -656,7 +677,8 @@ private long relativeTime() {
return relativeTimeProvider.getAsLong();
}

private void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
private void processBulkIndexIngestRequest(Task task, BulkRequest original, String executorName,
ActionListener<BulkResponse> listener) {
final long ingestStartTimeInNanos = System.nanoTime();
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.executeBulkRequest(
Expand All @@ -681,18 +703,18 @@ private void processBulkIndexIngestRequest(Task task, BulkRequest original, Acti
// If a processor went async and returned a response on a different thread then
// before we continue the bulk request we should fork back on a write thread:
if (originalThread == Thread.currentThread()) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE);
doInternalExecute(task, bulkRequest, actionListener);
assert Thread.currentThread().getName().contains(executorName);
doInternalExecute(task, bulkRequest, executorName, actionListener);
} else {
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
threadPool.executor(executorName).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
protected void doRun() throws Exception {
doInternalExecute(task, bulkRequest, actionListener);
doInternalExecute(task, bulkRequest, executorName, actionListener);
}

@Override
Expand All @@ -708,7 +730,8 @@ public boolean isForceExecution() {
}
}
},
bulkRequestModifier::markItemAsDropped
bulkRequestModifier::markItemAsDropped,
executorName
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,18 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;

/** Performs shard-level bulk (index, delete or update) operations */
Expand All @@ -86,6 +89,13 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
public static final ActionType<BulkShardResponse> TYPE = new ActionType<>(ACTION_NAME, BulkShardResponse::new);

private static final Logger logger = LogManager.getLogger(TransportShardBulkAction.class);
private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
if (shard.indexSettings().getIndexMetadata().isSystem()) {
return Names.SYSTEM_WRITE;
} else {
return Names.WRITE;
}
};

private final UpdateHelper updateHelper;
private final MappingUpdatedAction mappingUpdatedAction;
Expand All @@ -94,9 +104,9 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexingPressure indexingPressure) {
IndexingPressure indexingPressure, SystemIndices systemIndices) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false, indexingPressure);
BulkShardRequest::new, BulkShardRequest::new, EXECUTOR_NAME_FUNCTION, false, indexingPressure, systemIndices);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
}
Expand Down Expand Up @@ -136,7 +146,7 @@ public void onClusterServiceClose() {
public void onTimeout(TimeValue timeout) {
mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update"));
}
}), listener, threadPool
}), listener, threadPool, executor(primary)
);
}

Expand All @@ -153,10 +163,11 @@ public static void performOnPrimary(
MappingUpdatePerformer mappingUpdater,
Consumer<ActionListener<Void>> waitForMappingUpdate,
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener,
ThreadPool threadPool) {
ThreadPool threadPool,
String executorName) {
new ActionRunnable<>(listener) {

private final Executor executor = threadPool.executor(ThreadPool.Names.WRITE);
private final Executor executor = threadPool.executor(executorName);

private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand All @@ -33,35 +32,46 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.function.Function;
import java.util.stream.Stream;

public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {

private static String ACTION_NAME = "internal:index/seq_no/resync";
private static final Function<IndexShard, String> EXECUTOR_NAME_FUNCTION = shard -> {
if (shard.indexSettings().getIndexMetadata().isSystem()) {
return Names.SYSTEM_WRITE;
} else {
return Names.WRITE;
}
};

@Inject
public TransportResyncReplicationAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ActionFilters actionFilters,
IndexingPressure indexingPressure) {
IndexingPressure indexingPressure, SystemIndices systemIndices) {
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
ResyncReplicationRequest::new, ResyncReplicationRequest::new, EXECUTOR_NAME_FUNCTION,
true, /* we should never reject resync because of thread pool capacity on primary */
indexingPressure);
indexingPressure, systemIndices);
}

@Override
Expand Down
Loading

0 comments on commit 242083a

Please sign in to comment.