From ff61badcb884003acca2e53413d1c0ee04f50066 Mon Sep 17 00:00:00 2001 From: Gordon Brown Date: Tue, 9 Apr 2019 08:49:00 -0600 Subject: [PATCH] Improve Watcher test framework resiliency (#40658) It is possible for the watches tracked by ScheduleTriggerEngineMock to get out of sync with the Watches in the ScheduleTriggerEngine production code, which can lead to watches failing to run. This commit: 1. Changes TimeWarp to try to run the watch on all schedulers, rather than stopping after one which claims to have the watch registered. This reduces the impact of desynchronization between the mocking code and the backing production code. 2. Makes ScheduleTriggerEngineMock respect pauses of execution again. This is necessary to prevent duplicate watch invocations due to the above change. 3. Tweaks how watches are registered in ScheduleTriggerEngineMock to prevent race conditions due to concurrent modification. 4. Tweaks WatcherConcreteIndexTests to use TimeWarp instead of waiting for watches to be triggered, as TimeWarp is more reliable and accomplishes the same goal. --- .../watcher/WatcherConcreteIndexTests.java | 9 +++-- .../webhook/WebhookHttpsIntegrationTests.java | 3 +- .../webhook/WebhookIntegrationTests.java | 4 +-- .../AbstractWatcherIntegrationTestCase.java | 14 ++++++-- .../test/integration/BasicWatcherTests.java | 2 -- .../HttpSecretsIntegrationTests.java | 1 - .../test/integration/WatchAckTests.java | 1 - .../test/integration/WatchMetadataTests.java | 1 - .../trigger/ScheduleTriggerEngineMock.java | 35 ++++++++++++------- 9 files changed, 39 insertions(+), 31 deletions(-) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherConcreteIndexTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherConcreteIndexTests.java index e6b253d17397c..237c0a2bdf153 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherConcreteIndexTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherConcreteIndexTests.java @@ -23,11 +23,6 @@ public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCase { - @Override - protected boolean timeWarped() { - return false; - } - public void testCanUseAnyConcreteIndexName() throws Exception { String newWatcherIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); String watchResultsIndex = randomAlphaOfLength(11).toLowerCase(Locale.ROOT); @@ -35,6 +30,7 @@ public void testCanUseAnyConcreteIndexName() throws Exception { stopWatcher(); replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName); + ensureGreen(newWatcherIndexName); startWatcher(); PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder() @@ -45,6 +41,9 @@ public void testCanUseAnyConcreteIndexName() throws Exception { .get(); assertTrue(putWatchResponse.isCreated()); + refresh(); + + timeWarp().trigger("mywatch"); assertBusy(() -> { SearchResponse searchResult = client().prepareSearch(watchResultsIndex).setTrackTotalHits(true).get(); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookHttpsIntegrationTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookHttpsIntegrationTests.java index adbf43140328b..bdaa2377fd1d7 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookHttpsIntegrationTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookHttpsIntegrationTests.java @@ -15,10 +15,10 @@ import org.elasticsearch.xpack.core.watcher.history.WatchRecord; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.watcher.actions.ActionBuilders; +import org.elasticsearch.xpack.watcher.common.http.BasicAuth; import org.elasticsearch.xpack.watcher.common.http.HttpMethod; import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate; import org.elasticsearch.xpack.watcher.common.http.Scheme; -import org.elasticsearch.xpack.watcher.common.http.BasicAuth; import org.elasticsearch.xpack.watcher.common.text.TextTemplate; import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition; import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; @@ -67,7 +67,6 @@ public void stopWebservice() throws Exception { webServer.close(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503") public void testHttps() throws Exception { webServer.enqueue(new MockResponse().setResponseCode(200).setBody("body")); HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder("localhost", webServer.getPort()) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookIntegrationTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookIntegrationTests.java index 521cc2d49fc3f..2c961db6187fe 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookIntegrationTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookIntegrationTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.watcher.actions.webhook; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.transport.TransportAddress; @@ -18,9 +17,9 @@ import org.elasticsearch.xpack.core.watcher.history.WatchRecord; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.watcher.actions.ActionBuilders; +import org.elasticsearch.xpack.watcher.common.http.BasicAuth; import org.elasticsearch.xpack.watcher.common.http.HttpMethod; import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate; -import org.elasticsearch.xpack.watcher.common.http.BasicAuth; import org.elasticsearch.xpack.watcher.common.text.TextTemplate; import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition; import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; @@ -44,7 +43,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503") public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase { private MockWebServer webServer = new MockWebServer(); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 21e5751029f52..8c44ba831b359 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.watcher.test; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -580,6 +582,7 @@ public EmailSent send(Email email, Authentication auth, Profile profile, String } protected static class TimeWarp { + private static final Logger logger = LogManager.getLogger(TimeWarp.class); private final List schedulers; private final ClockMock clock; @@ -598,9 +601,14 @@ public ClockMock clock() { } public void trigger(String watchId, int times, TimeValue timeValue) { - boolean isTriggered = schedulers.stream().anyMatch(scheduler -> scheduler.trigger(watchId, times, timeValue)); - String msg = String.format(Locale.ROOT, "could not find watch [%s] to trigger", watchId); - assertThat(msg, isTriggered, is(true)); + long triggeredCount = schedulers.stream() + .filter(scheduler -> scheduler.trigger(watchId, times, timeValue)) + .count(); + String msg = String.format(Locale.ROOT, "watch was triggered on [%d] schedulers, expected [1]", triggeredCount); + if (triggeredCount > 1) { + logger.warn(msg); + } + assertThat(msg, triggeredCount, greaterThanOrEqualTo(1L)); } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java index 05d8b4ef29ded..2f2299d7d65e0 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.watcher.test.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; @@ -63,7 +62,6 @@ @TestLogging("org.elasticsearch.xpack.watcher:DEBUG," + "org.elasticsearch.xpack.watcher.WatcherIndexingListener:TRACE") -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503") public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase { public void testIndexWatch() throws Exception { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HttpSecretsIntegrationTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HttpSecretsIntegrationTests.java index 3eefa03137146..f8ddc3065f79d 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HttpSecretsIntegrationTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HttpSecretsIntegrationTests.java @@ -87,7 +87,6 @@ protected Settings nodeSettings(int nodeOrdinal) { return super.nodeSettings(nodeOrdinal); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40587") public void testHttpInput() throws Exception { WatcherClient watcherClient = watcherClient(); watcherClient.preparePutWatch("_id") diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java index 0e95a15b2a35c..a0ef5e97d8534 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java @@ -122,7 +122,6 @@ public void testAckSingleAction() throws Exception { assertThat(throttledCount, greaterThan(0L)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35506") public void testAckAllActions() throws Exception { PutWatchResponse putWatchResponse = watcherClient().preparePutWatch() .setId("_id") diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchMetadataTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchMetadataTests.java index aff3a62c12cf1..1e2c1ddbc64f1 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchMetadataTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchMetadataTests.java @@ -38,7 +38,6 @@ public class WatchMetadataTests extends AbstractWatcherIntegrationTestCase { - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40631") public void testWatchMetadata() throws Exception { Map metadata = new HashMap<>(); metadata.put("foo", "bar"); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java index f58954658fc1e..3e46f7102c192 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java @@ -21,8 +21,10 @@ import java.time.ZonedDateTime; import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** * A mock scheduler to help with unit testing. Provide {@link ScheduleTriggerEngineMock#trigger} method to manually trigger @@ -31,7 +33,8 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { private static final Logger logger = LogManager.getLogger(ScheduleTriggerEngineMock.class); - private final ConcurrentMap watches = new ConcurrentHashMap<>(); + private final AtomicReference> watches = new AtomicReference<>(new ConcurrentHashMap<>()); + private final AtomicBoolean paused = new AtomicBoolean(false); public ScheduleTriggerEngineMock(ScheduleRegistry scheduleRegistry, Clock clock) { super(scheduleRegistry, clock); @@ -49,30 +52,32 @@ public ScheduleTriggerEvent parseTriggerEvent(TriggerService service, String wat } @Override - public void start(Collection jobs) { - jobs.forEach(this::add); + public synchronized void start(Collection jobs) { + Map newWatches = new ConcurrentHashMap<>(); + jobs.forEach((watch) -> newWatches.put(watch.id(), watch)); + watches.set(newWatches); + paused.set(false); } @Override public void stop() { - watches.clear(); + watches.set(new ConcurrentHashMap<>()); } @Override - public void add(Watch watch) { + public synchronized void add(Watch watch) { logger.debug("adding watch [{}]", watch.id()); - watches.put(watch.id(), watch); + watches.get().put(watch.id(), watch); } @Override public void pauseExecution() { - // No action is needed because this engine does not trigger watches on a schedule (instead - // they must be triggered manually). + paused.set(true); } @Override - public boolean remove(String jobId) { - return watches.remove(jobId) != null; + public synchronized boolean remove(String jobId) { + return watches.get().remove(jobId) != null; } public boolean trigger(String jobName) { @@ -80,7 +85,11 @@ public boolean trigger(String jobName) { } public boolean trigger(String jobName, int times, TimeValue interval) { - if (watches.containsKey(jobName) == false) { + if (watches.get().containsKey(jobName) == false) { + return false; + } + if (paused.get()) { + logger.info("not executing watch [{}] on this scheduler because it is paused", jobName); return false; } @@ -89,7 +98,7 @@ public boolean trigger(String jobName, int times, TimeValue interval) { logger.debug("firing watch [{}] at [{}]", jobName, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now); consumers.forEach(consumer -> consumer.accept(Collections.singletonList(event))); - if (interval != null) { + if (interval != null) { if (clock instanceof ClockMock) { ((ClockMock) clock).fastForward(interval); } else {