Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force execution of finish shard bulk request #51957

Merged
merged 5 commits into from
Feb 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,29 @@ protected void doRun() throws Exception {

@Override
public void onRejection(Exception e) {
// Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request
while (context.hasMoreOperationsToExecute()) {
context.setRequestToExecute(context.getCurrent());
final DocWriteRequest<?> docWriteRequest = context.getRequestToExecute();
onComplete(
exceptionToResult(
e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()),
context, null);
}
finishRequest();
// We must finish the outstanding request. Finishing the outstanding request can include
//refreshing and fsyncing. Therefore, we must force execution on the WRITE thread.
executor.execute(new ActionRunnable<>(listener) {

@Override
protected void doRun() {
// Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request
while (context.hasMoreOperationsToExecute()) {
context.setRequestToExecute(context.getCurrent());
final DocWriteRequest<?> docWriteRequest = context.getRequestToExecute();
onComplete(
exceptionToResult(
e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()),
context, null);
}
finishRequest();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that rather than scheduling only finishRequest on the write thread pool, we should execute the entirety of onRejection on the write thread pool (and still forcing execution, since rejecting execution here would be very bad):

ActionListener.wrap(v -> executor.execute(this), this::onRejection)) == false) {

The reason I think this is because we're still doing work in onRejection that is linear in the number of documents in the bulk request. It seems we'd want to get that off of the networking/cluster state applier thread too given that we're going to fork anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should instead let onRejection force-push the onRejection handling onto the queue of the requested executor. The current "direct" handling kind of prioritizes the onRejection handling over everything else in the queue, which I think there is really no good reason for.

Looking at various onRejection handlers, some call listener.onFailure and verifying that none of those do bad things is tricky.

Also notice that onAfter is also called in the caller thread when requests are rejected, this poses similar issues (not that I found a smoking gun there).

Finally, notice that AbstractRunnable.onRejection by default calls onFailure.

I doubt that we careful consider that onAfter and onFailure might run in the current thread when using AbstractRunnable and executing the onRejection handling on the target thread-pool would fix this, making it easier to reason about.

That said, I think this PR is good and I am not objecting to it going in. Following my suggestion is likely to surface a few additional things to resolve.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made @jasontedor change.

I think maybe Henning's comment about maybe onRejection should be executed on the thread pool anyway is beyond the scope of this PR? Or at least a larger discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tbrooks8, yes, that is fine, I just found it most natural to put it here. I will open a PR with that change so we can discuss based on that PR instead.

}

@Override
public boolean isForceExecution() {
return true;
}
});
}

private void finishRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
Expand All @@ -53,13 +54,18 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.arrayWithSize;
Expand Down Expand Up @@ -809,6 +815,105 @@ public void testRetries() throws Exception {
latch.await();
}

public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
TestThreadPool rejectingThreadPool = new TestThreadPool(
"TransportShardBulkActionTests#testForceExecutionOnRejectionAfterMappingUpdate",
Settings.builder()
.put("thread_pool." + ThreadPool.Names.WRITE + ".size", 1)
.put("thread_pool." + ThreadPool.Names.WRITE + ".queue_size", 1)
.build());
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
try {
cyclicBarrier.await();
logger.info("blocking the write executor");
cyclicBarrier.await();
logger.info("unblocked the write executor");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
try {
cyclicBarrier.await();
// Place a task in the queue to block next enqueue
rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {});

BulkItemRequest[] items = new BulkItemRequest[2];
DocWriteRequest<IndexRequest> writeRequest1 = new IndexRequest("index").id("id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", 1);
DocWriteRequest<IndexRequest> writeRequest2 = new IndexRequest("index").id("id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
items[0] = new BulkItemRequest(0, writeRequest1);
items[1] = new BulkItemRequest(1, writeRequest2);
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

Engine.IndexResult mappingUpdate =
new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()));
Translog.Location resultLocation1 = new Translog.Location(42, 36, 36);
Translog.Location resultLocation2 = new Translog.Location(42, 42, 42);
Engine.IndexResult success1 = new FakeIndexResult(1, 1, 10, true, resultLocation1);
Engine.IndexResult success2 = new FakeIndexResult(1, 1, 13, true, resultLocation2);

IndexShard shard = mock(IndexShard.class);
when(shard.shardId()).thenReturn(shardId);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
.thenReturn(success1, mappingUpdate, success2);
when(shard.getFailedIndexResult(any(EsRejectedExecutionException.class), anyLong())).thenCallRealMethod();
when(shard.mapperService()).thenReturn(mock(MapperService.class));

randomlySetIgnoredPrimaryResponse(items[0]);

AtomicInteger updateCalled = new AtomicInteger();

final CountDownLatch latch = new CountDownLatch(1);
TransportShardBulkAction.performOnPrimary(
bulkShardRequest, shard, null, rejectingThreadPool::absoluteTimeInMillis, (update, shardId, listener) -> {
// There should indeed be a mapping update
assertNotNull(update);
updateCalled.incrementAndGet();
listener.onResponse(null);
try {
// Release blocking task now that the continue write execution has been rejected and
// the finishRequest execution has been force enqueued
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new IllegalStateException(e);
}
}, listener -> listener.onResponse(null), new LatchedActionListener<>(
ActionTestUtils.assertNoFailureListener(result ->
// Assert that we still need to fsync the location that was successfully written
assertThat(((WritePrimaryResult<BulkShardRequest, BulkShardResponse>) result).location,
equalTo(resultLocation1))), latch),
rejectingThreadPool);
latch.await();

assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1));

verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());

BulkItemResponse primaryResponse1 = bulkShardRequest.items()[0].getPrimaryResponse();
assertThat(primaryResponse1.getItemId(), equalTo(0));
assertThat(primaryResponse1.getId(), equalTo("id"));
assertThat(primaryResponse1.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
assertFalse(primaryResponse1.isFailed());
assertThat(primaryResponse1.getResponse().status(), equalTo(RestStatus.CREATED));
assertThat(primaryResponse1.getResponse().getSeqNo(), equalTo(10L));

BulkItemResponse primaryResponse2 = bulkShardRequest.items()[1].getPrimaryResponse();
assertThat(primaryResponse2.getItemId(), equalTo(1));
assertThat(primaryResponse2.getId(), equalTo("id"));
assertThat(primaryResponse2.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
assertTrue(primaryResponse2.isFailed());
assertNull(primaryResponse2.getResponse());
assertEquals(primaryResponse2.status(), RestStatus.TOO_MANY_REQUESTS);
assertThat(primaryResponse2.getFailure().getCause(), instanceOf(EsRejectedExecutionException.class));

closeShards(shard);
} finally {
rejectingThreadPool.shutdownNow();
}
}

private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) {
if (randomBoolean()) {
// add a response to the request and thereby check that it is ignored for the primary.
Expand Down