Skip to content

Commit

Permalink
The watcher indexing listener didn't handle document level exceptions. (
Browse files Browse the repository at this point in the history
#51466)

Prior to the change the watcher index listener didn't implement the
`postIndex(ShardId, Engine.Index, Engine.IndexResult)` method. This
caused document level exceptions like VersionConflictEngineException
to be ignored. This commit fixes this.

The watcher indexing listener did implement the `postIndex(ShardId, Engine.Index, Exception)`
method, but that only handles engine level exceptions.

This change also unmutes the SmokeTestWatcherTestSuiteIT#testMonitorClusterHealth test again.

Relates to #32299
  • Loading branch information
martijnvg committed Jan 29, 2020
1 parent 206c8ac commit b253af3
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,19 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
}

/**
*
* In case of an error, we have to ensure that the triggerservice does not leave anything behind
* In case of a document related failure (for example version conflict), then clean up resources for a watch
* in the trigger service.
*/
@Override
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
if (result.getResultType() == Engine.Result.Type.FAILURE) {
assert result.getFailure() != null;
postIndex(shardId, index, result.getFailure());
}
}

/**
* In case of an engine related error, we have to ensure that the triggerservice does not leave anything behind
*
* TODO: If the configuration changes between preindex and postindex methods and we add a
* watch, that could not be indexed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -200,15 +201,48 @@ public void testPreIndexCheckParsingException() throws Exception {
assertThat(exc.getMessage(), containsString(id));
}

public void testPostIndexRemoveTriggerOnException() throws Exception {
public void testPostIndexRemoveTriggerOnDocumentRelatedException() throws Exception {
when(operation.id()).thenReturn("_id");
when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE);
when(result.getFailure()).thenReturn(new RuntimeException());
when(shardId.getIndexName()).thenReturn(Watch.INDEX);

listener.postIndex(shardId, operation, result);
verify(triggerService).remove(eq("_id"));
}

public void testPostIndexRemoveTriggerOnDocumentRelatedException_ignoreOtherEngineResultTypes() throws Exception {
List<Engine.Result.Type> types = new ArrayList<>(Arrays.asList(Engine.Result.Type.values()));
types.remove(Engine.Result.Type.FAILURE);

when(operation.id()).thenReturn("_id");
when(result.getResultType()).thenReturn(randomFrom(types));
when(result.getFailure()).thenReturn(new RuntimeException());
when(shardId.getIndexName()).thenReturn(Watch.INDEX);

listener.postIndex(shardId, operation, result);
verifyZeroInteractions(triggerService);
}

public void testPostIndexRemoveTriggerOnDocumentRelatedException_ignoreNonWatcherDocument() throws Exception {
when(operation.id()).thenReturn("_id");
when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE);
when(result.getFailure()).thenReturn(new RuntimeException());
when(shardId.getIndexName()).thenReturn(randomAlphaOfLength(4));

listener.postIndex(shardId, operation, result);
verifyZeroInteractions(triggerService);
}

public void testPostIndexRemoveTriggerOnEngineLevelException() throws Exception {
when(operation.id()).thenReturn("_id");
when(shardId.getIndexName()).thenReturn(Watch.INDEX);

listener.postIndex(shardId, operation, new ElasticsearchParseException("whatever"));
verify(triggerService).remove(eq("_id"));
}

public void testPostIndexDontInvokeForOtherDocuments() throws Exception {
public void testPostIndexRemoveTriggerOnEngineLevelException_ignoreNonWatcherDocument() throws Exception {
when(operation.id()).thenReturn("_id");
when(shardId.getIndexName()).thenReturn("anything");
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ protected Settings restAdminSettings() {
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32299")
public void testMonitorClusterHealth() throws Exception {
final String watchId = "cluster_health_watch";

Expand Down

0 comments on commit b253af3

Please sign in to comment.