Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Improve Watcher test framework resiliency (#40658) #41020

Merged
merged 2 commits into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,14 @@

public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCase {

@Override
protected boolean timeWarped() {
return false;
}

public void testCanUseAnyConcreteIndexName() throws Exception {
String newWatcherIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
String watchResultsIndex = randomAlphaOfLength(11).toLowerCase(Locale.ROOT);
createIndex(watchResultsIndex);

stopWatcher();
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName);
ensureGreen(newWatcherIndexName);
startWatcher();

PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder()
Expand All @@ -45,6 +41,9 @@ public void testCanUseAnyConcreteIndexName() throws Exception {
.get();

assertTrue(putWatchResponse.isCreated());
refresh();

timeWarp().trigger("mywatch");

assertBusy(() -> {
SearchResponse searchResult = client().prepareSearch(watchResultsIndex).setTrackTotalHits(true).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.actions.ActionBuilders;
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
import org.elasticsearch.xpack.watcher.common.http.HttpMethod;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.http.Scheme;
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
Expand Down Expand Up @@ -67,7 +67,6 @@ public void stopWebservice() throws Exception {
webServer.close();
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503")
public void testHttps() throws Exception {
webServer.enqueue(new MockResponse().setResponseCode(200).setBody("body"));
HttpRequestTemplate.Builder builder = HttpRequestTemplate.builder("localhost", webServer.getPort())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.actions.webhook;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.transport.TransportAddress;
Expand All @@ -18,9 +17,9 @@
import org.elasticsearch.xpack.core.watcher.history.WatchRecord;
import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource;
import org.elasticsearch.xpack.watcher.actions.ActionBuilders;
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
import org.elasticsearch.xpack.watcher.common.http.HttpMethod;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
Expand All @@ -44,7 +43,6 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503")
public class WebhookIntegrationTests extends AbstractWatcherIntegrationTestCase {

private MockWebServer webServer = new MockWebServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.watcher.test;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
Expand Down Expand Up @@ -580,6 +582,7 @@ public EmailSent send(Email email, Authentication auth, Profile profile, String
}

protected static class TimeWarp {
private static final Logger logger = LogManager.getLogger(TimeWarp.class);

private final List<ScheduleTriggerEngineMock> schedulers;
private final ClockMock clock;
Expand All @@ -598,9 +601,14 @@ public ClockMock clock() {
}

public void trigger(String watchId, int times, TimeValue timeValue) {
boolean isTriggered = schedulers.stream().anyMatch(scheduler -> scheduler.trigger(watchId, times, timeValue));
String msg = String.format(Locale.ROOT, "could not find watch [%s] to trigger", watchId);
assertThat(msg, isTriggered, is(true));
long triggeredCount = schedulers.stream()
.filter(scheduler -> scheduler.trigger(watchId, times, timeValue))
.count();
String msg = String.format(Locale.ROOT, "watch was triggered on [%d] schedulers, expected [1]", triggeredCount);
if (triggeredCount > 1) {
logger.warn(msg);
}
assertThat(msg, triggeredCount, greaterThanOrEqualTo(1L));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.watcher.test.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
Expand Down Expand Up @@ -63,7 +62,6 @@

@TestLogging("org.elasticsearch.xpack.watcher:DEBUG," +
"org.elasticsearch.xpack.watcher.WatcherIndexingListener:TRACE")
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35503")
public class BasicWatcherTests extends AbstractWatcherIntegrationTestCase {

public void testIndexWatch() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
return super.nodeSettings(nodeOrdinal);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40587")
public void testHttpInput() throws Exception {
WatcherClient watcherClient = watcherClient();
watcherClient.preparePutWatch("_id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public void testAckSingleAction() throws Exception {
assertThat(throttledCount, greaterThan(0L));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35506")
public void testAckAllActions() throws Exception {
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch()
.setId("_id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

public class WatchMetadataTests extends AbstractWatcherIntegrationTestCase {

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40631")
public void testWatchMetadata() throws Exception {
Map<String, Object> metadata = new HashMap<>();
metadata.put("foo", "bar");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
* A mock scheduler to help with unit testing. Provide {@link ScheduleTriggerEngineMock#trigger} method to manually trigger
Expand All @@ -31,7 +33,8 @@
public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
private static final Logger logger = LogManager.getLogger(ScheduleTriggerEngineMock.class);

private final ConcurrentMap<String, Watch> watches = new ConcurrentHashMap<>();
private final AtomicReference<Map<String, Watch>> watches = new AtomicReference<>(new ConcurrentHashMap<>());
private final AtomicBoolean paused = new AtomicBoolean(false);

public ScheduleTriggerEngineMock(ScheduleRegistry scheduleRegistry, Clock clock) {
super(scheduleRegistry, clock);
Expand All @@ -49,38 +52,44 @@ public ScheduleTriggerEvent parseTriggerEvent(TriggerService service, String wat
}

@Override
public void start(Collection<Watch> jobs) {
jobs.forEach(this::add);
public synchronized void start(Collection<Watch> jobs) {
Map<String, Watch> newWatches = new ConcurrentHashMap<>();
jobs.forEach((watch) -> newWatches.put(watch.id(), watch));
watches.set(newWatches);
paused.set(false);
}

@Override
public void stop() {
watches.clear();
watches.set(new ConcurrentHashMap<>());
}

@Override
public void add(Watch watch) {
public synchronized void add(Watch watch) {
logger.debug("adding watch [{}]", watch.id());
watches.put(watch.id(), watch);
watches.get().put(watch.id(), watch);
}

@Override
public void pauseExecution() {
// No action is needed because this engine does not trigger watches on a schedule (instead
// they must be triggered manually).
paused.set(true);
}

@Override
public boolean remove(String jobId) {
return watches.remove(jobId) != null;
public synchronized boolean remove(String jobId) {
return watches.get().remove(jobId) != null;
}

public boolean trigger(String jobName) {
return trigger(jobName, 1, null);
}

public boolean trigger(String jobName, int times, TimeValue interval) {
if (watches.containsKey(jobName) == false) {
if (watches.get().containsKey(jobName) == false) {
return false;
}
if (paused.get()) {
logger.info("not executing watch [{}] on this scheduler because it is paused", jobName);
return false;
}

Expand All @@ -89,7 +98,7 @@ public boolean trigger(String jobName, int times, TimeValue interval) {
logger.debug("firing watch [{}] at [{}]", jobName, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now);
consumers.forEach(consumer -> consumer.accept(Collections.singletonList(event)));
if (interval != null) {
if (interval != null) {
if (clock instanceof ClockMock) {
((ClockMock) clock).fastForward(interval);
} else {
Expand Down