Skip to content

Commit

Permalink
Un-hardcode SecurityIndexManager to handle generic indices (#40064)
Browse files Browse the repository at this point in the history
`SecurityIndexManager` is hardcoded to handle only the `.security`-`.security-7` alias-index pair.
This commit removes the hardcoded bits, so that the `SecurityIndexManager` can be reused
for other indices, such as the planned security tokens index (`.security-tokens-7`).
  • Loading branch information
albertzaharovits authored Mar 17, 2019
1 parent 97ef295 commit 3844125
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ Collection<Object> createComponents(Client client, ThreadPool threadPool, Cluste
components.add(auditTrailService);
this.auditTrailService.set(auditTrailService);

securityIndex.set(new SecurityIndexManager(client, SecurityIndexManager.SECURITY_INDEX_NAME, clusterService));
securityIndex.set(SecurityIndexManager.buildSecurityIndexManager(client, clusterService));

final TokenService tokenService = new TokenService(settings, Clock.systemUTC(), client, securityIndex.get(), clusterService);
this.tokenService.set(tokenService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand All @@ -72,7 +73,7 @@
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

/**
* Manages the lifecycle of a single index, its template, mapping and and data upgrades/migrations.
* Manages the lifecycle of a single index, mapping and and data upgrades/migrations.
*/
public class SecurityIndexManager implements ClusterStateListener {

Expand All @@ -82,28 +83,41 @@ public class SecurityIndexManager implements ClusterStateListener {
public static final String TEMPLATE_VERSION_PATTERN = Pattern.quote("${security.template.version}");
public static final String SECURITY_TEMPLATE_NAME = "security-index-template";
public static final String SECURITY_INDEX_NAME = ".security";
private static final Logger LOGGER = LogManager.getLogger(SecurityIndexManager.class);
private static final Logger logger = LogManager.getLogger(SecurityIndexManager.class);

private final String indexName;
private final String aliasName;
private final String internalIndexName;
private final int internalIndexFormat;
private final Supplier<byte[]> mappingSourceSupplier;
private final Client client;

private final List<BiConsumer<State, State>> stateChangeListeners = new CopyOnWriteArrayList<>();

private volatile State indexState;

public SecurityIndexManager(Client client, String indexName, ClusterService clusterService) {
this(client, indexName, State.UNRECOVERED_STATE);
public static SecurityIndexManager buildSecurityIndexManager(Client client, ClusterService clusterService) {
return new SecurityIndexManager(client, SECURITY_INDEX_NAME, INTERNAL_SECURITY_INDEX, INTERNAL_INDEX_FORMAT,
SecurityIndexManager::readSecurityTemplateAsBytes, clusterService);
}

private SecurityIndexManager(Client client, String aliasName, String internalIndexName, int internalIndexFormat,
Supplier<byte[]> mappingSourceSupplier, ClusterService clusterService) {
this(client, aliasName, internalIndexName, internalIndexFormat, mappingSourceSupplier, State.UNRECOVERED_STATE);
clusterService.addListener(this);
}

private SecurityIndexManager(Client client, String indexName, State indexState) {
this.client = client;
this.indexName = indexName;
private SecurityIndexManager(Client client, String aliasName, String internalIndexName, int internalIndexFormat,
Supplier<byte[]> mappingSourceSupplier, State indexState) {
this.aliasName = aliasName;
this.internalIndexName = internalIndexName;
this.internalIndexFormat = internalIndexFormat;
this.mappingSourceSupplier = mappingSourceSupplier;
this.indexState = indexState;
this.client = client;
}

public SecurityIndexManager freeze() {
return new SecurityIndexManager(null, indexName, indexState);
return new SecurityIndexManager(null, aliasName, internalIndexName, internalIndexFormat, mappingSourceSupplier, indexState);
}

public boolean checkMappingVersion(Predicate<Version> requiredVersion) {
Expand Down Expand Up @@ -143,9 +157,10 @@ public ElasticsearchException getUnavailableReason() {
}

if (localState.indexExists) {
return new UnavailableShardsException(null, "at least one primary shard for the security index is unavailable");
return new UnavailableShardsException(null,
"at least one primary shard for the index [" + localState.concreteIndexName + "] is unavailable");
} else {
return new IndexNotFoundException(SECURITY_INDEX_NAME);
return new IndexNotFoundException(localState.concreteIndexName);
}
}

Expand All @@ -163,20 +178,20 @@ public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think we don't have the
// .security index but they may not have been restored from the cluster state on disk
LOGGER.debug("security index manager waiting until state has been recovered");
logger.debug("security index manager waiting until state has been recovered");
return;
}
final State previousState = indexState;
final IndexMetaData indexMetaData = resolveConcreteIndex(indexName, event.state().metaData());
final IndexMetaData indexMetaData = resolveConcreteIndex(aliasName, event.state().metaData());
final boolean indexExists = indexMetaData != null;
final boolean isIndexUpToDate = indexExists == false ||
INDEX_FORMAT_SETTING.get(indexMetaData.getSettings()).intValue() == INTERNAL_INDEX_FORMAT;
INDEX_FORMAT_SETTING.get(indexMetaData.getSettings()).intValue() == internalIndexFormat;
final boolean indexAvailable = checkIndexAvailable(event.state());
final boolean mappingIsUpToDate = indexExists == false || checkIndexMappingUpToDate(event.state());
final Version mappingVersion = oldestIndexMappingVersion(event.state());
final ClusterHealthStatus indexStatus = indexMetaData == null ? null :
new ClusterIndexHealth(indexMetaData, event.state().getRoutingTable().index(indexMetaData.getIndex())).getStatus();
final String concreteIndexName = indexMetaData == null ? INTERNAL_SECURITY_INDEX : indexMetaData.getIndex().getName();
final String concreteIndexName = indexMetaData == null ? internalIndexName : indexMetaData.getIndex().getName();
final State newState = new State(indexExists, isIndexUpToDate, indexAvailable, mappingIsUpToDate, mappingVersion, concreteIndexName,
indexStatus);
this.indexState = newState;
Expand All @@ -193,61 +208,55 @@ private boolean checkIndexAvailable(ClusterState state) {
if (routingTable != null && routingTable.allPrimaryShardsActive()) {
return true;
}
LOGGER.debug("Security index [{}] is not yet active", indexName);
logger.debug("Index [{}] is not yet active", aliasName);
return false;
}

/**
* Returns the routing-table for this index, or <code>null</code> if the index does not exist.
*/
private IndexRoutingTable getIndexRoutingTable(ClusterState clusterState) {
IndexMetaData metaData = resolveConcreteIndex(indexName, clusterState.metaData());
IndexMetaData metaData = resolveConcreteIndex(aliasName, clusterState.metaData());
if (metaData == null) {
return null;
} else {
return clusterState.routingTable().index(metaData.getIndex());
}
}

public static boolean checkTemplateExistsAndVersionMatches(
String templateName, ClusterState state, Logger logger, Predicate<Version> predicate) {

return TemplateUtils.checkTemplateExistsAndVersionMatches(templateName, SECURITY_VERSION_STRING,
state, logger, predicate);
public static boolean checkTemplateExistsAndVersionMatches(String templateName, ClusterState state, Logger logger,
Predicate<Version> predicate) {
return TemplateUtils.checkTemplateExistsAndVersionMatches(templateName, SECURITY_VERSION_STRING, state, logger, predicate);
}

private boolean checkIndexMappingUpToDate(ClusterState clusterState) {
return checkIndexMappingVersionMatches(clusterState, Version.CURRENT::equals);
}

private boolean checkIndexMappingVersionMatches(ClusterState clusterState,
Predicate<Version> predicate) {
return checkIndexMappingVersionMatches(indexName, clusterState, LOGGER, predicate);
private boolean checkIndexMappingVersionMatches(ClusterState clusterState, Predicate<Version> predicate) {
return checkIndexMappingVersionMatches(aliasName, clusterState, logger, predicate);
}

public static boolean checkIndexMappingVersionMatches(String indexName,
ClusterState clusterState, Logger logger,
public static boolean checkIndexMappingVersionMatches(String indexName, ClusterState clusterState, Logger logger,
Predicate<Version> predicate) {
return loadIndexMappingVersions(indexName, clusterState, logger)
.stream().allMatch(predicate);
return loadIndexMappingVersions(indexName, clusterState, logger).stream().allMatch(predicate);
}

private Version oldestIndexMappingVersion(ClusterState clusterState) {
final Set<Version> versions = loadIndexMappingVersions(indexName, clusterState, LOGGER);
final Set<Version> versions = loadIndexMappingVersions(aliasName, clusterState, logger);
return versions.stream().min(Version::compareTo).orElse(null);
}

private static Set<Version> loadIndexMappingVersions(String indexName,
ClusterState clusterState, Logger logger) {
private static Set<Version> loadIndexMappingVersions(String aliasName, ClusterState clusterState, Logger logger) {
Set<Version> versions = new HashSet<>();
IndexMetaData indexMetaData = resolveConcreteIndex(indexName, clusterState.metaData());
IndexMetaData indexMetaData = resolveConcreteIndex(aliasName, clusterState.metaData());
if (indexMetaData != null) {
for (Object object : indexMetaData.getMappings().values().toArray()) {
MappingMetaData mappingMetaData = (MappingMetaData) object;
if (mappingMetaData.type().equals(MapperService.DEFAULT_MAPPING)) {
continue;
}
versions.add(readMappingVersion(indexName, mappingMetaData, logger));
versions.add(readMappingVersion(aliasName, mappingMetaData, logger));
}
}
return versions;
Expand All @@ -270,8 +279,7 @@ private static IndexMetaData resolveConcreteIndex(final String indexOrAliasName,
return null;
}

private static Version readMappingVersion(String indexName, MappingMetaData mappingMetaData,
Logger logger) {
private static Version readMappingVersion(String indexName, MappingMetaData mappingMetaData, Logger logger) {
try {
Map<String, Object> meta =
(Map<String, Object>) mappingMetaData.sourceAsMap().get("_meta");
Expand All @@ -289,17 +297,17 @@ private static Version readMappingVersion(String indexName, MappingMetaData mapp
}

/**
* Validates the security index is up to date and does not need to migrated. If it is not, the
* consumer is called with an exception. If the security index is up to date, the runnable will
* Validates that the index is up to date and does not need to be migrated. If it is not, the
* consumer is called with an exception. If the index is up to date, the runnable will
* be executed. <b>NOTE:</b> this method does not check the availability of the index; this check
* is left to the caller so that this condition can be handled appropriately.
*/
public void checkIndexVersionThenExecute(final Consumer<Exception> consumer, final Runnable andThen) {
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
if (indexState.indexExists && indexState.isIndexUpToDate == false) {
consumer.accept(new IllegalStateException(
"Security index is not on the current version. Security features relying on the index will not be available until " +
"the upgrade API is run on the security index"));
"Index [" + indexState.concreteIndexName + "] is not on the current version. Security features relying on the index"
+ " will not be available until the upgrade API is run on the index"));
} else {
andThen.run();
}
Expand All @@ -313,17 +321,20 @@ public void prepareIndexIfNeededThenExecute(final Consumer<Exception> consumer,
final State indexState = this.indexState; // use a local copy so all checks execute against the same state!
// TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings)
if (indexState == State.UNRECOVERED_STATE) {
consumer.accept(new ElasticsearchStatusException("Cluster state has not been recovered yet, cannot write to the security index",
consumer.accept(new ElasticsearchStatusException(
"Cluster state has not been recovered yet, cannot write to the [" + indexState.concreteIndexName + "] index",
RestStatus.SERVICE_UNAVAILABLE));
} else if (indexState.indexExists && indexState.isIndexUpToDate == false) {
consumer.accept(new IllegalStateException(
"Security index is not on the current version. Security features relying on the index will not be available until " +
"the upgrade API is run on the security index"));
"Index [" + indexState.concreteIndexName + "] is not on the current version."
+ "Security features relying on the index will not be available until the upgrade API is run on the index"));
} else if (indexState.indexExists == false) {
LOGGER.info("security index does not exist. Creating [{}] with alias [{}]", INTERNAL_SECURITY_INDEX, SECURITY_INDEX_NAME);
Tuple<String, Settings> mappingAndSettings = loadMappingAndSettingsSourceFromTemplate();
CreateIndexRequest request = new CreateIndexRequest(INTERNAL_SECURITY_INDEX)
.alias(new Alias(SECURITY_INDEX_NAME))
assert indexState.concreteIndexName != null;
logger.info("security index does not exist. Creating [{}] with alias [{}]", indexState.concreteIndexName, this.aliasName);
final byte[] mappingSource = mappingSourceSupplier.get();
final Tuple<String, Settings> mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource);
CreateIndexRequest request = new CreateIndexRequest(indexState.concreteIndexName)
.alias(new Alias(this.aliasName))
.mapping(MapperService.SINGLE_MAPPING_NAME, mappingAndSettings.v1(), XContentType.JSON)
.waitForActiveShards(ActiveShardCount.ALL)
.settings(mappingAndSettings.v2());
Expand Down Expand Up @@ -351,11 +362,11 @@ public void onFailure(Exception e) {
}
}, client.admin().indices()::create);
} else if (indexState.mappingUpToDate == false) {
LOGGER.info(
"security index [{}] (alias [{}]) is not up to date. Updating mapping", indexState.concreteIndexName, SECURITY_INDEX_NAME);

logger.info("Index [{}] (alias [{}]) is not up to date. Updating mapping", indexState.concreteIndexName, this.aliasName);
final byte[] mappingSource = mappingSourceSupplier.get();
final Tuple<String, Settings> mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource);
PutMappingRequest request = new PutMappingRequest(indexState.concreteIndexName)
.source(loadMappingAndSettingsSourceFromTemplate().v1(), XContentType.JSON)
.source(mappingAndSettings.v1(), XContentType.JSON)
.type(MapperService.SINGLE_MAPPING_NAME);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request,
ActionListener.<AcknowledgedResponse>wrap(putMappingResponse -> {
Expand All @@ -370,11 +381,28 @@ public void onFailure(Exception e) {
}
}

private Tuple<String, Settings> loadMappingAndSettingsSourceFromTemplate() {
final byte[] template = TemplateUtils.loadTemplate("/" + SECURITY_TEMPLATE_NAME + ".json", Version.CURRENT.toString(),
/**
* Return true if the state moves from an unhealthy ("RED") index state to a healthy ("non-RED") state.
*/
public static boolean isMoveFromRedToNonRed(State previousState, State currentState) {
return (previousState.indexStatus == null || previousState.indexStatus == ClusterHealthStatus.RED)
&& currentState.indexStatus != null && currentState.indexStatus != ClusterHealthStatus.RED;
}

/**
* Return true if the state moves from the index existing to the index not existing.
*/
public static boolean isIndexDeleted(State previousState, State currentState) {
return previousState.indexStatus != null && currentState.indexStatus == null;
}

private static byte[] readSecurityTemplateAsBytes() {
return TemplateUtils.loadTemplate("/" + SECURITY_TEMPLATE_NAME + ".json", Version.CURRENT.toString(),
SecurityIndexManager.TEMPLATE_VERSION_PATTERN).getBytes(StandardCharsets.UTF_8);
final PutIndexTemplateRequest request = new PutIndexTemplateRequest(SECURITY_TEMPLATE_NAME).source(template, XContentType.JSON);
}

private static Tuple<String, Settings> parseMappingAndSettingsFromTemplateBytes(byte[] template) {
final PutIndexTemplateRequest request = new PutIndexTemplateRequest("name_is_not_important").source(template, XContentType.JSON);
final String mappingSource = request.mappings().get(MapperService.SINGLE_MAPPING_NAME);
try (XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappingSource)) {
Expand All @@ -391,21 +419,6 @@ private Tuple<String, Settings> loadMappingAndSettingsSourceFromTemplate() {
}
}

/**
* Return true if the state moves from an unhealthy ("RED") index state to a healthy ("non-RED") state.
*/
public static boolean isMoveFromRedToNonRed(State previousState, State currentState) {
return (previousState.indexStatus == null || previousState.indexStatus == ClusterHealthStatus.RED)
&& currentState.indexStatus != null && currentState.indexStatus != ClusterHealthStatus.RED;
}

/**
* Return true if the state moves from the index existing to the index not existing.
*/
public static boolean isIndexDeleted(State previousState, State currentState) {
return previousState.indexStatus != null && currentState.indexStatus == null;
}

/**
* State of the security index.
*/
Expand Down
Loading

0 comments on commit 3844125

Please sign in to comment.