Skip to content

Commit

Permalink
Improve Watcher test framework resiliency (elastic#40658)
Browse files Browse the repository at this point in the history
It is possible for the watches tracked by ScheduleTriggerEngineMock to
get out of sync with the Watches in the ScheduleTriggerEngine
production code, which can lead to watches failing to run.

This commit:

1. Changes TimeWarp to try to run the watch on all schedulers, rather than stopping after one which claims to have the watch registered. This reduces the impact of desynchronization between the mocking code and the backing production code.
2. Makes ScheduleTriggerEngineMock respect pauses of execution again. This is necessary to prevent duplicate watch invocations due to the above change.
3. Tweaks how watches are registered in ScheduleTriggerEngineMock to prevent race conditions due to concurrent modification.
4. Tweaks WatcherConcreteIndexTests to use TimeWarp instead of waiting for watches to be triggered, as TimeWarp is more reliable and accomplishes the same goal.
  • Loading branch information
gwbrown committed Apr 9, 2019
1 parent fdc1bdd commit 3f9ec34
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,14 @@

public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCase {

@Override
protected boolean timeWarped() {
return false;
}

public void testCanUseAnyConcreteIndexName() throws Exception {
String newWatcherIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
String watchResultsIndex = randomAlphaOfLength(11).toLowerCase(Locale.ROOT);
createIndex(watchResultsIndex);

stopWatcher();
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName);
ensureGreen(newWatcherIndexName);
startWatcher();

PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder()
Expand All @@ -45,6 +41,9 @@ public void testCanUseAnyConcreteIndexName() throws Exception {
.get();

assertTrue(putWatchResponse.isCreated());
refresh();

timeWarp().trigger("mywatch");

assertBusy(() -> {
SearchResponse searchResult = client().prepareSearch(watchResultsIndex).setTrackTotalHits(true).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.actions.ActionBuilders;
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
import org.elasticsearch.xpack.watcher.common.http.HttpMethod;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.http.Scheme;
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
Expand Down Expand Up @@ -67,7 +67,6 @@ public void stopWebservice() throws Exception {
webServer.close();
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503")
public void testHttps() throws Exception {
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder("localhost", webServer.getPort())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.actions.webhook;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.transport.TransportAddress;
Expand All @@ -18,9 +17,9 @@
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.actions.ActionBuilders;
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
import org.elasticsearch.xpack.watcher.common.http.HttpMethod;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
Expand All @@ -44,7 +43,6 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503")
public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase {

private MockWebServer webServer = new MockWebServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.watcher.test;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
Expand Down Expand Up @@ -580,6 +582,7 @@ public EmailSent send(Email email, Authentication auth, Profile profile, String
}

protected static class TimeWarp {
private static final Logger logger = LogManager.getLogger(TimeWarp.class);

private final List<ScheduleTriggerEngineMock> schedulers;
private final ClockMock clock;
Expand All @@ -598,9 +601,14 @@ public ClockMock clock() {
}

public void trigger(String watchId, int times, TimeValue timeValue) {
boolean isTriggered = schedulers.stream().anyMatch(scheduler -> scheduler.trigger(watchId, times, timeValue));
String msg = String.format(Locale.ROOT, "could not find watch [%s] to trigger", watchId);
assertThat(msg, isTriggered, is(true));
long triggeredCount = schedulers.stream()
.filter(scheduler -> scheduler.trigger(watchId, times, timeValue))
.count();
String msg = String.format(Locale.ROOT, "watch was triggered on [%d] schedulers, expected [1]", triggeredCount);
if (triggeredCount > 1) {
logger.warn(msg);
}
assertThat(msg, triggeredCount, greaterThanOrEqualTo(1L));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.test.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
Expand Down Expand Up @@ -63,7 +62,6 @@

@TestLogging("org.elasticsearch.xpack.watcher:DEBUG," +
"org.elasticsearch.xpack.watcher.WatcherIndexingListener:TRACE")
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503")
public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {

public void testIndexWatch() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
return super.nodeSettings(nodeOrdinal);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40587")
public void testHttpInput() throws Exception {
WatcherClient watcherClient = watcherClient();
watcherClient.preparePutWatch("_id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public void testAckSingleAction() throws Exception {
assertThat(throttledCount, greaterThan(0L));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35506")
public void testAckAllActions() throws Exception {
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch()
.setId("_id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

public class WatchMetadataTests extends AbstractWatcherIntegrationTestCase {

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40631")
public void testWatchMetadata() throws Exception {
Map<String, Object> metadata = new HashMap<>();
metadata.put("foo", "bar");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
* A mock scheduler to help with unit testing. Provide {@link ScheduleTriggerEngineMock#trigger} method to manually trigger
Expand All @@ -31,7 +33,8 @@
public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
private static final Logger logger = LogManager.getLogger(ScheduleTriggerEngineMock.class);

private final ConcurrentMap<String, Watch> watches = new ConcurrentHashMap<>();
private final AtomicReference<Map<String, Watch>> watches = new AtomicReference<>(new ConcurrentHashMap<>());
private final AtomicBoolean paused = new AtomicBoolean(false);

public ScheduleTriggerEngineMock(ScheduleRegistry scheduleRegistry, Clock clock) {
super(scheduleRegistry, clock);
Expand All @@ -49,38 +52,44 @@ public ScheduleTriggerEvent parseTriggerEvent(TriggerService service, String wat
}

@Override
public void start(Collection<Watch> jobs) {
jobs.forEach(this::add);
public synchronized void start(Collection<Watch> jobs) {
Map<String, Watch> newWatches = new ConcurrentHashMap<>();
jobs.forEach((watch) -> newWatches.put(watch.id(), watch));
watches.set(newWatches);
paused.set(false);
}

@Override
public void stop() {
watches.clear();
watches.set(new ConcurrentHashMap<>());
}

@Override
public void add(Watch watch) {
public synchronized void add(Watch watch) {
logger.debug("adding watch [{}]", watch.id());
watches.put(watch.id(), watch);
watches.get().put(watch.id(), watch);
}

@Override
public void pauseExecution() {
// No action is needed because this engine does not trigger watches on a schedule (instead
// they must be triggered manually).
paused.set(true);
}

@Override
public boolean remove(String jobId) {
return watches.remove(jobId) != null;
public synchronized boolean remove(String jobId) {
return watches.get().remove(jobId) != null;
}

public boolean trigger(String jobName) {
return trigger(jobName, 1, null);
}

public boolean trigger(String jobName, int times, TimeValue interval) {
if (watches.containsKey(jobName) == false) {
if (watches.get().containsKey(jobName) == false) {
return false;
}
if (paused.get()) {
logger.info("not executing watch [{}] on this scheduler because it is paused", jobName);
return false;
}

Expand All @@ -89,7 +98,7 @@ public boolean trigger(String jobName, int times, TimeValue interval) {
logger.debug("firing watch [{}] at [{}]", jobName, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now);
consumers.forEach(consumer -> consumer.accept(Collections.singletonList(event)));
if (interval != null) {
if (interval != null) {
if (clock instanceof ClockMock) {
((ClockMock) clock).fastForward(interval);
} else {
Expand Down

0 comments on commit 3f9ec34

Please sign in to comment.