Skip to content

Commit

Permalink
[ML] Change the auditor to write via an alias (#120064)
Browse files Browse the repository at this point in the history
Changes the ml and transform auditor classes to write through an alias. 
The alias is required to rollover the index which required for upgrades
  • Loading branch information
davidkyle authored Jan 29, 2025
1 parent ceb0dc7 commit a52c26a
Show file tree
Hide file tree
Showing 36 changed files with 1,015 additions and 581 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/120064.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120064
summary: Change the auditor to write via an alias
area: Machine Learning
type: upgrade
issues: []
2 changes: 2 additions & 0 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ tests:
- class: org.elasticsearch.xpack.security.profile.ProfileIntegTests
method: testGetUsersWithProfileUidWhenProfileIndexDoesNotExists
issue: https://github.com/elastic/elasticsearch/issues/121179
- class: org.elasticsearch.xpack.ml.integration.PyTorchModelIT
issue: https://github.com/elastic/elasticsearch/issues/121165
- class: org.elasticsearch.xpack.security.profile.ProfileIntegTests
method: testSetEnabled
issue: https://github.com/elastic/elasticsearch/issues/121183
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,27 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;

import java.io.IOException;
import java.util.Date;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;

Expand All @@ -43,59 +39,36 @@ public abstract class AbstractAuditor<T extends AbstractAuditMessage> {

private static final Logger logger = LogManager.getLogger(AbstractAuditor.class);
static final int MAX_BUFFER_SIZE = 1000;
static final TimeValue MASTER_TIMEOUT = TimeValue.timeValueMinutes(1);
protected static final TimeValue MASTER_TIMEOUT = TimeValue.timeValueMinutes(1);

private final OriginSettingClient client;
private final String nodeName;
private final String auditIndex;
private final String templateName;
private final Supplier<TransportPutComposableIndexTemplateAction.Request> templateSupplier;
private final String auditIndexWriteAlias;
private final AbstractAuditMessageFactory<T> messageFactory;
private final AtomicBoolean hasLatestTemplate;

private Queue<ToXContent> backlog;
private final ClusterService clusterService;
private final AtomicBoolean putTemplateInProgress;

protected AbstractAuditor(
OriginSettingClient client,
String auditIndex,
IndexTemplateConfig templateConfig,
String nodeName,
AbstractAuditMessageFactory<T> messageFactory,
ClusterService clusterService
) {
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final AtomicBoolean indexAndAliasCreated;

this(client, auditIndex, templateConfig.getTemplateName(), () -> {
try (var parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, templateConfig.loadBytes())) {
return new TransportPutComposableIndexTemplateAction.Request(templateConfig.getTemplateName()).indexTemplate(
ComposableIndexTemplate.parse(parser)
).masterNodeTimeout(MASTER_TIMEOUT);
} catch (IOException e) {
throw new ElasticsearchParseException("unable to parse composable template " + templateConfig.getTemplateName(), e);
}
}, nodeName, messageFactory, clusterService);
}
private Queue<ToXContent> backlog;
private final AtomicBoolean indexAndAliasCreationInProgress;

protected AbstractAuditor(
OriginSettingClient client,
String auditIndex,
String templateName,
Supplier<TransportPutComposableIndexTemplateAction.Request> templateSupplier,
String auditIndexWriteAlias,
String nodeName,
AbstractAuditMessageFactory<T> messageFactory,
ClusterService clusterService
ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver
) {
this.client = Objects.requireNonNull(client);
this.auditIndex = Objects.requireNonNull(auditIndex);
this.templateName = Objects.requireNonNull(templateName);
this.templateSupplier = Objects.requireNonNull(templateSupplier);
this.auditIndexWriteAlias = Objects.requireNonNull(auditIndexWriteAlias);
this.messageFactory = Objects.requireNonNull(messageFactory);
this.clusterService = Objects.requireNonNull(clusterService);
this.nodeName = Objects.requireNonNull(nodeName);
this.clusterService = Objects.requireNonNull(clusterService);
this.indexNameExpressionResolver = Objects.requireNonNull(indexNameExpressionResolver);
this.backlog = new ConcurrentLinkedQueue<>();
this.hasLatestTemplate = new AtomicBoolean();
this.putTemplateInProgress = new AtomicBoolean();
this.indexAndAliasCreated = new AtomicBoolean();
this.indexAndAliasCreationInProgress = new AtomicBoolean();
}

public void audit(Level level, String resourceId, String message) {
Expand All @@ -114,6 +87,19 @@ public void error(String resourceId, String message) {
audit(Level.ERROR, resourceId, message);
}

/**
* Calling reset will cause the auditor to check the required
* index and alias exist and recreate if necessary
*/
public void reset() {
indexAndAliasCreated.set(false);
if (backlog == null) {
// create a new backlog in case documents need
// to be temporarily stored when the new index/alias is created
backlog = new ConcurrentLinkedQueue<>();
}
}

private static void onIndexResponse(DocWriteResponse response) {
logger.trace("Successfully wrote audit message");
}
Expand All @@ -123,35 +109,24 @@ private static void onIndexFailure(Exception exception) {
}

protected void indexDoc(ToXContent toXContent) {
if (hasLatestTemplate.get()) {
if (indexAndAliasCreated.get()) {
writeDoc(toXContent);
return;
}

if (MlIndexAndAlias.hasIndexTemplate(clusterService.state(), templateName)) {
// install template & create index with alias
var createListener = ActionListener.<Boolean>wrap(success -> {
indexAndAliasCreationInProgress.set(false);
synchronized (this) {
// synchronized so nothing can be added to backlog while this value changes
hasLatestTemplate.set(true);
// synchronized so nothing can be added to backlog while writing it
indexAndAliasCreated.set(true);
writeBacklog();
}
writeDoc(toXContent);
return;
}

ActionListener<Boolean> putTemplateListener = ActionListener.wrap(r -> {
synchronized (this) {
// synchronized so nothing can be added to backlog while this value changes
hasLatestTemplate.set(true);
}
logger.info("Auditor template [{}] successfully installed", templateName);
putTemplateInProgress.set(false);
writeBacklog();
}, e -> {
logger.warn(Strings.format("Error putting latest template [%s]", templateName), e);
putTemplateInProgress.set(false);
});
}, e -> { indexAndAliasCreationInProgress.set(false); });

synchronized (this) {
if (hasLatestTemplate.get() == false) {
if (indexAndAliasCreated.get() == false) {
// synchronized so that hasLatestTemplate does not change value
// between the read and adding to the backlog
assert backlog != null;
Expand All @@ -165,29 +140,22 @@ protected void indexDoc(ToXContent toXContent) {
}

// stop multiple invocations
if (putTemplateInProgress.compareAndSet(false, true)) {
MlIndexAndAlias.installIndexTemplateIfRequired(
clusterService.state(),
client,
templateSupplier.get(),
putTemplateListener
);
if (indexAndAliasCreationInProgress.compareAndSet(false, true)) {
installTemplateAndCreateIndex(createListener);
}
return;
}
}

indexDoc(toXContent);
}

private void writeDoc(ToXContent toXContent) {
client.index(indexRequest(toXContent), ActionListener.wrap(AbstractAuditor::onIndexResponse, AbstractAuditor::onIndexFailure));
}

private IndexRequest indexRequest(ToXContent toXContent) {
IndexRequest indexRequest = new IndexRequest(auditIndex);
IndexRequest indexRequest = new IndexRequest(auditIndexWriteAlias);
indexRequest.source(toXContentBuilder(toXContent));
indexRequest.timeout(TimeValue.timeValueSeconds(5));
indexRequest.setRequireAlias(true);
return indexRequest;
}

Expand All @@ -206,7 +174,7 @@ protected void clearBacklog() {
protected void writeBacklog() {
assert backlog != null;
if (backlog == null) {
logger.error("Message back log has already been written");
logger.debug("Message back log has already been written");
return;
}

Expand All @@ -221,7 +189,7 @@ protected void writeBacklog() {
if (bulkItemResponses.hasFailures()) {
logger.warn("Failures bulk indexing the message back log: {}", bulkItemResponses.buildFailureMessage());
} else {
logger.trace("Successfully wrote audit message backlog after upgrading template");
logger.trace("Successfully wrote audit message backlog");
}
backlog = null;
}, AbstractAuditor::onIndexFailure));
Expand All @@ -231,4 +199,32 @@ protected void writeBacklog() {
int backLogSize() {
return backlog.size();
}

private void installTemplateAndCreateIndex(ActionListener<Boolean> listener) {
SubscribableListener.<Boolean>newForked(l -> {
MlIndexAndAlias.installIndexTemplateIfRequired(clusterService.state(), client, templateVersion(), putTemplateRequest(), l);
}).<Boolean>andThen((l, success) -> {
var indexDetails = indexDetails();
MlIndexAndAlias.createIndexAndAliasIfNecessary(
client,
clusterService.state(),
indexNameExpressionResolver,
indexDetails.indexPrefix(),
indexDetails.indexVersion(),
auditIndexWriteAlias,
MASTER_TIMEOUT,
ActiveShardCount.DEFAULT,
l
);

}).addListener(listener);
}

protected abstract TransportPutComposableIndexTemplateAction.Request putTemplateRequest();

protected abstract int templateVersion();

protected abstract IndexDetails indexDetails();

public record IndexDetails(String indexPrefix, String indexVersion) {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

public final class NotificationsIndex {

public static final String NOTIFICATIONS_INDEX = ".ml-notifications-000002";
public static final String NOTIFICATIONS_INDEX_PREFIX = ".ml-notifications-";
public static final String NOTIFICATIONS_INDEX_VERSION = "000002";
public static final String NOTIFICATIONS_INDEX = NOTIFICATIONS_INDEX_PREFIX + NOTIFICATIONS_INDEX_VERSION;
public static final String NOTIFICATIONS_INDEX_WRITE_ALIAS = ".ml-notifications-write";

private static final String RESOURCE_PATH = "/ml/";
private static final String MAPPINGS_VERSION_VARIABLE = "xpack.ml.version";
public static final int NOTIFICATIONS_INDEX_MAPPINGS_VERSION = 1;
public static final int NOTIFICATIONS_INDEX_TEMPLATE_VERSION = 1;

private NotificationsIndex() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,34 @@ public static void createIndexAndAliasIfNecessary(
ActiveShardCount waitForShardCount,
ActionListener<Boolean> finalListener
) {
createIndexAndAliasIfNecessary(
client,
clusterState,
resolver,
indexPatternPrefix,
FIRST_INDEX_SIX_DIGIT_SUFFIX,
alias,
masterNodeTimeout,
waitForShardCount,
finalListener
);
}

/**
* Same as createIndexAndAliasIfNecessary but with the first concrete
* index number specified.
*/
public static void createIndexAndAliasIfNecessary(
Client client,
ClusterState clusterState,
IndexNameExpressionResolver resolver,
String indexPatternPrefix,
String indexNumber,
String alias,
TimeValue masterNodeTimeout,
ActiveShardCount waitForShardCount,
ActionListener<Boolean> finalListener
) {

final ActionListener<Boolean> loggingListener = ActionListener.wrap(finalListener::onResponse, e -> {
logger.error(() -> format("Failed to create alias and index with pattern [%s] and alias [%s]", indexPatternPrefix, alias), e);
Expand All @@ -125,7 +153,7 @@ public static void createIndexAndAliasIfNecessary(
String legacyIndexWithoutSuffix = indexPatternPrefix;
String indexPattern = indexPatternPrefix + "*";
// The initial index name must be suitable for rollover functionality.
String firstConcreteIndex = indexPatternPrefix + FIRST_INDEX_SIX_DIGIT_SUFFIX;
String firstConcreteIndex = indexPatternPrefix + indexNumber;
String[] concreteIndexNames = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandHidden(), indexPattern);
Optional<String> indexPointedByCurrentWriteAlias = clusterState.getMetadata().hasAlias(alias)
? clusterState.getMetadata().getIndicesLookup().get(alias).getIndices().stream().map(Index::getName).findFirst()
Expand Down Expand Up @@ -330,7 +358,7 @@ public static void installIndexTemplateIfRequired(
String templateName = templateConfig.getTemplateName();

// The check for existence of the template is against the cluster state, so very cheap
if (hasIndexTemplate(clusterState, templateName)) {
if (hasIndexTemplate(clusterState, templateName, templateConfig.getVersion())) {
listener.onResponse(true);
return;
}
Expand All @@ -344,7 +372,7 @@ public static void installIndexTemplateIfRequired(
throw new ElasticsearchParseException("unable to parse composable template " + templateConfig.getTemplateName(), e);
}

installIndexTemplateIfRequired(clusterState, client, request, listener);
installIndexTemplateIfRequired(clusterState, client, templateConfig.getVersion(), request, listener);
}

/**
Expand All @@ -360,11 +388,12 @@ public static void installIndexTemplateIfRequired(
public static void installIndexTemplateIfRequired(
ClusterState clusterState,
Client client,
int templateVersion,
TransportPutComposableIndexTemplateAction.Request templateRequest,
ActionListener<Boolean> listener
) {
// The check for existence of the template is against the cluster state, so very cheap
if (hasIndexTemplate(clusterState, templateRequest.name())) {
if (hasIndexTemplate(clusterState, templateRequest.name(), templateVersion)) {
listener.onResponse(true);
return;
}
Expand All @@ -379,8 +408,9 @@ public static void installIndexTemplateIfRequired(
executeAsyncWithOrigin(client, ML_ORIGIN, TransportPutComposableIndexTemplateAction.TYPE, templateRequest, innerListener);
}

public static boolean hasIndexTemplate(ClusterState state, String templateName) {
return state.getMetadata().templatesV2().containsKey(templateName);
public static boolean hasIndexTemplate(ClusterState state, String templateName, long version) {
var template = state.getMetadata().templatesV2().get(templateName);
return template != null && Long.valueOf(version).equals(template.version());
}

public static boolean has6DigitSuffix(String indexName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public final class TransformInternalIndexConstants {
public static final String AUDIT_INDEX_PATTERN_DEPRECATED = TRANSFORM_PREFIX_DEPRECATED + "notifications-*";

public static final String AUDIT_INDEX_READ_ALIAS = TRANSFORM_PREFIX + "notifications-read";
public static final String AUDIT_INDEX_WRITE_ALIAS = TRANSFORM_PREFIX + "notifications-write";
public static final String AUDIT_INDEX = AUDIT_INDEX_PREFIX + AUDIT_TEMPLATE_VERSION;

private TransformInternalIndexConstants() {}
Expand Down
Loading

0 comments on commit a52c26a

Please sign in to comment.