Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.18][ML] Change the auditor to write via an alias #121215

Merged
merged 4 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120064.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120064
summary: Change the auditor to write via an alias
area: Machine Learning
type: upgrade
issues: []
2 changes: 2 additions & 0 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,5 @@ tests:
- class: org.elasticsearch.xpack.security.profile.ProfileIntegTests
method: testActivateProfile
issue: https://github.com/elastic/elasticsearch/issues/121151
- class: org.elasticsearch.xpack.ml.integration.PyTorchModelIT
issue: https://github.com/elastic/elasticsearch/issues/121165
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,26 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;

import java.io.IOException;
import java.util.Date;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;

Expand All @@ -43,59 +38,36 @@ public abstract class AbstractAuditor<T extends AbstractAuditMessage> {

private static final Logger logger = LogManager.getLogger(AbstractAuditor.class);
static final int MAX_BUFFER_SIZE = 1000;
static final TimeValue MASTER_TIMEOUT = TimeValue.timeValueMinutes(1);
protected static final TimeValue MASTER_TIMEOUT = TimeValue.timeValueMinutes(1);

private final OriginSettingClient client;
private final String nodeName;
private final String auditIndex;
private final String templateName;
private final Supplier<TransportPutComposableIndexTemplateAction.Request> templateSupplier;
private final String auditIndexWriteAlias;
private final AbstractAuditMessageFactory<T> messageFactory;
private final AtomicBoolean hasLatestTemplate;

private Queue<ToXContent> backlog;
private final ClusterService clusterService;
private final AtomicBoolean putTemplateInProgress;

protected AbstractAuditor(
OriginSettingClient client,
String auditIndex,
IndexTemplateConfig templateConfig,
String nodeName,
AbstractAuditMessageFactory<T> messageFactory,
ClusterService clusterService
) {
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final AtomicBoolean indexAndAliasCreated;

this(client, auditIndex, templateConfig.getTemplateName(), () -> {
try (var parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, templateConfig.loadBytes())) {
return new TransportPutComposableIndexTemplateAction.Request(templateConfig.getTemplateName()).indexTemplate(
ComposableIndexTemplate.parse(parser)
).masterNodeTimeout(MASTER_TIMEOUT);
} catch (IOException e) {
throw new ElasticsearchParseException("unable to parse composable template " + templateConfig.getTemplateName(), e);
}
}, nodeName, messageFactory, clusterService);
}
private Queue<ToXContent> backlog;
private final AtomicBoolean indexAndAliasCreationInProgress;

protected AbstractAuditor(
OriginSettingClient client,
String auditIndex,
String templateName,
Supplier<TransportPutComposableIndexTemplateAction.Request> templateSupplier,
String auditIndexWriteAlias,
String nodeName,
AbstractAuditMessageFactory<T> messageFactory,
ClusterService clusterService
ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver
) {
this.client = Objects.requireNonNull(client);
this.auditIndex = Objects.requireNonNull(auditIndex);
this.templateName = Objects.requireNonNull(templateName);
this.templateSupplier = Objects.requireNonNull(templateSupplier);
this.auditIndexWriteAlias = Objects.requireNonNull(auditIndexWriteAlias);
this.messageFactory = Objects.requireNonNull(messageFactory);
this.clusterService = Objects.requireNonNull(clusterService);
this.nodeName = Objects.requireNonNull(nodeName);
this.clusterService = Objects.requireNonNull(clusterService);
this.indexNameExpressionResolver = Objects.requireNonNull(indexNameExpressionResolver);
this.backlog = new ConcurrentLinkedQueue<>();
this.hasLatestTemplate = new AtomicBoolean();
this.putTemplateInProgress = new AtomicBoolean();
this.indexAndAliasCreated = new AtomicBoolean();
this.indexAndAliasCreationInProgress = new AtomicBoolean();
}

public void audit(Level level, String resourceId, String message) {
Expand All @@ -114,6 +86,19 @@ public void error(String resourceId, String message) {
audit(Level.ERROR, resourceId, message);
}

/**
* Calling reset will cause the auditor to check the required
* index and alias exist and recreate if necessary
*/
public void reset() {
indexAndAliasCreated.set(false);
if (backlog == null) {
// create a new backlog in case documents need
// to be temporarily stored when the new index/alias is created
backlog = new ConcurrentLinkedQueue<>();
}
}

private static void onIndexResponse(DocWriteResponse response) {
logger.trace("Successfully wrote audit message");
}
Expand All @@ -123,35 +108,24 @@ private static void onIndexFailure(Exception exception) {
}

protected void indexDoc(ToXContent toXContent) {
if (hasLatestTemplate.get()) {
if (indexAndAliasCreated.get()) {
writeDoc(toXContent);
return;
}

if (MlIndexAndAlias.hasIndexTemplate(clusterService.state(), templateName)) {
// install template & create index with alias
var createListener = ActionListener.<Boolean>wrap(success -> {
indexAndAliasCreationInProgress.set(false);
synchronized (this) {
// synchronized so nothing can be added to backlog while this value changes
hasLatestTemplate.set(true);
// synchronized so nothing can be added to backlog while writing it
indexAndAliasCreated.set(true);
writeBacklog();
}
writeDoc(toXContent);
return;
}

ActionListener<Boolean> putTemplateListener = ActionListener.wrap(r -> {
synchronized (this) {
// synchronized so nothing can be added to backlog while this value changes
hasLatestTemplate.set(true);
}
logger.info("Auditor template [{}] successfully installed", templateName);
putTemplateInProgress.set(false);
writeBacklog();
}, e -> {
logger.warn(Strings.format("Error putting latest template [%s]", templateName), e);
putTemplateInProgress.set(false);
});
}, e -> { indexAndAliasCreationInProgress.set(false); });

synchronized (this) {
if (hasLatestTemplate.get() == false) {
if (indexAndAliasCreated.get() == false) {
// synchronized so that hasLatestTemplate does not change value
// between the read and adding to the backlog
assert backlog != null;
Expand All @@ -165,29 +139,22 @@ protected void indexDoc(ToXContent toXContent) {
}

// stop multiple invocations
if (putTemplateInProgress.compareAndSet(false, true)) {
MlIndexAndAlias.installIndexTemplateIfRequired(
clusterService.state(),
client,
templateSupplier.get(),
putTemplateListener
);
if (indexAndAliasCreationInProgress.compareAndSet(false, true)) {
installTemplateAndCreateIndex(createListener);
}
return;
}
}

indexDoc(toXContent);
}

private void writeDoc(ToXContent toXContent) {
client.index(indexRequest(toXContent), ActionListener.wrap(AbstractAuditor::onIndexResponse, AbstractAuditor::onIndexFailure));
}

private IndexRequest indexRequest(ToXContent toXContent) {
IndexRequest indexRequest = new IndexRequest(auditIndex);
IndexRequest indexRequest = new IndexRequest(auditIndexWriteAlias);
indexRequest.source(toXContentBuilder(toXContent));
indexRequest.timeout(TimeValue.timeValueSeconds(5));
indexRequest.setRequireAlias(true);
return indexRequest;
}

Expand All @@ -206,7 +173,7 @@ protected void clearBacklog() {
protected void writeBacklog() {
assert backlog != null;
if (backlog == null) {
logger.error("Message back log has already been written");
logger.debug("Message back log has already been written");
return;
}

Expand All @@ -221,7 +188,7 @@ protected void writeBacklog() {
if (bulkItemResponses.hasFailures()) {
logger.warn("Failures bulk indexing the message back log: {}", bulkItemResponses.buildFailureMessage());
} else {
logger.trace("Successfully wrote audit message backlog after upgrading template");
logger.trace("Successfully wrote audit message backlog");
}
backlog = null;
}, AbstractAuditor::onIndexFailure));
Expand All @@ -231,4 +198,31 @@ protected void writeBacklog() {
int backLogSize() {
return backlog.size();
}

private void installTemplateAndCreateIndex(ActionListener<Boolean> listener) {
SubscribableListener.<Boolean>newForked(l -> {
MlIndexAndAlias.installIndexTemplateIfRequired(clusterService.state(), client, templateVersion(), putTemplateRequest(), l);
}).<Boolean>andThen((l, success) -> {
var indexDetails = indexDetails();
MlIndexAndAlias.createIndexAndAliasIfNecessary(
client,
clusterService.state(),
indexNameExpressionResolver,
indexDetails.indexPrefix(),
indexDetails.indexVersion(),
auditIndexWriteAlias,
MASTER_TIMEOUT,
l
);

}).addListener(listener);
}

protected abstract TransportPutComposableIndexTemplateAction.Request putTemplateRequest();

protected abstract int templateVersion();

protected abstract IndexDetails indexDetails();

public record IndexDetails(String indexPrefix, String indexVersion) {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

public final class NotificationsIndex {

public static final String NOTIFICATIONS_INDEX = ".ml-notifications-000002";
public static final String NOTIFICATIONS_INDEX_PREFIX = ".ml-notifications-";
public static final String NOTIFICATIONS_INDEX_VERSION = "000002";
public static final String NOTIFICATIONS_INDEX = NOTIFICATIONS_INDEX_PREFIX + NOTIFICATIONS_INDEX_VERSION;
public static final String NOTIFICATIONS_INDEX_WRITE_ALIAS = ".ml-notifications-write";

private static final String RESOURCE_PATH = "/ml/";
private static final String MAPPINGS_VERSION_VARIABLE = "xpack.ml.version";
public static final int NOTIFICATIONS_INDEX_MAPPINGS_VERSION = 1;
public static final int NOTIFICATIONS_INDEX_TEMPLATE_VERSION = 1;

private NotificationsIndex() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,32 @@ public static void createIndexAndAliasIfNecessary(
TimeValue masterNodeTimeout,
ActionListener<Boolean> finalListener
) {
createIndexAndAliasIfNecessary(
client,
clusterState,
resolver,
indexPatternPrefix,
FIRST_INDEX_SIX_DIGIT_SUFFIX,
alias,
masterNodeTimeout,
finalListener
);
}

/**
* Same as createIndexAndAliasIfNecessary but with the first concrete
* index number specified.
*/
public static void createIndexAndAliasIfNecessary(
Client client,
ClusterState clusterState,
IndexNameExpressionResolver resolver,
String indexPatternPrefix,
String indexNumber,
String alias,
TimeValue masterNodeTimeout,
ActionListener<Boolean> finalListener
) {

final ActionListener<Boolean> loggingListener = ActionListener.wrap(finalListener::onResponse, e -> {
logger.error(() -> format("Failed to create alias and index with pattern [%s] and alias [%s]", indexPatternPrefix, alias), e);
Expand All @@ -123,7 +149,7 @@ public static void createIndexAndAliasIfNecessary(
String legacyIndexWithoutSuffix = indexPatternPrefix;
String indexPattern = indexPatternPrefix + "*";
// The initial index name must be suitable for rollover functionality.
String firstConcreteIndex = indexPatternPrefix + FIRST_INDEX_SIX_DIGIT_SUFFIX;
String firstConcreteIndex = indexPatternPrefix + indexNumber;
String[] concreteIndexNames = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandHidden(), indexPattern);
Optional<String> indexPointedByCurrentWriteAlias = clusterState.getMetadata().hasAlias(alias)
? clusterState.getMetadata().getIndicesLookup().get(alias).getIndices().stream().map(Index::getName).findFirst()
Expand Down Expand Up @@ -325,7 +351,7 @@ public static void installIndexTemplateIfRequired(
String templateName = templateConfig.getTemplateName();

// The check for existence of the template is against the cluster state, so very cheap
if (hasIndexTemplate(clusterState, templateName)) {
if (hasIndexTemplate(clusterState, templateName, templateConfig.getVersion())) {
listener.onResponse(true);
return;
}
Expand All @@ -339,7 +365,7 @@ public static void installIndexTemplateIfRequired(
throw new ElasticsearchParseException("unable to parse composable template " + templateConfig.getTemplateName(), e);
}

installIndexTemplateIfRequired(clusterState, client, request, listener);
installIndexTemplateIfRequired(clusterState, client, templateConfig.getVersion(), request, listener);
}

/**
Expand All @@ -355,11 +381,12 @@ public static void installIndexTemplateIfRequired(
public static void installIndexTemplateIfRequired(
ClusterState clusterState,
Client client,
int templateVersion,
TransportPutComposableIndexTemplateAction.Request templateRequest,
ActionListener<Boolean> listener
) {
// The check for existence of the template is against the cluster state, so very cheap
if (hasIndexTemplate(clusterState, templateRequest.name())) {
if (hasIndexTemplate(clusterState, templateRequest.name(), templateVersion)) {
listener.onResponse(true);
return;
}
Expand All @@ -374,8 +401,9 @@ public static void installIndexTemplateIfRequired(
executeAsyncWithOrigin(client, ML_ORIGIN, TransportPutComposableIndexTemplateAction.TYPE, templateRequest, innerListener);
}

public static boolean hasIndexTemplate(ClusterState state, String templateName) {
return state.getMetadata().templatesV2().containsKey(templateName);
public static boolean hasIndexTemplate(ClusterState state, String templateName, long version) {
var template = state.getMetadata().templatesV2().get(templateName);
return template != null && Long.valueOf(version).equals(template.version());
}

public static boolean has6DigitSuffix(String indexName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class TransformInternalIndexConstants {
public static final String AUDIT_INDEX_PATTERN_DEPRECATED = TRANSFORM_PREFIX_DEPRECATED + "notifications-*";

public static final String AUDIT_INDEX_READ_ALIAS = TRANSFORM_PREFIX + "notifications-read";
public static final String AUDIT_INDEX_WRITE_ALIAS = TRANSFORM_PREFIX + "notifications-write";
public static final String AUDIT_INDEX = AUDIT_INDEX_PREFIX + AUDIT_TEMPLATE_VERSION;

private TransformInternalIndexConstants() {}
Expand Down
Loading