Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce client feature tracking #31020

Merged
merged 11 commits into from
Jun 1, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ else if (readableBytes >= TcpHeader.HEADER_SIZE) {
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
context.readHeaders(in);
}
// now we decode the features
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
in.readStringArray();
}
// now we can decode the action name
sb.append(", action: ").append(in.readString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public abstract class TransportClient extends AbstractClient {
public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope);

public static final String TRANSPORT_CLIENT_FEATURE = "transport_client";

private static PluginsService newPluginService(final Settings settings, Collection<Class<? extends Plugin>> plugins) {
final Settings.Builder settingsBuilder = Settings.builder()
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
Expand Down Expand Up @@ -130,8 +132,12 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
}
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).put(ThreadContext.PREFIX
+ "." + "transport_client", true).build();
final Settings settings =
Settings.builder()
.put(defaultSettings)
.put(pluginsService.updatedSettings())
.put(TcpTransport.FEATURE_PREFIX + "." + TRANSPORT_CLIENT_FEATURE, true)
.build();
final List<Closeable> resourcesToClose = new ArrayList<>();
final ThreadPool threadPool = new ThreadPool(settings);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
Expand Down
66 changes: 61 additions & 5 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -90,7 +92,51 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>

public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();

public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
/**
* An interface that implementors use when a class requires a client to maybe have a feature.
*/
public interface FeatureAware {

/**
* An optional feature that is required for the client to have.
*
* @return an empty optional if no feature is required otherwise a string representing the required feature
*/
default Optional<String> getRequiredFeature() {
return Optional.empty();
}

/**
* Tests whether or not the custom should be serialized. The criteria are:
* <ul>
* <li>the output stream must be at least the minimum supported version of the custom</li>
* <li>the output stream must have the feature required by the custom (if any) or not be a transport client</li>
* </ul>
* <p>
* That is, we only serialize customs to clients than can understand the custom based on the version of the client and the features
* that the client has. For transport clients we can be lenient in requiring a feature in which case we do not send the custom but
* for connected nodes we always require that the node has the required feature.
*
* @param out the output stream
* @param custom the custom to serialize
* @param <T> the type of the custom
* @return true if the custom should be serialized and false otherwise
*/
static <T extends NamedDiffable & FeatureAware> boolean shouldSerializeCustom(final StreamOutput out, final T custom) {
if (out.getVersion().before(custom.getMinimalSupportedVersion())) {
return false;
}
if (custom.getRequiredFeature().isPresent()) {
final String requiredFeature = custom.getRequiredFeature().get();
// if it is a transport client we are lenient yet for a connected node it must have the required feature
return out.hasFeature(requiredFeature) || out.hasFeature(TransportClient.TRANSPORT_CLIENT_FEATURE) == false;
}
return true;
}

}

public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, FeatureAware {

/**
* Returns <code>true</code> iff this {@link Custom} is private to the cluster and should never be send to a client.
Expand All @@ -99,6 +145,7 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
default boolean isPrivate() {
return false;
}

}

private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
Expand Down Expand Up @@ -244,6 +291,15 @@ public String toString() {
sb.append("isa_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n");
}
}
if (metaData.customs().isEmpty() == false) {
sb.append("metadata customs:\n");
for (final ObjectObjectCursor<String, MetaData.Custom> cursor : metaData.customs()) {
final String type = cursor.key;
final MetaData.Custom custom = cursor.value;
sb.append(TAB).append(type).append(": ").append(custom);
}
sb.append("\n");
}
sb.append(blocks());
sb.append(nodes());
sb.append(routingTable());
Expand Down Expand Up @@ -691,14 +747,14 @@ public void writeTo(StreamOutput out) throws IOException {
blocks.writeTo(out);
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.FeatureAware;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.cluster.DiffableUtils;
Expand Down Expand Up @@ -117,9 +119,10 @@ public enum XContentContext {
*/
public static EnumSet<XContentContext> ALL_CONTEXTS = EnumSet.allOf(XContentContext.class);

public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, ClusterState.FeatureAware {

EnumSet<XContentContext> context();

}

public static final Setting<Boolean> SETTING_READ_ONLY_SETTING =
Expand Down Expand Up @@ -782,14 +785,14 @@ public void writeTo(StreamOutput out) throws IOException {
}
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
Expand Down Expand Up @@ -58,10 +60,12 @@
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;

Expand Down Expand Up @@ -98,6 +102,7 @@ public abstract class StreamOutput extends OutputStream {
}

private Version version = Version.CURRENT;
private Set<String> features = Collections.emptySet();

/**
* The version of the node on the other side of this stream.
Expand All @@ -113,6 +118,27 @@ public void setVersion(Version version) {
this.version = version;
}

/**
* Test if the stream has the specified feature. Features are used when serializing {@link ClusterState.Custom} or
* {@link MetaData.Custom}; see also {@link ClusterState.FeatureAware}.
*
* @param feature the feature to test
* @return true if the stream has the specified feature
*/
public boolean hasFeature(final String feature) {
return this.features.contains(feature);
}

/**
* Set the features on the stream. See {@link StreamOutput#hasFeature(String)}.
*
* @param features the features on the stream
*/
public void setFeatures(final Set<String> features) {
assert this.features.isEmpty() : this.features;
this.features = Collections.unmodifiableSet(new HashSet<>(features));
}

public long position() throws IOException {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
EsExecutors.PROCESSORS_SETTING,
ThreadContext.DEFAULT_HEADERS_SETTING,
TcpTransport.DEFAULT_FEATURES_SETTING,
Loggers.LOG_DEFAULT_LEVEL_SETTING,
Loggers.LOG_LEVEL_SETTING,
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -56,6 +57,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.UnaryOperator;

/**
Expand All @@ -79,6 +81,17 @@
*/
public abstract class Plugin implements Closeable {

/**
* A feature exposed by the plugin. This should be used if a plugin exposes {@link ClusterState.Custom} or {@link MetaData.Custom}; see
* also {@link ClusterState.FeatureAware}.
*
* @return a feature set represented by this plugin, or the empty optional if the plugin does not expose cluster state or metadata
* customs
*/
protected Optional<String> getFeature() {
return Optional.empty();
}

/**
* Node level guice modules.
*/
Expand Down
25 changes: 23 additions & 2 deletions server/src/main/java/org/elasticsearch/plugins/PluginsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.transport.TcpTransport;

import java.io.IOException;
import java.lang.reflect.Constructor;
Expand All @@ -57,16 +59,17 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.common.io.FileSystemUtils.isAccessibleDirectory;

Expand Down Expand Up @@ -196,6 +199,7 @@ private static void logPluginInfo(final List<PluginInfo> pluginInfos, final Stri

public Settings updatedSettings() {
Map<String, String> foundSettings = new HashMap<>();
final Map<String, String> features = new TreeMap<>();
final Settings.Builder builder = Settings.builder();
for (Tuple<PluginInfo, Plugin> plugin : plugins) {
Settings settings = plugin.v2().additionalSettings();
Expand All @@ -207,6 +211,23 @@ public Settings updatedSettings() {
}
}
builder.put(settings);
final Optional<String> maybeFeature = plugin.v2().getFeature();
if (maybeFeature.isPresent()) {
final String feature = maybeFeature.get();
if (features.containsKey(feature)) {
final String message = String.format(
Locale.ROOT,
"duplicate feature [%s] in plugin [%s], already added in [%s]",
feature,
plugin.v1().getName(),
features.get(feature));
throw new IllegalArgumentException(message);
}
features.put(feature, plugin.v1().getName());
}
}
for (final String feature : features.keySet()) {
builder.put(TcpTransport.FEATURE_PREFIX + "." + feature, true);
}
return builder.put(this.settings).build();
}
Expand Down
Loading