Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimistic concurrency control for updating ingest pipelines #78551

Merged
merged 12 commits into from
Oct 15, 2021
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
---
"Test pipeline versioned updates":
- skip:
version: " - 7.99.99"
reason: "re-enable in 7.16+ when backported"

- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

# conditional update fails because of missing version
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline"
if_version: 1
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}

- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"version": 1,
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 1 }

# required version does not match specified version
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline"
if_version: 99
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}

# may not update to same version
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline"
if_version: 1
body: >
{
"version": 1,
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}

# cannot conditionally update non-existent pipeline
- do:
catch: bad_request
ingest.put_pipeline:
id: "my_pipeline2"
if_version: 1
body: >
{
"version": 1,
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}

# conditionally update to specified version
- do:
ingest.put_pipeline:
id: "my_pipeline"
if_version: 1
body: >
{
"version": 99,
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 99 }

# conditionally update without specified version
- do:
ingest.put_pipeline:
id: "my_pipeline"
if_version: 99
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value": "_value"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.get_pipeline:
id: "my_pipeline"
- match: { my_pipeline.version: 100 }

- do:
ingest.delete_pipeline:
id: "my_pipeline"
- match: { acknowledged: true }
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
]
},
"params":{
"if_version":{
"type":"int",
"description":"Required version for optimistic concurrency control for pipeline updates"
},
"master_timeout":{
"type":"time",
"description":"Explicit operation timeout for connection to master node"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -23,27 +24,39 @@

public class PutPipelineRequest extends AcknowledgedRequest<PutPipelineRequest> implements ToXContentObject {

private String id;
private BytesReference source;
private XContentType xContentType;
private final String id;
private final BytesReference source;
private final XContentType xContentType;
private final Integer version;

/**
* Create a new pipeline request with the id and source along with the content type of the source
*/
public PutPipelineRequest(String id, BytesReference source, XContentType xContentType) {
public PutPipelineRequest(String id, BytesReference source, XContentType xContentType, Integer version) {
this.id = Objects.requireNonNull(id);
this.source = Objects.requireNonNull(source);
this.xContentType = Objects.requireNonNull(xContentType);
this.version = version;
}

public PutPipelineRequest(String id, BytesReference source, XContentType xContentType) {
this(id, source, xContentType, null);
}

public PutPipelineRequest(StreamInput in) throws IOException {
super(in);
id = in.readString();
source = in.readBytesReference();
xContentType = in.readEnum(XContentType.class);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
version = in.readOptionalInt();
} else {
version = null;
}
}

PutPipelineRequest() {
this(null, null, null, null);
}

@Override
Expand All @@ -63,12 +76,19 @@ public XContentType getXContentType() {
return xContentType;
}

public Integer getVersion() {
return version;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeBytesReference(source);
XContentHelper.writeTo(out, xContentType);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalInt(version);
}
}

@Override
Expand Down
57 changes: 55 additions & 2 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -54,7 +55,9 @@
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -395,7 +398,9 @@ public void putPipeline(

Map<String, Object> pipelineConfig = null;
IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE);
if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) {
if (request.getVersion() == null &&
currentIngestMetadata != null &&
currentIngestMetadata.getPipelines().containsKey(request.getId())) {
pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) {
Expand Down Expand Up @@ -486,16 +491,64 @@ private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(Compound
return processorMetrics;
}

// visible for testing
static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
IngestMetadata currentIngestMetadata = currentState.metadata().custom(IngestMetadata.TYPE);

BytesReference pipelineSource = request.getSource();
if (request.getVersion() != null) {
var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata.getPipelines().get(request.getId()) : null;
if (currentPipeline == null) {
throw new IllegalArgumentException(String.format(
Locale.ROOT,
"version conflict, required version [%s] for pipeline [%s] but no pipeline was found",
request.getVersion(),
request.getId()
));
}

final Integer currentVersion = currentPipeline.getVersion();
if (Objects.equals(request.getVersion(), currentVersion) == false) {
throw new IllegalArgumentException(String.format(
Locale.ROOT,
"version conflict, required version [%s] for pipeline [%s] but current version is [%s]",
request.getVersion(),
request.getId(),
currentVersion
));
}

var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
final Integer specifiedVersion = (Integer) pipelineConfig.get("version");
if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) {
throw new IllegalArgumentException(String.format(
Locale.ROOT,
"cannot update pipeline [%s] with the same version [%s]",
request.getId(),
request.getVersion()
));
}
Comment on lines +521 to +530
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we pull out a version from the pipeline configuration when we know that one has been specified in the request already? (the if (request.getVersion() != null) check above)

Seems like if the request specifies the version, it should be part of the request validation that request.getVersion() matches the pipelineConfig.get("version")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, it needs to check that request.getVersion() does not match pipelineConfig.get("version"). I like that check where it is because it defers a heavier check due to the XContentHelper.convertToMap call to the latest possible point, keeps all the OCC-related checks in the same place, and keeps no-op pipeline updates as very lightweight.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And just in case it's not clear, one of those versions is the value of the if_version parameter and the other is the value of the version attribute in the pipeline definition. The latter doesn't have to be specified, but if specified, must be different from the former.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh okay, I see my confusion now, thanks for clarifying this!


// if no version specified in the pipeline definition, inject a version of [request.getVersion() + 1]
if (specifiedVersion == null) {
pipelineConfig.put("version", request.getVersion() == null ? 1 : request.getVersion() + 1);
try {
var builder = XContentBuilder.builder(request.getXContentType().xContent()).map(pipelineConfig);
pipelineSource = BytesReference.bytes(builder);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}

Map<String, PipelineConfiguration> pipelines;
if (currentIngestMetadata != null) {
pipelines = new HashMap<>(currentIngestMetadata.getPipelines());
} else {
pipelines = new HashMap<>();
}

pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType()));
pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType()));
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metadata(Metadata.builder(currentState.getMetadata())
.putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,22 @@ BytesReference getConfig() {
return config;
}

public Integer getVersion() {
var configMap = getConfigAsMap();
if (configMap.containsKey("version")) {
Object o = configMap.get("version");
if (o == null) {
return null;
} else if (o instanceof Number) {
return ((Number) o).intValue();
} else {
throw new IllegalStateException("unexpected version type [" + o.getClass().getName() + "]");
}
} else {
return null;
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Loading