diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherConcreteIndexTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherConcreteIndexTests.java index e6b253d17397c..237c0a2bdf153 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherConcreteIndexTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherConcreteIndexTests.java @@ -23,11 +23,6 @@ public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCase { - @Override - protected boolean timeWarped() { - return false; - } - public void testCanUseAnyConcreteIndexName() throws Exception { String newWatcherIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); String watchResultsIndex = randomAlphaOfLength(11).toLowerCase(Locale.ROOT); @@ -35,6 +30,7 @@ public void testCanUseAnyConcreteIndexName() throws Exception { stopWatcher(); replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName); + ensureGreen(newWatcherIndexName); startWatcher(); PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder() @@ -45,6 +41,9 @@ public void testCanUseAnyConcreteIndexName() throws Exception { .get(); assertTrue(putWatchResponse.isCreated()); + refresh(); + + timeWarp().trigger("mywatch"); assertBusy(() -> { SearchResponse searchResult = client().prepareSearch(watchResultsIndex).setTrackTotalHits(true).get(); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookHttpsIntegrationTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookHttpsIntegrationTests.java index adbf43140328b..bdaa2377fd1d7 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookHttpsIntegrationTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookHttpsIntegrationTests.java @@ -15,10 +15,10 @@ import org.elasticsearch.xpack.core.watcher.history.WatchRecord; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.watcher.actions.ActionBuilders; +import org.elasticsearch.xpack.watcher.common.http.BasicAuth; import org.elasticsearch.xpack.watcher.common.http.HttpMethod; import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate; import org.elasticsearch.xpack.watcher.common.http.Scheme; -import org.elasticsearch.xpack.watcher.common.http.BasicAuth; import org.elasticsearch.xpack.watcher.common.text.TextTemplate; import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition; import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; @@ -67,7 +67,6 @@ public void stopWebservice() throws Exception { webServer.close(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503") public void testHttps() throws Exception { webServer.enqueue(new MockResponse().setResponseCode(200).setBody("body")); HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder("localhost", webServer.getPort()) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookIntegrationTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookIntegrationTests.java index 521cc2d49fc3f..2c961db6187fe 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookIntegrationTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/actions/webhook/WebhookIntegrationTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.watcher.actions.webhook; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.transport.TransportAddress; @@ -18,9 +17,9 @@ import org.elasticsearch.xpack.core.watcher.history.WatchRecord; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.watcher.actions.ActionBuilders; +import org.elasticsearch.xpack.watcher.common.http.BasicAuth; import org.elasticsearch.xpack.watcher.common.http.HttpMethod; import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate; -import org.elasticsearch.xpack.watcher.common.http.BasicAuth; import org.elasticsearch.xpack.watcher.common.text.TextTemplate; import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition; import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; @@ -44,7 +43,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503") public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase { private MockWebServer webServer = new MockWebServer(); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 21e5751029f52..8c44ba831b359 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.watcher.test; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -580,6 +582,7 @@ public EmailSent send(Email email, Authentication auth, Profile profile, String } protected static class TimeWarp { + private static final Logger logger = LogManager.getLogger(TimeWarp.class); private final List schedulers; private final ClockMock clock; @@ -598,9 +601,14 @@ public ClockMock clock() { } public void trigger(String watchId, int times, TimeValue timeValue) { - boolean isTriggered = schedulers.stream().anyMatch(scheduler -> scheduler.trigger(watchId, times, timeValue)); - String msg = String.format(Locale.ROOT, "could not find watch [%s] to trigger", watchId); - assertThat(msg, isTriggered, is(true)); + long triggeredCount = schedulers.stream() + .filter(scheduler -> scheduler.trigger(watchId, times, timeValue)) + .count(); + String msg = String.format(Locale.ROOT, "watch was triggered on [%d] schedulers, expected [1]", triggeredCount); + if (triggeredCount > 1) { + logger.warn(msg); + } + assertThat(msg, triggeredCount, greaterThanOrEqualTo(1L)); } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java index 05d8b4ef29ded..2f2299d7d65e0 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BasicWatcherTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.watcher.test.integration; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; @@ -63,7 +62,6 @@ @TestLogging("org.elasticsearch.xpack.watcher:DEBUG," + "org.elasticsearch.xpack.watcher.WatcherIndexingListener:TRACE") -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503") public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase { public void testIndexWatch() throws Exception { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HttpSecretsIntegrationTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HttpSecretsIntegrationTests.java index 3eefa03137146..f8ddc3065f79d 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HttpSecretsIntegrationTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/HttpSecretsIntegrationTests.java @@ -87,7 +87,6 @@ protected Settings nodeSettings(int nodeOrdinal) { return super.nodeSettings(nodeOrdinal); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40587") public void testHttpInput() throws Exception { WatcherClient watcherClient = watcherClient(); watcherClient.preparePutWatch("_id") diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java index 0e95a15b2a35c..a0ef5e97d8534 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java @@ -122,7 +122,6 @@ public void testAckSingleAction() throws Exception { assertThat(throttledCount, greaterThan(0L)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35506") public void testAckAllActions() throws Exception { PutWatchResponse putWatchResponse = watcherClient().preparePutWatch() .setId("_id") diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchMetadataTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchMetadataTests.java index aff3a62c12cf1..1e2c1ddbc64f1 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchMetadataTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchMetadataTests.java @@ -38,7 +38,6 @@ public class WatchMetadataTests extends AbstractWatcherIntegrationTestCase { - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40631") public void testWatchMetadata() throws Exception { Map metadata = new HashMap<>(); metadata.put("foo", "bar"); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java index f58954658fc1e..3e46f7102c192 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/ScheduleTriggerEngineMock.java @@ -21,8 +21,10 @@ import java.time.ZonedDateTime; import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** * A mock scheduler to help with unit testing. Provide {@link ScheduleTriggerEngineMock#trigger} method to manually trigger @@ -31,7 +33,8 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine { private static final Logger logger = LogManager.getLogger(ScheduleTriggerEngineMock.class); - private final ConcurrentMap watches = new ConcurrentHashMap<>(); + private final AtomicReference> watches = new AtomicReference<>(new ConcurrentHashMap<>()); + private final AtomicBoolean paused = new AtomicBoolean(false); public ScheduleTriggerEngineMock(ScheduleRegistry scheduleRegistry, Clock clock) { super(scheduleRegistry, clock); @@ -49,30 +52,32 @@ public ScheduleTriggerEvent parseTriggerEvent(TriggerService service, String wat } @Override - public void start(Collection jobs) { - jobs.forEach(this::add); + public synchronized void start(Collection jobs) { + Map newWatches = new ConcurrentHashMap<>(); + jobs.forEach((watch) -> newWatches.put(watch.id(), watch)); + watches.set(newWatches); + paused.set(false); } @Override public void stop() { - watches.clear(); + watches.set(new ConcurrentHashMap<>()); } @Override - public void add(Watch watch) { + public synchronized void add(Watch watch) { logger.debug("adding watch [{}]", watch.id()); - watches.put(watch.id(), watch); + watches.get().put(watch.id(), watch); } @Override public void pauseExecution() { - // No action is needed because this engine does not trigger watches on a schedule (instead - // they must be triggered manually). + paused.set(true); } @Override - public boolean remove(String jobId) { - return watches.remove(jobId) != null; + public synchronized boolean remove(String jobId) { + return watches.get().remove(jobId) != null; } public boolean trigger(String jobName) { @@ -80,7 +85,11 @@ public boolean trigger(String jobName) { } public boolean trigger(String jobName, int times, TimeValue interval) { - if (watches.containsKey(jobName) == false) { + if (watches.get().containsKey(jobName) == false) { + return false; + } + if (paused.get()) { + logger.info("not executing watch [{}] on this scheduler because it is paused", jobName); return false; } @@ -89,7 +98,7 @@ public boolean trigger(String jobName, int times, TimeValue interval) { logger.debug("firing watch [{}] at [{}]", jobName, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now); consumers.forEach(consumer -> consumer.accept(Collections.singletonList(event))); - if (interval != null) { + if (interval != null) { if (clock instanceof ClockMock) { ((ClockMock) clock).fastForward(interval); } else {