Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Un-hardcode SecurityIndexManager to handle generic indices #40064

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ Collection<Object> createComponents(Client client, ThreadPool threadPool, Cluste
components.add(auditTrailService);
this.auditTrailService.set(auditTrailService);

securityIndex.set(new SecurityIndexManager(client, SecurityIndexManager.SECURITY_INDEX_NAME, clusterService));
securityIndex.set(SecurityIndexManager.buildSecurityIndexManager(client, clusterService));

final TokenService tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex.get(), clusterService);
this.tokenService.set(tokenService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand All @@ -72,7 +73,7 @@
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

/**
* Manages the lifecycle of a single index, its template, mapping and and data upgrades/migrations.
* Manages the lifecycle of a single index, mapping and and data upgrades/migrations.
*/
public class SecurityIndexManager implements ClusterStateListener {

Expand All @@ -82,28 +83,41 @@ public class SecurityIndexManager implements ClusterStateListener {
public static final String TEMPLATE_VERSION_PATTERN = Pattern.quote("${security.template.version}");
public static final String SECURITY_TEMPLATE_NAME = "security-index-template";
public static final String SECURITY_INDEX_NAME = ".security";
private static final Logger LOGGER = LogManager.getLogger(SecurityIndexManager.class);
private static final Logger logger = LogManager.getLogger(SecurityIndexManager.class);

private final String indexName;
private final String aliasName;
private final String internalIndexName;
private final int internalIndexFormat;
private final Supplier<byte[]> mappingSourceSupplier;
private final Client client;

private final List<BiConsumer<State, State>> stateChangeListeners = new CopyOnWriteArrayList<>();

private volatile State indexState;

public SecurityIndexManager(Client client, String indexName, ClusterService clusterService) {
this(client, indexName, State.UNRECOVERED_STATE);
public static SecurityIndexManager buildSecurityIndexManager(Client client, ClusterService clusterService) {
return new SecurityIndexManager(client, SECURITY_INDEX_NAME, INTERNAL_SECURITY_INDEX, INTERNAL_INDEX_FORMAT,
SecurityIndexManager::readSecurityTemplateAsBytes, clusterService);
}

private SecurityIndexManager(Client client, String aliasName, String internalIndexName, int internalIndexFormat,
Supplier<byte[]> mappingSourceSupplier, ClusterService clusterService) {
this(client, aliasName, internalIndexName, internalIndexFormat, mappingSourceSupplier, State.UNRECOVERED_STATE);
clusterService.addListener(this);
}

private SecurityIndexManager(Client client, String indexName, State indexState) {
this.client = client;
this.indexName = indexName;
private SecurityIndexManager(Client client, String aliasName, String internalIndexName, int internalIndexFormat,
Supplier<byte[]> mappingSourceSupplier, State indexState) {
this.aliasName = aliasName;
this.internalIndexName = internalIndexName;
this.internalIndexFormat = internalIndexFormat;
this.mappingSourceSupplier = mappingSourceSupplier;
this.indexState = indexState;
this.client = client;
}

public SecurityIndexManager freeze() {
return new SecurityIndexManager(null, indexName, indexState);
return new SecurityIndexManager(null, aliasName, internalIndexName, internalIndexFormat, mappingSourceSupplier, indexState);
}

public boolean checkMappingVersion(Predicate<Version> requiredVersion) {
Expand Down Expand Up @@ -143,9 +157,10 @@ public ElasticsearchException getUnavailableReason() {
}

if (localState.indexExists) {
return new UnavailableShardsException(null, "at least one primary shard for the security index is unavailable");
return new UnavailableShardsException(null,
"at least one primary shard for the index [" + localState.concreteIndexName + "] is unavailable");
} else {
return new IndexNotFoundException(SECURITY_INDEX_NAME);
return new IndexNotFoundException(localState.concreteIndexName);
}
}

Expand All @@ -163,20 +178,20 @@ public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think we don't have the
// .security index but they may not have been restored from the cluster state on disk
LOGGER.debug("security index manager waiting until state has been recovered");
logger.debug("security index manager waiting until state has been recovered");
return;
}
final State previousState = indexState;
final IndexMetaData indexMetaData = resolveConcreteIndex(indexName, event.state().metaData());
final IndexMetaData indexMetaData = resolveConcreteIndex(aliasName, event.state().metaData());
final boolean indexExists = indexMetaData != null;
final boolean isIndexUpToDate = indexExists == false ||
INDEX_FORMAT_SETTING.get(indexMetaData.getSettings()).intValue() == INTERNAL_INDEX_FORMAT;
INDEX_FORMAT_SETTING.get(indexMetaData.getSettings()).intValue() == internalIndexFormat;
final boolean indexAvailable = checkIndexAvailable(event.state());
final boolean mappingIsUpToDate = indexExists == false || checkIndexMappingUpToDate(event.state());
final Version mappingVersion = oldestIndexMappingVersion(event.state());
final ClusterHealthStatus indexStatus = indexMetaData == null ? null :
new ClusterIndexHealth(indexMetaData, event.state().getRoutingTable().index(indexMetaData.getIndex())).getStatus();
final String concreteIndexName = indexMetaData == null ? INTERNAL_SECURITY_INDEX : indexMetaData.getIndex().getName();
final String concreteIndexName = indexMetaData == null ? internalIndexName : indexMetaData.getIndex().getName();
final State newState = new State(indexExists, isIndexUpToDate, indexAvailable, mappingIsUpToDate, mappingVersion, concreteIndexName,
indexStatus);
this.indexState = newState;
Expand All @@ -193,61 +208,55 @@ private boolean checkIndexAvailable(ClusterState state) {
if (routingTable != null && routingTable.allPrimaryShardsActive()) {
return true;
}
LOGGER.debug("Security index [{}] is not yet active", indexName);
logger.debug("Index [{}] is not yet active", aliasName);
return false;
}

/**
* Returns the routing-table for this index, or <code>null</code> if the index does not exist.
*/
private IndexRoutingTable getIndexRoutingTable(ClusterState clusterState) {
IndexMetaData metaData = resolveConcreteIndex(indexName, clusterState.metaData());
IndexMetaData metaData = resolveConcreteIndex(aliasName, clusterState.metaData());
if (metaData == null) {
return null;
} else {
return clusterState.routingTable().index(metaData.getIndex());
}
}

public static boolean checkTemplateExistsAndVersionMatches(
String templateName, ClusterState state, Logger logger, Predicate<Version> predicate) {

return TemplateUtils.checkTemplateExistsAndVersionMatches(templateName, SECURITY_VERSION_STRING,
state, logger, predicate);
public static boolean checkTemplateExistsAndVersionMatches(String templateName, ClusterState state, Logger logger,
Predicate<Version> predicate) {
return TemplateUtils.checkTemplateExistsAndVersionMatches(templateName, SECURITY_VERSION_STRING, state, logger, predicate);
}

private boolean checkIndexMappingUpToDate(ClusterState clusterState) {
return checkIndexMappingVersionMatches(clusterState, Version.CURRENT::equals);
}

private boolean checkIndexMappingVersionMatches(ClusterState clusterState,
Predicate<Version> predicate) {
return checkIndexMappingVersionMatches(indexName, clusterState, LOGGER, predicate);
private boolean checkIndexMappingVersionMatches(ClusterState clusterState, Predicate<Version> predicate) {
return checkIndexMappingVersionMatches(aliasName, clusterState, logger, predicate);
}

public static boolean checkIndexMappingVersionMatches(String indexName,
ClusterState clusterState, Logger logger,
public static boolean checkIndexMappingVersionMatches(String indexName, ClusterState clusterState, Logger logger,
Predicate<Version> predicate) {
return loadIndexMappingVersions(indexName, clusterState, logger)
.stream().allMatch(predicate);
return loadIndexMappingVersions(indexName, clusterState, logger).stream().allMatch(predicate);
}

private Version oldestIndexMappingVersion(ClusterState clusterState) {
final Set<Version> versions = loadIndexMappingVersions(indexName, clusterState, LOGGER);
final Set<Version> versions = loadIndexMappingVersions(aliasName, clusterState, logger);
return versions.stream().min(Version::compareTo).orElse(null);
}

private static Set<Version> loadIndexMappingVersions(String indexName,
ClusterState clusterState, Logger logger) {
private static Set<Version> loadIndexMappingVersions(String aliasName, ClusterState clusterState, Logger logger) {
Set<Version> versions = new HashSet<>();
IndexMetaData indexMetaData = resolveConcreteIndex(indexName, clusterState.metaData());
IndexMetaData indexMetaData = resolveConcreteIndex(aliasName, clusterState.metaData());
if (indexMetaData != null) {
for (Object object : indexMetaData.getMappings().values().toArray()) {
MappingMetaData mappingMetaData = (MappingMetaData) object;
if (mappingMetaData.type().equals(MapperService.DEFAULT_MAPPING)) {
continue;
}
versions.add(readMappingVersion(indexName, mappingMetaData, logger));
versions.add(readMappingVersion(aliasName, mappingMetaData, logger));
}
}
return versions;
Expand All @@ -270,8 +279,7 @@ private static IndexMetaData resolveConcreteIndex(final String indexOrAliasName,
return null;
}

private static Version readMappingVersion(String indexName, MappingMetaData mappingMetaData,
Logger logger) {
private static Version readMappingVersion(String indexName, MappingMetaData mappingMetaData, Logger logger) {
try {
Map<String, Object> meta =
(Map<String, Object>) mappingMetaData.sourceAsMap().get("_meta");
Expand All @@ -289,17 +297,17 @@ private static Version readMappingVersion(String indexName, MappingMetaData mapp
}

/**
* Validates the security index is up to date and does not need to migrated. If it is not, the
* consumer is called with an exception. If the security index is up to date, the runnable will
* Validates that the index is up to date and does not need to be migrated. If it is not, the
* consumer is called with an exception. If the index is up to date, the runnable will
* be executed. <b>NOTE:</b> this method does not check the availability of the index; this check
* is left to the caller so that this condition can be handled appropriately.
*/
public void checkIndexVersionThenExecute(final Consumer<Exception> consumer, final Runnable andThen) {
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
if (indexState.indexExists && indexState.isIndexUpToDate == false) {
consumer.accept(new IllegalStateException(
"Security index is not on the current version. Security features relying on the index will not be available until " +
"the upgrade API is run on the security index"));
"Index [" + indexState.concreteIndexName + "] is not on the current version. Security features relying on the index"
+ " will not be available until the upgrade API is run on the index"));
} else {
andThen.run();
}
Expand All @@ -313,17 +321,20 @@ public void prepareIndexIfNeededThenExecute(final Consumer<Exception> consumer,
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
// TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings)
if (indexState == State.UNRECOVERED_STATE) {
consumer.accept(new ElasticsearchStatusException("Cluster state has not been recovered yet, cannot write to the security index",
consumer.accept(new ElasticsearchStatusException(
"Cluster state has not been recovered yet, cannot write to the [" + indexState.concreteIndexName + "] index",
RestStatus.SERVICE_UNAVAILABLE));
} else if (indexState.indexExists && indexState.isIndexUpToDate == false) {
consumer.accept(new IllegalStateException(
"Security index is not on the current version. Security features relying on the index will not be available until " +
"the upgrade API is run on the security index"));
"Index [" + indexState.concreteIndexName + "] is not on the current version."
+ "Security features relying on the index will not be available until the upgrade API is run on the index"));
} else if (indexState.indexExists == false) {
LOGGER.info("security index does not exist. Creating [{}] with alias [{}]", INTERNAL_SECURITY_INDEX, SECURITY_INDEX_NAME);
Tuple<String, Settings> mappingAndSettings = loadMappingAndSettingsSourceFromTemplate();
CreateIndexRequest request = new CreateIndexRequest(INTERNAL_SECURITY_INDEX)
.alias(new Alias(SECURITY_INDEX_NAME))
assert indexState.concreteIndexName != null;
logger.info("security index does not exist. Creating [{}] with alias [{}]", indexState.concreteIndexName, this.aliasName);
final byte[] mappingSource = mappingSourceSupplier.get();
final Tuple<String, Settings> mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource);
CreateIndexRequest request = new CreateIndexRequest(indexState.concreteIndexName)
.alias(new Alias(this.aliasName))
.mapping(MapperService.SINGLE_MAPPING_NAME, mappingAndSettings.v1(), XContentType.JSON)
.waitForActiveShards(ActiveShardCount.ALL)
.settings(mappingAndSettings.v2());
Expand Down Expand Up @@ -351,11 +362,11 @@ public void onFailure(Exception e) {
}
}, client.admin().indices()::create);
} else if (indexState.mappingUpToDate == false) {
LOGGER.info(
"security index [{}] (alias [{}]) is not up to date. Updating mapping", indexState.concreteIndexName, SECURITY_INDEX_NAME);

logger.info("Index [{}] (alias [{}]) is not up to date. Updating mapping", indexState.concreteIndexName, this.aliasName);
final byte[] mappingSource = mappingSourceSupplier.get();
final Tuple<String, Settings> mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource);
PutMappingRequest request = new PutMappingRequest(indexState.concreteIndexName)
.source(loadMappingAndSettingsSourceFromTemplate().v1(), XContentType.JSON)
.source(mappingAndSettings.v1(), XContentType.JSON)
.type(MapperService.SINGLE_MAPPING_NAME);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
ActionListener.<AcknowledgedResponse>wrap(putMappingResponse -> {
Expand All @@ -370,11 +381,28 @@ public void onFailure(Exception e) {
}
}

private Tuple<String, Settings> loadMappingAndSettingsSourceFromTemplate() {
final byte[] template = TemplateUtils.loadTemplate("/" + SECURITY_TEMPLATE_NAME + ".json", Version.CURRENT.toString(),
/**
* Return true if the state moves from an unhealthy ("RED") index state to a healthy ("non-RED") state.
*/
public static boolean isMoveFromRedToNonRed(State previousState, State currentState) {
return (previousState.indexStatus == null || previousState.indexStatus == ClusterHealthStatus.RED)
&& currentState.indexStatus != null && currentState.indexStatus != ClusterHealthStatus.RED;
}

/**
* Return true if the state moves from the index existing to the index not existing.
*/
public static boolean isIndexDeleted(State previousState, State currentState) {
return previousState.indexStatus != null && currentState.indexStatus == null;
}

private static byte[] readSecurityTemplateAsBytes() {
return TemplateUtils.loadTemplate("/" + SECURITY_TEMPLATE_NAME + ".json", Version.CURRENT.toString(),
SecurityIndexManager.TEMPLATE_VERSION_PATTERN).getBytes(StandardCharsets.UTF_8);
final PutIndexTemplateRequest request = new PutIndexTemplateRequest(SECURITY_TEMPLATE_NAME).source(template, XContentType.JSON);
}

private static Tuple<String, Settings> parseMappingAndSettingsFromTemplateBytes(byte[] template) {
final PutIndexTemplateRequest request = new PutIndexTemplateRequest("name_is_not_important").source(template, XContentType.JSON);
final String mappingSource = request.mappings().get(MapperService.SINGLE_MAPPING_NAME);
try (XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappingSource)) {
Expand All @@ -391,21 +419,6 @@ private Tuple<String, Settings> loadMappingAndSettingsSourceFromTemplate() {
}
}

/**
* Return true if the state moves from an unhealthy ("RED") index state to a healthy ("non-RED") state.
*/
public static boolean isMoveFromRedToNonRed(State previousState, State currentState) {
return (previousState.indexStatus == null || previousState.indexStatus == ClusterHealthStatus.RED)
&& currentState.indexStatus != null && currentState.indexStatus != ClusterHealthStatus.RED;
}

/**
* Return true if the state moves from the index existing to the index not existing.
*/
public static boolean isIndexDeleted(State previousState, State currentState) {
return previousState.indexStatus != null && currentState.indexStatus == null;
}

/**
* State of the security index.
*/
Expand Down
Loading