Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Don't install empty ML metadata on startup #30751

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.NamedDiff;
Expand Down Expand Up @@ -467,6 +468,14 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
}
}

public static MlMetadata getMlMetadata(ClusterState state) {
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(MLMetadataField.TYPE);
if (mlMetadata == null) {
return EMPTY_METADATA;
}
return mlMetadata;
}

public static class JobAlreadyMarkedAsDeletedException extends RuntimeException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.elasticsearch.xpack.core.ml.job.persistence;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;

/**
Expand Down Expand Up @@ -47,8 +46,7 @@ public static String resultsWriteAlias(String jobId) {
* @return The index name
*/
public static String getPhysicalIndexFromState(ClusterState state, String jobId) {
MlMetadata meta = state.getMetaData().custom(MLMetadataField.TYPE);
return meta.getJobs().get(jobId).getResultsIndexName();
return MlMetadata.getMlMetadata(state).getJobs().get(jobId).getResultsIndexName();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
Expand Down Expand Up @@ -132,15 +131,7 @@ public Map<String, Object> nativeCodeInfo() {
@Override
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
ClusterState state = clusterService.state();
MlMetadata mlMetadata = state.getMetaData().custom(MLMetadataField.TYPE);

// Handle case when usage is called but MlMetadata has not been installed yet
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was never really worth handling this case differently to ML metadata existing but being empty, as this state only existed for a fraction of a second on first cluster startup. Now this state will exist for longer - until the first job is created - but handling it like empty ML metadata is actually closer to the historical behaviour.

if (mlMetadata == null) {
listener.onResponse(new MachineLearningFeatureSetUsage(available(), enabled,
Collections.emptyMap(), Collections.emptyMap()));
} else {
new Retriever(client, mlMetadata, available(), enabled()).execute(listener);
}
new Retriever(client, MlMetadata.getMlMetadata(state), available(), enabled()).execute(listener);
}

public static class Retriever {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
Expand Down Expand Up @@ -90,8 +89,7 @@ public void clusterChanged(ClusterChangedEvent event) {
}
} else if (StartDatafeedAction.TASK_NAME.equals(currentTask.getTaskName())) {
String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId();
MlMetadata mlMetadata = event.state().getMetaData().custom(MLMetadataField.TYPE);
DatafeedConfig datafeedConfig = mlMetadata.getDatafeed(datafeedId);
DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId);
if (currentAssignment.getExecutorNode() == null) {
String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" +
currentAssignment.getExplanation() + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,20 @@

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;

import java.util.concurrent.atomic.AtomicBoolean;

class MlInitializationService extends AbstractComponent implements ClusterStateListener {

private final ThreadPool threadPool;
private final ClusterService clusterService;
private final Client client;

private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false);

private volatile MlDailyMaintenanceService mlDailyMaintenanceService;

MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client) {
Expand All @@ -48,45 +39,12 @@ public void clusterChanged(ClusterChangedEvent event) {
}

if (event.localNodeMaster()) {
MetaData metaData = event.state().metaData();
installMlMetadata(metaData);
installDailyMaintenanceService();
} else {
uninstallDailyMaintenanceService();
}
}

private void installMlMetadata(MetaData metaData) {
if (metaData.custom(MLMetadataField.TYPE) == null) {
if (installMlMetadataCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() ->
clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// If the metadata has been added already don't try to update
if (currentState.metaData().custom(MLMetadataField.TYPE) != null) {
return currentState;
}
ClusterState.Builder builder = new ClusterState.Builder(currentState);
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
metadataBuilder.putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA);
builder.metaData(metadataBuilder.build());
return builder.build();
}

@Override
public void onFailure(String source, Exception e) {
installMlMetadataCheck.set(false);
logger.error("unable to install ml metadata", e);
}
})
);
}
} else {
installMlMetadataCheck.set(false);
}
}

private void installDailyMaintenanceService() {
if (mlDailyMaintenanceService == null) {
mlDailyMaintenanceService = new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
Expand Down Expand Up @@ -92,8 +91,7 @@ public TransportCloseJobAction(Settings settings, TransportService transportServ
static void resolveAndValidateJobId(CloseJobAction.Request request, ClusterState state, List<String> openJobIds,
List<String> closingJobIds) {
PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata maybeNull = state.metaData().custom(MLMetadataField.TYPE);
final MlMetadata mlMetadata = (maybeNull == null) ? MlMetadata.EMPTY_METADATA : maybeNull;
final MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);

List<String> failedJobs = new ArrayList<>();

Expand All @@ -107,7 +105,7 @@ static void resolveAndValidateJobId(CloseJobAction.Request request, ClusterState
};

Set<String> expandedJobIds = mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs());
expandedJobIds.stream().forEach(jobIdProcessor::accept);
expandedJobIds.forEach(jobIdProcessor::accept);
if (request.isForce() == false && failedJobs.size() > 0) {
if (expandedJobIds.size() == 1) {
throw ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ protected DeleteDatafeedAction.Response newResponse(boolean acknowledged) {
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata currentMetadata = currentState.getMetaData().custom(MLMetadataField.TYPE);
public ClusterState execute(ClusterState currentState) {
MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState);
PersistentTasksCustomMetaData persistentTasks =
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
Expand Down Expand Up @@ -60,8 +59,7 @@ protected void doExecute(DeleteFilterAction.Request request, ActionListener<Dele

final String filterId = request.getFilterId();
ClusterState state = clusterService.state();
MlMetadata currentMlMetadata = state.metaData().custom(MLMetadataField.TYPE);
Map<String, Job> jobs = currentMlMetadata.getJobs();
Map<String, Job> jobs = MlMetadata.getMlMetadata(state).getJobs();
List<String> currentlyUsedBy = new ArrayList<>();
for (Job job : jobs.values()) {
List<Detector> detectors = job.getAnalysisConfig().getDetectors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,9 @@ public void onFailure(Exception e) {
void markJobAsDeleting(String jobId, ActionListener<Boolean> listener, boolean force) {
clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata currentMlMetadata = currentState.metaData().custom(MLMetadataField.TYPE);
public ClusterState execute(ClusterState currentState) {
PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState));
builder.markJobAsDeleted(jobId, tasks, force);
return buildNewClusterState(currentState, builder);
}
Expand Down Expand Up @@ -248,11 +247,7 @@ public void onTimeout(TimeValue timeout) {
}

static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) {
MlMetadata metadata = clusterState.metaData().custom(MLMetadataField.TYPE);
if (metadata == null) {
return true;
}
return !metadata.getJobs().containsKey(jobId);
return !MlMetadata.getMlMetadata(clusterState).getJobs().containsKey(jobId);
}

private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust
logger.debug("finalizing jobs [{}]", jobIdString);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MlMetadata mlMetadata = currentState.metaData().custom(MLMetadataField.TYPE);
public ClusterState execute(ClusterState currentState) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
Date finishedTime = new Date();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetCalendarEventsAction;
import org.elasticsearch.xpack.core.ml.action.GetCalendarsAction;
Expand Down Expand Up @@ -70,7 +69,7 @@ protected void doExecute(GetCalendarEventsAction.Request request,

if (request.getJobId() != null) {
ClusterState state = clusterService.state();
MlMetadata currentMlMetadata = state.metaData().custom(MLMetadataField.TYPE);
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(state);

List<String> jobGroups;
String requestId = request.getJobId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
Expand Down Expand Up @@ -52,10 +51,7 @@ protected void masterOperation(GetDatafeedsAction.Request request, ClusterState
ActionListener<GetDatafeedsAction.Response> listener) throws Exception {
logger.debug("Get datafeed '{}'", request.getDatafeedId());

MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE);
if (mlMetadata == null) {
mlMetadata = MlMetadata.EMPTY_METADATA;
}
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());
List<DatafeedConfig> datafeedConfigs = new ArrayList<>();
for (String expandedDatafeedId : expandedDatafeedIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
Expand Down Expand Up @@ -56,11 +55,7 @@ protected void masterOperation(GetDatafeedsStatsAction.Request request, ClusterS
ActionListener<GetDatafeedsStatsAction.Response> listener) throws Exception {
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());

MlMetadata mlMetadata = state.metaData().custom(MLMetadataField.TYPE);
if (mlMetadata == null) {
mlMetadata = MlMetadata.EMPTY_METADATA;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());

PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
Expand Down Expand Up @@ -69,8 +68,7 @@ public TransportGetJobsStatsAction(Settings settings, TransportService transport

@Override
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> listener) {
MlMetadata clusterMlMetadata = clusterService.state().metaData().custom(MLMetadataField.TYPE);
MlMetadata mlMetadata = (clusterMlMetadata == null) ? MlMetadata.EMPTY_METADATA : clusterMlMetadata;
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
request.setExpandedJobsIds(new ArrayList<>(mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs())));
ActionListener<GetJobsStatsAction.Response> finalListener = listener;
listener = ActionListener.wrap(response -> gatherStatsForClosedJobs(mlMetadata,
Expand Down
Loading