Skip to content

Commit

Permalink
[7.x][ML] Validate dest pipeline exists on transform update (elastic#…
Browse files Browse the repository at this point in the history
…63494)

Adds validation that the dest pipeline exists when a transform
is updated. Refactors the pipeline check into the `SourceDestValidator`.

Fixes elastic#59587

Backport of elastic#63494
  • Loading branch information
dimitris-athanasiou committed Oct 12, 2020
1 parent 858951d commit 6418855
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.protocol.xpack.license.LicenseStatus;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
Expand Down Expand Up @@ -62,13 +63,15 @@ public final class SourceDestValidator {
public static final String REMOTE_CLUSTER_LICENSE_INACTIVE = "License check failed for remote cluster "
+ "alias [{0}], license is not active";
public static final String REMOTE_SOURCE_INDICES_NOT_SUPPORTED = "remote source indices are not supported";
public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";

// workaround for 7.x: remoteClusterAliases does not throw
private static final ClusterNameExpressionResolver clusterNameExpressionResolver = new ClusterNameExpressionResolver();

private final IndexNameExpressionResolver indexNameExpressionResolver;
private final RemoteClusterService remoteClusterService;
private final RemoteClusterLicenseChecker remoteClusterLicenseChecker;
private final IngestService ingestService;
private final String nodeName;
private final String license;

Expand All @@ -80,8 +83,10 @@ static class Context {
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final RemoteClusterService remoteClusterService;
private final RemoteClusterLicenseChecker remoteClusterLicenseChecker;
private final IngestService ingestService;
private final String[] source;
private final String dest;
private final String destIndex;
private final String destPipeline;
private final String nodeName;
private final String license;

Expand All @@ -95,17 +100,21 @@ static class Context {
final IndexNameExpressionResolver indexNameExpressionResolver,
final RemoteClusterService remoteClusterService,
final RemoteClusterLicenseChecker remoteClusterLicenseChecker,
final IngestService ingestService,
final String[] source,
final String dest,
final String destIndex,
final String destPipeline,
final String nodeName,
final String license
) {
this.state = state;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.remoteClusterService = remoteClusterService;
this.remoteClusterLicenseChecker = remoteClusterLicenseChecker;
this.ingestService = ingestService;
this.source = source;
this.dest = dest;
this.destIndex = destIndex;
this.destPipeline = destPipeline;
this.nodeName = nodeName;
this.license = license;
}
Expand All @@ -126,6 +135,10 @@ public IndexNameExpressionResolver getIndexNameExpressionResolver() {
return indexNameExpressionResolver;
}

public IngestService getIngestService() {
return ingestService;
}

public boolean isRemoteSearchEnabled() {
return remoteClusterLicenseChecker != null;
}
Expand All @@ -134,8 +147,8 @@ public String[] getSource() {
return source;
}

public String getDest() {
return dest;
public String getDestIndex() {
return destIndex;
}

public String getNodeName() {
Expand Down Expand Up @@ -168,11 +181,11 @@ public String resolveDest() {
Index singleWriteIndex = indexNameExpressionResolver.concreteWriteIndex(
state,
IndicesOptions.lenientExpandOpen(),
dest,
destIndex,
true,
false);

resolvedDest = singleWriteIndex != null ? singleWriteIndex.getName() : dest;
resolvedDest = singleWriteIndex != null ? singleWriteIndex.getName() : destIndex;
} catch (IllegalArgumentException e) {
// stop here as we can not return a single dest index
addValidationError(e.getMessage());
Expand Down Expand Up @@ -236,6 +249,7 @@ public interface SourceDestValidation {
public static final SourceDestValidation DESTINATION_IN_SOURCE_VALIDATION = new DestinationInSourceValidation();
public static final SourceDestValidation DESTINATION_SINGLE_INDEX_VALIDATION = new DestinationSingleIndexValidation();
public static final SourceDestValidation REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION = new RemoteSourceNotSupportedValidation();
public static final SourceDestValidation DESTINATION_PIPELINE_MISSING_VALIDATION = new DestinationPipelineMissingValidation();

/**
* Create a new Source Dest Validator
Expand All @@ -250,29 +264,33 @@ public SourceDestValidator(
IndexNameExpressionResolver indexNameExpressionResolver,
RemoteClusterService remoteClusterService,
RemoteClusterLicenseChecker remoteClusterLicenseChecker,
IngestService ingestService,
String nodeName,
String license
) {
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.remoteClusterService = remoteClusterService;
this.remoteClusterLicenseChecker = remoteClusterLicenseChecker;
this.ingestService = ingestService;
this.nodeName = nodeName;
this.license = license;
}

/**
* Run validation against source and dest.
* Run validation against source and destIndex.
*
* @param clusterState The current ClusterState
* @param source an array of source indexes
* @param dest destination index
* @param destIndex destination index
* @param destPipeline destination pipeline
* @param validations list of of validations to run
* @param listener result listener
*/
public void validate(
final ClusterState clusterState,
final String[] source,
final String dest,
final String destIndex,
@Nullable final String destPipeline,
final List<SourceDestValidation> validations,
final ActionListener<Boolean> listener
) {
Expand All @@ -281,8 +299,10 @@ public void validate(
indexNameExpressionResolver,
remoteClusterService,
remoteClusterLicenseChecker,
ingestService,
source,
dest,
destIndex,
destPipeline,
nodeName,
license
);
Expand All @@ -306,25 +326,25 @@ public void validate(
}

/**
* Validate dest request.
* Validate request.
*
* This runs a couple of simple validations at request time, to be executed from a {@link ActionRequest}}
* implementation.
*
* Note: Source can not be validated at request time as it might contain expressions.
*
* @param validationException an ActionRequestValidationException for collection validation problem, can be null
* @param dest destination index, null if validation shall be skipped
* @param destIndex destination index, null if validation shall be skipped
*/
public static ActionRequestValidationException validateRequest(
@Nullable ActionRequestValidationException validationException,
@Nullable String dest
@Nullable String destIndex
) {
try {
if (dest != null) {
validateIndexOrAliasName(dest, InvalidIndexNameException::new);
if (dest.toLowerCase(Locale.ROOT).equals(dest) == false) {
validationException = addValidationError(getMessage(DEST_LOWERCASE, dest), validationException);
if (destIndex != null) {
validateIndexOrAliasName(destIndex, InvalidIndexNameException::new);
if (destIndex.toLowerCase(Locale.ROOT).equals(destIndex) == false) {
validationException = addValidationError(getMessage(DEST_LOWERCASE, destIndex), validationException);
}
}
} catch (InvalidIndexNameException ex) {
Expand Down Expand Up @@ -408,7 +428,7 @@ static class DestinationInSourceValidation implements SourceDestValidation {

@Override
public void validate(Context context, ActionListener<Context> listener) {
final String destIndex = context.getDest();
final String destIndex = context.getDestIndex();
boolean foundSourceInDest = false;

for (String src : context.getSource()) {
Expand Down Expand Up @@ -462,6 +482,19 @@ public void validate(Context context, ActionListener<Context> listener) {
}
}

static class DestinationPipelineMissingValidation implements SourceDestValidation {

@Override
public void validate(Context context, ActionListener<Context> listener) {
if (context.destPipeline != null) {
if (context.ingestService.getPipeline(context.destPipeline) == null) {
context.addValidationError(PIPELINE_MISSING, context.destPipeline);
}
}
listener.onResponse(context);
}
}

private static String getMessage(String message, Object... args) {
return new MessageFormat(message, Locale.ROOT).format(args);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public class TransformMessages {
public static final String REST_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";
public static final String TRANSFORM_FAILED_TO_PERSIST_STATS = "Failed to persist transform statistics for transform [{0}]";
public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";
public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";

public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future.";

Expand Down
Loading

0 comments on commit 6418855

Please sign in to comment.