-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimistic concurrency control for updating ingest pipelines #78551
Optimistic concurrency control for updating ingest pipelines #78551
Conversation
Pinging @elastic/es-data-management (Team:Data Management) |
server/src/main/java/org/elasticsearch/ingest/IngestService.java
Outdated
Show resolved
Hide resolved
@elasticmachine update branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just leaving a drive-by comment, but why allow setting the version to null
explicitly? If we enforced it being a number in the request then we don't need the version integer and the versionedUpdate boolean flag, we can just use the presence of the number as an indication?
if (request.isVersionedUpdate()) { | ||
var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata.getPipelines().get(request.getId()) : null; | ||
if (currentPipeline == null) { | ||
throw new IllegalStateException(String.format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be more like an IllegalArgumentException
, since the required parameter is illegal if there is no pre-existing pipeline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we do have an exception for something like this, a VersionConflictEngineException
, which will return back to the user RestStatus.CONFLICT
(a 409), what do you think about using that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be great, especially if it resolves the problem with the HTTP error code, but VersionConflictingEngineException
requires a shard ID and some other index-related values that don't apply to ingest pipelines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't do that because we currently allow the version to be set to |
Pinging @elastic/clients-team (Team:Clients) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of comments:
rest-api-spec/src/main/resources/rest-api-spec/api/ingest.put_pipeline.json
Outdated
Show resolved
Hide resolved
.../ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/290_versioned_update.yml
Outdated
Show resolved
Hide resolved
Sorry for the useless comments, didn't get that |
No worries. It is an unusual situation and we're still discussing the best way of handling it. If there's a particular way or particular considerations that would be best for clients, that would be helpful information, too. |
@elasticmachine update branch |
@elasticmachine run elasticsearch-ci/part-2 |
@sethmlarson, we've gone back and changed this feature to work only with pipelines that have a numeric version so we no longer have to accommodate both integer values and |
@danhermann Excellent, thanks for the update! That'll be a lot simpler to represent in the specification. |
@elasticmachine run elasticsearch-ci/packaging-tests-windows-sample |
server/src/main/java/org/elasticsearch/action/ingest/PutPipelineRequest.java
Outdated
Show resolved
Hide resolved
var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); | ||
final Integer specifiedVersion = (Integer) pipelineConfig.get("version"); | ||
if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) { | ||
throw new IllegalArgumentException(String.format( | ||
Locale.ROOT, | ||
"cannot update pipeline [%s] with the same version [%s]", | ||
request.getId(), | ||
request.getVersion() | ||
)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we pull out a version from the pipeline configuration when we know that one has been specified in the request already? (the if (request.getVersion() != null)
check above)
Seems like if the request specifies the version, it should be part of the request validation that request.getVersion()
matches the pipelineConfig.get("version")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify, it needs to check that request.getVersion()
does not match pipelineConfig.get("version")
. I like that check where it is because it defers a heavier check due to the XContentHelper.convertToMap
call to the latest possible point, keeps all the OCC-related checks in the same place, and keeps no-op pipeline updates as very lightweight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And just in case it's not clear, one of those versions is the value of the if_version
parameter and the other is the value of the version attribute in the pipeline definition. The latter doesn't have to be specified, but if specified, must be different from the former.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh okay, I see my confusion now, thanks for clarifying this!
@elasticmachine update branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks Dan!
var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); | ||
final Integer specifiedVersion = (Integer) pipelineConfig.get("version"); | ||
if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) { | ||
throw new IllegalArgumentException(String.format( | ||
Locale.ROOT, | ||
"cannot update pipeline [%s] with the same version [%s]", | ||
request.getId(), | ||
request.getVersion() | ||
)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh okay, I see my confusion now, thanks for clarifying this!
Thanks for the review, @dakrone! |
Pinging @elastic/kibana-stack-management since this adds a new option to the pipeline API |
* upstream/master: (34 commits) Add extensionName() to security extension (elastic#79329) More robust and consistent allowAll indicesAccessControl (elastic#79415) Fix circuit breaker leak in MultiTerms aggregation (elastic#79362) guard geoline aggregation from parents aggegator that emit empty buckets (elastic#79129) Vector tiles: increase the size of the envelope used to clip geometries (elastic#79030) Revert "[ML] Add queue_capacity setting to start deployment API (elastic#79369)" (elastic#79374) Convert token service license object to LicensedFeature (elastic#79284) [TEST] Fix ShardPathTests for MDP (elastic#79393) Fix fleet search API with no checkpints (elastic#79400) Reduce BWC version for transient settings (elastic#79396) EQL: Rename a test class for eclipse (elastic#79254) Use search_coordination threadpool in field caps (elastic#79378) Use query param instead of a system property for opting in for new cluster health response code (elastic#79351) Add new kNN search endpoint (elastic#79013) Disable BWC tests Convert auditing license object to LicensedFeature (elastic#79280) Update BWC versions after backport of elastic#78551 Enable InstantiatingObjectParser to pass context as a first argument (elastic#79206) Move xcontent filtering tests (elastic#79298) Update links to Fleet/Agent docs (elastic#79303) ...
This commit adds optional optimistic concurrency control to the API for updating ingest pipelines. This is achieved through the addition of an
if_version
query parameter similar to theif_seq_no
andif_primary_term
parameters used for optimistic concurrency control in Elasticsearch. When this parameter is specified, the pipeline update succeeds only if there is an existing pipeline with a version that matches the specified version.In the example above, the update succeeds only if "my_pipeline" exists with a version of 5. Any version can be specified in the body of the update so long as it is not the existing version. If no version is specified in the body of the update, the version is automatically set to
<currentVersion> + 1
. Pipelines that do not have aversion
attribute with a numeric value cannot be updated with theif_version
parameter because checking for a null or missing version is not a meaningful guard against concurrent updates.Note that any pipeline updates that do not include an
if_version
parameter will be processed unconditionally. Note also that there are no optimizations possible for no-op pipeline updates when theif_version
parameter is specified because such an update will always change, at the very least, the version attribute of the pipeline and will therefore never be a no-op update.Fixes #77031