From e1fbfaf55e79585d7edf2e3af6145e650eea7067 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 30 May 2018 14:32:28 -0400 Subject: [PATCH 01/11] Introduce client feature tracking This commit introduces the ability for a client to communicate to the server features that it can support and for these features to be used in influencing the decisions that the server makes when communicating with the client. To this end we carry the features from the client to the underlying stream as we carry the version of the client today. This enables us to enhance the logic where we make protocol decisions on the basis of the version on the stream to also make protocol decisions on the basis of the features on the stream. With such functionality, the client can communicate to the server if it is a transport client, or if it has, for example, X-Pack installed. This enables us to support rolling upgrades from the OSS distribution to the default distribution without breaking client connectivity as we can now elect to serialize customs in the cluster state depending on whether or not the client reports to us using the feature capabilities that it can under these customs. This means that we would avoid sending a client pieces of the cluster state that it can not understand. However, we want to take care and always send the full cluster state during node-to-node communication as otherwise we would end up with different understanding of what is in the cluster state across nodes depending on which features they reported to have. This is why when deciding whether or not to write out a custom we always send the custom if the client is not a transport client and otherwise do not send the custom if the client is transport client that does not report to have the feature required by the custom. Co-authored-by: Yannick Welsch --- .../transport/netty4/ESLoggingHandler.java | 2 + .../client/transport/TransportClient.java | 10 +- .../elasticsearch/cluster/ClusterState.java | 66 +++- .../cluster/metadata/MetaData.java | 13 +- .../common/io/stream/StreamOutput.java | 11 + .../common/settings/ClusterSettings.java | 1 + .../PersistentTasksCustomMetaData.java | 3 +- .../org/elasticsearch/plugins/Plugin.java | 12 + .../elasticsearch/plugins/PluginsService.java | 25 +- .../elasticsearch/transport/TcpTransport.java | 74 +++- .../transport/TcpTransportChannel.java | 14 +- .../transport/TransportClientTests.java | 23 +- .../elasticsearch/cluster/ClusterStateIT.java | 351 ++++++++++++++++++ .../cluster/FeatureAwareTests.java | 172 +++++++++ .../transport/TcpTransportTests.java | 1 + .../elasticsearch/test/ESIntegTestCase.java | 98 ++++- .../test/client/RandomizingClient.java | 4 + .../AbstractSimpleTransportTestCase.java | 3 +- .../license/LicensesMetaData.java | 3 +- .../xpack/core/XPackClientPlugin.java | 27 +- .../elasticsearch/xpack/core/XPackPlugin.java | 25 +- .../xpack/core/ml/MlMetadata.java | 3 +- .../core/security/authc/TokenMetaData.java | 3 +- .../xpack/core/watcher/WatcherMetaData.java | 3 +- 24 files changed, 879 insertions(+), 68 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java index 47a31f268a6a8..18ab5cc0bd169 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java @@ -104,6 +104,8 @@ else if (readableBytes >= TcpHeader.HEADER_SIZE) { try (ThreadContext context = new ThreadContext(Settings.EMPTY)) { context.readHeaders(in); } + // now we decode the features + in.readStringArray(); // now we can decode the action name sb.append(", action: ").append(in.readString()); } diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java index f6d3a87f10da9..40904e9a8248b 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -98,6 +98,8 @@ public abstract class TransportClient extends AbstractClient { public static final Setting CLIENT_TRANSPORT_SNIFF = Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope); + public static final String TRANSPORT_CLIENT_FEATURE = "transport_client"; + private static PluginsService newPluginService(final Settings settings, Collection> plugins) { final Settings.Builder settingsBuilder = Settings.builder() .put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval @@ -130,8 +132,12 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build(); } final PluginsService pluginsService = newPluginService(providedSettings, plugins); - final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).put(ThreadContext.PREFIX - + "." + "transport_client", true).build(); + final Settings settings = + Settings.builder() + .put(defaultSettings) + .put(pluginsService.updatedSettings()) + .put(TcpTransport.FEATURE_PREFIX + "." + TRANSPORT_CLIENT_FEATURE, true) + .build(); final List resourcesToClose = new ArrayList<>(); final ThreadPool threadPool = new ThreadPool(settings); resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java index 2b991d1dc611a..6bc555eae0bd9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java @@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -61,6 +62,7 @@ import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; /** @@ -90,7 +92,51 @@ public class ClusterState implements ToXContentFragment, Diffable public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build(); - public interface Custom extends NamedDiffable, ToXContentFragment { + /** + * An interface that implementors use when a class requires a client to maybe have a feature. + */ + public interface FeatureAware { + + /** + * An optional feature that is required for the client to have. + * + * @return an empty optional if no feature is required otherwise a string representing the required feature + */ + default Optional getRequiredFeature() { + return Optional.empty(); + } + + /** + * Tests whether or not the custom should be serialized. The criteria are: + *
    + *
  • the output stream must be at least the minimum supported version of the custom
  • + *
  • the output stream must have the feature required by the custom (if any) or not be a transport client
  • + *
+ *

+ * That is, we only serialize customs to clients than can understand the custom based on the version of the client and the features + * that the client has. For transport clients we can be lenient in requiring a feature in which case we do not send the custom but + * for connected nodes we always require that the node has the required feature. + * + * @param out the output stream + * @param custom the custom to serialize + * @param the type of the custom + * @return true if the custom should be serialized and false otherwise + */ + static boolean shouldSerializeCustom(final StreamOutput out, final T custom) { + if (out.getVersion().before(custom.getMinimalSupportedVersion())) { + return false; + } + if (custom.getRequiredFeature().isPresent()) { + final String requiredFeature = custom.getRequiredFeature().get(); + // if it is a transport client we are lenient yet for a connected node it must have the required feature + return out.hasFeature(requiredFeature) || out.hasFeature(TransportClient.TRANSPORT_CLIENT_FEATURE) == false; + } + return true; + } + + } + + public interface Custom extends NamedDiffable, ToXContentFragment, FeatureAware { /** * Returns true iff this {@link Custom} is private to the cluster and should never be send to a client. @@ -99,6 +145,7 @@ public interface Custom extends NamedDiffable, ToXContentFragment { default boolean isPrivate() { return false; } + } private static final NamedDiffableValueSerializer CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class); @@ -244,6 +291,15 @@ public String toString() { sb.append("isa_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n"); } } + if (metaData.customs().isEmpty() == false) { + sb.append("metadata customs:\n"); + for (final ObjectObjectCursor cursor : metaData.customs()) { + final String type = cursor.key; + final MetaData.Custom custom = cursor.value; + sb.append(TAB).append(type).append(": ").append(custom); + } + sb.append("\n"); + } sb.append(blocks()); sb.append(nodes()); sb.append(routingTable()); @@ -691,14 +747,14 @@ public void writeTo(StreamOutput out) throws IOException { blocks.writeTo(out); // filter out custom states not supported by the other node int numberOfCustoms = 0; - for (ObjectCursor cursor : customs.values()) { - if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) { + for (final ObjectCursor cursor : customs.values()) { + if (FeatureAware.shouldSerializeCustom(out, cursor.value)) { numberOfCustoms++; } } out.writeVInt(numberOfCustoms); - for (ObjectCursor cursor : customs.values()) { - if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) { + for (final ObjectCursor cursor : customs.values()) { + if (FeatureAware.shouldSerializeCustom(out, cursor.value)) { out.writeNamedWriteable(cursor.value); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index b18c82712b37e..8c73222519502 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -24,6 +24,8 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterState.FeatureAware; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.Diffable; import org.elasticsearch.cluster.DiffableUtils; @@ -117,9 +119,10 @@ public enum XContentContext { */ public static EnumSet ALL_CONTEXTS = EnumSet.allOf(XContentContext.class); - public interface Custom extends NamedDiffable, ToXContentFragment { + public interface Custom extends NamedDiffable, ToXContentFragment, ClusterState.FeatureAware { EnumSet context(); + } public static final Setting SETTING_READ_ONLY_SETTING = @@ -782,14 +785,14 @@ public void writeTo(StreamOutput out) throws IOException { } // filter out custom states not supported by the other node int numberOfCustoms = 0; - for (ObjectCursor cursor : customs.values()) { - if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) { + for (final ObjectCursor cursor : customs.values()) { + if (FeatureAware.shouldSerializeCustom(out, cursor.value)) { numberOfCustoms++; } } out.writeVInt(numberOfCustoms); - for (ObjectCursor cursor : customs.values()) { - if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) { + for (final ObjectCursor cursor : customs.values()) { + if (FeatureAware.shouldSerializeCustom(out, cursor.value)) { out.writeNamedWriteable(cursor.value); } } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index ab0f47bf14d6d..f9617be4b56ed 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -58,10 +58,12 @@ import java.util.EnumMap; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.IntFunction; @@ -98,6 +100,7 @@ public abstract class StreamOutput extends OutputStream { } private Version version = Version.CURRENT; + private Set features = Collections.emptySet(); /** * The version of the node on the other side of this stream. @@ -113,6 +116,14 @@ public void setVersion(Version version) { this.version = version; } + public boolean hasFeature(final String feature) { + return this.features.contains(feature); + } + + public void setFeatures(final Set features) { + this.features = Collections.unmodifiableSet(new HashSet<>(features)); + } + public long position() throws IOException { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index d9cf0f630c0f2..e616613a425a9 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -379,6 +379,7 @@ public void apply(Settings value, Settings current, Settings previous) { ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING, EsExecutors.PROCESSORS_SETTING, ThreadContext.DEFAULT_HEADERS_SETTING, + TcpTransport.DEFAULT_FEATURES_SETTING, Loggers.LOG_DEFAULT_LEVEL_SETTING, Loggers.LOG_LEVEL_SETTING, NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING, diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java index 6d2c21a764ad5..b9c5b32306397 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -189,7 +190,7 @@ public long getNumberOfTasksOnNode(String nodeId, String taskName) { @Override public Version getMinimalSupportedVersion() { - return Version.V_5_4_0; + return Version.V_6_3_0; } @Override diff --git a/server/src/main/java/org/elasticsearch/plugins/Plugin.java b/server/src/main/java/org/elasticsearch/plugins/Plugin.java index 82c8bf1bbcb18..6294f942d08cb 100644 --- a/server/src/main/java/org/elasticsearch/plugins/Plugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/Plugin.java @@ -56,6 +56,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.UnaryOperator; /** @@ -79,6 +80,17 @@ */ public abstract class Plugin implements Closeable { + /** + * A feature exposed by the plugin. This should be used if a plugin exposes {@link org.elasticsearch.cluster.ClusterState.Custom} or + * {@link MetaData.Custom}; see also {@link org.elasticsearch.cluster.ClusterState.FeatureAware}. + * + * @return a feature set represented by this plugin, or the empty optional if the plugin does not expose cluster state or metadata + * customs + */ + protected Optional getFeature() { + return Optional.empty(); + } + /** * Node level guice modules. */ diff --git a/server/src/main/java/org/elasticsearch/plugins/PluginsService.java b/server/src/main/java/org/elasticsearch/plugins/PluginsService.java index 68a19bb9bca9b..5b64b5be6390d 100644 --- a/server/src/main/java/org/elasticsearch/plugins/PluginsService.java +++ b/server/src/main/java/org/elasticsearch/plugins/PluginsService.java @@ -41,8 +41,10 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexModule; import org.elasticsearch.threadpool.ExecutorBuilder; +import org.elasticsearch.transport.TcpTransport; import java.io.IOException; import java.lang.reflect.Constructor; @@ -57,16 +59,17 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.elasticsearch.common.io.FileSystemUtils.isAccessibleDirectory; @@ -196,6 +199,7 @@ private static void logPluginInfo(final List pluginInfos, final Stri public Settings updatedSettings() { Map foundSettings = new HashMap<>(); + final Map features = new TreeMap<>(); final Settings.Builder builder = Settings.builder(); for (Tuple plugin : plugins) { Settings settings = plugin.v2().additionalSettings(); @@ -207,6 +211,23 @@ public Settings updatedSettings() { } } builder.put(settings); + final Optional maybeFeature = plugin.v2().getFeature(); + if (maybeFeature.isPresent()) { + final String feature = maybeFeature.get(); + if (features.containsKey(feature)) { + final String message = String.format( + Locale.ROOT, + "duplicate feature [%s] in plugin [%s], already added in [%s]", + feature, + plugin.v1().getName(), + features.get(feature)); + throw new IllegalArgumentException(message); + } + features.put(feature, plugin.v1().getName()); + } + } + for (final String feature : features.keySet()) { + builder.put(TcpTransport.FEATURE_PREFIX + "." + feature, true); } return builder.put(this.settings).build(); } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 0b3d4e1b0a1ef..82b7aa8aed428 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -21,6 +21,7 @@ import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.Booleans; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -93,6 +94,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -189,6 +191,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9); private static final BytesReference EMPTY_BYTES_REFERENCE = new BytesArray(new byte[0]); + public static final String FEATURE_PREFIX = "client.features"; + public static final Setting DEFAULT_FEATURES_SETTING = Setting.groupSetting(FEATURE_PREFIX + ".", Setting.Property.NodeScope); + private final String[] features; + private final CircuitBreakerService circuitBreakerService; // package visibility for tests protected final ScheduledPing scheduledPing; @@ -240,6 +246,17 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo this.networkService = networkService; this.transportName = transportName; defaultConnectionProfile = buildDefaultConnectionProfile(settings); + final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings); + if (defaultFeatures == null) { + this.features = new String[0]; + } else { + defaultFeatures.names().forEach(key -> { + if (Booleans.parseBoolean(defaultFeatures.get(key)) == false) { + throw new IllegalArgumentException("feature settings must have default [true] value"); + } + }); + this.features = new TreeSet<>(defaultFeatures.names()).toArray(new String[defaultFeatures.names().size()]); + } } static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { @@ -1103,6 +1120,9 @@ private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel cha stream.setVersion(version); threadPool.getThreadContext().writeTo(stream); + if (version.onOrAfter(Version.V_7_0_0_alpha1)) { + stream.writeStringArray(features); + } stream.writeString(action); BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream); final TransportRequestOptions finalOptions = options; @@ -1135,15 +1155,22 @@ private void internalSendMessage(TcpChannel channel, BytesReference message, Sen * Sends back an error response to the caller via the given channel * * @param nodeVersion the caller node version + * @param features the caller features * @param channel the channel to send the response to * @param error the error to return * @param requestId the request ID this response replies to * @param action the action this response replies to */ - public void sendErrorResponse(Version nodeVersion, TcpChannel channel, final Exception error, final long requestId, - final String action) throws IOException { + public void sendErrorResponse( + final Version nodeVersion, + final Set features, + final TcpChannel channel, + final Exception error, + final long requestId, + final String action) throws IOException { try (BytesStreamOutput stream = new BytesStreamOutput()) { stream.setVersion(nodeVersion); + stream.setFeatures(features); RemoteTransportException tx = new RemoteTransportException( nodeName(), new TransportAddress(channel.getLocalAddress()), action, error); threadPool.getThreadContext().writeTo(stream); @@ -1163,15 +1190,28 @@ public void sendErrorResponse(Version nodeVersion, TcpChannel channel, final Exc /** * Sends the response to the given channel. This method should be used to send {@link TransportResponse} objects back to the caller. * - * @see #sendErrorResponse(Version, TcpChannel, Exception, long, String) for sending back errors to the caller + * @see #sendErrorResponse(Version, Set, TcpChannel, Exception, long, String) for sending back errors to the caller */ - public void sendResponse(Version nodeVersion, TcpChannel channel, final TransportResponse response, final long requestId, - final String action, TransportResponseOptions options) throws IOException { - sendResponse(nodeVersion, channel, response, requestId, action, options, (byte) 0); - } - - private void sendResponse(Version nodeVersion, TcpChannel channel, final TransportResponse response, final long requestId, - final String action, TransportResponseOptions options, byte status) throws IOException { + public void sendResponse( + final Version nodeVersion, + final Set features, + final TcpChannel channel, + final TransportResponse response, + final long requestId, + final String action, + final TransportResponseOptions options) throws IOException { + sendResponse(nodeVersion, features, channel, response, requestId, action, options, (byte) 0); + } + + private void sendResponse( + final Version nodeVersion, + final Set features, + final TcpChannel channel, + final TransportResponse response, + final long requestId, + final String action, + TransportResponseOptions options, + byte status) throws IOException { if (compress) { options = TransportResponseOptions.builder(options).withCompress(true).build(); } @@ -1185,6 +1225,7 @@ private void sendResponse(Version nodeVersion, TcpChannel channel, final Transpo } threadPool.getThreadContext().writeTo(stream); stream.setVersion(nodeVersion); + stream.setFeatures(features); BytesReference message = buildMessage(requestId, status, nodeVersion, response, stream); final TransportResponseOptions finalOptions = options; @@ -1546,13 +1587,19 @@ private void handleException(final TransportResponseHandler handler, Throwable e protected String handleRequest(TcpChannel channel, String profileName, final StreamInput stream, long requestId, int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status) throws IOException { + final Set features; + if (version.onOrAfter(Version.V_7_0_0_alpha1)) { + features = Collections.unmodifiableSet(new TreeSet<>(Arrays.asList(stream.readStringArray()))); + } else { + features = Collections.emptySet(); + } final String action = stream.readString(); transportService.onRequestReceived(requestId, action); TransportChannel transportChannel = null; try { if (TransportStatus.isHandshake(status)) { final VersionHandshakeResponse response = new VersionHandshakeResponse(getCurrentVersion()); - sendResponse(version, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY, + sendResponse(version, features, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY, TransportStatus.setHandshake((byte) 0)); } else { final RequestHandlerRegistry reg = transportService.getRequestHandler(action); @@ -1564,7 +1611,7 @@ protected String handleRequest(TcpChannel channel, String profileName, final Str } else { getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes); } - transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, profileName, + transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName, messageLengthBytes); final TransportRequest request = reg.newRequest(stream); request.remoteAddress(new TransportAddress(remoteAddress)); @@ -1575,7 +1622,8 @@ protected String handleRequest(TcpChannel channel, String profileName, final Str } catch (Exception e) { // the circuit breaker tripped if (transportChannel == null) { - transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, profileName, 0); + transportChannel = + new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName, 0); } try { transportChannel.sendResponse(e); diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java index eb4c244c7a920..1bf1d027329b5 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java @@ -16,16 +16,20 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.transport; import org.elasticsearch.Version; import java.io.IOException; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; public final class TcpTransportChannel implements TransportChannel { private final TcpTransport transport; private final Version version; + private final Set features; private final String action; private final long requestId; private final String profileName; @@ -34,9 +38,10 @@ public final class TcpTransportChannel implements TransportChannel { private final String channelType; private final TcpChannel channel; - TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action, - long requestId, Version version, String profileName, long reservedBytes) { + TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action, long requestId, Version version, + Set features, String profileName, long reservedBytes) { this.version = version; + this.features = features; this.channel = channel; this.transport = transport; this.action = action; @@ -59,7 +64,7 @@ public void sendResponse(TransportResponse response) throws IOException { @Override public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { try { - transport.sendResponse(version, channel, response, requestId, action, options); + transport.sendResponse(version, features, channel, response, requestId, action, options); } finally { release(false); } @@ -68,7 +73,7 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op @Override public void sendResponse(Exception exception) throws IOException { try { - transport.sendErrorResponse(version, channel, exception, requestId, action); + transport.sendErrorResponse(version, features, channel, exception, requestId, action); } finally { release(true); } @@ -100,5 +105,6 @@ public Version getVersion() { public TcpChannel getChannel() { return channel; } + } diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java index 1830698d90c6f..583dca95f65bd 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.MockTransportClient; +import org.elasticsearch.transport.TcpTransport; import java.io.IOException; import java.util.Arrays; @@ -38,6 +39,8 @@ import java.util.concurrent.ExecutionException; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.object.HasToString.hasToString; public class TransportClientTests extends ESTestCase { @@ -64,13 +67,25 @@ public void testPluginNamedWriteablesRegistered() { } } - public void testDefaultHeaderContainsPlugins() { - Settings baseSettings = Settings.builder() + public void testSettingsContainsTransportClient() { + final Settings baseSettings = Settings.builder() .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) .build(); try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) { - ThreadContext threadContext = client.threadPool().getThreadContext(); - assertEquals("true", threadContext.getHeader("transport_client")); + final Settings settings = TcpTransport.DEFAULT_FEATURES_SETTING.get(client.settings()); + assertThat(settings.keySet(), hasItem("transport_client")); + assertThat(settings.get("transport_client"), equalTo("true")); + final ThreadContext threadContext = client.threadPool().getThreadContext(); + assertEquals("true", threadContext.getHeader("test")); + } + } + + public void testDefaultHeader() { + final Settings baseSettings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) + .build(); + try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) { + final ThreadContext threadContext = client.threadPool().getThreadContext(); assertEquals("true", threadContext.getHeader("test")); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java new file mode 100644 index 0000000000000..a66136c8f8234 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java @@ -0,0 +1,351 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexGraveyard; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TcpTransport; +import org.elasticsearch.watcher.ResourceWatcherService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; +import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; + +/** + * This test suite sets up a situation where the cluster has two plugins installed (node, and node-and-transport-client), and a transport + * client only has node-and-transport-client plugin installed. Each of these plugins inject customs into the cluster state and we want to + * check that the client can de-serialize a cluster state response based on the fact that the response should not contain customs that the + * transport client does not understand based on the fact that it only presents the node-and-transport-client-feature. + */ +@ESIntegTestCase.ClusterScope(scope = TEST) +public class ClusterStateIT extends ESIntegTestCase { + + public abstract static class Custom implements MetaData.Custom { + + private static final ParseField VALUE = new ParseField("value"); + + private final int value; + + int value() { + return value; + } + + Custom(final int value) { + this.value = value; + } + + Custom(final StreamInput in) throws IOException { + value = in.readInt(); + } + + @Override + public EnumSet context() { + return MetaData.ALL_CONTEXTS; + } + + @Override + public Diff diff(final MetaData.Custom previousState) { + return null; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeInt(value); + } + + public static int fromXContent(final XContentParser parser) throws IOException { + XContentParser.Token token; + int value = 0; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + switch (token) { + case FIELD_NAME: + currentFieldName = parser.currentName(); + break; + case VALUE_BOOLEAN: + if (VALUE.match(currentFieldName, parser.getDeprecationHandler())) { + value = parser.intValue(); + } + break; + } + } + return value; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(VALUE.getPreferredName(), value); + return builder; + } + + } + + public static class NodeCustom extends Custom { + + public static final String TYPE = "node"; + + NodeCustom(final int value) { + super(value); + } + + NodeCustom(final StreamInput in) throws IOException { + super(in); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Optional getRequiredFeature() { + return Optional.of("node"); + } + + } + + public static class NodeAndTransportClientCustom extends Custom { + + public static final String TYPE = "node-and-transport-client"; + + NodeAndTransportClientCustom(final int value) { + super(value); + } + + public NodeAndTransportClientCustom(final StreamInput in) throws IOException { + super(in); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Optional getRequiredFeature() { + return Optional.of("node-and-transport-client"); + } + + } + + public abstract static class CustomPlugin extends Plugin { + + private final List namedWritables = new ArrayList<>(); + private final List namedXContents = new ArrayList<>(); + + public CustomPlugin() { + registerBuiltinWritables(); + } + + protected void registerMetaDataCustom( + final String name, final Writeable.Reader reader, final CheckedFunction parser) { + namedWritables.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, name, reader)); + namedXContents.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(name), parser)); + } + + protected abstract void registerBuiltinWritables(); + + protected abstract String getType(); + + protected abstract Custom getInstance(); + + @Override + public List getNamedWriteables() { + return namedWritables; + } + + @Override + public List getNamedXContent() { + return namedXContents; + } + + private final AtomicBoolean installed = new AtomicBoolean(); + + @Override + public Collection createComponents( + final Client client, + final ClusterService clusterService, + final ThreadPool threadPool, + final ResourceWatcherService resourceWatcherService, + final ScriptService scriptService, + final NamedXContentRegistry xContentRegistry, + final Environment environment, + final NodeEnvironment nodeEnvironment, + final NamedWriteableRegistry namedWriteableRegistry) { + clusterService.addListener(event -> { + final ClusterState state = event.state(); + if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) { + return; + } + + final MetaData metaData = state.metaData(); + if (state.nodes().isLocalNodeElectedMaster()) { + if (metaData.custom(getType()) == null) { + if (installed.compareAndSet(false, true)) { + clusterService.submitStateUpdateTask("install-metadata-custom", new ClusterStateUpdateTask(Priority.URGENT) { + + @Override + public ClusterState execute(ClusterState currentState) { + if (currentState.custom(getType()) == null) { + final MetaData.Builder builder = MetaData.builder(currentState.metaData()); + builder.putCustom(getType(), getInstance()); + return ClusterState.builder(currentState).metaData(builder).build(); + } else { + return currentState; + } + } + + @Override + public void onFailure(String source, Exception e) { + throw new AssertionError(e); + } + + }); + } + } + } + + }); + return Collections.emptyList(); + } + } + + public static class NodePlugin extends CustomPlugin { + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + static final Optional NODE_PLUGIN_FEATURE = Optional.of("node"); + + public Optional getFeature() { + return NODE_PLUGIN_FEATURE; + } + + static final int VALUE = randomInt(); + + @Override + protected void registerBuiltinWritables() { + registerMetaDataCustom(NodeCustom.TYPE, NodeCustom::new, parser -> new NodeCustom(NodeCustom.fromXContent(parser))); + } + + @Override + protected String getType() { + return NodeCustom.TYPE; + } + + @Override + protected Custom getInstance() { + return new NodeCustom(VALUE); + } + + } + + public static class NodeAndTransportClientPlugin extends CustomPlugin { + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + static final Optional FEATURE = Optional.of("node-and-transport-client"); + + @Override + protected Optional getFeature() { + return FEATURE; + } + + static final int VALUE = randomInt(); + + @Override + protected void registerBuiltinWritables() { + registerMetaDataCustom( + NodeAndTransportClientCustom.TYPE, + NodeAndTransportClientCustom::new, + parser -> new NodeAndTransportClientCustom(NodeAndTransportClientCustom.fromXContent(parser))); + } + + @Override + protected String getType() { + return NodeAndTransportClientCustom.TYPE; + } + + @Override + protected Custom getInstance() { + return new NodeAndTransportClientCustom(VALUE); + } + + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(NodePlugin.class, NodeAndTransportClientPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return Collections.singletonList(NodeAndTransportClientPlugin.class); + } + + public void testOptionalCustoms() throws Exception { + // ensure that the customs are injected into the cluster state + assertBusy(() -> assertTrue(clusterService().state().metaData().customs().containsKey(NodeCustom.TYPE))); + assertBusy(() -> assertTrue(clusterService().state().metaData().customs().containsKey(NodeAndTransportClientCustom.TYPE))); + final ClusterStateResponse state = internalCluster().transportClient().admin().cluster().prepareState().get(); + final ImmutableOpenMap customs = state.getState().metaData().customs(); + final Set keys = new HashSet<>(Arrays.asList(customs.keys().toArray(String.class))); + assertThat(keys, hasItem(IndexGraveyard.TYPE)); + assertThat(keys, not(hasItem(NodeCustom.TYPE))); + assertThat(keys, hasItem(NodeAndTransportClientCustom.TYPE)); + final MetaData.Custom actual = customs.get(NodeAndTransportClientCustom.TYPE); + assertThat(actual, instanceOf(NodeAndTransportClientCustom.class)); + assertThat(((NodeAndTransportClientCustom)actual).value(), equalTo(NodeAndTransportClientPlugin.VALUE)); + } + +} diff --git a/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java b/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java new file mode 100644 index 0000000000000..c4ecca1516e82 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java @@ -0,0 +1,172 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.Version; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.ClusterState.FeatureAware; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Optional; + +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; + +public class FeatureAwareTests extends ESTestCase { + + abstract static class Custom implements MetaData.Custom { + + private final Version version; + + Custom(final Version version) { + this.version = version; + } + + @Override + public EnumSet context() { + return MetaData.ALL_CONTEXTS; + } + + @Override + public Diff diff(final MetaData.Custom previousState) { + return null; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + return builder; + } + + @Override + public Version getMinimalSupportedVersion() { + return version; + } + + } + + static class NoRequiredFeatureCustom extends Custom { + + NoRequiredFeatureCustom(final Version version) { + super(version); + } + + @Override + public String getWriteableName() { + return "no-required-feature"; + } + + } + + static class RequiredFeatureCustom extends Custom { + + RequiredFeatureCustom(final Version version) { + super(version); + } + + @Override + public String getWriteableName() { + return null; + } + + @Override + public Optional getRequiredFeature() { + return Optional.of("required-feature"); + } + + } + + public void testVersion() { + final Version version = VersionUtils.randomVersion(random()); + for (final Custom custom : Arrays.asList(new NoRequiredFeatureCustom(version), new RequiredFeatureCustom(version))) { + { + final BytesStreamOutput out = new BytesStreamOutput(); + final Version afterVersion = randomVersionBetween(random(), version, Version.CURRENT); + out.setVersion(afterVersion); + if (custom.getRequiredFeature().isPresent()) { + out.setFeatures(Collections.singleton(custom.getRequiredFeature().get())); + } + assertTrue(FeatureAware.shouldSerializeCustom(out, custom)); + } + { + final BytesStreamOutput out = new BytesStreamOutput(); + final Version beforeVersion = + randomVersionBetween(random(), VersionUtils.getFirstVersion(), VersionUtils.getPreviousVersion(version)); + out.setVersion(beforeVersion); + assertFalse(FeatureAware.shouldSerializeCustom(out, custom)); + } + } + } + + public void testFeature() { + final Version version = VersionUtils.randomVersion(random()); + final Version afterVersion = randomVersionBetween(random(), version, Version.CURRENT); + final Custom custom = new RequiredFeatureCustom(version); + { + // the feature is present and the client is not a transport client + final BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(afterVersion); + assertTrue(custom.getRequiredFeature().isPresent()); + out.setFeatures(Collections.singleton(custom.getRequiredFeature().get())); + assertTrue(FeatureAware.shouldSerializeCustom(out, custom)); + } + { + // the feature is present and the client is a transport client + final BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(afterVersion); + assertTrue(custom.getRequiredFeature().isPresent()); + out.setFeatures(new HashSet<>(Arrays.asList(custom.getRequiredFeature().get(), TransportClient.TRANSPORT_CLIENT_FEATURE))); + assertTrue(FeatureAware.shouldSerializeCustom(out, custom)); + } + } + + public void testMissingFeature() { + final Version version = VersionUtils.randomVersion(random()); + final Version afterVersion = randomVersionBetween(random(), version, Version.CURRENT); + final Custom custom = new RequiredFeatureCustom(version); + { + // the feature is missing but we should serialize it anyway because the client is not a transport client + final BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(afterVersion); + assertTrue(FeatureAware.shouldSerializeCustom(out, custom)); + } + { + // the feature is missing and we should not serialize it because the client is a transport client + final BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(afterVersion); + out.setFeatures(Collections.singleton(TransportClient.TRANSPORT_CLIENT_FEATURE)); + assertFalse(FeatureAware.shouldSerializeCustom(out, custom)); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 2cedb5419e08e..7e83e1cdc0bdf 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -227,6 +227,7 @@ public NodeChannels getConnection(DiscoveryNode node) { .streamInput(streamIn); } threadPool.getThreadContext().readHeaders(streamIn); + assertThat(streamIn.readStringArray(), equalTo(new String[0])); // features assertEquals("foobar", streamIn.readString()); Req readReq = new Req(""); readReq.readFrom(streamIn); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 505a5937d290b..8b58cea4d0a54 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -26,7 +26,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.http.HttpHost; import org.apache.lucene.search.Sort; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -68,12 +67,18 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoriesMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -105,6 +110,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; @@ -130,9 +136,11 @@ import org.elasticsearch.indices.IndicesRequestCache; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.store.IndicesStore; +import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.node.NodeMocksPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.ScriptMetaData; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.MockSearchService; import org.elasticsearch.search.SearchHit; @@ -1108,7 +1116,8 @@ protected void ensureClusterSizeConsistency() { protected void ensureClusterStateConsistency() throws IOException { if (cluster() != null && cluster().size() > 0) { final NamedWriteableRegistry namedWriteableRegistry = cluster().getNamedWriteableRegistry(); - ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState(); + final Client masterClient = client(); + ClusterState masterClusterState = masterClient.admin().cluster().prepareState().all().get().getState(); byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState); // remove local node reference masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry); @@ -1124,16 +1133,37 @@ protected void ensureClusterStateConsistency() throws IOException { final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length; // Check that the non-master node has the same version of the cluster state as the master and // that the master node matches the master (otherwise there is no requirement for the cluster state to match) - if (masterClusterState.version() == localClusterState.version() && masterId.equals(localClusterState.nodes().getMasterNodeId())) { + if (masterClusterState.version() == localClusterState.version() + && masterId.equals(localClusterState.nodes().getMasterNodeId())) { try { - assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID()); - // We cannot compare serialization bytes since serialization order of maps is not guaranteed - // but we can compare serialization sizes - they should be the same - assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize); - // Compare JSON serialization - assertNull("clusterstate JSON serialization does not match", differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap)); - } catch (AssertionError error) { - logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}", masterClusterState.toString(), localClusterState.toString()); + assertEquals("cluster state UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID()); + /* + * The cluster state received by the transport client can miss customs that the client does not understand. This + * means that we only expect equality in the cluster state including customs if the master client and the local + * client are of the same type (both or neither are transport clients). Otherwise, we can only assert equality + * modulo non-core customs. + */ + if (isTransportClient(masterClient) == isTransportClient(client)) { + // We cannot compare serialization bytes since serialization order of maps is not guaranteed + // but we can compare serialization sizes - they should be the same + assertEquals("cluster state size does not match", masterClusterStateSize, localClusterStateSize); + // Compare JSON serialization + assertNull( + "cluster state JSON serialization does not match", + differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap)); + } else { + // remove non-core customs and compare the cluster states + assertNull( + "cluster state JSON serialization does not match (after removing some customs)", + differenceBetweenMapsIgnoringArrayOrder( + convertToMap(removePluginCustoms(masterClusterState)), + convertToMap(removePluginCustoms(localClusterState)))); + } + } catch (final AssertionError error) { + logger.error( + "Cluster state from master:\n{}\nLocal cluster state:\n{}", + masterClusterState.toString(), + localClusterState.toString()); throw error; } } @@ -1142,6 +1172,52 @@ protected void ensureClusterStateConsistency() throws IOException { } + /** + * Tests if the client is a transport client or wraps a transport client. + * + * @param client the client to test + * @return true if the client is a transport client or a wrapped transport client + */ + private boolean isTransportClient(final Client client) { + if (TransportClient.class.isAssignableFrom(client.getClass())) { + return true; + } else if (client instanceof RandomizingClient) { + return isTransportClient(((RandomizingClient) client).in()); + } + return false; + } + + private static final Set SAFE_METADATA_CUSTOMS = + Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(IndexGraveyard.TYPE, IngestMetadata.TYPE, RepositoriesMetaData.TYPE, ScriptMetaData.TYPE))); + + private static final Set SAFE_CUSTOMS = + Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(RestoreInProgress.TYPE, SnapshotDeletionsInProgress.TYPE, SnapshotsInProgress.TYPE))); + + /** + * Remove any customs except for customs that we know all clients understand. + * + * @param clusterState the cluster state to remove possibly-unknown customs from + * @return the cluster state with possibly-unknown customs removed + */ + private ClusterState removePluginCustoms(final ClusterState clusterState) { + final ClusterState.Builder builder = ClusterState.builder(clusterState); + clusterState.customs().keysIt().forEachRemaining(key -> { + if (SAFE_CUSTOMS.contains(key) == false) { + builder.removeCustom(key); + } + }); + final MetaData.Builder mdBuilder = MetaData.builder(clusterState.metaData()); + clusterState.metaData().customs().keysIt().forEachRemaining(key -> { + if (SAFE_METADATA_CUSTOMS.contains(key) == false) { + mdBuilder.removeCustom(key); + } + }); + builder.metaData(mdBuilder); + return builder.build(); + } + /** * Ensures the cluster is in a searchable state for the given indices. This means a searchable copy of each * shard is available on the cluster. diff --git a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java index e1a6ba030fde8..4c826101780a4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java +++ b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java @@ -93,4 +93,8 @@ public String toString() { return "randomized(" + super.toString() + ")"; } + public Client in() { + return super.in(); + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 9a44f99c7c6b4..6c53ca6edb387 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -23,7 +23,6 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.Constants; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -32,7 +31,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.network.NetworkService; @@ -45,6 +43,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.node.Node; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java index 56475de123f3c..d9f7068b2181e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensesMetaData.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.license.License.OperationMode; +import org.elasticsearch.xpack.core.XPackPlugin; import java.io.IOException; import java.util.EnumSet; @@ -23,7 +24,7 @@ /** * Contains metadata about registered licenses */ -public class LicensesMetaData extends AbstractNamedDiffable implements MetaData.Custom, +public class LicensesMetaData extends AbstractNamedDiffable implements XPackPlugin.XPackMetaDataCustom, MergableCustomMetaData { public static final String TYPE = "licenses"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 0b22cd86fe6a0..a96de96fd4f44 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.license.DeleteLicenseAction; @@ -28,6 +27,7 @@ import org.elasticsearch.license.PostStartBasicAction; import org.elasticsearch.license.PostStartTrialAction; import org.elasticsearch.license.PutLicenseAction; +import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.Plugin; @@ -61,7 +61,6 @@ import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetFiltersAction; import org.elasticsearch.xpack.core.ml.action.GetInfluencersAction; -import org.elasticsearch.xpack.core.ml.action.MlInfoAction; import org.elasticsearch.xpack.core.ml.action.GetJobsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; @@ -69,6 +68,7 @@ import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction; +import org.elasticsearch.xpack.core.ml.action.MlInfoAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction; @@ -91,7 +91,6 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.core.monitoring.MonitoringFeatureSetUsage; -import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.xpack.core.rollup.RollupFeatureSetUsage; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction; @@ -133,6 +132,8 @@ import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport; import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.core.ssl.action.GetCertificateInfoAction; +import org.elasticsearch.xpack.core.upgrade.actions.IndexUpgradeAction; +import org.elasticsearch.xpack.core.upgrade.actions.IndexUpgradeInfoAction; import org.elasticsearch.xpack.core.watcher.WatcherFeatureSetUsage; import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import org.elasticsearch.xpack.core.watcher.transport.actions.ack.AckWatchAction; @@ -143,18 +144,25 @@ import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchAction; import org.elasticsearch.xpack.core.watcher.transport.actions.service.WatcherServiceAction; import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsAction; -import org.elasticsearch.xpack.core.upgrade.actions.IndexUpgradeAction; -import org.elasticsearch.xpack.core.upgrade.actions.IndexUpgradeInfoAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Supplier; public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPlugin { + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + static Optional X_PACK_FEATURE = Optional.of("x-pack"); + + @Override + protected Optional getFeature() { + return X_PACK_FEATURE; + } + private final Settings settings; public XPackClientPlugin(final Settings settings) { @@ -185,11 +193,10 @@ public Settings additionalSettings() { static Settings additionalSettings(final Settings settings, final boolean enabled, final boolean transportClientMode) { if (enabled && transportClientMode) { - final Settings.Builder builder = Settings.builder(); - builder.put(SecuritySettings.addTransportSettings(settings)); - builder.put(SecuritySettings.addUserSettings(settings)); - builder.put(ThreadContext.PREFIX + "." + "has_xpack", true); - return builder.build(); + return Settings.builder() + .put(SecuritySettings.addTransportSettings(settings)) + .put(SecuritySettings.addUserSettings(settings)) + .build(); } else { return Settings.EMPTY; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index 9568a36551c83..602f4bdbc079b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -59,19 +59,15 @@ import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.core.watcher.WatcherMetaData; -import javax.security.auth.DestroyFailedException; - -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.security.AccessController; -import java.security.GeneralSecurityException; import java.security.PrivilegedAction; import java.time.Clock; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -316,4 +312,23 @@ public static Path resolveConfigFile(Environment env, String name) { } return config; } + + public interface XPackClusterStateCustom extends ClusterState.Custom { + + @Override + default Optional getRequiredFeature() { + return XPackClientPlugin.X_PACK_FEATURE; + } + + } + + public interface XPackMetaDataCustom extends MetaData.Custom { + + @Override + default Optional getRequiredFeature() { + return XPackClientPlugin.X_PACK_FEATURE; + } + + } + } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 6af323f1510e4..861f386a90966 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -27,6 +27,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; @@ -53,7 +54,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -public class MlMetadata implements MetaData.Custom { +public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { private static final ParseField JOBS_FIELD = new ParseField("jobs"); private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/TokenMetaData.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/TokenMetaData.java index 6bd6228f2efe1..46111b9b16cd1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/TokenMetaData.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authc/TokenMetaData.java @@ -12,13 +12,14 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.XPackPlugin; import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; -public final class TokenMetaData extends AbstractNamedDiffable implements ClusterState.Custom { +public final class TokenMetaData extends AbstractNamedDiffable implements XPackPlugin.XPackClusterStateCustom { /** * The type of {@link ClusterState} data. diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java index 3a490f08b79e5..9f014dee843c5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/WatcherMetaData.java @@ -13,12 +13,13 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.XPackPlugin; import java.io.IOException; import java.util.EnumSet; import java.util.Objects; -public class WatcherMetaData extends AbstractNamedDiffable implements MetaData.Custom { +public class WatcherMetaData extends AbstractNamedDiffable implements XPackPlugin.XPackMetaDataCustom { public static final String TYPE = "watcher"; From 82ed4d4ecb7a3bf842f1acddec6a267592e1c57e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Jun 2018 05:59:47 -0400 Subject: [PATCH 02/11] Remove irrelevant lines from test --- .../elasticsearch/client/transport/TransportClientTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java index 583dca95f65bd..1dc30e951b6d3 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java +++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java @@ -75,8 +75,6 @@ public void testSettingsContainsTransportClient() { final Settings settings = TcpTransport.DEFAULT_FEATURES_SETTING.get(client.settings()); assertThat(settings.keySet(), hasItem("transport_client")); assertThat(settings.get("transport_client"), equalTo("true")); - final ThreadContext threadContext = client.threadPool().getThreadContext(); - assertEquals("true", threadContext.getHeader("test")); } } From 6d529fee20a88a87592337e9a49fd24a3bb06b05 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Jun 2018 06:02:41 -0400 Subject: [PATCH 03/11] Add comment on use of TreeSet --- .../src/main/java/org/elasticsearch/transport/TcpTransport.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 82b7aa8aed428..d3d1b93810d68 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -255,6 +255,7 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo throw new IllegalArgumentException("feature settings must have default [true] value"); } }); + // use a sorted set to present the features in a consistent order this.features = new TreeSet<>(defaultFeatures.names()).toArray(new String[defaultFeatures.names().size()]); } } From 66f2ea95dc29438b1772321fc94afd7e13cae9d7 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Jun 2018 06:07:49 -0400 Subject: [PATCH 04/11] Remove unneeded parser in test --- .../elasticsearch/cluster/ClusterStateIT.java | 30 ++++++------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java index a66136c8f8234..aed3417939dae 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java @@ -106,25 +106,6 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeInt(value); } - public static int fromXContent(final XContentParser parser) throws IOException { - XContentParser.Token token; - int value = 0; - String currentFieldName = null; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - switch (token) { - case FIELD_NAME: - currentFieldName = parser.currentName(); - break; - case VALUE_BOOLEAN: - if (VALUE.match(currentFieldName, parser.getDeprecationHandler())) { - value = parser.intValue(); - } - break; - } - } - return value; - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(VALUE.getPreferredName(), value); @@ -276,7 +257,12 @@ public Optional getFeature() { @Override protected void registerBuiltinWritables() { - registerMetaDataCustom(NodeCustom.TYPE, NodeCustom::new, parser -> new NodeCustom(NodeCustom.fromXContent(parser))); + registerMetaDataCustom( + NodeCustom.TYPE, + NodeCustom::new, + parser -> { + throw new IOException(new UnsupportedOperationException()); + }); } @Override @@ -308,7 +294,9 @@ protected void registerBuiltinWritables() { registerMetaDataCustom( NodeAndTransportClientCustom.TYPE, NodeAndTransportClientCustom::new, - parser -> new NodeAndTransportClientCustom(NodeAndTransportClientCustom.fromXContent(parser))); + parser -> { + throw new IOException(new UnsupportedOperationException()); + }); } @Override From 25a861c2a24bcf2ebf96daf1f12d80c45b9e9f28 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Jun 2018 06:08:43 -0400 Subject: [PATCH 05/11] Remove unneeded fields --- .../java/org/elasticsearch/cluster/ClusterStateIT.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java index aed3417939dae..d93a0680fbf98 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java @@ -246,11 +246,8 @@ public void onFailure(String source, Exception e) { public static class NodePlugin extends CustomPlugin { - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - static final Optional NODE_PLUGIN_FEATURE = Optional.of("node"); - public Optional getFeature() { - return NODE_PLUGIN_FEATURE; + return Optional.of("node"); } static final int VALUE = randomInt(); @@ -279,12 +276,9 @@ protected Custom getInstance() { public static class NodeAndTransportClientPlugin extends CustomPlugin { - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - static final Optional FEATURE = Optional.of("node-and-transport-client"); - @Override protected Optional getFeature() { - return FEATURE; + return Optional.of("node-and-transport-client"); } static final int VALUE = randomInt(); From 0b39ce997889035d44100c3a7cb8394eee155d76 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Jun 2018 06:11:07 -0400 Subject: [PATCH 06/11] Guard feature de-serialization by version --- .../org/elasticsearch/transport/netty4/ESLoggingHandler.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java index 18ab5cc0bd169..62e52a8726fa2 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java @@ -105,7 +105,9 @@ else if (readableBytes >= TcpHeader.HEADER_SIZE) { context.readHeaders(in); } // now we decode the features - in.readStringArray(); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + in.readStringArray(); + } // now we can decode the action name sb.append(", action: ").append(in.readString()); } From 26071ff630e5b20be17a4f909416ce2e06691d59 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Jun 2018 06:20:01 -0400 Subject: [PATCH 07/11] Add Javadocs --- .../common/io/stream/StreamOutput.java | 15 +++++++++++++++ .../java/org/elasticsearch/plugins/Plugin.java | 5 +++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index f9617be4b56ed..e8c4d197fda62 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -30,6 +30,8 @@ import org.apache.lucene.util.BytesRefBuilder; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.geo.GeoPoint; @@ -116,11 +118,24 @@ public void setVersion(Version version) { this.version = version; } + /** + * Test if the stream has the specified feature. Features are used when serializing {@link ClusterState.Custom} or + * {@link MetaData.Custom}; see also {@link ClusterState.FeatureAware}. + * + * @param feature the feature to test + * @return true if the stream has the specified feature + */ public boolean hasFeature(final String feature) { return this.features.contains(feature); } + /** + * Set the features on the stream. See {@link StreamOutput#hasFeature(String)}. + * + * @param features the features on the stream + */ public void setFeatures(final Set features) { + assert this.features.isEmpty() : this.features; this.features = Collections.unmodifiableSet(new HashSet<>(features)); } diff --git a/server/src/main/java/org/elasticsearch/plugins/Plugin.java b/server/src/main/java/org/elasticsearch/plugins/Plugin.java index 6294f942d08cb..0ef703448b799 100644 --- a/server/src/main/java/org/elasticsearch/plugins/Plugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/Plugin.java @@ -23,6 +23,7 @@ import org.elasticsearch.bootstrap.BootstrapCheck; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -81,8 +82,8 @@ public abstract class Plugin implements Closeable { /** - * A feature exposed by the plugin. This should be used if a plugin exposes {@link org.elasticsearch.cluster.ClusterState.Custom} or - * {@link MetaData.Custom}; see also {@link org.elasticsearch.cluster.ClusterState.FeatureAware}. + * A feature exposed by the plugin. This should be used if a plugin exposes {@link ClusterState.Custom} or {@link MetaData.Custom}; see + * also {@link ClusterState.FeatureAware}. * * @return a feature set represented by this plugin, or the empty optional if the plugin does not expose cluster state or metadata * customs From 8be176c24b6aa9e838e51b7c4a1ee0daeffaaecb Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Jun 2018 06:28:33 -0400 Subject: [PATCH 08/11] Randomly add feature to test --- .../test/java/org/elasticsearch/cluster/FeatureAwareTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java b/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java index c4ecca1516e82..696bf2f82faaa 100644 --- a/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/FeatureAwareTests.java @@ -123,6 +123,9 @@ public void testVersion() { final Version beforeVersion = randomVersionBetween(random(), VersionUtils.getFirstVersion(), VersionUtils.getPreviousVersion(version)); out.setVersion(beforeVersion); + if (custom.getRequiredFeature().isPresent() && randomBoolean()) { + out.setFeatures(Collections.singleton(custom.getRequiredFeature().get())); + } assertFalse(FeatureAware.shouldSerializeCustom(out, custom)); } } From 1f6682970001ac2430ece164630e74493ffc820b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Jun 2018 06:35:52 -0400 Subject: [PATCH 09/11] Change prefix of feature setting --- .../src/main/java/org/elasticsearch/transport/TcpTransport.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index d3d1b93810d68..df46c945540bf 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -191,7 +191,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9); private static final BytesReference EMPTY_BYTES_REFERENCE = new BytesArray(new byte[0]); - public static final String FEATURE_PREFIX = "client.features"; + public static final String FEATURE_PREFIX = "transport.features"; public static final Setting DEFAULT_FEATURES_SETTING = Setting.groupSetting(FEATURE_PREFIX + ".", Setting.Property.NodeScope); private final String[] features; From 546fe414e5af5ad8858d54a7a202c6196524fa36 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Jun 2018 08:48:56 -0400 Subject: [PATCH 10/11] Randomize one or none features --- .../java/org/elasticsearch/cluster/ClusterStateIT.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java index d93a0680fbf98..07a974a2ca771 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java @@ -155,9 +155,16 @@ public String getWriteableName() { return TYPE; } + /* + * This custom should always be returned yet we randomize whether it has a required feature that the client is expected to have + * versus not requiring any feature. We use a field to make the random choice exactly once. + */ + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + private final Optional requiredFeature = randomBoolean() ? Optional.empty() : Optional.of("node-and-transport-client"); + @Override public Optional getRequiredFeature() { - return Optional.of("node-and-transport-client"); + return requiredFeature; } } From a18f16628f6045dd57e0b3e5dce790b8afd623f4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Jun 2018 08:49:46 -0400 Subject: [PATCH 11/11] Revert version change for persistent tasks --- .../elasticsearch/persistent/PersistentTasksCustomMetaData.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java index b9c5b32306397..bdee87cc77c51 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java @@ -190,7 +190,7 @@ public long getNumberOfTasksOnNode(String nodeId, String taskName) { @Override public Version getMinimalSupportedVersion() { - return Version.V_6_3_0; + return Version.V_5_4_0; } @Override