diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index d365733eac0b4..f873d8699b9b4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -514,7 +514,7 @@ public void onTimeout(TimeValue timeout) { private void clearJobFinishedTime(String jobId, ActionListener listener) { JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); - jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( + jobConfigProvider.updateJob(jobId, update, null, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap( job -> listener.onResponse(new AcknowledgedResponse(true)), e -> { logger.error("[" + jobId + "] Failed to clear finished_time", e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java index 09a8f219afcf4..443e66b81e785 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java @@ -87,7 +87,7 @@ protected void masterOperation(UpdateDatafeedAction.Request request, ClusterStat CheckedConsumer updateConsumer = ok -> { datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers, - jobConfigProvider::validateDatafeedJob, + jobConfigProvider::validateDatafeedJob, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap( updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)), listener::onFailure diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java index f464b86d29822..d8d5fe216b2a4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetRequest; @@ -17,6 +18,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -52,14 +54,16 @@ public class TransportUpdateFilterAction extends HandledTransportAction) UpdateFilterAction.Request::new); this.client = client; this.jobManager = jobManager; + this.clusterService = clusterService; } @Override @@ -95,13 +99,20 @@ private void updateFilter(FilterWithVersion filterWithVersion, UpdateFilterActio } MlFilter updatedFilter = MlFilter.builder(filter.getId()).setDescription(description).setItems(items).build(); - indexUpdatedFilter(updatedFilter, filterWithVersion.version, request, listener); + indexUpdatedFilter( + updatedFilter, filterWithVersion.version, filterWithVersion.seqNo, filterWithVersion.primaryTerm, request, listener); } - private void indexUpdatedFilter(MlFilter filter, long version, UpdateFilterAction.Request request, + private void indexUpdatedFilter(MlFilter filter, final long version, final long seqNo, final long primaryTerm, + UpdateFilterAction.Request request, ActionListener listener) { IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId()); - indexRequest.version(version); + if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) { + indexRequest.setIfSeqNo(seqNo); + indexRequest.setIfPrimaryTerm(primaryTerm); + } else { + indexRequest.version(version); + } indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { @@ -146,7 +157,7 @@ public void onResponse(GetResponse getDocResponse) { XContentParser parser = XContentFactory.xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { MlFilter filter = MlFilter.LENIENT_PARSER.apply(parser, null).build(); - listener.onResponse(new FilterWithVersion(filter, getDocResponse.getVersion())); + listener.onResponse(new FilterWithVersion(filter, getDocResponse)); } } else { this.onFailure(new ResourceNotFoundException(Messages.getMessage(Messages.FILTER_NOT_FOUND, filterId))); @@ -167,10 +178,15 @@ private static class FilterWithVersion { private final MlFilter filter; private final long version; + private final long seqNo; + private final long primaryTerm; - private FilterWithVersion(MlFilter filter, long version) { + private FilterWithVersion(MlFilter filter, GetResponse getDocResponse) { this.filter = filter; - this.version = version; + this.version = getDocResponse.getVersion(); + this.seqNo = getDocResponse.getSeqNo(); + this.primaryTerm = getDocResponse.getPrimaryTerm(); + } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java index 36e71de8bcba1..7d11173e258b1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; @@ -19,6 +20,7 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -262,10 +264,12 @@ public void onFailure(Exception e) { * @param headers Datafeed headers applied with the update * @param validator BiConsumer that accepts the updated config and can perform * extra validations. {@code validator} must call the passed listener + * @param minClusterNodeVersion minimum version of nodes in cluster * @param updatedConfigListener Updated datafeed config listener */ public void updateDatefeedConfig(String datafeedId, DatafeedUpdate update, Map headers, BiConsumer> validator, + Version minClusterNodeVersion, ActionListener updatedConfigListener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId)); @@ -277,7 +281,9 @@ public void onResponse(GetResponse getResponse) { updatedConfigListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId)); return; } - long version = getResponse.getVersion(); + final long version = getResponse.getVersion(); + final long seqNo = getResponse.getSeqNo(); + final long primaryTerm = getResponse.getPrimaryTerm(); BytesReference source = getResponse.getSourceAsBytesRef(); DatafeedConfig.Builder configBuilder; try { @@ -298,7 +304,7 @@ public void onResponse(GetResponse getResponse) { ActionListener validatedListener = ActionListener.wrap( ok -> { - indexUpdatedConfig(updatedConfig, version, ActionListener.wrap( + indexUpdatedConfig(updatedConfig, version, seqNo, primaryTerm, minClusterNodeVersion, ActionListener.wrap( indexResponse -> { assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED; updatedConfigListener.onResponse(updatedConfig); @@ -318,17 +324,23 @@ public void onFailure(Exception e) { }); } - private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, ActionListener listener) { + private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, long seqNo, long primaryTerm, + Version minClusterNodeVersion, ActionListener listener) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder updatedSource = updatedConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); - IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), + IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(updatedConfig.getId())) .setSource(updatedSource) - .setVersion(version) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .request(); + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) { + indexRequest.setIfSeqNo(seqNo); + indexRequest.setIfPrimaryTerm(primaryTerm); + } else { + indexRequest.setVersion(version); + } - executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener); + executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), listener); } catch (IOException e) { listener.onFailure( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 53559aee4701b..6696bfe1ad96a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -333,7 +333,7 @@ public void updateJob(UpdateJobAction.Request request, ActionListener { jobConfigProvider.updateJobWithValidation(request.getJobId(), request.getJobUpdate(), maxModelMemoryLimit, - this::validate, ActionListener.wrap( + this::validate, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap( updatedJob -> postJobUpdate(request, updatedJob, actionListener), actionListener::onFailure )); @@ -603,8 +603,8 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList .setModelSnapshotId(modelSnapshot.getSnapshotId()) .build(); - jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap( - job -> { + jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, clusterService.state().nodes().getMinNodeVersion(), + ActionListener.wrap(job -> { auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription())); updateHandler.accept(Boolean.TRUE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index 9019dc2032ccf..e5ee8855969a3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -10,6 +10,7 @@ import org.apache.lucene.search.join.ScoreMode; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; @@ -21,6 +22,7 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -225,9 +227,12 @@ public void onFailure(Exception e) { * @param maxModelMemoryLimit The maximum model memory allowed. This can be {@code null} * if the job's {@link org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits} * are not changed. + * @param minClusterNodeVersion the minimum version of nodes in the cluster * @param updatedJobListener Updated job listener */ - public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, ActionListener updatedJobListener) { + public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, + Version minClusterNodeVersion, + ActionListener updatedJobListener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); @@ -239,7 +244,9 @@ public void onResponse(GetResponse getResponse) { return; } - long version = getResponse.getVersion(); + final long version = getResponse.getVersion(); + final long seqNo = getResponse.getSeqNo(); + final long primaryTerm = getResponse.getPrimaryTerm(); BytesReference source = getResponse.getSourceAsBytesRef(); Job.Builder jobBuilder; try { @@ -259,7 +266,7 @@ public void onResponse(GetResponse getResponse) { return; } - indexUpdatedJob(updatedJob, version, updatedJobListener); + indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener); } @Override @@ -280,17 +287,18 @@ public interface UpdateValidator { } /** - * Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, ActionListener)} but + * Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, Version, ActionListener)} but * with an extra validation step which is called before the updated is applied. * * @param jobId The Id of the job to update * @param update The job update * @param maxModelMemoryLimit The maximum model memory allowed * @param validator The job update validator + * @param minClusterNodeVersion the minimum version of a node ifn the cluster * @param updatedJobListener Updated job listener */ public void updateJobWithValidation(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, - UpdateValidator validator, ActionListener updatedJobListener) { + UpdateValidator validator, Version minClusterNodeVersion, ActionListener updatedJobListener) { GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); @@ -302,7 +310,9 @@ public void onResponse(GetResponse getResponse) { return; } - long version = getResponse.getVersion(); + final long version = getResponse.getVersion(); + final long seqNo = getResponse.getSeqNo(); + final long primaryTerm = getResponse.getPrimaryTerm(); BytesReference source = getResponse.getSourceAsBytesRef(); Job originalJob; try { @@ -324,7 +334,7 @@ public void onResponse(GetResponse getResponse) { return; } - indexUpdatedJob(updatedJob, version, updatedJobListener); + indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener); }, updatedJobListener::onFailure )); @@ -337,17 +347,22 @@ public void onFailure(Exception e) { }); } - private void indexUpdatedJob(Job updatedJob, long version, ActionListener updatedJobListener) { + private void indexUpdatedJob(Job updatedJob, long version, long seqNo, long primaryTerm, Version minClusterNodeVersion, + ActionListener updatedJobListener) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder updatedSource = updatedJob.toXContent(builder, ToXContent.EMPTY_PARAMS); - IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), + IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings.DOC_TYPE, Job.documentId(updatedJob.getId())) .setSource(updatedSource) - .setVersion(version) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .request(); + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) { + indexRequest.setIfSeqNo(seqNo); + indexRequest.setIfPrimaryTerm(primaryTerm); + } else { + indexRequest.setVersion(version); + } - executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap( + executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), ActionListener.wrap( indexResponse -> { assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED; updatedJobListener.onResponse(updatedJob); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java index 9496f4ca0d8f2..00d62b7e0a933 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedConfigProviderIT.java @@ -7,6 +7,7 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; @@ -86,7 +87,7 @@ public void testCrud() throws InterruptedException { AtomicReference configHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), updateHeaders, - (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener), + (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), Version.CURRENT, actionListener), configHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertThat(configHolder.get().getIndices(), equalTo(updateIndices)); @@ -167,7 +168,7 @@ public void testUpdateWhenApplyingTheUpdateThrows() throws Exception { AtomicReference configHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(), - (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), actionListener), + (updatedConfig, listener) -> listener.onResponse(Boolean.TRUE), Version.CURRENT, actionListener), configHolder, exceptionHolder); assertNull(configHolder.get()); assertNotNull(exceptionHolder.get()); @@ -193,7 +194,7 @@ public void testUpdateWithValidatorFunctionThatErrors() throws Exception { AtomicReference exceptionHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.updateDatefeedConfig(datafeedId, update.build(), Collections.emptyMap(), - validateErrorFunction, actionListener), + validateErrorFunction, Version.CURRENT, actionListener), configHolder, exceptionHolder); assertNull(configHolder.get()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java index 0266247714dfe..3e20bdd73de07 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java @@ -8,6 +8,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; @@ -147,8 +148,8 @@ public void testCrud() throws InterruptedException { JobUpdate jobUpdate = new JobUpdate.Builder(jobId).setDescription("This job has been updated").build(); AtomicReference updateJobResponseHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, jobUpdate, new ByteSizeValue(32), actionListener), - updateJobResponseHolder, exceptionHolder); + blockingCall(actionListener -> jobConfigProvider.updateJob + (jobId, jobUpdate, new ByteSizeValue(32), Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertEquals("This job has been updated", updateJobResponseHolder.get().getDescription()); @@ -205,8 +206,8 @@ public void testUpdateWithAValidationError() throws Exception { .build(); AtomicReference updateJobResponseHolder = new AtomicReference<>(); - blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, invalidUpdate, new ByteSizeValue(32), actionListener), - updateJobResponseHolder, exceptionHolder); + blockingCall(actionListener -> jobConfigProvider.updateJob(jobId, invalidUpdate, new ByteSizeValue(32), Version.CURRENT, + actionListener), updateJobResponseHolder, exceptionHolder); assertNull(updateJobResponseHolder.get()); assertNotNull(exceptionHolder.get()); assertThat(exceptionHolder.get(), instanceOf(ElasticsearchStatusException.class)); @@ -229,9 +230,8 @@ public void testUpdateWithValidator() throws Exception { AtomicReference exceptionHolder = new AtomicReference<>(); AtomicReference updateJobResponseHolder = new AtomicReference<>(); // update with the no-op validator - blockingCall(actionListener -> - jobConfigProvider.updateJobWithValidation(jobId, jobUpdate, new ByteSizeValue(32), validator, actionListener), - updateJobResponseHolder, exceptionHolder); + blockingCall(actionListener -> jobConfigProvider.updateJobWithValidation( + jobId, jobUpdate, new ByteSizeValue(32), validator, Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertNotNull(updateJobResponseHolder.get()); @@ -244,7 +244,7 @@ public void testUpdateWithValidator() throws Exception { updateJobResponseHolder.set(null); // Update with a validator that errors blockingCall(actionListener -> jobConfigProvider.updateJobWithValidation(jobId, jobUpdate, new ByteSizeValue(32), - validatorWithAnError, actionListener), + validatorWithAnError, Version.CURRENT, actionListener), updateJobResponseHolder, exceptionHolder); assertNull(updateJobResponseHolder.get());