Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimistic concurrency control for updating ingest pipelines #78551

Merged
merged 12 commits into from
Oct 15, 2021
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
---
"Test pipeline versioned updates":
- skip:
version: " - 7.99.99"
reason: "re-enable in 7.16+ when backported"

- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

# conditionally update based on missing version
- do:
ingest.put_pipeline:
id: "my_pipeline"
if_version: "null"
sethmlarson marked this conversation as resolved.
Show resolved Hide resolved
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 1 }

# required version does not match specified version
- do:
catch: /.*version conflict, required version \[99\] for pipeline \[my_pipeline\] but current version is \[1\].*/
ingest.put_pipeline:
id: "my_pipeline"
if_version: 99
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}

# may not update to same version
- do:
catch: /.*cannot update pipeline \[my_pipeline\] with the same version \[1\].*/
ingest.put_pipeline:
id: "my_pipeline"
if_version: 1
body: >
{
"version": 1,
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}

# cannot conditionally update non-existent pipeline
- do:
catch: /.*version conflict, required version \[1\] for pipeline \[my_pipeline2\] but no pipeline was found.*/
ingest.put_pipeline:
id: "my_pipeline2"
if_version: 1
body: >
{
"version": 1,
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}

# conditionally update to specified version
- do:
ingest.put_pipeline:
id: "my_pipeline"
if_version: 1
body: >
{
"version": 99,
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 99 }

# conditionally update without specified version
- do:
ingest.put_pipeline:
id: "my_pipeline"
if_version: 99
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 100 }

- do:
ingest.delete_pipeline:
id: "my_pipeline"
- match: { acknowledged: true }
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
]
},
"params":{
"if_version":{
"type":"string",
sethmlarson marked this conversation as resolved.
Show resolved Hide resolved
"description":"Required version for optimistic concurrency control for pipeline updates"
},
"master_timeout":{
"type":"time",
"description":"Explicit operation timeout for connection to master node"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -23,27 +24,43 @@

public class PutPipelineRequest extends AcknowledgedRequest<PutPipelineRequest> implements ToXContentObject {

private String id;
private BytesReference source;
private XContentType xContentType;
private final String id;
private final BytesReference source;
private final XContentType xContentType;
private boolean versionedUpdate;
private Integer version;
danhermann marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create a new pipeline request with the id and source along with the content type of the source
*/
public PutPipelineRequest(String id, BytesReference source, XContentType xContentType) {
public PutPipelineRequest(String id, BytesReference source, XContentType xContentType, boolean versionedUpdate, Integer version) {
this.id = Objects.requireNonNull(id);
this.source = Objects.requireNonNull(source);
this.xContentType = Objects.requireNonNull(xContentType);
this.versionedUpdate = versionedUpdate;
this.version = version;
}

public PutPipelineRequest(String id, BytesReference source, XContentType xContentType) {
this(id, source, xContentType, false, null);
}

public PutPipelineRequest(StreamInput in) throws IOException {
super(in);
id = in.readString();
source = in.readBytesReference();
xContentType = in.readEnum(XContentType.class);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
versionedUpdate = in.readBoolean();
version = in.readOptionalInt();
} else {
versionedUpdate = false;
version = -1;
}
}

PutPipelineRequest() {
this(null, null, null, false, null);
}

@Override
Expand All @@ -63,12 +80,32 @@ public XContentType getXContentType() {
return xContentType;
}

public boolean isVersionedUpdate() {
return versionedUpdate;
}

public Integer getVersion() {
return version;
}

public void setVersion(Integer version) {
this.version = version;
}

public void setVersionedUpdate(boolean versionedUpdate) {
this.versionedUpdate = versionedUpdate;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeBytesReference(source);
XContentHelper.writeTo(out, xContentType);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(versionedUpdate);
out.writeOptionalInt(version);
}
}

@Override
Expand Down
58 changes: 56 additions & 2 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
Expand All @@ -52,12 +54,14 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -341,7 +345,9 @@ public void putPipeline(

Map<String, Object> pipelineConfig = null;
IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE);
if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) {
if (request.isVersionedUpdate() == false &&
currentIngestMetadata != null &&
currentIngestMetadata.getPipelines().containsKey(request.getId())) {
pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) {
Expand Down Expand Up @@ -432,16 +438,64 @@ private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(Compound
return processorMetrics;
}

// visible for testing
static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
IngestMetadata currentIngestMetadata = currentState.metadata().custom(IngestMetadata.TYPE);

BytesReference pipelineSource = request.getSource();
if (request.isVersionedUpdate()) {
var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata.getPipelines().get(request.getId()) : null;
if (currentPipeline == null) {
throw new IllegalStateException(String.format(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be more like an IllegalArgumentException, since the required parameter is illegal if there is no pre-existing pipeline?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we do have an exception for something like this, a VersionConflictEngineException, which will return back to the user RestStatus.CONFLICT (a 409), what do you think about using that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be great, especially if it resolves the problem with the HTTP error code, but VersionConflictingEngineException requires a shard ID and some other index-related values that don't apply to ingest pipelines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dakrone, I changed the instances of IllegalStateException to IllegalArgumentException in 19d60f5 which I agree better characterizes the problem and is also reported as an HTTP 400 error to the client. Let me know if you think there's a better approach for that.

Locale.ROOT,
"version conflict, required version [%s] for pipeline [%s] but no pipeline was found",
request.getVersion(),
request.getId()
danhermann marked this conversation as resolved.
Show resolved Hide resolved
));
}

final Integer currentVersion = currentPipeline.getVersion();
if (Objects.equals(request.getVersion(), currentVersion) == false) {
throw new IllegalStateException(String.format(
Locale.ROOT,
"version conflict, required version [%s] for pipeline [%s] but current version is [%s]",
request.getVersion(),
request.getId(),
currentVersion
));
}

var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
final Integer specifiedVersion = (Integer) pipelineConfig.get("version");
if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) {
throw new IllegalStateException(String.format(
Locale.ROOT,
"cannot update pipeline [%s] with the same version [%s]",
request.getId(),
request.getVersion()
));
}

// if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1]
if (specifiedVersion == null) {
pipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1);
try {
var builder = XContentBuilder.builder(request.getXContentType().xContent()).map(pipelineConfig);
pipelineSource = BytesReference.bytes(builder);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}

Map<String, PipelineConfiguration> pipelines;
if (currentIngestMetadata != null) {
pipelines = new HashMap<>(currentIngestMetadata.getPipelines());
} else {
pipelines = new HashMap<>();
}

pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType()));
pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType()));
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metadata(Metadata.builder(currentState.getMetadata())
.putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,22 @@ BytesReference getConfig() {
return config;
}

public Integer getVersion() {
var configMap = getConfigAsMap();
if (configMap.containsKey("version")) {
Object o = configMap.get("version");
if (o == null) {
return null;
} else if (o instanceof Number) {
return ((Number) o).intValue();
} else {
throw new IllegalStateException("unexpected version type [" + o.getClass().getName() + "]");
}
} else {
return null;
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Loading