diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 7a6a562e4fb7f..1286e083d8203 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -76,6 +76,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.script.mustache.MultiSearchTemplateRequest; import org.elasticsearch.script.mustache.SearchTemplateRequest; @@ -885,6 +886,20 @@ Params withVersionType(VersionType versionType) { return this; } + Params withIfSeqNo(long ifSeqNo) { + if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + return putParam("if_seq_no", Long.toString(ifSeqNo)); + } + return this; + } + + Params withIfPrimaryTerm(long ifPrimaryTerm) { + if (ifPrimaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { + return putParam("if_primary_term", Long.toString(ifPrimaryTerm)); + } + return this; + } + Params withWaitForActiveShards(ActiveShardCount activeShardCount) { return withWaitForActiveShards(activeShardCount, ActiveShardCount.DEFAULT); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java index a5c6d0dd1810e..34fb826d62382 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java @@ -69,7 +69,10 @@ static Request putWatch(PutWatchRequest putWatchRequest) { .build(); Request request = new Request(HttpPut.METHOD_NAME, endpoint); - RequestConverters.Params params = new RequestConverters.Params(request).withVersion(putWatchRequest.getVersion()); + RequestConverters.Params params = new RequestConverters.Params(request) + .withVersion(putWatchRequest.getVersion()) + .withIfSeqNo(putWatchRequest.ifSeqNo()) + .withIfPrimaryTerm(putWatchRequest.ifPrimaryTerm()); if (putWatchRequest.isActive() == false) { params.putParam("active", "false"); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/GetWatchResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/GetWatchResponse.java index 9f5934b33eb30..83727003106e9 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/GetWatchResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/GetWatchResponse.java @@ -31,9 +31,14 @@ import java.util.Map; import java.util.Objects; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + public class GetWatchResponse { private final String id; private final long version; + private final long seqNo; + private final long primaryTerm; private final WatchStatus status; private final BytesReference source; @@ -43,15 +48,18 @@ public class GetWatchResponse { * Ctor for missing watch */ public GetWatchResponse(String id) { - this(id, Versions.NOT_FOUND, null, null, null); + this(id, Versions.NOT_FOUND, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, null, null, null); } - public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source, XContentType xContentType) { + public GetWatchResponse(String id, long version, long seqNo, long primaryTerm, WatchStatus status, + BytesReference source, XContentType xContentType) { this.id = id; this.version = version; this.status = status; this.source = source; this.xContentType = xContentType; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; } public String getId() { @@ -62,6 +70,14 @@ public long getVersion() { return version; } + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + public boolean isFound() { return version != Versions.NOT_FOUND; } @@ -111,6 +127,8 @@ public int hashCode() { private static final ParseField ID_FIELD = new ParseField("_id"); private static final ParseField FOUND_FIELD = new ParseField("found"); private static final ParseField VERSION_FIELD = new ParseField("_version"); + private static final ParseField SEQ_NO_FIELD = new ParseField("_seq_no"); + private static final ParseField PRIMARY_TERM_FIELD = new ParseField("_primary_term"); private static final ParseField STATUS_FIELD = new ParseField("status"); private static final ParseField WATCH_FIELD = new ParseField("watch"); @@ -119,9 +137,10 @@ public int hashCode() { a -> { boolean isFound = (boolean) a[1]; if (isFound) { - XContentBuilder builder = (XContentBuilder) a[4]; + XContentBuilder builder = (XContentBuilder) a[6]; BytesReference source = BytesReference.bytes(builder); - return new GetWatchResponse((String) a[0], (long) a[2], (WatchStatus) a[3], source, builder.contentType()); + return new GetWatchResponse((String) a[0], (long) a[2], (long) a[3], (long) a[4], (WatchStatus) a[5], + source, builder.contentType()); } else { return new GetWatchResponse((String) a[0]); } @@ -131,6 +150,8 @@ public int hashCode() { PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD); PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FOUND_FIELD); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VERSION_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), SEQ_NO_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), PRIMARY_TERM_FIELD); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (parser, context) -> WatchStatus.parse(parser), STATUS_FIELD); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java index 88f47aeaeee7a..8b83970723dd2 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java @@ -23,10 +23,14 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.util.Objects; import java.util.regex.Pattern; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + /** * This request class contains the data needed to create a watch along with the name of the watch. * The name of the watch will become the ID of the indexed document. @@ -40,6 +44,9 @@ public final class PutWatchRequest implements Validatable { private final XContentType xContentType; private boolean active = true; private long version = Versions.MATCH_ANY; + private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + public PutWatchRequest(String id, BytesReference source, XContentType xContentType) { Objects.requireNonNull(id, "watch id is missing"); @@ -96,6 +103,56 @@ public void setVersion(long version) { this.version = version; } + /** + * only performs this put request if the watch's last modification was assigned the given + * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)} + * + * If the watch's last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public PutWatchRequest setIfSeqNo(long seqNo) { + if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "]."); + } + ifSeqNo = seqNo; + return this; + } + + /** + * only performs this put request if the watch's last modification was assigned the given + * primary term. Must be used in combination with {@link #setIfSeqNo(long)} + * + * If the watch last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public PutWatchRequest setIfPrimaryTerm(long term) { + if (term < 0) { + throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]"); + } + ifPrimaryTerm = term; + return this; + } + + /** + * If set, only perform this put watch request if the watch's last modification was assigned this sequence number. + * If the watch last last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public long ifSeqNo() { + return ifSeqNo; + } + + /** + * If set, only perform this put watch request if the watch's last modification was assigned this primary term. + * + * If the watch's last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public long ifPrimaryTerm() { + return ifPrimaryTerm; + } + + public static boolean isValidId(String id) { return Strings.isEmpty(id) == false && NO_WS_PATTERN.matcher(id).matches(); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchResponse.java index 8f7070b2565a2..61742b84e1219 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchResponse.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchResponse.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.IOException; import java.util.Objects; @@ -32,20 +33,26 @@ public class PutWatchResponse { static { PARSER.declareString(PutWatchResponse::setId, new ParseField("_id")); + PARSER.declareLong(PutWatchResponse::setSeqNo, new ParseField("_seq_no")); + PARSER.declareLong(PutWatchResponse::setPrimaryTerm, new ParseField("_primary_term")); PARSER.declareLong(PutWatchResponse::setVersion, new ParseField("_version")); PARSER.declareBoolean(PutWatchResponse::setCreated, new ParseField("created")); } private String id; private long version; + private long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + private long primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM; private boolean created; public PutWatchResponse() { } - public PutWatchResponse(String id, long version, boolean created) { + public PutWatchResponse(String id, long version, long seqNo, long primaryTerm, boolean created) { this.id = id; this.version = version; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; this.created = created; } @@ -57,6 +64,14 @@ private void setVersion(long version) { this.version = version; } + private void setSeqNo(long seqNo) { + this.seqNo = seqNo; + } + + private void setPrimaryTerm(long primaryTerm) { + this.primaryTerm = primaryTerm; + } + private void setCreated(boolean created) { this.created = created; } @@ -69,6 +84,14 @@ public long getVersion() { return version; } + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + public boolean isCreated() { return created; } @@ -80,12 +103,14 @@ public boolean equals(Object o) { PutWatchResponse that = (PutWatchResponse) o; - return Objects.equals(id, that.id) && Objects.equals(version, that.version) && Objects.equals(created, that.created); + return Objects.equals(id, that.id) && Objects.equals(version, that.version) + && Objects.equals(seqNo, that.seqNo) + && Objects.equals(primaryTerm, that.primaryTerm) && Objects.equals(created, that.created); } @Override public int hashCode() { - return Objects.hash(id, version, created); + return Objects.hash(id, version, seqNo, primaryTerm, created); } public static PutWatchResponse fromXContent(XContentParser parser) throws IOException { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/watcher/PutWatchResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/watcher/PutWatchResponseTests.java index af327abc1a728..d358f5c8955ae 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/watcher/PutWatchResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/watcher/PutWatchResponseTests.java @@ -41,14 +41,18 @@ private static XContentBuilder toXContent(PutWatchResponse response, XContentBui return builder.startObject() .field("_id", response.getId()) .field("_version", response.getVersion()) + .field("_seq_no", response.getSeqNo()) + .field("_primary_term", response.getPrimaryTerm()) .field("created", response.isCreated()) .endObject(); } private static PutWatchResponse createTestInstance() { String id = randomAlphaOfLength(10); + long seqNo = randomNonNegativeLong(); + long primaryTerm = randomLongBetween(1, 200); long version = randomLongBetween(1, 10); boolean created = randomBoolean(); - return new PutWatchResponse(id, version, created); + return new PutWatchResponse(id, version, seqNo, primaryTerm, created); } } diff --git a/x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc b/x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc index a1704e9acc329..44b22f713d2a6 100644 --- a/x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc +++ b/x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc @@ -93,6 +93,8 @@ The action state of a newly-created watch is `awaits_successful_execution`: -------------------------------------------------- { "found": true, + "_seq_no": 0, + "_primary_term": 1, "_version": 1, "_id": "my_watch", "status": { @@ -137,6 +139,8 @@ and the action is now in `ackable` state: { "found": true, "_id": "my_watch", + "_seq_no": 1, + "_primary_term": 1, "_version": 2, "status": { "version": 2, @@ -186,6 +190,8 @@ GET _watcher/watch/my_watch { "found": true, "_id": "my_watch", + "_seq_no": 2, + "_primary_term": 1, "_version": 3, "status": { "version": 3, diff --git a/x-pack/docs/en/rest-api/watcher/activate-watch.asciidoc b/x-pack/docs/en/rest-api/watcher/activate-watch.asciidoc index ae49ba1f369c8..74a98a00fa423 100644 --- a/x-pack/docs/en/rest-api/watcher/activate-watch.asciidoc +++ b/x-pack/docs/en/rest-api/watcher/activate-watch.asciidoc @@ -44,6 +44,8 @@ GET _watcher/watch/my_watch { "found": true, "_id": "my_watch", + "_seq_no": 0, + "_primary_term": 1, "_version": 1, "status": { "state" : { diff --git a/x-pack/docs/en/rest-api/watcher/deactivate-watch.asciidoc b/x-pack/docs/en/rest-api/watcher/deactivate-watch.asciidoc index 96fad5a702854..59625c1391119 100644 --- a/x-pack/docs/en/rest-api/watcher/deactivate-watch.asciidoc +++ b/x-pack/docs/en/rest-api/watcher/deactivate-watch.asciidoc @@ -44,6 +44,8 @@ GET _watcher/watch/my_watch "found": true, "_id": "my_watch", "_version": 1, + "_seq_no": 0, + "_primary_term": 1, "status": { "state" : { "active" : true, diff --git a/x-pack/docs/en/rest-api/watcher/get-watch.asciidoc b/x-pack/docs/en/rest-api/watcher/get-watch.asciidoc index 7d62b5c76c41d..421cf7e5efff0 100644 --- a/x-pack/docs/en/rest-api/watcher/get-watch.asciidoc +++ b/x-pack/docs/en/rest-api/watcher/get-watch.asciidoc @@ -44,6 +44,8 @@ Response: { "found": true, "_id": "my_watch", + "_seq_no": 0, + "_primary_term": 1, "_version": 1, "status": { <1> "version": 1, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchRequest.java index 7997d853db37a..fbee2963ad92d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchRequest.java @@ -5,19 +5,24 @@ */ package org.elasticsearch.protocol.xpack.watcher; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.ValidateActions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.IOException; import java.util.regex.Pattern; +import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + /** * This request class contains the data needed to create a watch along with the name of the watch. * The name of the watch will become the ID of the indexed document. @@ -32,6 +37,9 @@ public final class PutWatchRequest extends ActionRequest { private boolean active = true; private long version = Versions.MATCH_ANY; + private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + public PutWatchRequest() {} public PutWatchRequest(StreamInput in) throws IOException { @@ -52,6 +60,13 @@ public void readFrom(StreamInput in) throws IOException { active = in.readBoolean(); xContentType = in.readEnum(XContentType.class); version = in.readZLong(); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + ifSeqNo = in.readZLong(); + ifPrimaryTerm = in.readVLong(); + } else { + ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; + } } @Override @@ -62,6 +77,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(active); out.writeEnum(xContentType); out.writeZLong(version); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeZLong(ifSeqNo); + out.writeVLong(ifPrimaryTerm); + } } /** @@ -122,20 +141,80 @@ public void setVersion(long version) { this.version = version; } + /** + * only performs this put request if the watch's last modification was assigned the given + * sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)} + * + * If the watch's last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public PutWatchRequest setIfSeqNo(long seqNo) { + if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) { + throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "]."); + } + ifSeqNo = seqNo; + return this; + } + + /** + * only performs this put request if the watch's last modification was assigned the given + * primary term. Must be used in combination with {@link #setIfSeqNo(long)} + * + * If the watch last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public PutWatchRequest setIfPrimaryTerm(long term) { + if (term < 0) { + throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]"); + } + ifPrimaryTerm = term; + return this; + } + + /** + * If set, only perform this put watch request if the watch's last modification was assigned this sequence number. + * If the watch last last modification was assigned a different sequence number a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public long getIfSeqNo() { + return ifSeqNo; + } + + /** + * If set, only perform this put watch request if the watch's last modification was assigned this primary term. + * + * If the watch's last modification was assigned a different term a + * {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown. + */ + public long getIfPrimaryTerm() { + return ifPrimaryTerm; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; if (id == null) { - validationException = ValidateActions.addValidationError("watch id is missing", validationException); + validationException = addValidationError("watch id is missing", validationException); } else if (isValidId(id) == false) { - validationException = ValidateActions.addValidationError("watch id contains whitespace", validationException); + validationException = addValidationError("watch id contains whitespace", validationException); } if (source == null) { - validationException = ValidateActions.addValidationError("watch source is missing", validationException); + validationException = addValidationError("watch source is missing", validationException); } if (xContentType == null) { - validationException = ValidateActions.addValidationError("request body is missing", validationException); + validationException = addValidationError("request body is missing", validationException); + } + if (ifSeqNo != UNASSIGNED_SEQ_NO && version != Versions.MATCH_ANY) { + validationException = addValidationError("compare and write operations can not use versioning", validationException); } + if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) { + validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException); + } + if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) { + validationException = + addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException); + } + return validationException; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponse.java index f6e55ff555339..ab8496f839a3c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponse.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.protocol.xpack.watcher; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; @@ -13,6 +14,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.IOException; import java.util.Objects; @@ -24,19 +26,25 @@ public class PutWatchResponse extends ActionResponse implements ToXContentObject static { PARSER.declareString(PutWatchResponse::setId, new ParseField("_id")); PARSER.declareLong(PutWatchResponse::setVersion, new ParseField("_version")); + PARSER.declareLong(PutWatchResponse::setSeqNo, new ParseField("_seq_no")); + PARSER.declareLong(PutWatchResponse::setPrimaryTerm, new ParseField("_primary_term")); PARSER.declareBoolean(PutWatchResponse::setCreated, new ParseField("created")); } private String id; private long version; + private long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + private long primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM; private boolean created; public PutWatchResponse() { } - public PutWatchResponse(String id, long version, boolean created) { + public PutWatchResponse(String id, long version, long seqNo, long primaryTerm, boolean created) { this.id = id; this.version = version; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; this.created = created; } @@ -48,6 +56,14 @@ private void setVersion(long version) { this.version = version; } + private void setSeqNo(long seqNo) { + this.seqNo = seqNo; + } + + private void setPrimaryTerm(long primaryTerm) { + this.primaryTerm = primaryTerm; + } + private void setCreated(boolean created) { this.created = created; } @@ -60,6 +76,14 @@ public long getVersion() { return version; } + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + public boolean isCreated() { return created; } @@ -71,12 +95,14 @@ public boolean equals(Object o) { PutWatchResponse that = (PutWatchResponse) o; - return Objects.equals(id, that.id) && Objects.equals(version, that.version) && Objects.equals(created, that.created); + return Objects.equals(id, that.id) && Objects.equals(version, that.version) + && Objects.equals(seqNo, that.seqNo) + && Objects.equals(primaryTerm, that.primaryTerm) && Objects.equals(created, that.created); } @Override public int hashCode() { - return Objects.hash(id, version, created); + return Objects.hash(id, version, seqNo, primaryTerm, created); } @Override @@ -84,6 +110,10 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(id); out.writeVLong(version); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeZLong(seqNo); + out.writeVLong(primaryTerm); + } out.writeBoolean(created); } @@ -92,6 +122,13 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); id = in.readString(); version = in.readVLong(); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + seqNo = in.readZLong(); + primaryTerm = in.readVLong(); + } else { + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM; + } created = in.readBoolean(); } @@ -100,6 +137,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder.startObject() .field("_id", id) .field("_version", version) + .field("_seq_no", seqNo) + .field("_primary_term", primaryTerm) .field("created", created) .endObject(); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/get/GetWatchResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/get/GetWatchResponse.java index d92ae1dcc4626..e61632f7d1d4c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/get/GetWatchResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/transport/actions/get/GetWatchResponse.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.watcher.transport.actions.get; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -12,6 +13,7 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.core.watcher.watch.WatchStatus; @@ -25,6 +27,8 @@ public class GetWatchResponse extends ActionResponse implements ToXContent { private boolean found; private XContentSource source; private long version; + private long seqNo; + private long primaryTerm; public GetWatchResponse() { } @@ -38,17 +42,21 @@ public GetWatchResponse(String id) { this.found = false; this.source = null; this.version = Versions.NOT_FOUND; + this.seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + this.primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM; } /** * ctor for found watch */ - public GetWatchResponse(String id, long version, WatchStatus status, XContentSource source) { + public GetWatchResponse(String id, long version, long seqNo, long primaryTerm, WatchStatus status, XContentSource source) { this.id = id; this.status = status; this.found = true; this.source = source; this.version = version; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; } public String getId() { @@ -71,6 +79,14 @@ public long getVersion() { return version; } + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -80,10 +96,16 @@ public void readFrom(StreamInput in) throws IOException { status = WatchStatus.read(in); source = XContentSource.readFrom(in); version = in.readZLong(); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + seqNo = in.readZLong(); + primaryTerm = in.readVLong(); + } } else { status = null; source = null; version = Versions.NOT_FOUND; + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM; } } @@ -96,6 +118,10 @@ public void writeTo(StreamOutput out) throws IOException { status.writeTo(out); XContentSource.writeTo(source, out); out.writeZLong(version); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeZLong(seqNo); + out.writeVLong(primaryTerm); + } } } @@ -105,6 +131,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("_id", id); if (found) { builder.field("_version", version); + builder.field("_seq_no", seqNo); + builder.field("_primary_term", primaryTerm); builder.field("status", status, params); builder.field("watch", source, params); } @@ -116,7 +144,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; GetWatchResponse that = (GetWatchResponse) o; - return version == that.version && + return version == that.version && seqNo == that.seqNo && primaryTerm == that.primaryTerm && Objects.equals(id, that.id) && Objects.equals(status, that.status) && Objects.equals(source, that.source); @@ -124,7 +152,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(id, status, version); + return Objects.hash(id, status, version, seqNo, primaryTerm); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/Watch.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/Watch.java index 75034752c3cc6..e34b0a15e7133 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/Watch.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/Watch.java @@ -9,6 +9,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.xpack.core.watcher.actions.ActionStatus; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; import org.elasticsearch.xpack.core.watcher.condition.ExecutableCondition; @@ -37,11 +38,12 @@ public class Watch implements ToXContentObject { @Nullable private final Map metadata; private final WatchStatus status; - private transient long version; + private final long sourceSeqNo; + private final long sourcePrimaryTerm; public Watch(String id, Trigger trigger, ExecutableInput input, ExecutableCondition condition, @Nullable ExecutableTransform transform, @Nullable TimeValue throttlePeriod, List actions, @Nullable Map metadata, - WatchStatus status, long version) { + WatchStatus status, long sourceSeqNo, long sourcePrimaryTerm) { this.id = id; this.trigger = trigger; this.input = input; @@ -51,7 +53,8 @@ public Watch(String id, Trigger trigger, ExecutableInput input, ExecutableCondit this.throttlePeriod = throttlePeriod; this.metadata = metadata; this.status = status; - this.version = version; + this.sourceSeqNo = sourceSeqNo; + this.sourcePrimaryTerm = sourcePrimaryTerm; } public String id() { @@ -88,12 +91,20 @@ public WatchStatus status() { return status; } - public long version() { - return version; + /** + * The sequence number of the document that was used to create this watch, {@link SequenceNumbers#UNASSIGNED_SEQ_NO} + * if the watch wasn't read from a document + ***/ + public long getSourceSeqNo() { + return sourceSeqNo; } - public void version(long version) { - this.version = version; + /** + * The primary term of the document that was used to create this watch, {@link SequenceNumbers#UNASSIGNED_PRIMARY_TERM} + * if the watch wasn't read from a document + ***/ + public long getSourcePrimaryTerm() { + return sourcePrimaryTerm; } /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java index dbc3ce76c9517..6f6a1955927d9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/WatchField.java @@ -17,7 +17,6 @@ public final class WatchField { public static final ParseField THROTTLE_PERIOD_HUMAN = new ParseField("throttle_period"); public static final ParseField METADATA = new ParseField("metadata"); public static final ParseField STATUS = new ParseField("status"); - public static final ParseField VERSION = new ParseField("_version"); public static final String ALL_ACTIONS_ID = "_all"; private WatchField() {} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/GetWatchResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/GetWatchResponseTests.java index 1c00b6fd9dc27..13f4b0fb5d3b4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/GetWatchResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/GetWatchResponseTests.java @@ -74,6 +74,7 @@ protected void assertEqualInstances(GetWatchResponse expectedInstance, GetWatchR throw new AssertionError(e); } newInstance = new GetWatchResponse(newInstance.getId(), newInstance.getVersion(), + newInstance.getSeqNo(), newInstance.getPrimaryTerm(), newInstance.getStatus(), new XContentSource(newSource, expectedInstance.getSource().getContentType())); } super.assertEqualInstances(expectedInstance, newInstance); @@ -91,9 +92,11 @@ protected GetWatchResponse createTestInstance() { return new GetWatchResponse(id); } long version = randomLongBetween(0, 10); + long seqNo = randomNonNegativeLong(); + long primaryTerm = randomLongBetween(1, 2000); WatchStatus status = randomWatchStatus(); BytesReference source = simpleWatch(); - return new GetWatchResponse(id, version, status, new XContentSource(source, XContentType.JSON)); + return new GetWatchResponse(id, version, seqNo, primaryTerm, status, new XContentSource(source, XContentType.JSON)); } private static BytesReference simpleWatch() { @@ -170,8 +173,8 @@ public org.elasticsearch.client.watcher.GetWatchResponse doHlrcParseInstance(XCo @Override public GetWatchResponse convertHlrcToInternal(org.elasticsearch.client.watcher.GetWatchResponse instance) { if (instance.isFound()) { - return new GetWatchResponse(instance.getId(), instance.getVersion(), convertHlrcToInternal(instance.getStatus()), - new XContentSource(instance.getSource(), instance.getContentType())); + return new GetWatchResponse(instance.getId(), instance.getVersion(), instance.getSeqNo(), instance.getPrimaryTerm(), + convertHlrcToInternal(instance.getStatus()), new XContentSource(instance.getSource(), instance.getContentType())); } else { return new GetWatchResponse(instance.getId()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponseTests.java index 8ea4a84daed95..975d842b79537 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/protocol/xpack/watcher/PutWatchResponseTests.java @@ -16,9 +16,11 @@ public class PutWatchResponseTests extends @Override protected PutWatchResponse createTestInstance() { String id = randomAlphaOfLength(10); + long seqNo = randomNonNegativeLong(); + long primaryTerm = randomLongBetween(1, 20); long version = randomLongBetween(1, 10); boolean created = randomBoolean(); - return new PutWatchResponse(id, version, created); + return new PutWatchResponse(id, version, seqNo, primaryTerm, created); } @Override @@ -33,7 +35,8 @@ public org.elasticsearch.client.watcher.PutWatchResponse doHlrcParseInstance(XCo @Override public PutWatchResponse convertHlrcToInternal(org.elasticsearch.client.watcher.PutWatchResponse instance) { - return new PutWatchResponse(instance.getId(), instance.getVersion(), instance.isCreated()); + return new PutWatchResponse(instance.getId(), instance.getVersion(), instance.getSeqNo(), instance.getPrimaryTerm(), + instance.isCreated()); } @Override diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.watcher.put_watch.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.watcher.put_watch.json index aabbc8aef7f4c..438f2e4ee7637 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.watcher.put_watch.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.watcher.put_watch.json @@ -20,6 +20,14 @@ "version" : { "type" : "number", "description" : "Explicit version number for concurrency control" + }, + "if_seq_no" : { + "type" : "number", + "description" : "only update the watch if the last operation that has changed the watch has the specified sequence number" + }, + "if_primary_term" : { + "type" : "number", + "description" : "only update the watch if the last operation that has changed the watch has the specified primary term" } } }, diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml index 03bc40c9372ba..f61bc91f2ce35 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/watcher/put_watch/80_put_get_watch_with_passwords.yml @@ -221,6 +221,175 @@ setup: } } +--- +"Test putting a watch with a redacted password with old seq no returns an error": + - skip: + version: " - 6.99.99" + reason: seq no powered concurrency was added in 7.0.0 + + # version 1 + - do: + xpack.watcher.put_watch: + id: "watch_with_seq_no" + body: > + { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "http" : { + "request" : { + "host" : "host.domain", + "port" : 9200, + "path" : "/myservice", + "auth" : { + "basic" : { + "username" : "user", + "password" : "pass" + } + } + } + } + }, + "actions": { + "logging": { + "logging": { + "text": "Log me Amadeus!" + } + } + } + } + + - set: { "_seq_no": seqNo } + - set: { "_primary_term" : primaryTerm } + + # using optimistic concurrency control, this one will loose + # as if two users in the watch UI tried to update the same watch + - do: + catch: conflict + xpack.watcher.put_watch: + id: "watch_with_seq_no" + if_seq_no: 123034 + if_primary_term: $primaryTerm + body: > + { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "http" : { + "request" : { + "host" : "host.domain", + "port" : 9200, + "path" : "/myservice", + "auth" : { + "basic" : { + "username" : "user", + "password" : "::es_redacted::" + } + } + } + } + }, + "actions": { + "logging": { + "logging": { + "text": "Log me Amadeus!" + } + } + } + } + + - do: + catch: conflict + xpack.watcher.put_watch: + id: "watch_with_seq_no" + if_seq_no: $seqNo + if_primary_term: 234242423 + body: > + { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "http" : { + "request" : { + "host" : "host.domain", + "port" : 9200, + "path" : "/myservice", + "auth" : { + "basic" : { + "username" : "user", + "password" : "::es_redacted::" + } + } + } + } + }, + "actions": { + "logging": { + "logging": { + "text": "Log me Amadeus!" + } + } + } + } + + - do: + xpack.watcher.put_watch: + id: "watch_with_seq_no" + if_seq_no: $seqNo + if_primary_term: $primaryTerm + body: > + { + "trigger": { + "schedule" : { "cron" : "0 0 0 1 * ? 2099" } + }, + "input": { + "http" : { + "request" : { + "host" : "host.domain", + "port" : 9200, + "path" : "/myservice", + "auth" : { + "basic" : { + "username" : "new_user", + "password" : "::es_redacted::" + } + } + } + } + }, + "actions": { + "logging": { + "logging": { + "text": "Log me Amadeus!" + } + } + } + } + + - do: + search: + rest_total_hits_as_int: true + index: .watches + body: > + { + "query": { + "term": { + "_id": { + "value": "watch_with_seq_no" + } + } + } + } + + + - match: { hits.total: 1 } + - match: { hits.hits.0._id: "watch_with_seq_no" } + - match: { hits.hits.0._source.input.http.request.auth.basic.username: "new_user" } + - match: { hits.hits.0._source.input.http.request.auth.basic.password: "pass" } + --- "Test putting a watch with a redacted password with current version works": diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java index 1974dd055ddc8..1f2ddb4ed07a6 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java @@ -103,7 +103,8 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { if (isWatchDocument(shardId.getIndexName(), operation.type())) { DateTime now = new DateTime(clock.millis(), UTC); try { - Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON); + Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON, + operation.getIfSeqNo(), operation.getIfPrimaryTerm()); ShardAllocationConfiguration shardAllocationConfiguration = configuration.localShards.get(shardId); if (shardAllocationConfiguration == null) { logger.debug("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}", diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java index 796d06b0b81ea..c96203bd64222 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java @@ -298,7 +298,7 @@ private Collection loadWatches(ClusterState clusterState) { .source(new SearchSourceBuilder() .size(scrollSize) .sort(SortBuilders.fieldSort("_doc")) - .version(true)); + .seqNoAndPrimaryTerm(true)); response = client.search(searchRequest).actionGet(defaultSearchTimeout); if (response.getTotalShards() != response.getSuccessfulShards()) { @@ -341,8 +341,7 @@ private Collection loadWatches(ClusterState clusterState) { } try { - Watch watch = parser.parse(id, true, hit.getSourceRef(), XContentType.JSON); - watch.version(hit.getVersion()); + Watch watch = parser.parse(id, true, hit.getSourceRef(), XContentType.JSON, hit.getSeqNo(), hit.getPrimaryTerm()); if (watch.status().state().isActive()) { watches.add(watch); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 60f9c02eb297c..3950b05034eb4 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -282,7 +282,8 @@ record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "W if (resp.isExists() == false) { throw new ResourceNotFoundException("watch [{}] does not exist", watchId); } - return parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(), XContentType.JSON); + return parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(), XContentType.JSON, + resp.getSeqNo(), resp.getPrimaryTerm()); }); } catch (ResourceNotFoundException e) { String message = "unable to find watch for record [" + ctx.id() + "]"; @@ -353,7 +354,8 @@ public void updateWatchStatus(Watch watch) throws IOException { UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, watch.id()); updateRequest.doc(source); - updateRequest.version(watch.version()); + updateRequest.setIfSeqNo(watch.getSourceSeqNo()); + updateRequest.setIfPrimaryTerm(watch.getSourcePrimaryTerm()); try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { client.update(updateRequest).actionGet(indexDefaultTimeout); } catch (DocumentMissingException e) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestExecuteWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestExecuteWatchAction.java index 4b79c72d417a5..2c85dca35c0e4 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestExecuteWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestExecuteWatchAction.java @@ -50,8 +50,7 @@ public class RestExecuteWatchAction extends WatcherRestHandler implements RestRe WatchField.INPUT.getPreferredName(), WatchField.CONDITION.getPreferredName(), WatchField.ACTIONS.getPreferredName(), WatchField.TRANSFORM.getPreferredName(), WatchField.THROTTLE_PERIOD.getPreferredName(), WatchField.THROTTLE_PERIOD_HUMAN.getPreferredName(), - WatchField.METADATA.getPreferredName(), WatchField.STATUS.getPreferredName(), - WatchField.VERSION.getPreferredName()); + WatchField.METADATA.getPreferredName(), WatchField.STATUS.getPreferredName()); public RestExecuteWatchAction(Settings settings, RestController controller) { super(settings); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatchAction.java index 1ed6b0d795e91..15cb9612445e1 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestGetWatchAction.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestController; @@ -22,7 +21,6 @@ import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchResponse; import org.elasticsearch.xpack.watcher.rest.WatcherRestHandler; - import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestStatus.NOT_FOUND; import static org.elasticsearch.rest.RestStatus.OK; @@ -50,17 +48,9 @@ protected RestChannelConsumer doPrepareRequest(final RestRequest request, Watche return channel -> client.getWatch(getWatchRequest, new RestBuilderListener(channel) { @Override public RestResponse buildResponse(GetWatchResponse response, XContentBuilder builder) throws Exception { - builder.startObject() - .field("found", response.isFound()) - .field("_id", response.getId()); - if (response.isFound()) { - builder.field("_version", response.getVersion()); - ToXContent.MapParams xContentParams = new ToXContent.MapParams(request.params()); - builder.field("status", response.getStatus(), xContentParams); - builder.field("watch", response.getSource(), xContentParams); - } - builder.endObject(); - + builder.startObject(); + response.toXContent(builder, request); + builder.endObject(); RestStatus status = response.isFound() ? OK : NOT_FOUND; return new BytesRestResponse(status, builder); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestPutWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestPutWatchAction.java index 31a5162499641..edbfa3cf1fe28 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestPutWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestPutWatchAction.java @@ -58,15 +58,13 @@ protected RestChannelConsumer doPrepareRequest(final RestRequest request, Watche PutWatchRequest putWatchRequest = new PutWatchRequest(request.param("id"), request.content(), request.getXContentType()); putWatchRequest.setVersion(request.paramAsLong("version", Versions.MATCH_ANY)); + putWatchRequest.setIfSeqNo(request.paramAsLong("if_seq_no", putWatchRequest.getIfSeqNo())); + putWatchRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", putWatchRequest.getIfPrimaryTerm())); putWatchRequest.setActive(request.paramAsBoolean("active", putWatchRequest.isActive())); return channel -> client.putWatch(putWatchRequest, new RestBuilderListener(channel) { @Override public RestResponse buildResponse(PutWatchResponse response, XContentBuilder builder) throws Exception { - builder.startObject() - .field("_id", response.getId()) - .field("_version", response.getVersion()) - .field("created", response.isCreated()) - .endObject(); + response.toXContent(builder, request); RestStatus status = response.isCreated() ? CREATED : OK; return new BytesRestResponse(status, builder); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java index 9db741ecf4773..81788d7af01bf 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java @@ -7,6 +7,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; @@ -16,6 +17,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.routing.Preference; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -49,15 +51,17 @@ public class TransportAckWatchAction extends WatcherTransportActionwrap(getResponse -> { if (getResponse.isExists()) { Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(), now, - XContentType.JSON); - watch.version(getResponse.getVersion()); + XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm()); watch.status().version(getResponse.getVersion()); listener.onResponse(new ActivateWatchResponse(watch.status())); } else { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java index eca30cfca8c56..6a25808c40e36 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -86,9 +87,8 @@ protected void doExecute(ExecuteWatchRequest request, ActionListenerwrap(response -> { if (response.isExists()) { - Watch watch = - watchParser.parse(request.getId(), true, response.getSourceAsBytesRef(), request.getXContentType()); - watch.version(response.getVersion()); + Watch watch = watchParser.parse(request.getId(), true, response.getSourceAsBytesRef(), + request.getXContentType(), response.getSeqNo(), response.getPrimaryTerm()); watch.status().version(response.getVersion()); executeWatch(request, listener, watch, true); } else { @@ -99,7 +99,7 @@ protected void doExecute(ExecuteWatchRequest request, ActionListener listener) { try { DateTime now = new DateTime(clock.millis(), UTC); - boolean isUpdate = request.getVersion() > 0; - Watch watch = parser.parseWithSecrets(request.getId(), false, request.getSource(), now, request.xContentType(), isUpdate); + boolean isUpdate = request.getVersion() > 0 || request.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO; + Watch watch = parser.parseWithSecrets(request.getId(), false, request.getSource(), now, request.xContentType(), + isUpdate, request.getIfSeqNo(), request.getIfPrimaryTerm()); watch.setState(request.isActive(), now); // ensure we only filter for the allowed headers @@ -92,14 +94,20 @@ protected void doExecute(PutWatchRequest request, ActionListenerwrap(response -> { boolean created = response.getResult() == DocWriteResponse.Result.CREATED; - listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created)); + listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), + response.getSeqNo(), response.getPrimaryTerm(), created)); }, listener::onFailure), client::update); } else { @@ -109,7 +117,8 @@ protected void doExecute(PutWatchRequest request, ActionListenerwrap(response -> { boolean created = response.getResult() == DocWriteResponse.Result.CREATED; - listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created)); + listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), + response.getSeqNo(), response.getPrimaryTerm(), created)); }, listener::onFailure), client::index); } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchParser.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchParser.java index ab316c3dd1001..55f65943a1381 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchParser.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchParser.java @@ -10,7 +10,6 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -74,13 +73,15 @@ public WatchParser(TriggerService triggerService, ActionRegistry actionRegistry, this.defaultActions = Collections.emptyList(); } - public Watch parse(String name, boolean includeStatus, BytesReference source, XContentType xContentType) throws IOException { - return parse(name, includeStatus, false, source, new DateTime(clock.millis(), UTC), xContentType, false); + public Watch parse(String name, boolean includeStatus, BytesReference source, XContentType xContentType, + long sourceSeqNo, long sourcePrimaryTerm) throws IOException { + return parse(name, includeStatus, false, source, new DateTime(clock.millis(), UTC), xContentType, false, + sourceSeqNo, sourcePrimaryTerm); } public Watch parse(String name, boolean includeStatus, BytesReference source, DateTime now, - XContentType xContentType) throws IOException { - return parse(name, includeStatus, false, source, now, xContentType, false); + XContentType xContentType, long sourceSeqNo, long sourcePrimaryTerm) throws IOException { + return parse(name, includeStatus, false, source, now, xContentType, false, sourceSeqNo, sourcePrimaryTerm); } /** @@ -95,17 +96,20 @@ public Watch parse(String name, boolean includeStatus, BytesReference source, Da * */ public Watch parseWithSecrets(String id, boolean includeStatus, BytesReference source, DateTime now, - XContentType xContentType, boolean allowRedactedPasswords) throws IOException { - return parse(id, includeStatus, true, source, now, xContentType, allowRedactedPasswords); + XContentType xContentType, boolean allowRedactedPasswords, long sourceSeqNo, long sourcePrimaryTerm + ) throws IOException { + return parse(id, includeStatus, true, source, now, xContentType, allowRedactedPasswords, sourceSeqNo, sourcePrimaryTerm); } + public Watch parseWithSecrets(String id, boolean includeStatus, BytesReference source, DateTime now, - XContentType xContentType) throws IOException { - return parse(id, includeStatus, true, source, now, xContentType, false); + XContentType xContentType, long sourceSeqNo, long sourcePrimaryTerm) throws IOException { + return parse(id, includeStatus, true, source, now, xContentType, false, sourceSeqNo, sourcePrimaryTerm); } private Watch parse(String id, boolean includeStatus, boolean withSecrets, BytesReference source, DateTime now, - XContentType xContentType, boolean allowRedactedPasswords) throws IOException { + XContentType xContentType, boolean allowRedactedPasswords, long sourceSeqNo, long sourcePrimaryTerm) + throws IOException { if (logger.isTraceEnabled()) { logger.trace("parsing watch [{}] ", source.utf8ToString()); } @@ -115,13 +119,14 @@ private Watch parse(String id, boolean includeStatus, boolean withSecrets, Bytes LoggingDeprecationHandler.INSTANCE, stream), now, withSecrets ? cryptoService : null, allowRedactedPasswords)) { parser.nextToken(); - return parse(id, includeStatus, parser); + return parse(id, includeStatus, parser, sourceSeqNo, sourcePrimaryTerm); } catch (IOException ioe) { throw ioException("could not parse watch [{}]", ioe, id); } } - public Watch parse(String id, boolean includeStatus, WatcherXContentParser parser) throws IOException { + public Watch parse(String id, boolean includeStatus, WatcherXContentParser parser, long sourceSeqNo, long sourcePrimaryTerm) + throws IOException { Trigger trigger = null; ExecutableInput input = defaultInput; ExecutableCondition condition = defaultCondition; @@ -130,7 +135,6 @@ public Watch parse(String id, boolean includeStatus, WatcherXContentParser parse TimeValue throttlePeriod = null; Map metatdata = null; WatchStatus status = null; - long version = Versions.MATCH_ANY; String currentFieldName = null; XContentParser.Token token; @@ -163,8 +167,6 @@ public Watch parse(String id, boolean includeStatus, WatcherXContentParser parse actions = actionRegistry.parseActions(id, parser); } else if (WatchField.METADATA.match(currentFieldName, parser.getDeprecationHandler())) { metatdata = parser.map(); - } else if (WatchField.VERSION.match(currentFieldName, parser.getDeprecationHandler())) { - version = parser.longValue(); } else if (WatchField.STATUS.match(currentFieldName, parser.getDeprecationHandler())) { if (includeStatus) { status = WatchStatus.parse(id, parser); @@ -198,6 +200,7 @@ public Watch parse(String id, boolean includeStatus, WatcherXContentParser parse } - return new Watch(id, trigger, input, condition, transform, throttlePeriod, actions, metatdata, status, version); + return new Watch( + id, trigger, input, condition, transform, throttlePeriod, actions, metatdata, status, sourceSeqNo, sourcePrimaryTerm); } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java index f351ed2e154ed..7a37f71e22f28 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java @@ -63,6 +63,7 @@ import static org.hamcrest.core.Is.is; import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -134,13 +135,13 @@ public void testPreIndex() throws Exception { boolean watchActive = randomBoolean(); boolean isNewWatch = randomBoolean(); Watch watch = mockWatch("_id", watchActive, isNewWatch); - when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject())).thenReturn(watch); + when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch); Engine.Index returnedOperation = listener.preIndex(shardId, operation); assertThat(returnedOperation, is(operation)); DateTime now = new DateTime(clock.millis(), UTC); - verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject()); + verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject(), anyLong(), anyLong()); if (isNewWatch) { if (watchActive) { @@ -162,7 +163,7 @@ public void testPreIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Excep when(shardId.getIndexName()).thenReturn(Watch.INDEX); when(operation.type()).thenReturn(Watch.DOC_TYPE); - when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject())).thenReturn(watch); + when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch); for (int idx = 0; idx < totalShardCount; idx++) { final Map localShards = new HashMap<>(); @@ -207,7 +208,7 @@ public void testPreIndexCheckParsingException() throws Exception { when(operation.id()).thenReturn(id); when(operation.source()).thenReturn(BytesArray.EMPTY); when(shardId.getIndexName()).thenReturn(Watch.INDEX); - when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject())) + when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())) .thenThrow(new IOException("self thrown")); ElasticsearchParseException exc = expectThrows(ElasticsearchParseException.class, diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index ab9fff7dbe587..96186905c26ad 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -70,6 +70,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -194,7 +195,7 @@ void stopExecutor() { Watch watch = mock(Watch.class); when(watchStatus.state()).thenReturn(state); when(watch.status()).thenReturn(watchStatus); - when(parser.parse(eq(id), eq(true), any(), eq(XContentType.JSON))).thenReturn(watch); + when(parser.parse(eq(id), eq(true), any(), eq(XContentType.JSON), anyLong(), anyLong())).thenReturn(watch); } SearchHits searchHits = new SearchHits(hits, new TotalHits(count, TotalHits.Relation.EQUAL_TO), 1.0f); SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, false, null, 1); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 287b3976dea3a..5f2b807f10757 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -101,6 +101,7 @@ import static org.joda.time.DateTime.now; import static org.joda.time.DateTimeZone.UTC; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -166,7 +167,7 @@ public void testExecute() throws Exception { DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); - when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); Condition.Result conditionResult = InternalAlwaysCondition.RESULT_INSTANCE; ExecutableCondition condition = mock(ExecutableCondition.class); @@ -270,7 +271,7 @@ public void testExecuteFailedInput() throws Exception { DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); - when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); input = mock(ExecutableInput.class); Input.Result inputResult = mock(Input.Result.class); @@ -339,7 +340,7 @@ public void testExecuteFailedCondition() throws Exception { DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); - when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); ExecutableCondition condition = mock(ExecutableCondition.class); Condition.Result conditionResult = mock(Condition.Result.class); @@ -404,7 +405,7 @@ public void testExecuteFailedWatchTransform() throws Exception { DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), now, event, timeValueSeconds(5)); - when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); Condition.Result conditionResult = InternalAlwaysCondition.RESULT_INSTANCE; ExecutableCondition condition = mock(ExecutableCondition.class); @@ -464,7 +465,7 @@ public void testExecuteFailedActionTransform() throws Exception { GetResponse getResponse = mock(GetResponse.class); when(getResponse.isExists()).thenReturn(true); mockGetWatchResponse(client, "_id", getResponse); - when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); DateTime now = new DateTime(clock.millis()); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); @@ -838,7 +839,7 @@ public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exc when(getResponse.isExists()).thenReturn(true); when(getResponse.getId()).thenReturn("foo"); mockGetWatchResponse(client, "foo", getResponse); - when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); // execute needs to fail as well as storing the history doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); @@ -971,7 +972,7 @@ public void testWatchInactive() throws Exception { GetResponse getResponse = mock(GetResponse.class); when(getResponse.isExists()).thenReturn(true); mockGetWatchResponse(client, "_id", getResponse); - when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any())).thenReturn(watch); + when(parser.parseWithSecrets(eq(watch.id()), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); WatchRecord.MessageWatchRecord record = mock(WatchRecord.MessageWatchRecord.class); when(record.state()).thenReturn(ExecutionState.EXECUTION_NOT_NEEDED); @@ -1017,7 +1018,7 @@ public void testQueuedWatches() throws Exception { public void testUpdateWatchStatusDoesNotUpdateState() throws Exception { WatchStatus status = new WatchStatus(DateTime.now(UTC), Collections.emptyMap()); Watch watch = new Watch("_id", new ManualTrigger(), new ExecutableNoneInput(), InternalAlwaysCondition.INSTANCE, null, null, - Collections.emptyList(), null, status, 1L); + Collections.emptyList(), null, status, 1L, 1L); final AtomicBoolean assertionsTriggered = new AtomicBoolean(false); doAnswer(invocation -> { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java index e22d3d6f0837c..f2a9941ec35ed 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/WatcherTestUtils.java @@ -55,7 +55,6 @@ import org.joda.time.DateTime; import javax.mail.internet.AddressException; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -134,7 +133,7 @@ public static WatchExecutionContext createWatchExecutionContext() throws Excepti null, new ArrayList<>(), null, - new WatchStatus(new DateTime(0, UTC), emptyMap()), 1L); + new WatchStatus(new DateTime(0, UTC), emptyMap()), 1L, 1L); TriggeredExecutionContext context = new TriggeredExecutionContext(watch.id(), new DateTime(0, UTC), new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)), @@ -181,7 +180,7 @@ public static Watch createTestWatch(String watchName, Client client, HttpClient new TimeValue(0), actions, Collections.singletonMap("foo", "bar"), - new WatchStatus(now, statuses), 1L); + new WatchStatus(now, statuses), 1L, 1L); } public static SearchType getRandomSupportedSearchType() { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java index d1b8963950dd8..183e80bf2528d 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/bench/ScheduleEngineTriggerBenchmark.java @@ -57,7 +57,7 @@ public static void main(String[] args) throws Exception { List watches = new ArrayList<>(numWatches); for (int i = 0; i < numWatches; i++) { watches.add(new Watch("job_" + i, new ScheduleTrigger(interval(interval + "s")), new ExecutableNoneInput(), - InternalAlwaysCondition.INSTANCE, null, null, Collections.emptyList(), null, null, 1L)); + InternalAlwaysCondition.INSTANCE, null, null, Collections.emptyList(), null, null, 1L, 1L)); } ScheduleRegistry scheduleRegistry = new ScheduleRegistry(emptySet()); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java index cb8fc9edb6f14..c1a8623b44719 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/WatchAckTests.java @@ -110,7 +110,8 @@ public void testAckSingleAction() throws Exception { GetWatchResponse getWatchResponse = watcherClient().prepareGetWatch("_id").get(); assertThat(getWatchResponse.isFound(), is(true)); - Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes(), XContentType.JSON); + Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes(), + XContentType.JSON, getWatchResponse.getSeqNo(), getWatchResponse.getPrimaryTerm()); assertThat(parsedWatch.status().actionStatus("_a1").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION)); assertThat(parsedWatch.status().actionStatus("_a2").ackStatus().state(), @@ -178,7 +179,8 @@ public void testAckAllActions() throws Exception { GetWatchResponse getWatchResponse = watcherClient().prepareGetWatch("_id").get(); assertThat(getWatchResponse.isFound(), is(true)); - Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, getWatchResponse.getSource().getBytes(), XContentType.JSON); + Watch parsedWatch = watchParser().parse(getWatchResponse.getId(), true, + getWatchResponse.getSource().getBytes(), XContentType.JSON, getWatchResponse.getSeqNo(), getWatchResponse.getPrimaryTerm()); assertThat(parsedWatch.status().actionStatus("_a1").ackStatus().state(), is(ActionStatus.AckStatus.State.AWAITS_SUCCESSFUL_EXECUTION)); assertThat(parsedWatch.status().actionStatus("_a2").ackStatus().state(), @@ -220,7 +222,8 @@ public void testAckWithRestart() throws Exception { refresh(); GetResponse getResponse = client().get(new GetRequest(Watch.INDEX, Watch.DOC_TYPE, "_name")).actionGet(); - Watch indexedWatch = watchParser().parse("_name", true, getResponse.getSourceAsBytesRef(), XContentType.JSON); + Watch indexedWatch = watchParser().parse("_name", true, getResponse.getSourceAsBytesRef(), XContentType.JSON, + getResponse.getSeqNo(), getResponse.getPrimaryTerm()); assertThat(watchResponse.getStatus().actionStatus("_id").ackStatus().state(), equalTo(indexedWatch.status().actionStatus("_id").ackStatus().state())); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java index 57c189d328e8a..956cce95f84c2 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchActionTests.java @@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; @@ -61,7 +62,7 @@ public void setupAction() { client = mock(Client.class); when(client.threadPool()).thenReturn(threadPool); action = new TransportAckWatchAction(transportService, new ActionFilters(Collections.emptySet()), - Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, client); + Clock.systemUTC(), new XPackLicenseState(Settings.EMPTY), watchParser, client, createClusterService(threadPool)); } public void testWatchNotFound() { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java index 5ae7c19f625c5..411bee5e2f9d9 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/actions/put/TransportPutWatchActionTests.java @@ -36,6 +36,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -57,7 +58,8 @@ public void setupAction() throws Exception { TransportService transportService = mock(TransportService.class); WatchParser parser = mock(WatchParser.class); - when(parser.parseWithSecrets(eq("_id"), eq(false), anyObject(), anyObject(), anyObject(), anyBoolean())).thenReturn(watch); + when(parser.parseWithSecrets(eq("_id"), eq(false), anyObject(), anyObject(), anyObject(), anyBoolean(), anyLong(), anyLong())) + .thenReturn(watch); Client client = mock(Client.class); when(client.threadPool()).thenReturn(threadPool); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java index 898ae8ac9aa55..0f6ca200cb4ab 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/trigger/schedule/engine/TickerScheduleEngineTests.java @@ -5,8 +5,8 @@ */ package org.elasticsearch.xpack.watcher.trigger.schedule.engine; -import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; @@ -273,6 +273,6 @@ public void testAddOnlyWithNewSchedule() { private Watch createWatch(String name, Schedule schedule) { return new Watch(name, new ScheduleTrigger(schedule), new ExecutableNoneInput(), InternalAlwaysCondition.INSTANCE, null, null, - Collections.emptyList(), null, null, Versions.MATCH_ANY); + Collections.emptyList(), null, null, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java index cb32c3eebec09..95a63c97f4dfe 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchTests.java @@ -208,13 +208,16 @@ public void testParserSelfGenerated() throws Exception { TimeValue throttlePeriod = randomBoolean() ? null : TimeValue.timeValueSeconds(randomIntBetween(5, 10000)); - Watch watch = new Watch("_name", trigger, input, condition, transform, throttlePeriod, actions, metadata, watchStatus, 1L); + final long sourceSeqNo = randomNonNegativeLong(); + final long sourcePrimaryTerm = randomLongBetween(1, 200); + Watch watch = new Watch("_name", trigger, input, condition, transform, throttlePeriod, actions, metadata, watchStatus, + sourceSeqNo, sourcePrimaryTerm); BytesReference bytes = BytesReference.bytes(jsonBuilder().value(watch)); logger.info("{}", bytes.utf8ToString()); WatchParser watchParser = new WatchParser(triggerService, actionRegistry, inputRegistry, null, clock); - Watch parsedWatch = watchParser.parse("_name", includeStatus, bytes, XContentType.JSON); + Watch parsedWatch = watchParser.parse("_name", includeStatus, bytes, XContentType.JSON, sourceSeqNo, sourcePrimaryTerm); if (includeStatus) { assertThat(parsedWatch.status(), equalTo(watchStatus)); @@ -227,6 +230,8 @@ public void testParserSelfGenerated() throws Exception { } assertThat(parsedWatch.metadata(), equalTo(metadata)); assertThat(parsedWatch.actions(), equalTo(actions)); + assertThat(parsedWatch.getSourceSeqNo(), equalTo(sourceSeqNo)); + assertThat(parsedWatch.getSourcePrimaryTerm(), equalTo(sourcePrimaryTerm)); } public void testThatBothStatusFieldsCanBeRead() throws Exception { @@ -256,7 +261,7 @@ public Trigger parseTrigger(String jobName, XContentParser parser) throws IOExce WatchParser watchParser = new WatchParser(triggerService, actionRegistry, inputRegistry, null, clock); XContentBuilder builder = jsonBuilder().startObject().startObject("trigger").endObject().field("status", watchStatus).endObject(); - Watch watch = watchParser.parse("foo", true, BytesReference.bytes(builder), XContentType.JSON); + Watch watch = watchParser.parse("foo", true, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L); assertThat(watch.status().state().getTimestamp().getMillis(), is(clock.millis())); for (ActionWrapper action : actions) { assertThat(watch.status().actionStatus(action.id()), is(actionsStatuses.get(action.id()))); @@ -284,7 +289,7 @@ public void testParserBadActions() throws Exception { .endObject(); WatchParser watchParser = new WatchParser(triggerService, actionRegistry, inputRegistry, null, clock); try { - watchParser.parse("failure", false, BytesReference.bytes(jsonBuilder), XContentType.JSON); + watchParser.parse("failure", false, BytesReference.bytes(jsonBuilder), XContentType.JSON, 1L, 1L); fail("This watch should fail to parse as actions is an array"); } catch (ElasticsearchParseException pe) { assertThat(pe.getMessage().contains("could not parse actions for watch [failure]"), is(true)); @@ -309,7 +314,7 @@ public void testParserDefaults() throws Exception { .endObject(); builder.endObject(); WatchParser watchParser = new WatchParser(triggerService, actionRegistry, inputRegistry, null, Clock.systemUTC()); - Watch watch = watchParser.parse("failure", false, BytesReference.bytes(builder), XContentType.JSON); + Watch watch = watchParser.parse("failure", false, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L); assertThat(watch, notNullValue()); assertThat(watch.trigger(), instanceOf(ScheduleTrigger.class)); assertThat(watch.input(), instanceOf(ExecutableNoneInput.class)); @@ -375,7 +380,7 @@ public void testParseWatch_verifyScriptLangDefault() throws Exception { builder.endObject(); // parse in default mode: - Watch watch = watchParser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON); + Watch watch = watchParser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L); assertThat(((ScriptCondition) watch.condition()).getScript().getLang(), equalTo(Script.DEFAULT_SCRIPT_LANG)); WatcherSearchTemplateRequest request = ((SearchInput) watch.input().input()).getRequest(); SearchRequest searchRequest = searchTemplateService.toSearchRequest(request); @@ -394,7 +399,7 @@ public void testParseWatchWithoutInput() throws Exception { builder.endObject(); WatchParser parser = createWatchparser(); - Watch watch = parser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON); + Watch watch = parser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L); assertThat(watch, is(notNullValue())); assertThat(watch.input().type(), is(NoneInput.TYPE)); } @@ -410,7 +415,7 @@ public void testParseWatchWithoutAction() throws Exception { builder.endObject(); WatchParser parser = createWatchparser(); - Watch watch = parser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON); + Watch watch = parser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L); assertThat(watch, is(notNullValue())); assertThat(watch.actions(), hasSize(0)); } @@ -429,7 +434,7 @@ public void testParseWatchWithoutTriggerDoesNotWork() throws Exception { WatchParser parser = createWatchparser(); ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, - () -> parser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON)); + () -> parser.parse("_id", false, BytesReference.bytes(builder), XContentType.JSON, 1L, 1L)); assertThat(e.getMessage(), is("could not parse watch [_id]. missing required field [trigger]")); } }