From 54bbb721739d1853b021048c8642e2d0dde3af4a Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 30 Jan 2019 20:14:59 -0500 Subject: [PATCH] Move watcher to use seq# and primary term for concurrency control (#37977) --- .../client/RequestConverters.java | 19 ++ .../client/WatcherRequestConverters.java | 5 +- .../client/watcher/GetWatchResponse.java | 29 ++- .../client/watcher/PutWatchRequest.java | 57 ++++++ .../client/watcher/PutWatchResponse.java | 31 +++- .../client/watcher/PutWatchResponseTests.java | 6 +- .../en/rest-api/watcher/ack-watch.asciidoc | 6 + .../rest-api/watcher/activate-watch.asciidoc | 2 + .../watcher/deactivate-watch.asciidoc | 2 + .../en/rest-api/watcher/get-watch.asciidoc | 2 + .../xpack/watcher/PutWatchRequest.java | 88 ++++++++- .../xpack/watcher/PutWatchResponse.java | 45 ++++- .../actions/get/GetWatchResponse.java | 33 +++- .../xpack/core/watcher/watch/Watch.java | 25 ++- .../xpack/core/watcher/watch/WatchField.java | 1 - .../xpack/watcher/GetWatchResponseTests.java | 9 +- .../xpack/watcher/PutWatchResponseTests.java | 7 +- .../api/xpack.watcher.put_watch.json | 8 + .../80_put_get_watch_with_passwords.yml | 169 ++++++++++++++++++ .../watcher/WatcherIndexingListener.java | 3 +- .../xpack/watcher/WatcherService.java | 5 +- .../watcher/execution/ExecutionService.java | 6 +- .../rest/action/RestExecuteWatchAction.java | 5 +- .../rest/action/RestGetWatchAction.java | 18 +- .../rest/action/RestPutWatchAction.java | 10 +- .../actions/ack/TransportAckWatchAction.java | 13 +- .../TransportActivateWatchAction.java | 3 +- .../execute/TransportExecuteWatchAction.java | 8 +- .../actions/get/TransportGetWatchAction.java | 8 +- .../actions/put/TransportPutWatchAction.java | 19 +- .../xpack/watcher/watch/WatchParser.java | 37 ++-- .../watcher/WatcherIndexingListenerTests.java | 9 +- .../xpack/watcher/WatcherServiceTests.java | 3 +- .../execution/ExecutionServiceTests.java | 17 +- .../xpack/watcher/test/WatcherTestUtils.java | 4 +- .../bench/ScheduleEngineTriggerBenchmark.java | 2 +- .../test/integration/WatchAckTests.java | 9 +- .../ack/TransportAckWatchActionTests.java | 3 +- .../TransportActivateWatchActionTests.java | 4 +- .../put/TransportPutWatchActionTests.java | 4 +- .../engine/TickerScheduleEngineTests.java | 4 +- .../xpack/watcher/watch/WatchTests.java | 19 +- 42 files changed, 629 insertions(+), 128 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index dfce0f6472ab2..5af5e9ed9d825 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -75,6 +75,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.script.mustache.MultiSearchTemplateRequest; @@ -882,6 +883,24 @@ Params withWaitForActiveShards(ActiveShardCount currentActiveShardCount, ActiveS return this; } + Params withIfSeqNo(long ifSeqNo) { + if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + return putParam("if_seq_no", Long.toString(ifSeqNo)); + } + return this; + } + + Params withIfPrimaryTerm(long ifPrimaryTerm) { + if (ifPrimaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { + return putParam("if_primary_term", Long.toString(ifPrimaryTerm)); + } + return this; + } + + Params withWaitForActiveShards(ActiveShardCount activeShardCount) { + return withWaitForActiveShards(activeShardCount, ActiveShardCount.DEFAULT); + } + Params withIndicesOptions(IndicesOptions indicesOptions) { if (indicesOptions != null) { withIgnoreUnavailable(indicesOptions.ignoreUnavailable()); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java index 248b3fa27f56d..55744c228afb8 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java @@ -71,7 +71,10 @@ static Request putWatch(PutWatchRequest putWatchRequest) { .build(); Request request = new Request(HttpPut.METHOD_NAME, endpoint); - RequestConverters.Params params = new RequestConverters.Params(request).withVersion(putWatchRequest.getVersion()); + RequestConverters.Params params = new RequestConverters.Params(request) + .withVersion(putWatchRequest.getVersion()) + .withIfSeqNo(putWatchRequest.ifSeqNo()) + .withIfPrimaryTerm(putWatchRequest.ifPrimaryTerm()); if (putWatchRequest.isActive() == false) { params.putParam("active", "false"); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/GetWatchResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/GetWatchResponse.java index 9f5934b33eb30..83727003106e9 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/GetWatchResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/GetWatchResponse.java @@ -31,9 +31,14 @@ import java.util.Map; import java.util.Objects; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + public class GetWatchResponse { private final String id; private final long version; + private final long seqNo; + private final long primaryTerm; private final WatchStatus status; private final BytesReference source; @@ -43,15 +48,18 @@ public class GetWatchResponse { * Ctor for missing watch */ public GetWatchResponse(String id) { - this(id, Versions.NOT_FOUND, null, null, null); + this(id, Versions.NOT_FOUND, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, null, null, null); } - public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source, XContentType xContentType) { + public GetWatchResponse(String id, long version, long seqNo, long primaryTerm, WatchStatus status, + BytesReference source, XContentType xContentType) { this.id = id; this.version = version; this.status = status; this.source = source; this.xContentType = xContentType; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; } public String getId() { @@ -62,6 +70,14 @@ public long getVersion() { return version; } + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + public boolean isFound() { return version != Versions.NOT_FOUND; } @@ -111,6 +127,8 @@ public int hashCode() { private static final ParseField ID_FIELD = new ParseField("_id"); private static final ParseField FOUND_FIELD = new ParseField("found"); private static final ParseField VERSION_FIELD = new ParseField("_version"); + private static final ParseField SEQ_NO_FIELD = new ParseField("_seq_no"); + private static final ParseField PRIMARY_TERM_FIELD = new ParseField("_primary_term"); private static final ParseField STATUS_FIELD = new ParseField("status"); private static final ParseField WATCH_FIELD = new ParseField("watch"); @@ -119,9 +137,10 @@ public int hashCode() { a -> { boolean isFound = (boolean) a[1]; if (isFound) { - XContentBuilder builder = (XContentBuilder) a[4]; + XContentBuilder builder = (XContentBuilder) a[6]; BytesReference source = BytesReference.bytes(builder); - return new GetWatchResponse((String) a[0], (long) a[2], (WatchStatus) a[3], source, builder.contentType()); + return new GetWatchResponse((String) a[0], (long) a[2], (long) a[3], (long) a[4], (WatchStatus) a[5], + source, builder.contentType()); } else { return new GetWatchResponse((String) a[0]); } @@ -131,6 +150,8 @@ public int hashCode() { PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD); PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FOUND_FIELD); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VERSION_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), SEQ_NO_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), PRIMARY_TERM_FIELD); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (parser, context) -> WatchStatus.parse(parser), STATUS_FIELD); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java index e12d1b8f609ff..8d6aaeab2fdd6 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java @@ -24,10 +24,14 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.util.Objects; import java.util.regex.Pattern; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + /** * This request class contains the data needed to create a watch along with the name of the watch. * The name of the watch will become the ID of the indexed document. @@ -42,6 +46,9 @@ public final class PutWatchRequest implements Validatable { private final XContentType xContentType; private boolean active = true; private long version = Versions.MATCH_ANY; + private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + public PutWatchRequest(String id, BytesReference source, XContentType xContentType) { Objects.requireNonNull(id, "watch id is missing"); @@ -98,6 +105,56 @@ public void setVersion(long version) { this.version = version; } + /** + * only performs this put request if the watch's last modification was assigned the given + * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)} + * + * If the watch's last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public PutWatchRequest setIfSeqNo(long seqNo) { + if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "]."); + } + ifSeqNo = seqNo; + return this; + } + + /** + * only performs this put request if the watch's last modification was assigned the given + * primary term. Must be used in combination with {@link #setIfSeqNo(long)} + * + * If the watch last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public PutWatchRequest setIfPrimaryTerm(long term) { + if (term < 0) { + throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]"); + } + ifPrimaryTerm = term; + return this; + } + + /** + * If set, only perform this put watch request if the watch's last modification was assigned this sequence number. + * If the watch last last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public long ifSeqNo() { + return ifSeqNo; + } + + /** + * If set, only perform this put watch request if the watch's last modification was assigned this primary term. + * + * If the watch's last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public long ifPrimaryTerm() { + return ifPrimaryTerm; + } + + public static boolean isValidId(String id) { return Strings.isEmpty(id) == false && NO_WS_PATTERN.matcher(id).matches(); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchResponse.java index 8f7070b2565a2..61742b84e1219 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchResponse.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.IOException; import java.util.Objects; @@ -32,20 +33,26 @@ public class PutWatchResponse { static { PARSER.declareString(PutWatchResponse::setId, new ParseField("_id")); + PARSER.declareLong(PutWatchResponse::setSeqNo, new ParseField("_seq_no")); + PARSER.declareLong(PutWatchResponse::setPrimaryTerm, new ParseField("_primary_term")); PARSER.declareLong(PutWatchResponse::setVersion, new ParseField("_version")); PARSER.declareBoolean(PutWatchResponse::setCreated, new ParseField("created")); } private String id; private long version; + private long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + private long primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM; private boolean created; public PutWatchResponse() { } - public PutWatchResponse(String id, long version, boolean created) { + public PutWatchResponse(String id, long version, long seqNo, long primaryTerm, boolean created) { this.id = id; this.version = version; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; this.created = created; } @@ -57,6 +64,14 @@ private void setVersion(long version) { this.version = version; } + private void setSeqNo(long seqNo) { + this.seqNo = seqNo; + } + + private void setPrimaryTerm(long primaryTerm) { + this.primaryTerm = primaryTerm; + } + private void setCreated(boolean created) { this.created = created; } @@ -69,6 +84,14 @@ public long getVersion() { return version; } + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + public boolean isCreated() { return created; } @@ -80,12 +103,14 @@ public boolean equals(Object o) { PutWatchResponse that = (PutWatchResponse) o; - return Objects.equals(id, that.id) && Objects.equals(version, that.version) && Objects.equals(created, that.created); + return Objects.equals(id, that.id) && Objects.equals(version, that.version) + && Objects.equals(seqNo, that.seqNo) + && Objects.equals(primaryTerm, that.primaryTerm) && Objects.equals(created, that.created); } @Override public int hashCode() { - return Objects.hash(id, version, created); + return Objects.hash(id, version, seqNo, primaryTerm, created); } public static PutWatchResponse fromXContent(XContentParser parser) throws IOException { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/watcher/PutWatchResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/watcher/PutWatchResponseTests.java index af327abc1a728..d358f5c8955ae 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/watcher/PutWatchResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/watcher/PutWatchResponseTests.java @@ -41,14 +41,18 @@ private static XContentBuilder toXContent(PutWatchResponse response, XContentBui return builder.startObject() .field("_id", response.getId()) .field("_version", response.getVersion()) + .field("_seq_no", response.getSeqNo()) + .field("_primary_term", response.getPrimaryTerm()) .field("created", response.isCreated()) .endObject(); } private static PutWatchResponse createTestInstance() { String id = randomAlphaOfLength(10); + long seqNo = randomNonNegativeLong(); + long primaryTerm = randomLongBetween(1, 200); long version = randomLongBetween(1, 10); boolean created = randomBoolean(); - return new PutWatchResponse(id, version, created); + return new PutWatchResponse(id, version, seqNo, primaryTerm, created); } } diff --git a/x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc b/x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc index 3b3550ac61f90..dcda5509d8122 100644 --- a/x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc +++ b/x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc @@ -93,6 +93,8 @@ The action state of a newly-created watch is `awaits_successful_execution`: -------------------------------------------------- { "found": true, + "_seq_no": 0, + "_primary_term": 1, "_version": 1, "_id": "my_watch", "status": { @@ -137,6 +139,8 @@ and the action is now in `ackable` state: { "found": true, "_id": "my_watch", + "_seq_no": 1, + "_primary_term": 1, "_version": 2, "status": { "version": 2, @@ -186,6 +190,8 @@ GET _xpack/watcher/watch/my_watch { "found": true, "_id": "my_watch", + "_seq_no": 2, + "_primary_term": 1, "_version": 3, "status": { "version": 3, diff --git a/x-pack/docs/en/rest-api/watcher/activate-watch.asciidoc b/x-pack/docs/en/rest-api/watcher/activate-watch.asciidoc index b1770b66aa591..8fdde13c65236 100644 --- a/x-pack/docs/en/rest-api/watcher/activate-watch.asciidoc +++ b/x-pack/docs/en/rest-api/watcher/activate-watch.asciidoc @@ -44,6 +44,8 @@ GET _xpack/watcher/watch/my_watch { "found": true, "_id": "my_watch", + "_seq_no": 0, + "_primary_term": 1, "_version": 1, "status": { "state" : { diff --git a/x-pack/docs/en/rest-api/watcher/deactivate-watch.asciidoc b/x-pack/docs/en/rest-api/watcher/deactivate-watch.asciidoc index 8ef501941c187..ad78742d9e1a5 100644 --- a/x-pack/docs/en/rest-api/watcher/deactivate-watch.asciidoc +++ b/x-pack/docs/en/rest-api/watcher/deactivate-watch.asciidoc @@ -44,6 +44,8 @@ GET _xpack/watcher/watch/my_watch "found": true, "_id": "my_watch", "_version": 1, + "_seq_no": 0, + "_primary_term": 1, "status": { "state" : { "active" : true, diff --git a/x-pack/docs/en/rest-api/watcher/get-watch.asciidoc b/x-pack/docs/en/rest-api/watcher/get-watch.asciidoc index 52bbe68ecfe7e..a045e472bc427 100644 --- a/x-pack/docs/en/rest-api/watcher/get-watch.asciidoc +++ b/x-pack/docs/en/rest-api/watcher/get-watch.asciidoc @@ -44,6 +44,8 @@ Response: { "found": true, "_id": "my_watch", + "_seq_no": 0, + "_primary_term": 1, "_version": 1, "status": { <1> "version": 1, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchRequest.java index abc42b149194b..bda59d90ef7d0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchRequest.java @@ -7,7 +7,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; @@ -17,10 +16,15 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.IOException; import java.util.regex.Pattern; +import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + /** * This request class contains the data needed to create a watch along with the name of the watch. * The name of the watch will become the ID of the indexed document. @@ -36,6 +40,9 @@ public class PutWatchRequest extends MasterNodeRequest { private boolean active = true; private long version = Versions.MATCH_ANY; + private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + public PutWatchRequest() {} public PutWatchRequest(StreamInput in) throws IOException { @@ -107,20 +114,80 @@ public void setVersion(long version) { this.version = version; } + /** + * only performs this put request if the watch's last modification was assigned the given + * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)} + * + * If the watch's last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public PutWatchRequest setIfSeqNo(long seqNo) { + if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "]."); + } + ifSeqNo = seqNo; + return this; + } + + /** + * only performs this put request if the watch's last modification was assigned the given + * primary term. Must be used in combination with {@link #setIfSeqNo(long)} + * + * If the watch last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public PutWatchRequest setIfPrimaryTerm(long term) { + if (term < 0) { + throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]"); + } + ifPrimaryTerm = term; + return this; + } + + /** + * If set, only perform this put watch request if the watch's last modification was assigned this sequence number. + * If the watch last last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public long getIfSeqNo() { + return ifSeqNo; + } + + /** + * If set, only perform this put watch request if the watch's last modification was assigned this primary term. + * + * If the watch's last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public long getIfPrimaryTerm() { + return ifPrimaryTerm; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; if (id == null) { - validationException = ValidateActions.addValidationError("watch id is missing", validationException); + validationException = addValidationError("watch id is missing", validationException); } else if (isValidId(id) == false) { - validationException = ValidateActions.addValidationError("watch id contains whitespace", validationException); + validationException = addValidationError("watch id contains whitespace", validationException); } if (source == null) { - validationException = ValidateActions.addValidationError("watch source is missing", validationException); + validationException = addValidationError("watch source is missing", validationException); } if (xContentType == null) { - validationException = ValidateActions.addValidationError("request body is missing", validationException); + validationException = addValidationError("request body is missing", validationException); + } + if (ifSeqNo != UNASSIGNED_SEQ_NO && version != Versions.MATCH_ANY) { + validationException = addValidationError("compare and write operations can not use versioning", validationException); } + if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) { + validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException); + } + if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) { + validationException = + addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException); + } + return validationException; } @@ -140,6 +207,13 @@ public void readFrom(StreamInput in) throws IOException { } else { version = Versions.MATCH_ANY; } + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + ifSeqNo = in.readZLong(); + ifPrimaryTerm = in.readVLong(); + } else { + ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + } } @Override @@ -154,6 +228,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_3_0)) { out.writeZLong(version); } + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeZLong(ifSeqNo); + out.writeVLong(ifPrimaryTerm); + } } public static boolean isValidId(String id) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponse.java index f6e55ff555339..ab8496f839a3c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponse.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.protocol.xpack.watcher; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; @@ -13,6 +14,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.IOException; import java.util.Objects; @@ -24,19 +26,25 @@ public class PutWatchResponse extends ActionResponse implements ToXContentObject static { PARSER.declareString(PutWatchResponse::setId, new ParseField("_id")); PARSER.declareLong(PutWatchResponse::setVersion, new ParseField("_version")); + PARSER.declareLong(PutWatchResponse::setSeqNo, new ParseField("_seq_no")); + PARSER.declareLong(PutWatchResponse::setPrimaryTerm, new ParseField("_primary_term")); PARSER.declareBoolean(PutWatchResponse::setCreated, new ParseField("created")); } private String id; private long version; + private long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + private long primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM; private boolean created; public PutWatchResponse() { } - public PutWatchResponse(String id, long version, boolean created) { + public PutWatchResponse(String id, long version, long seqNo, long primaryTerm, boolean created) { this.id = id; this.version = version; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; this.created = created; } @@ -48,6 +56,14 @@ private void setVersion(long version) { this.version = version; } + private void setSeqNo(long seqNo) { + this.seqNo = seqNo; + } + + private void setPrimaryTerm(long primaryTerm) { + this.primaryTerm = primaryTerm; + } + private void setCreated(boolean created) { this.created = created; } @@ -60,6 +76,14 @@ public long getVersion() { return version; } + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + public boolean isCreated() { return created; } @@ -71,12 +95,14 @@ public boolean equals(Object o) { PutWatchResponse that = (PutWatchResponse) o; - return Objects.equals(id, that.id) && Objects.equals(version, that.version) && Objects.equals(created, that.created); + return Objects.equals(id, that.id) && Objects.equals(version, that.version) + && Objects.equals(seqNo, that.seqNo) + && Objects.equals(primaryTerm, that.primaryTerm) && Objects.equals(created, that.created); } @Override public int hashCode() { - return Objects.hash(id, version, created); + return Objects.hash(id, version, seqNo, primaryTerm, created); } @Override @@ -84,6 +110,10 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(id); out.writeVLong(version); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeZLong(seqNo); + out.writeVLong(primaryTerm); + } out.writeBoolean(created); } @@ -92,6 +122,13 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); id = in.readString(); version = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + seqNo = in.readZLong(); + primaryTerm = in.readVLong(); + } else { + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM; + } created = in.readBoolean(); } @@ -100,6 +137,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder.startObject() .field("_id", id) .field("_version", version) + .field("_seq_no", seqNo) + .field("_primary_term", primaryTerm) .field("created", created) .endObject(); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/get/GetWatchResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/get/GetWatchResponse.java index ab492181c72d4..a2429a4414764 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/get/GetWatchResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/get/GetWatchResponse.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; @@ -26,6 +27,8 @@ public class GetWatchResponse extends ActionResponse implements ToXContent { private boolean found; private XContentSource source; private long version; + private long seqNo; + private long primaryTerm; public GetWatchResponse() { } @@ -39,17 +42,21 @@ public GetWatchResponse(String id) { this.found = false; this.source = null; this.version = Versions.NOT_FOUND; + this.seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + this.primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM; } /** * ctor for found watch */ - public GetWatchResponse(String id, long version, WatchStatus status, XContentSource source) { + public GetWatchResponse(String id, long version, long seqNo, long primaryTerm, WatchStatus status, XContentSource source) { this.id = id; this.status = status; this.found = true; this.source = source; this.version = version; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; } public String getId() { @@ -72,6 +79,14 @@ public long getVersion() { return version; } + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -85,10 +100,16 @@ public void readFrom(StreamInput in) throws IOException { } else { version = Versions.MATCH_ANY; } + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + seqNo = in.readZLong(); + primaryTerm = in.readVLong(); + } } else { status = null; source = null; version = Versions.NOT_FOUND; + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM; } } @@ -103,6 +124,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_3_0)) { out.writeZLong(version); } + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeZLong(seqNo); + out.writeVLong(primaryTerm); + } } } @@ -112,6 +137,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("_id", id); if (found) { builder.field("_version", version); + builder.field("_seq_no", seqNo); + builder.field("_primary_term", primaryTerm); builder.field("status", status, params); builder.field("watch", source, params); } @@ -123,7 +150,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; GetWatchResponse that = (GetWatchResponse) o; - return version == that.version && + return version == that.version && seqNo == that.seqNo && primaryTerm == that.primaryTerm && Objects.equals(id, that.id) && Objects.equals(status, that.status) && Objects.equals(source, that.source); @@ -131,7 +158,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(id, status, version); + return Objects.hash(id, status, version, seqNo, primaryTerm); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/Watch.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/Watch.java index 75034752c3cc6..e34b0a15e7133 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/Watch.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/Watch.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.xpack.core.watcher.actions.ActionStatus; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; import org.elasticsearch.xpack.core.watcher.condition.ExecutableCondition; @@ -37,11 +38,12 @@ public class Watch implements ToXContentObject { @Nullable private final Map metadata; private final WatchStatus status; - private transient long version; + private final long sourceSeqNo; + private final long sourcePrimaryTerm; public Watch(String id, Trigger trigger, ExecutableInput input, ExecutableCondition condition, @Nullable ExecutableTransform transform, @Nullable TimeValue throttlePeriod, List actions, @Nullable Map metadata, - WatchStatus status, long version) { + WatchStatus status, long sourceSeqNo, long sourcePrimaryTerm) { this.id = id; this.trigger = trigger; this.input = input; @@ -51,7 +53,8 @@ public Watch(String id, Trigger trigger, ExecutableInput input, ExecutableCondit this.throttlePeriod = throttlePeriod; this.metadata = metadata; this.status = status; - this.version = version; + this.sourceSeqNo = sourceSeqNo; + this.sourcePrimaryTerm = sourcePrimaryTerm; } public String id() { @@ -88,12 +91,20 @@ public WatchStatus status() { return status; } - public long version() { - return version; + /** + * The sequence number of the document that was used to create this watch, {@link SequenceNumbers#UNASSIGNED_SEQ_NO} + * if the watch wasn't read from a document + ***/ + public long getSourceSeqNo() { + return sourceSeqNo; } - public void version(long version) { - this.version = version; + /** + * The primary term of the document that was used to create this watch, {@link SequenceNumbers#UNASSIGNED_PRIMARY_TERM} + * if the watch wasn't read from a document + ***/ + public long getSourcePrimaryTerm() { + return sourcePrimaryTerm; } /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java index dbc3ce76c9517..6f6a1955927d9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java @@ -17,7 +17,6 @@ public final class WatchField { public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period"); public static final ParseField METADATA = new ParseField("metadata"); public static final ParseField STATUS = new ParseField("status"); - public static final ParseField VERSION = new ParseField("_version"); public static final String ALL_ACTIONS_ID = "_all"; private WatchField() {} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/GetWatchResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/GetWatchResponseTests.java index 1c00b6fd9dc27..13f4b0fb5d3b4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/GetWatchResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/GetWatchResponseTests.java @@ -74,6 +74,7 @@ protected void assertEqualInstances(GetWatchResponse expectedInstance, GetWatchR throw new AssertionError(e); } newInstance = new GetWatchResponse(newInstance.getId(), newInstance.getVersion(), + newInstance.getSeqNo(), newInstance.getPrimaryTerm(), newInstance.getStatus(), new XContentSource(newSource, expectedInstance.getSource().getContentType())); } super.assertEqualInstances(expectedInstance, newInstance); @@ -91,9 +92,11 @@ protected GetWatchResponse createTestInstance() { return new GetWatchResponse(id); } long version = randomLongBetween(0, 10); + long seqNo = randomNonNegativeLong(); + long primaryTerm = randomLongBetween(1, 2000); WatchStatus status = randomWatchStatus(); BytesReference source = simpleWatch(); - return new GetWatchResponse(id, version, status, new XContentSource(source, XContentType.JSON)); + return new GetWatchResponse(id, version, seqNo, primaryTerm, status, new XContentSource(source, XContentType.JSON)); } private static BytesReference simpleWatch() { @@ -170,8 +173,8 @@ public org.elasticsearch.client.watcher.GetWatchResponse doHlrcParseInstance(XCo @Override public GetWatchResponse convertHlrcToInternal(org.elasticsearch.client.watcher.GetWatchResponse instance) { if (instance.isFound()) { - return new GetWatchResponse(instance.getId(), instance.getVersion(), convertHlrcToInternal(instance.getStatus()), - new XContentSource(instance.getSource(), instance.getContentType())); + return new GetWatchResponse(instance.getId(), instance.getVersion(), instance.getSeqNo(), instance.getPrimaryTerm(), + convertHlrcToInternal(instance.getStatus()), new XContentSource(instance.getSource(), instance.getContentType())); } else { return new GetWatchResponse(instance.getId()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponseTests.java index 8ea4a84daed95..975d842b79537 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponseTests.java @@ -16,9 +16,11 @@ public class PutWatchResponseTests extends @Override protected PutWatchResponse createTestInstance() { String id = randomAlphaOfLength(10); + long seqNo = randomNonNegativeLong(); + long primaryTerm = randomLongBetween(1, 20); long version = randomLongBetween(1, 10); boolean created = randomBoolean(); - return new PutWatchResponse(id, version, created); + return new PutWatchResponse(id, version, seqNo, primaryTerm, created); } @Override @@ -33,7 +35,8 @@ public org.elasticsearch.client.watcher.PutWatchResponse doHlrcParseInstance(XCo @Override public PutWatchResponse convertHlrcToInternal(org.elasticsearch.client.watcher.PutWatchResponse instance) { - return new PutWatchResponse(instance.getId(), instance.getVersion(), instance.isCreated()); + return new PutWatchResponse(instance.getId(), instance.getVersion(), instance.getSeqNo(), instance.getPrimaryTerm(), + instance.isCreated()); } @Override diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.watcher.put_watch.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.watcher.put_watch.json index 7e29aeaaf43f7..0350a424a5de3 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.watcher.put_watch.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.watcher.put_watch.json @@ -24,6 +24,14 @@ "version" : { "type" : "number", "description" : "Explicit version number for concurrency control" + }, + "if_seq_no" : { + "type" : "number", + "description" : "only update the watch if the last operation that has changed the watch has the specified sequence number" + }, + "if_primary_term" : { + "type" : "number", + "description" : "only update the watch if the last operation that has changed the watch has the specified primary term" } } }, diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml index db1fa84370410..f8d2098b094da 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml @@ -221,6 +221,175 @@ setup: } } +--- +"Test putting a watch with a redacted password with old seq no returns an error": + - skip: + version: " - 6.99.99" + reason: seq no powered concurrency was added in 7.0.0 + + # version 1 + - do: + xpack.watcher.put_watch: + id: "watch_with_seq_no" + body: > + { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "http" : { + "request" : { + "host" : "host.domain", + "port" : 9200, + "path" : "/myservice", + "auth" : { + "basic" : { + "username" : "user", + "password" : "pass" + } + } + } + } + }, + "actions": { + "logging": { + "logging": { + "text": "Log me Amadeus!" + } + } + } + } + + - set: { "_seq_no": seqNo } + - set: { "_primary_term" : primaryTerm } + + # using optimistic concurrency control, this one will loose + # as if two users in the watch UI tried to update the same watch + - do: + catch: conflict + xpack.watcher.put_watch: + id: "watch_with_seq_no" + if_seq_no: 123034 + if_primary_term: $primaryTerm + body: > + { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "http" : { + "request" : { + "host" : "host.domain", + "port" : 9200, + "path" : "/myservice", + "auth" : { + "basic" : { + "username" : "user", + "password" : "::es_redacted::" + } + } + } + } + }, + "actions": { + "logging": { + "logging": { + "text": "Log me Amadeus!" + } + } + } + } + + - do: + catch: conflict + xpack.watcher.put_watch: + id: "watch_with_seq_no" + if_seq_no: $seqNo + if_primary_term: 234242423 + body: > + { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "http" : { + "request" : { + "host" : "host.domain", + "port" : 9200, + "path" : "/myservice", + "auth" : { + "basic" : { + "username" : "user", + "password" : "::es_redacted::" + } + } + } + } + }, + "actions": { + "logging": { + "logging": { + "text": "Log me Amadeus!" + } + } + } + } + + - do: + xpack.watcher.put_watch: + id: "watch_with_seq_no" + if_seq_no: $seqNo + if_primary_term: $primaryTerm + body: > + { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "http" : { + "request" : { + "host" : "host.domain", + "port" : 9200, + "path" : "/myservice", + "auth" : { + "basic" : { + "username" : "new_user", + "password" : "::es_redacted::" + } + } + } + } + }, + "actions": { + "logging": { + "logging": { + "text": "Log me Amadeus!" + } + } + } + } + + - do: + search: + rest_total_hits_as_int: true + index: .watches + body: > + { + "query": { + "term": { + "_id": { + "value": "watch_with_seq_no" + } + } + } + } + + + - match: { hits.total: 1 } + - match: { hits.hits.0._id: "watch_with_seq_no" } + - match: { hits.hits.0._source.input.http.request.auth.basic.username: "new_user" } + - match: { hits.hits.0._source.input.http.request.auth.basic.password: "pass" } + --- "Test putting a watch with a redacted password with current version works": diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java index 51f5fcb4d484f..bdbeb7e8c62fb 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java @@ -100,7 +100,8 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { if (isWatchDocument(shardId.getIndexName(), operation.type())) { DateTime now = new DateTime(clock.millis(), UTC); try { - Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON); + Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON, + operation.getIfSeqNo(), operation.getIfPrimaryTerm()); ShardAllocationConfiguration shardAllocationConfiguration = configuration.localShards.get(shardId); if (shardAllocationConfiguration == null) { logger.debug("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}", diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index f26ab5a14fb06..947a62d62ceee 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -296,7 +296,7 @@ private Collection loadWatches(ClusterState clusterState) { .source(new SearchSourceBuilder() .size(scrollSize) .sort(SortBuilders.fieldSort("_doc")) - .version(true)); + .seqNoAndPrimaryTerm(true)); response = client.search(searchRequest).actionGet(defaultSearchTimeout); if (response.getTotalShards() != response.getSuccessfulShards()) { @@ -339,8 +339,7 @@ private Collection loadWatches(ClusterState clusterState) { } try { - Watch watch = parser.parse(id, true, hit.getSourceRef(), XContentType.JSON); - watch.version(hit.getVersion()); + Watch watch = parser.parse(id, true, hit.getSourceRef(), XContentType.JSON, hit.getSeqNo(), hit.getPrimaryTerm()); if (watch.status().state().isActive()) { watches.add(watch); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 03289df73fc99..93e54437e45e7 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -278,7 +278,8 @@ record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "W if (resp.isExists() == false) { throw new ResourceNotFoundException("watch [{}] does not exist", watchId); } - return parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(), XContentType.JSON); + return parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(), XContentType.JSON, + resp.getSeqNo(), resp.getPrimaryTerm()); }); } catch (ResourceNotFoundException e) { String message = "unable to find watch for record [" + ctx.id() + "]"; @@ -349,7 +350,8 @@ public void updateWatchStatus(Watch watch) throws IOException { UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, watch.id()); updateRequest.doc(source); - updateRequest.version(watch.version()); + updateRequest.setIfSeqNo(watch.getSourceSeqNo()); + updateRequest.setIfPrimaryTerm(watch.getSourcePrimaryTerm()); try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { client.update(updateRequest).actionGet(indexDefaultTimeout); } catch (DocumentMissingException e) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestExecuteWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestExecuteWatchAction.java index a198eb458b18f..f8750e1b1754e 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestExecuteWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestExecuteWatchAction.java @@ -5,8 +5,8 @@ */ package org.elasticsearch.xpack.watcher.rest.action; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesReference; @@ -50,8 +50,7 @@ public class RestExecuteWatchAction extends WatcherRestHandler implements RestRe WatchField.INPUT.getPreferredName(), WatchField.CONDITION.getPreferredName(), WatchField.ACTIONS.getPreferredName(), WatchField.TRANSFORM.getPreferredName(), WatchField.THROTTLE_PERIOD.getPreferredName(), WatchField.THROTTLE_PERIOD_HUMAN.getPreferredName(), - WatchField.METADATA.getPreferredName(), WatchField.STATUS.getPreferredName(), - WatchField.VERSION.getPreferredName()); + WatchField.METADATA.getPreferredName(), WatchField.STATUS.getPreferredName()); public RestExecuteWatchAction(Settings settings, RestController controller) { super(settings); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatchAction.java index c0e4b325fb384..9e136599842b4 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatchAction.java @@ -5,11 +5,10 @@ */ package org.elasticsearch.xpack.watcher.rest.action; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestController; @@ -22,7 +21,6 @@ import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchResponse; import org.elasticsearch.xpack.watcher.rest.WatcherRestHandler; - import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestStatus.NOT_FOUND; import static org.elasticsearch.rest.RestStatus.OK; @@ -49,17 +47,9 @@ protected RestChannelConsumer doPrepareRequest(final RestRequest request, Watche return channel -> client.getWatch(getWatchRequest, new RestBuilderListener(channel) { @Override public RestResponse buildResponse(GetWatchResponse response, XContentBuilder builder) throws Exception { - builder.startObject() - .field("found", response.isFound()) - .field("_id", response.getId()); - if (response.isFound()) { - builder.field("_version", response.getVersion()); - ToXContent.MapParams xContentParams = new ToXContent.MapParams(request.params()); - builder.field("status", response.getStatus(), xContentParams); - builder.field("watch", response.getSource(), xContentParams); - } - builder.endObject(); - + builder.startObject(); + response.toXContent(builder, request); + builder.endObject(); RestStatus status = response.isFound() ? OK : NOT_FOUND; return new BytesRestResponse(status, builder); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestPutWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestPutWatchAction.java index 409298daecdf3..5e5515ace97ba 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestPutWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestPutWatchAction.java @@ -5,8 +5,8 @@ */ package org.elasticsearch.xpack.watcher.rest.action; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; @@ -57,15 +57,13 @@ protected RestChannelConsumer doPrepareRequest(final RestRequest request, Watche new PutWatchRequest(request.param("id"), request.content(), request.getXContentType()); putWatchRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putWatchRequest.masterNodeTimeout())); putWatchRequest.setVersion(request.paramAsLong("version", Versions.MATCH_ANY)); + putWatchRequest.setIfSeqNo(request.paramAsLong("if_seq_no", putWatchRequest.getIfSeqNo())); + putWatchRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", putWatchRequest.getIfPrimaryTerm())); putWatchRequest.setActive(request.paramAsBoolean("active", putWatchRequest.isActive())); return channel -> client.putWatch(putWatchRequest, new RestBuilderListener(channel) { @Override public RestResponse buildResponse(PutWatchResponse response, XContentBuilder builder) throws Exception { - builder.startObject() - .field("_id", response.getId()) - .field("_version", response.getVersion()) - .field("created", response.isCreated()) - .endObject(); + response.toXContent(builder, request); RestStatus status = response.isCreated() ? CREATED : OK; return new BytesRestResponse(status, builder); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java index 044320d9119af..2906d77337151 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java @@ -7,6 +7,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; @@ -54,6 +55,7 @@ public class TransportAckWatchAction extends WatcherTransportActionwrap(getResponse -> { if (getResponse.isExists()) { Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(), now, - XContentType.JSON); - watch.version(getResponse.getVersion()); + XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm()); watch.status().version(getResponse.getVersion()); // if we are not yet running in distributed mode, only call triggerservice, if we are on the master node if (localExecute(request) == false && this.clusterService.state().nodes().isLocalNodeElectedMaster()) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java index b88b8b2462142..ee058d385d90a 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -91,9 +92,8 @@ protected void masterOperation(ExecuteWatchRequest request, ClusterState state, executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest, ActionListener.wrap(response -> { if (response.isExists()) { - Watch watch = - watchParser.parse(request.getId(), true, response.getSourceAsBytesRef(), request.getXContentType()); - watch.version(response.getVersion()); + Watch watch = watchParser.parse(request.getId(), true, response.getSourceAsBytesRef(), + request.getXContentType(), response.getSeqNo(), response.getPrimaryTerm()); watch.status().version(response.getVersion()); executeWatch(request, listener, watch, true); } else { @@ -104,7 +104,7 @@ protected void masterOperation(ExecuteWatchRequest request, ClusterState state, try { assert !request.isRecordExecution(); Watch watch = watchParser.parse(ExecuteWatchRequest.INLINE_WATCH_ID, true, request.getWatchSource(), - request.getXContentType()); + request.getXContentType(), SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); executeWatch(request, listener, watch, false); } catch (IOException e) { logger.error(new ParameterizedMessage("failed to parse [{}]", request.getId()), e); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/get/TransportGetWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/get/TransportGetWatchAction.java index 08ac2ddad391d..34ee72c411d3d 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/get/TransportGetWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/get/TransportGetWatchAction.java @@ -72,15 +72,15 @@ protected void masterOperation(GetWatchRequest request, ClusterState state, // so that it indicates the the status is managed by watcher itself. DateTime now = new DateTime(clock.millis(), UTC); Watch watch = parser.parseWithSecrets(request.getId(), true, getResponse.getSourceAsBytesRef(), now, - XContentType.JSON); + XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm()); watch.toXContent(builder, WatcherParams.builder() .hideSecrets(true) .includeStatus(false) .build()); - watch.version(getResponse.getVersion()); watch.status().version(getResponse.getVersion()); - listener.onResponse(new GetWatchResponse(watch.id(), getResponse.getVersion(), watch.status(), - new XContentSource(BytesReference.bytes(builder), XContentType.JSON))); + listener.onResponse(new GetWatchResponse(watch.id(), getResponse.getVersion(), + watch.getSourceSeqNo(), watch.getSourcePrimaryTerm(), + watch.status(), new XContentSource(BytesReference.bytes(builder), XContentType.JSON))); } } else { listener.onResponse(new GetWatchResponse(request.getId())); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java index 1647ab8f6b604..d54a2798ea98b 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest; import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse; @@ -88,8 +89,9 @@ protected void masterOperation(PutWatchRequest request, ClusterState state, ActionListener listener) throws Exception { try { DateTime now = new DateTime(clock.millis(), UTC); - boolean isUpdate = request.getVersion() > 0; - Watch watch = parser.parseWithSecrets(request.getId(), false, request.getSource(), now, request.xContentType(), isUpdate); + boolean isUpdate = request.getVersion() > 0 || request.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO; + Watch watch = parser.parseWithSecrets(request.getId(), false, request.getSource(), now, request.xContentType(), + isUpdate, request.getIfSeqNo(), request.getIfPrimaryTerm()); watch.setState(request.isActive(), now); // ensure we only filter for the allowed headers @@ -103,7 +105,12 @@ protected void masterOperation(PutWatchRequest request, ClusterState state, if (isUpdate) { UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId()); - updateRequest.version(request.getVersion()); + if (request.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + updateRequest.setIfSeqNo(request.getIfSeqNo()); + updateRequest.setIfPrimaryTerm(request.getIfPrimaryTerm()); + } else { + updateRequest.version(request.getVersion()); + } updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); updateRequest.doc(builder); @@ -113,7 +120,8 @@ protected void masterOperation(PutWatchRequest request, ClusterState state, if (shouldBeTriggeredLocally(request, watch)) { triggerService.add(watch); } - listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created)); + listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), + response.getSeqNo(), response.getPrimaryTerm(), created)); }, listener::onFailure), client::update); } else { @@ -127,7 +135,8 @@ protected void masterOperation(PutWatchRequest request, ClusterState state, if (shouldBeTriggeredLocally(request, watch)) { triggerService.add(watch); } - listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created)); + listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), + response.getSeqNo(), response.getPrimaryTerm(), created)); }, listener::onFailure), client::index); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchParser.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchParser.java index bc65335e844a0..45b39f5c359b4 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchParser.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchParser.java @@ -9,12 +9,12 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.common.time.HaltedClock; import org.elasticsearch.xpack.core.watcher.actions.ActionRegistry; import org.elasticsearch.xpack.core.watcher.actions.ActionStatus; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; @@ -33,7 +33,6 @@ import org.elasticsearch.xpack.watcher.input.InputRegistry; import org.elasticsearch.xpack.watcher.input.none.ExecutableNoneInput; import org.elasticsearch.xpack.watcher.trigger.TriggerService; -import org.elasticsearch.xpack.common.time.HaltedClock; import org.joda.time.DateTime; import java.io.IOException; @@ -72,13 +71,15 @@ public WatchParser(TriggerService triggerService, ActionRegistry actionRegistry, this.clock = clock; } - public Watch parse(String name, boolean includeStatus, BytesReference source, XContentType xContentType) throws IOException { - return parse(name, includeStatus, false, source, new DateTime(clock.millis(), UTC), xContentType, false); + public Watch parse(String name, boolean includeStatus, BytesReference source, XContentType xContentType, + long sourceSeqNo, long sourcePrimaryTerm) throws IOException { + return parse(name, includeStatus, false, source, new DateTime(clock.millis(), UTC), xContentType, false, + sourceSeqNo, sourcePrimaryTerm); } public Watch parse(String name, boolean includeStatus, BytesReference source, DateTime now, - XContentType xContentType) throws IOException { - return parse(name, includeStatus, false, source, now, xContentType, false); + XContentType xContentType, long sourceSeqNo, long sourcePrimaryTerm) throws IOException { + return parse(name, includeStatus, false, source, now, xContentType, false, sourceSeqNo, sourcePrimaryTerm); } /** @@ -93,17 +94,20 @@ public Watch parse(String name, boolean includeStatus, BytesReference source, Da * */ public Watch parseWithSecrets(String id, boolean includeStatus, BytesReference source, DateTime now, - XContentType xContentType, boolean allowRedactedPasswords) throws IOException { - return parse(id, includeStatus, true, source, now, xContentType, allowRedactedPasswords); + XContentType xContentType, boolean allowRedactedPasswords, long sourceSeqNo, long sourcePrimaryTerm + ) throws IOException { + return parse(id, includeStatus, true, source, now, xContentType, allowRedactedPasswords, sourceSeqNo, sourcePrimaryTerm); } + public Watch parseWithSecrets(String id, boolean includeStatus, BytesReference source, DateTime now, - XContentType xContentType) throws IOException { - return parse(id, includeStatus, true, source, now, xContentType, false); + XContentType xContentType, long sourceSeqNo, long sourcePrimaryTerm) throws IOException { + return parse(id, includeStatus, true, source, now, xContentType, false, sourceSeqNo, sourcePrimaryTerm); } private Watch parse(String id, boolean includeStatus, boolean withSecrets, BytesReference source, DateTime now, - XContentType xContentType, boolean allowRedactedPasswords) throws IOException { + XContentType xContentType, boolean allowRedactedPasswords, long sourceSeqNo, long sourcePrimaryTerm) + throws IOException { if (logger.isTraceEnabled()) { logger.trace("parsing watch [{}] ", source.utf8ToString()); } @@ -113,13 +117,14 @@ private Watch parse(String id, boolean includeStatus, boolean withSecrets, Bytes .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream), new HaltedClock(now), withSecrets ? cryptoService : null, allowRedactedPasswords)) { parser.nextToken(); - return parse(id, includeStatus, parser); + return parse(id, includeStatus, parser, sourceSeqNo, sourcePrimaryTerm); } catch (IOException ioe) { throw ioException("could not parse watch [{}]", ioe, id); } } - public Watch parse(String id, boolean includeStatus, XContentParser parser) throws IOException { + public Watch parse(String id, boolean includeStatus, WatcherXContentParser parser, long sourceSeqNo, long sourcePrimaryTerm) + throws IOException { Trigger trigger = null; ExecutableInput input = defaultInput; ExecutableCondition condition = defaultCondition; @@ -128,7 +133,6 @@ public Watch parse(String id, boolean includeStatus, XContentParser parser) thro TimeValue throttlePeriod = null; Map metatdata = null; WatchStatus status = null; - long version = Versions.MATCH_ANY; String currentFieldName = null; XContentParser.Token token; @@ -161,8 +165,6 @@ public Watch parse(String id, boolean includeStatus, XContentParser parser) thro actions = actionRegistry.parseActions(id, parser); } else if (WatchField.METADATA.match(currentFieldName, parser.getDeprecationHandler())) { metatdata = parser.map(); - } else if (WatchField.VERSION.match(currentFieldName, parser.getDeprecationHandler())) { - version = parser.longValue(); } else if (WatchField.STATUS.match(currentFieldName, parser.getDeprecationHandler())) { if (includeStatus) { status = WatchStatus.parse(id, parser, clock); @@ -197,6 +199,7 @@ public Watch parse(String id, boolean includeStatus, XContentParser parser) thro } - return new Watch(id, trigger, input, condition, transform, throttlePeriod, actions, metatdata, status, version); + return new Watch( + id, trigger, input, condition, transform, throttlePeriod, actions, metatdata, status, sourceSeqNo, sourcePrimaryTerm); } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java index aca6fb8d03b1b..2f23df0a56013 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java @@ -64,6 +64,7 @@ import static org.hamcrest.core.Is.is; import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -135,13 +136,13 @@ public void testPreIndex() throws Exception { boolean watchActive = randomBoolean(); boolean isNewWatch = randomBoolean(); Watch watch = mockWatch("_id", watchActive, isNewWatch); - when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject())).thenReturn(watch); + when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch); Engine.Index returnedOperation = listener.preIndex(shardId, operation); assertThat(returnedOperation, is(operation)); DateTime now = new DateTime(clock.millis(), UTC); - verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject()); + verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject(), anyLong(), anyLong()); if (isNewWatch) { if (watchActive) { @@ -163,7 +164,7 @@ public void testPreIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Excep when(shardId.getIndexName()).thenReturn(Watch.INDEX); when(operation.type()).thenReturn(Watch.DOC_TYPE); - when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject())).thenReturn(watch); + when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch); for (int idx = 0; idx < totalShardCount; idx++) { final Map localShards = new HashMap<>(); @@ -208,7 +209,7 @@ public void testPreIndexCheckParsingException() throws Exception { when(operation.id()).thenReturn(id); when(operation.source()).thenReturn(BytesArray.EMPTY); when(shardId.getIndexName()).thenReturn(Watch.INDEX); - when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject())) + when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())) .thenThrow(new IOException("self thrown")); ElasticsearchParseException exc = expectThrows(ElasticsearchParseException.class, diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index 0f670ea4cde9e..2ef3dcda598ca 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -68,6 +68,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -192,7 +193,7 @@ void stopExecutor() { Watch watch = mock(Watch.class); when(watchStatus.state()).thenReturn(state); when(watch.status()).thenReturn(watchStatus); - when(parser.parse(eq(id), eq(true), any(), eq(XContentType.JSON))).thenReturn(watch); + when(parser.parse(eq(id), eq(true), any(), eq(XContentType.JSON), anyLong(), anyLong())).thenReturn(watch); } SearchHits searchHits = new SearchHits(hits, count, 1.0f); SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, false, null, 1); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index b409ffd28cae8..7dd87be42cf10 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -100,6 +100,7 @@ import static org.joda.time.DateTime.now; import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -165,7 +166,7 @@ public void testExecute() throws Exception { DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); - when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); Condition.Result conditionResult = InternalAlwaysCondition.RESULT_INSTANCE; ExecutableCondition condition = mock(ExecutableCondition.class); @@ -259,7 +260,7 @@ public void testExecuteFailedInput() throws Exception { DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); - when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); input = mock(ExecutableInput.class); Input.Result inputResult = mock(Input.Result.class); @@ -328,7 +329,7 @@ public void testExecuteFailedCondition() throws Exception { DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); - when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); ExecutableCondition condition = mock(ExecutableCondition.class); Condition.Result conditionResult = mock(Condition.Result.class); @@ -393,7 +394,7 @@ public void testExecuteFailedWatchTransform() throws Exception { DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); - when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); Condition.Result conditionResult = InternalAlwaysCondition.RESULT_INSTANCE; ExecutableCondition condition = mock(ExecutableCondition.class); @@ -453,7 +454,7 @@ public void testExecuteFailedActionTransform() throws Exception { GetResponse getResponse = mock(GetResponse.class); when(getResponse.isExists()).thenReturn(true); mockGetWatchResponse(client, "_id", getResponse); - when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); @@ -827,7 +828,7 @@ public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exc when(getResponse.isExists()).thenReturn(true); when(getResponse.getId()).thenReturn("foo"); mockGetWatchResponse(client, "foo", getResponse); - when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); // execute needs to fail as well as storing the history doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); @@ -960,7 +961,7 @@ public void testWatchInactive() throws Exception { GetResponse getResponse = mock(GetResponse.class); when(getResponse.isExists()).thenReturn(true); mockGetWatchResponse(client, "_id", getResponse); - when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); WatchRecord.MessageWatchRecord record = mock(WatchRecord.MessageWatchRecord.class); when(record.state()).thenReturn(ExecutionState.EXECUTION_NOT_NEEDED); @@ -973,7 +974,7 @@ public void testWatchInactive() throws Exception { public void testUpdateWatchStatusDoesNotUpdateState() throws Exception { WatchStatus status = new WatchStatus(DateTime.now(UTC), Collections.emptyMap()); Watch watch = new Watch("_id", new ManualTrigger(), new ExecutableNoneInput(), InternalAlwaysCondition.INSTANCE, null, null, - Collections.emptyList(), null, status, 1L); + Collections.emptyList(), null, status, 1L, 1L); final AtomicBoolean assertionsTriggered = new AtomicBoolean(false); doAnswer(invocation -> { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java index 318d9078bc667..5694b81dea398 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java @@ -168,7 +168,7 @@ public static WatchExecutionContext createWatchExecutionContext() throws Excepti null, new ArrayList<>(), null, - new WatchStatus(new DateTime(0, UTC), emptyMap()), 1L); + new WatchStatus(new DateTime(0, UTC), emptyMap()), 1L, 1L); TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), new DateTime(0, UTC), new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), @@ -214,7 +214,7 @@ public static Watch createTestWatch(String watchName, Client client, HttpClient new TimeValue(0), actions, Collections.singletonMap("foo", "bar"), - new WatchStatus(now, statuses), 1L); + new WatchStatus(now, statuses), 1L, 1L); } public static ScriptService createScriptService(ThreadPool tp) throws Exception { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java index d1b8963950dd8..183e80bf2528d 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java @@ -57,7 +57,7 @@ public static void main(String[] args) throws Exception { List watches = new ArrayList<>(numWatches); for (int i = 0; i < numWatches; i++) { watches.add(new Watch("job_" + i, new ScheduleTrigger(interval(interval + "s")), new ExecutableNoneInput(), - InternalAlwaysCondition.INSTANCE, null, null, Collections.emptyList(), null, null, 1L)); + InternalAlwaysCondition.INSTANCE, null, null, Collections.emptyList(), null, null, 1L, 1L)); } ScheduleRegistry scheduleRegistry = new ScheduleRegistry(emptySet()); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java index 1ae49265352f7..d26ef467d33f6 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java @@ -110,7 +110,8 @@ public void testAckSingleAction() throws Exception { GetWatchResponse getWatchResponse = watcherClient().prepareGetWatch("_id").get(); assertThat(getWatchResponse.isFound(), is(true)); - Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes(), XContentType.JSON); + Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes(), + XContentType.JSON, getWatchResponse.getSeqNo(), getWatchResponse.getPrimaryTerm()); assertThat(parsedWatch.status().actionStatus("_a1").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION)); assertThat(parsedWatch.status().actionStatus("_a2").ackStatus().state(), @@ -178,7 +179,8 @@ public void testAckAllActions() throws Exception { GetWatchResponse getWatchResponse = watcherClient().prepareGetWatch("_id").get(); assertThat(getWatchResponse.isFound(), is(true)); - Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes(), XContentType.JSON); + Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, + getWatchResponse.getSource().getBytes(), XContentType.JSON, getWatchResponse.getSeqNo(), getWatchResponse.getPrimaryTerm()); assertThat(parsedWatch.status().actionStatus("_a1").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION)); assertThat(parsedWatch.status().actionStatus("_a2").ackStatus().state(), @@ -219,7 +221,8 @@ public void testAckWithRestart() throws Exception { refresh(); GetResponse getResponse = client().get(new GetRequest(Watch.INDEX, Watch.DOC_TYPE, "_name")).actionGet(); - Watch indexedWatch = watchParser().parse("_name", true, getResponse.getSourceAsBytesRef(), XContentType.JSON); + Watch indexedWatch = watchParser().parse("_name", true, getResponse.getSourceAsBytesRef(), XContentType.JSON, + getResponse.getSeqNo(), getResponse.getPrimaryTerm()); assertThat(watchResponse.getStatus().actionStatus("_id").ackStatus().state(), equalTo(indexedWatch.status().actionStatus("_id").ackStatus().state())); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java index 307bf8f9fd0f2..a419bc03945be 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java @@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; @@ -60,7 +61,7 @@ public void setupAction() { ThreadContext threadContext = new ThreadContext(Settings.EMPTY); when(threadPool.getThreadContext()).thenReturn(threadContext); WatchParser watchParser = mock(WatchParser.class); - ClusterService clusterService = mock(ClusterService.class); + ClusterService clusterService = createClusterService(threadPool); client = mock(Client.class); when(client.threadPool()).thenReturn(threadPool); action = new TransportAckWatchAction(Settings.EMPTY, transportService, threadPool, diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchActionTests.java index e95af60e9bcc9..82959099e78db 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchActionTests.java @@ -47,6 +47,7 @@ import static java.util.Arrays.asList; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -71,7 +72,8 @@ public void setupAction() throws Exception { TransportService transportService = mock(TransportService.class); WatchParser parser = mock(WatchParser.class); - when(parser.parseWithSecrets(eq("watch_id"), eq(true), anyObject(), anyObject(), anyObject())).thenReturn(watch); + when(parser.parseWithSecrets(eq("watch_id"), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())) + .thenReturn(watch); Client client = mock(Client.class); when(client.threadPool()).thenReturn(threadPool); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java index 5e7df1987805e..61e7e06e5cc3a 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java @@ -51,6 +51,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -76,7 +77,8 @@ public void setupAction() throws Exception { TransportService transportService = mock(TransportService.class); WatchParser parser = mock(WatchParser.class); - when(parser.parseWithSecrets(eq("_id"), eq(false), anyObject(), anyObject(), anyObject(), anyBoolean())).thenReturn(watch); + when(parser.parseWithSecrets(eq("_id"), eq(false), anyObject(), anyObject(), anyObject(), anyBoolean(), anyLong(), anyLong())) + .thenReturn(watch); WatchStatus status = mock(WatchStatus.class); WatchStatus.State state = new WatchStatus.State(true, DateTime.now(DateTimeZone.UTC)); when(status.state()).thenReturn(state); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java index 898ae8ac9aa55..0f6ca200cb4ab 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java @@ -5,8 +5,8 @@ */ package org.elasticsearch.xpack.watcher.trigger.schedule.engine; -import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; @@ -273,6 +273,6 @@ public void testAddOnlyWithNewSchedule() { private Watch createWatch(String name, Schedule schedule) { return new Watch(name, new ScheduleTrigger(schedule), new ExecutableNoneInput(), InternalAlwaysCondition.INSTANCE, null, null, - Collections.emptyList(), null, null, Versions.MATCH_ANY); + Collections.emptyList(), null, null, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java index 745f1520fe0c6..41d6c289e7470 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java @@ -201,13 +201,16 @@ public void testParserSelfGenerated() throws Exception { TimeValue throttlePeriod = randomBoolean() ? null : TimeValue.timeValueSeconds(randomIntBetween(5, 10000)); - Watch watch = new Watch("_name", trigger, input, condition, transform, throttlePeriod, actions, metadata, watchStatus, 1L); + final long sourceSeqNo = randomNonNegativeLong(); + final long sourcePrimaryTerm = randomLongBetween(1, 200); + Watch watch = new Watch("_name", trigger, input, condition, transform, throttlePeriod, actions, metadata, watchStatus, + sourceSeqNo, sourcePrimaryTerm); BytesReference bytes = BytesReference.bytes(jsonBuilder().value(watch)); logger.info("{}", bytes.utf8ToString()); WatchParser watchParser = new WatchParser(triggerService, actionRegistry, inputRegistry, null, clock); - Watch parsedWatch = watchParser.parse("_name", includeStatus, bytes, XContentType.JSON); + Watch parsedWatch = watchParser.parse("_name", includeStatus, bytes, XContentType.JSON, sourceSeqNo, sourcePrimaryTerm); if (includeStatus) { assertThat(parsedWatch.status(), equalTo(watchStatus)); @@ -220,6 +223,8 @@ public void testParserSelfGenerated() throws Exception { } assertThat(parsedWatch.metadata(), equalTo(metadata)); assertThat(parsedWatch.actions(), equalTo(actions)); + assertThat(parsedWatch.getSourceSeqNo(), equalTo(sourceSeqNo)); + assertThat(parsedWatch.getSourcePrimaryTerm(), equalTo(sourcePrimaryTerm)); } public void testThatBothStatusFieldsCanBeRead() throws Exception { @@ -250,7 +255,7 @@ public Trigger parseTrigger(String jobName, XContentParser parser) throws IOExce WatchParser watchParser = new WatchParser(triggerService, actionRegistry, inputRegistry, null, clock); XContentBuilder builder = jsonBuilder().startObject().startObject("trigger").endObject().field("status", watchStatus).endObject(); - Watch watch = watchParser.parse("foo", true, BytesReference.bytes(builder), XContentType.JSON); + Watch watch = watchParser.parse("foo", true, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L); assertThat(watch.status().state().getTimestamp().getMillis(), is(clock.millis())); for (ActionWrapper action : actions) { assertThat(watch.status().actionStatus(action.id()), is(actionsStatuses.get(action.id()))); @@ -278,7 +283,7 @@ public void testParserBadActions() throws Exception { .endObject(); WatchParser watchParser = new WatchParser(triggerService, actionRegistry, inputRegistry, null, clock); try { - watchParser.parse("failure", false, BytesReference.bytes(jsonBuilder), XContentType.JSON); + watchParser.parse("failure", false, BytesReference.bytes(jsonBuilder), XContentType.JSON, 1L, 1L); fail("This watch should fail to parse as actions is an array"); } catch (ElasticsearchParseException pe) { assertThat(pe.getMessage().contains("could not parse actions for watch [failure]"), is(true)); @@ -303,7 +308,7 @@ public void testParserDefaults() throws Exception { .endObject(); builder.endObject(); WatchParser watchParser = new WatchParser(triggerService, actionRegistry, inputRegistry, null, Clock.systemUTC()); - Watch watch = watchParser.parse("failure", false, BytesReference.bytes(builder), XContentType.JSON); + Watch watch = watchParser.parse("failure", false, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L); assertThat(watch, notNullValue()); assertThat(watch.trigger(), instanceOf(ScheduleTrigger.class)); assertThat(watch.input(), instanceOf(ExecutableNoneInput.class)); @@ -369,13 +374,13 @@ public void testParseWatch_verifyScriptLangDefault() throws Exception { builder.endObject(); // parse in default mode: - Watch watch = watchParser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON); + Watch watch = watchParser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L); assertThat(((ScriptCondition) watch.condition()).getScript().getLang(), equalTo(Script.DEFAULT_SCRIPT_LANG)); WatcherSearchTemplateRequest request = ((SearchInput) watch.input().input()).getRequest(); SearchRequest searchRequest = searchTemplateService.toSearchRequest(request); assertThat(((ScriptQueryBuilder) searchRequest.source().query()).script().getLang(), equalTo(Script.DEFAULT_SCRIPT_LANG)); } - + private static Schedule randomSchedule() { String type = randomFrom(CronSchedule.TYPE, HourlySchedule.TYPE, DailySchedule.TYPE, WeeklySchedule.TYPE, MonthlySchedule.TYPE, YearlySchedule.TYPE, IntervalSchedule.TYPE);