Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ES|QL CCS uses skip_unavailable setting for handling disconnected remote clusters #115266

Merged
merged 35 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
da778ee
Intmd commit in order to work on backports
quux00 Oct 18, 2024
c784ebb
Added LogicalPlanActionListener and improved edge case handling
quux00 Oct 21, 2024
737ca7c
Placeholder to start on enrich policy pathways
quux00 Oct 21, 2024
43cf559
Intmd commit - very basic testing shows that this handles skip_un cor…
quux00 Oct 22, 2024
6f73b4d
Added Map<String, Exception> unavailableClusters to EnrichResolution
quux00 Oct 23, 2024
a79a32b
Completed first pass handling of remote unavailable errors during enr…
quux00 Oct 23, 2024
e84e0c8
Remove ConnectionErrorLookupResponse and add errorConnection to Looku…
quux00 Oct 23, 2024
52ce899
Non-working version of CrossClusterQueryUnavailableIT added to get fe…
quux00 Oct 23, 2024
e8626c1
Working version of CrossClusterQueryUnavailableRemotesIT
quux00 Oct 24, 2024
3027b58
Renamed to CrossClusterQueryUnavailableIT to CrossClusterQueryUnavail…
quux00 Oct 24, 2024
21e849c
Enhancements to CrossClusterQueryUnavailableRemotesIT - not yet doing…
quux00 Oct 24, 2024
a7ca28d
Fixed bug in EsqlSession found by existing tests and patched RemoteCl…
quux00 Oct 24, 2024
2d4bc98
Added CrossClusterEnrichUnavailableClustersIT
quux00 Oct 25, 2024
ac1cb85
Removed two tests from CrossClusterEnrichUnavailableClustersIT
quux00 Oct 25, 2024
22871a8
Updated RemoteClusterSecurityEsqlIT to match new skip_unavailable=tru…
quux00 Oct 25, 2024
b3956f4
Minor cleanup before PR review
quux00 Oct 25, 2024
67454fd
Moved updateExecutionInfoWithClustersWithNoMatchingIndices out of the…
quux00 Oct 25, 2024
e4fa48c
Update docs/changelog/115266.yaml
quux00 Oct 25, 2024
ab24843
Removed change to jdk-deprecated.txt
quux00 Oct 25, 2024
1622065
Reversed the decision to pull updateExecutionInfoWithClustersWithNoMa…
quux00 Oct 25, 2024
5fbb373
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_unavai…
quux00 Oct 25, 2024
3c95dfe
Adjusted EsqlSessionTests to new logic of zeroing out shard counts fo…
quux00 Oct 26, 2024
bb6b96c
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_unavai…
quux00 Oct 26, 2024
06aa298
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_unavai…
quux00 Oct 28, 2024
5fd33f0
PR feedback
quux00 Oct 28, 2024
59fe3e9
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_unavai…
quux00 Oct 28, 2024
dc122c3
PR feedback. Added additional test to CrossClusterQueryUnavailableRem…
quux00 Oct 28, 2024
cd56a2e
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_unavai…
quux00 Oct 28, 2024
dc2a75b
PR feedback: Empty Results (skip_un=true scenarios) now return Analyz…
quux00 Oct 28, 2024
bcdaad3
Added checks for no-fields columns to CrossClusterQueryUnavailableRem…
quux00 Oct 28, 2024
9369d29
Fix failing test and spotless check
quux00 Oct 28, 2024
4b5c72b
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_unavai…
quux00 Oct 29, 2024
db2a441
Minor cleanup
quux00 Oct 29, 2024
8eda8c8
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_unavai…
quux00 Oct 29, 2024
2a4c06c
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_unavai…
quux00 Oct 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/115266.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 115266
summary: ES|QL CCS uses `skip_unavailable` setting for handling disconnected remote
clusters
area: ES|QL
type: enhancement
issues: [ 114531 ]
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INDEX_REQUEST_REMOVE_METERING = def(8_780_00_0);
public static final TransportVersion CPU_STAT_STRING_PARSING = def(8_781_00_0);
public static final TransportVersion QUERY_RULES_RETRIEVER = def(8_782_00_0);
public static final TransportVersion ESQL_CCS_EXEC_INFO_WITH_FAILURES = def(8_783_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
Expand Down Expand Up @@ -246,7 +247,8 @@ public void testSearchesWhereMissingIndicesAreSpecified() {

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("no_such_index"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
// TODO: a follow on PR will change this to throw an Exception when the local cluster requests a concrete index that is missing
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
assertThat(localCluster.getTotalShards(), equalTo(0));
Expand Down Expand Up @@ -499,7 +501,7 @@ public void testCCSExecutionOnSearchesWithLimit0() {

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
assertThat(localCluster.getIndexExpression(), equalTo("nomatch*"));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
assertThat(remoteCluster.getTotalShards(), equalTo(0));
Expand Down Expand Up @@ -803,6 +805,14 @@ Map<String, Object> setupTwoClusters() {
clusterInfo.put("local.index", localIndex);
clusterInfo.put("remote.num_shards", numShardsRemote);
clusterInfo.put("remote.index", remoteIndex);

String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER);
Setting<?> skipUnavailableSetting = cluster(REMOTE_CLUSTER).clusterService().getClusterSettings().get(skipUnavailableKey);
boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService()
.getClusterSettings()
.get(skipUnavailableSetting);
clusterInfo.put("remote.skip_unavailable", skipUnavailable);

return clusterInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -281,6 +282,7 @@ public static class Cluster implements ToXContentFragment, Writeable {
private final Integer successfulShards;
private final Integer skippedShards;
private final Integer failedShards;
private final List<ShardSearchFailure> failures;
private final TimeValue took; // search latency for this cluster sub-search

/**
Expand All @@ -300,7 +302,7 @@ public String toString() {
}

public Cluster(String clusterAlias, String indexExpression) {
this(clusterAlias, indexExpression, true, Cluster.Status.RUNNING, null, null, null, null, null);
this(clusterAlias, indexExpression, true, Cluster.Status.RUNNING, null, null, null, null, null, null);
}

/**
Expand All @@ -312,7 +314,7 @@ public Cluster(String clusterAlias, String indexExpression) {
* @param skipUnavailable whether this Cluster is marked as skip_unavailable in remote cluster settings
*/
public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable) {
this(clusterAlias, indexExpression, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null);
this(clusterAlias, indexExpression, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null, null);
}

/**
Expand All @@ -324,7 +326,7 @@ public Cluster(String clusterAlias, String indexExpression, boolean skipUnavaila
* @param status current status of the search on this Cluster
*/
public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable, Cluster.Status status) {
this(clusterAlias, indexExpression, skipUnavailable, status, null, null, null, null, null);
this(clusterAlias, indexExpression, skipUnavailable, status, null, null, null, null, null, null);
}

public Cluster(
Expand All @@ -336,6 +338,7 @@ public Cluster(
Integer successfulShards,
Integer skippedShards,
Integer failedShards,
List<ShardSearchFailure> failures,
TimeValue took
) {
assert clusterAlias != null : "clusterAlias cannot be null";
Expand All @@ -349,6 +352,11 @@ public Cluster(
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
this.failedShards = failedShards;
if (failures == null) {
this.failures = List.of();
} else {
this.failures = failures;
}
Comment on lines +355 to +359
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.failures = failures == null ? emptyList() : failures

this.took = took;
}

Expand All @@ -362,6 +370,11 @@ public Cluster(StreamInput in) throws IOException {
this.failedShards = in.readOptionalInt();
this.took = in.readOptionalTimeValue();
this.skipUnavailable = in.readBoolean();
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) {
this.failures = Collections.unmodifiableList(in.readCollectionAsList(ShardSearchFailure::readShardSearchFailure));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why the collection is unmodifiable - within the ESQL code we're not using the defensive style since the collections are never modified, rather copied.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EsqlExecution.Cluster class, like the SearchResponse.Cluster class it is based on, is immutable. You can only change state by coping an existing Cluster and swapping it in. I can change line 374 to use a mutable list, but it won't make any difference since the coding practice for this class is to never modify in place. In fact, the line below where I do this.failures = List.of(); should likely be changed to an immutable list as well to enforce this model.

} else {
this.failures = List.of();
}
}

@Override
Expand All @@ -375,6 +388,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalInt(failedShards);
out.writeOptionalTimeValue(took);
out.writeBoolean(skipUnavailable);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) {
out.writeCollection(failures);
}
}

/**
Expand All @@ -387,12 +403,12 @@ public void writeTo(StreamOutput out) throws IOException {
* All other fields can be set and override the value in the "copyFrom" Cluster.
*/
public static class Builder {
private String indexExpression;
private Cluster.Status status;
private Integer totalShards;
private Integer successfulShards;
private Integer skippedShards;
private Integer failedShards;
private List<ShardSearchFailure> failures;
private TimeValue took;
private final Cluster original;

Expand All @@ -408,22 +424,18 @@ public Builder(Cluster copyFrom) {
public Cluster build() {
return new Cluster(
original.getClusterAlias(),
indexExpression == null ? original.getIndexExpression() : indexExpression,
original.getIndexExpression(),
original.isSkipUnavailable(),
status != null ? status : original.getStatus(),
totalShards != null ? totalShards : original.getTotalShards(),
successfulShards != null ? successfulShards : original.getSuccessfulShards(),
skippedShards != null ? skippedShards : original.getSkippedShards(),
failedShards != null ? failedShards : original.getFailedShards(),
failures != null ? failures : original.getFailures(),
took != null ? took : original.getTook()
);
}

public Cluster.Builder setIndexExpression(String indexExpression) {
this.indexExpression = indexExpression;
return this;
}

public Cluster.Builder setStatus(Cluster.Status status) {
this.status = status;
return this;
Expand All @@ -449,6 +461,11 @@ public Cluster.Builder setFailedShards(int failedShards) {
return this;
}

public Cluster.Builder setFailures(List<ShardSearchFailure> failures) {
this.failures = failures;
return this;
}

public Cluster.Builder setTook(TimeValue took) {
this.took = took;
return this;
Expand All @@ -466,7 +483,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(STATUS_FIELD.getPreferredName(), getStatus().toString());
builder.field(INDICES_FIELD.getPreferredName(), indexExpression);
if (took != null) {
// TODO: change this to took_nanos and call took.nanos?
builder.field(TOOK.getPreferredName(), took.millis());
}
if (totalShards != null) {
Expand All @@ -483,6 +499,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();
}
if (failures != null && failures.size() > 0) {
builder.startArray(RestActions.FAILURES_FIELD.getPreferredName());
for (ShardSearchFailure failure : failures) {
failure.toXContent(builder, params);
}
builder.endArray();
}
}
builder.endObject();
return builder;
Expand Down Expand Up @@ -529,6 +552,10 @@ public Integer getFailedShards() {
return failedShards;
}

public List<ShardSearchFailure> getFailures() {
return failures;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class EnrichResolution {

private final Map<Key, ResolvedEnrichPolicy> resolvedPolicies = ConcurrentCollections.newConcurrentMap();
private final Map<Key, String> errors = ConcurrentCollections.newConcurrentMap();
private final Map<String, Exception> unavailableClusters = ConcurrentCollections.newConcurrentMap();

public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) {
return resolvedPolicies.get(new Key(policyName, mode));
Expand Down Expand Up @@ -51,6 +52,14 @@ public void addError(String policyName, Enrich.Mode mode, String reason) {
errors.putIfAbsent(new Key(policyName, mode), reason);
}

public void addUnavailableCluster(String clusterAlias, Exception e) {
unavailableClusters.put(clusterAlias, e);
}

public Map<String, Exception> getUnavailableClusters() {
return unavailableClusters;
}

private record Key(String policyName, Enrich.Mode mode) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.enrich;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -50,6 +51,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -113,12 +115,27 @@ public void resolvePolicies(
final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
final EnrichResolution enrichResolution = new EnrichResolution();

Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();

for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
String clusterAlias = entry.getKey();
if (entry.getValue().connectionError != null) {
enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError);
// remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy
remoteClusters.remove(clusterAlias);
} else {
lookupResponsesToProcess.put(clusterAlias, entry.getValue());
}
}

for (UnresolvedPolicy unresolved : unresolvedPolicies) {
Tuple<ResolvedEnrichPolicy, String> resolved = mergeLookupResults(
unresolved,
calculateTargetClusters(unresolved.mode, includeLocal, remoteClusters),
lookupResponses
lookupResponsesToProcess
);

if (resolved.v1() != null) {
enrichResolution.addResolvedPolicy(unresolved.name, unresolved.mode, resolved.v1());
} else {
Expand Down Expand Up @@ -149,13 +166,16 @@ private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults(
Collection<String> targetClusters,
Map<String, LookupResponse> lookupResults
) {
assert targetClusters.isEmpty() == false;
String policyName = unresolved.name;
if (targetClusters.isEmpty()) {
return Tuple.tuple(null, "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a test for this situation. Does it make sense to add one now? (I know there are several other follow up PRs in this area)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was added in response to one of the IT tests I added failing:
CrossClusterEnrichUnavailableClustersIT..testEnrichWithHostsPolicyAndDisconnectedRemotesWithSkipUnavailableTrue, and in particular when the 2nd cluster is taken offline: https://github.com/elastic/elasticsearch/pull/115266/files#diff-67efeda55a8a497d595e7ce7737ee3ae3acc3ec966db031190bd6954a610f88aR240

In that case invariants in the code below are not met and assertion failures are thrown, so I added this to just leave the code immediately when there are no targetClusters, which can happen when the Enrich Mode type is "REMOTE" and the local cluster was not included in the query.

}
final Map<String, ResolvedEnrichPolicy> policies = new HashMap<>();
final List<String> failures = new ArrayList<>();
for (String cluster : targetClusters) {
LookupResponse lookupResult = lookupResults.get(cluster);
if (lookupResult != null) {
assert lookupResult.connectionError == null : "Should never have a non-null connectionError here";
ResolvedEnrichPolicy policy = lookupResult.policies.get(policyName);
if (policy != null) {
policies.put(cluster, policy);
Expand Down Expand Up @@ -261,22 +281,34 @@ private void lookupPolicies(
if (remotePolicies.isEmpty() == false) {
for (String cluster : remoteClusters) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By this point, as I understand, we have already run field caps resolution. So we may have some skipped clusters. Do we exclude them somewhere from the list of clusters for which we're going to resolve policies? Because it looks like it's pointless to try to resolve those policies if the cluster will be skipped anyway.

Copy link
Contributor Author

@quux00 quux00 Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the enrich policy resolution is done before the field-caps. I'm not sure why or if that's necessary for some reason or just historical accident. IMO, the field-caps call seems the logical one to do first, but that's not how it is, so we need to track unavailable clusters here and then pass that list to the callback listener which is the one that does the field-caps lookup: https://github.com/elastic/elasticsearch/pull/115266/files#diff-40060e2ec9003953a228c4a03bdc80a301949b0d4e3dccc1978798e33a992a73R345

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok, I was for some reason assuming field caps go first... then of course yes, enrich should be telling field caps which clusters are skipped, not the other way around.

ActionListener<LookupResponse> lookupListener = refs.acquire(resp -> lookupResponses.put(cluster, resp));
getRemoteConnection(
cluster,
lookupListener.delegateFailureAndWrap(
(delegate, connection) -> transportService.sendRequest(
getRemoteConnection(cluster, new ActionListener<Transport.Connection>() {
@Override
public void onResponse(Transport.Connection connection) {
transportService.sendRequest(
connection,
RESOLVE_ACTION_NAME,
new LookupRequest(cluster, remotePolicies),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
delegate,
LookupResponse::new,
threadPool.executor(ThreadPool.Names.SEARCH)
)
)
)
);
new ActionListenerResponseHandler<>(lookupListener.delegateResponse((l, e) -> {
if (ExceptionsHelper.isRemoteUnavailableException(e)
&& remoteClusterService.isSkipUnavailable(cluster)) {
l.onResponse(new LookupResponse(e));
} else {
l.onFailure(e);
}
}), LookupResponse::new, threadPool.executor(ThreadPool.Names.SEARCH))
);
}

@Override
public void onFailure(Exception e) {
if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) {
lookupListener.onResponse(new LookupResponse(e));
} else {
lookupListener.onFailure(e);
}
}
});
Comment on lines +292 to +311
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the code is similar between onFailure and onResponse, pull it out in a separate method.

}
}
// local cluster
Expand Down Expand Up @@ -323,16 +355,30 @@ public void writeTo(StreamOutput out) throws IOException {
private static class LookupResponse extends TransportResponse {
final Map<String, ResolvedEnrichPolicy> policies;
final Map<String, String> failures;
// does not need to be Writable since this indicates a failure to contact a remote cluster, so only set on querying cluster
final transient Exception connectionError;

LookupResponse(Map<String, ResolvedEnrichPolicy> policies, Map<String, String> failures) {
this.policies = policies;
this.failures = failures;
this.connectionError = null;
}

/**
* Use this constructor when the remote cluster is unavailable to indicate inability to do the enrich policy lookup
* @param connectionError Exception received when trying to connect to a remote cluster
*/
LookupResponse(Exception connectionError) {
this.policies = Collections.emptyMap();
this.failures = Collections.emptyMap();
this.connectionError = connectionError;
}

LookupResponse(StreamInput in) throws IOException {
PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null);
this.policies = planIn.readMap(StreamInput::readString, ResolvedEnrichPolicy::new);
this.failures = planIn.readMap(StreamInput::readString, StreamInput::readString);
this.connectionError = null;
}

@Override
Expand Down
Loading