Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[6.8] Fix delete_expired_data/nightly maintenance when many model snapshots need deleting #57174

Merged
merged 3 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,36 @@ public static Date parseTimeField(XContentParser parser, String fieldName) throw
"unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
}

/**
* Safely parses a string epoch representation to a Long
*
* Commonly this function is used for parsing Date fields from doc values
* requested with the format "epoch_millis".
*
* Since nanosecond support was added epoch_millis timestamps may have a fractional component.
* We discard this, taking just whole milliseconds. Arguably it would be better to retain the
* precision here and let the downstream component decide whether it wants the accuracy, but
* that makes it hard to pass around the value as a number. The double type doesn't have
* enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would
* work, but that isn't supported by the JSON parser if the number gets round-tripped through
* JSON. So String is really the only format that could be used, but the consumers of time
* are expecting a number.
*
* @param epoch The epoch value as a string. This may contain a fractional component.
* @return The epoch value.
*/
public static long parseToEpochMs(String epoch) {
int dotPos = epoch.indexOf('.');
if (dotPos == -1) {
return Long.parseLong(epoch);
} else if (dotPos > 0) {
return Long.parseLong(epoch.substring(0, dotPos));
} else {
// The first character is '.' so round down to 0
return 0L;
}
}

/**
* First tries to parse the date first as a Long and convert that to an
* epoch time. If the long number has more than 10 digits it is considered a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -35,12 +30,11 @@
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -83,6 +77,10 @@ public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOu
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE))
.filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName())));
source.size(MAX_FORECASTS);
source.fetchSource(false);
source.docValueField(Job.ID.getPreferredName(), null);
source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null);
source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis");

// _doc is the most efficient sort order and will also disable scoring
source.sort(ElasticsearchMappings.ES_DOC);
Expand All @@ -94,11 +92,9 @@ public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOu
}

private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
List<ForecastRequestStats> forecastsToDelete;
try {
forecastsToDelete = findForecastsToDelete(searchResponse);
} catch (IOException e) {
listener.onFailure(e);
List<JobForecastId> forecastsToDelete = findForecastsToDelete(searchResponse);
if (forecastsToDelete.isEmpty()) {
listener.onResponse(true);
return;
}

Expand Down Expand Up @@ -129,39 +125,49 @@ public void onFailure(Exception e) {
});
}

private List<ForecastRequestStats> findForecastsToDelete(SearchResponse searchResponse) throws IOException {
List<ForecastRequestStats> forecastsToDelete = new ArrayList<>();
private List<JobForecastId> findForecastsToDelete(SearchResponse searchResponse) {
List<JobForecastId> forecastsToDelete = new ArrayList<>();

SearchHits hits = searchResponse.getHits();
if (hits.getTotalHits() > MAX_FORECASTS) {
LOGGER.info("More than [{}] forecasts were found. This run will only delete [{}] of them", MAX_FORECASTS, MAX_FORECASTS);
}

for (SearchHit hit : hits.getHits()) {
try (InputStream stream = hit.getSourceRef().streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null);
if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) {
forecastsToDelete.add(forecastRequestStats);
String expiryTime = stringFieldValueOrNull(hit, ForecastRequestStats.EXPIRY_TIME.getPreferredName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 6.8 the doc value could be a Long rather than a String. It's why TimeField has this case:

} else if (value[0] instanceof Long == false) { // pre-6.0 field

What this means in practice is that a user running 6.8 who first used ML in 5.x will end up seeing the warning on lines 139-140 repeatedly and won't get any cleanup.

It's still OK to use stringFieldValueOrNull() to extract fields mapped as keyword or text from hits, but for fields mapped as date in 6.8 the code needs to handle both Long and String.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point thanks.

I'm curious though why does this code throw if the object is a Long? This is from the 6.8 branch

throw new IllegalStateException("Unexpected value for a time field: " + value[0].getClass());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tricky part is the condition checks the value is not a long. Thus, the logic there is that prior to 6.0, we expect a long. If it's not, then something's gone wrong. Otherwise, we fall through the last return of the method. Pretty confusing, I know.

if (expiryTime == null) {
LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(),
ForecastRequestStats.EXPIRY_TIME.getPreferredName());
continue;
}
long expiryMs = TimeUtils.parseToEpochMs(expiryTime);
if (expiryMs < cutoffEpochMs) {
JobForecastId idPair = new JobForecastId(
stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName()));

if (idPair.hasNullValue() == false) {
forecastsToDelete.add(idPair);
}
}
}
return forecastsToDelete;
}

private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forecastsToDelete) {
private DeleteByQueryRequest buildDeleteByQuery(List<JobForecastId> ids) {
DeleteByQueryRequest request = new DeleteByQueryRequest();
request.setSlices(5);

request.indices(RESULTS_INDEX_PATTERN);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1);
boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),
ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE));
for (ForecastRequestStats forecastToDelete : forecastsToDelete) {
boolQuery.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId()))
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId())));
for (JobForecastId jobForecastId : ids) {
if (jobForecastId.hasNullValue() == false) {
boolQuery.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId))
.must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId)));
}
}
QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery);
request.setQuery(query);
Expand All @@ -171,4 +177,18 @@ private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forec

return request;
}

private static class JobForecastId {
private final String jobId;
private final String forecastId;

private JobForecastId(String jobId, String forecastId) {
this.jobId = jobId;
this.forecastId = forecastId;
}

boolean hasNullValue() {
return jobId == null || forecastId == null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,14 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Bool
.mustNot(activeSnapshotFilter)
.mustNot(retainFilter);

searchRequest.source(new SearchSourceBuilder().query(query).size(MODEL_SNAPSHOT_SEARCH_SIZE).sort(ElasticsearchMappings.ES_DOC));
SearchSourceBuilder source = new SearchSourceBuilder();
source.query(query);
source.size(MODEL_SNAPSHOT_SEARCH_SIZE);
source.sort(ElasticsearchMappings.ES_DOC);
source.fetchSource(false);
source.docValueField(Job.ID.getPreferredName(), null);
source.docValueField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), null);
searchRequest.source(source);

getClient().execute(SearchAction.INSTANCE, searchRequest, new ThreadedActionListener<>(LOGGER, threadPool,
MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false));
Expand All @@ -99,11 +106,18 @@ private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, Ac
@Override
public void onResponse(SearchResponse searchResponse) {
try {
List<ModelSnapshot> modelSnapshots = new ArrayList<>();
List<JobSnapshotId> snapshotIds = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits()) {
modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef()));
JobSnapshotId idPair = new JobSnapshotId(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is different to the later branches as it doesn't have the new model snapshot retention options added in #56125

stringFieldValueOrNull(hit, Job.ID.getPreferredName()),
stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName()));

if (idPair.hasNullValue() == false) {
snapshotIds.add(idPair);
}
}
deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener);

deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener);
} catch (Exception e) {
onFailure(e);
}
Expand All @@ -116,14 +130,14 @@ public void onFailure(Exception e) {
};
}

private void deleteModelSnapshots(Iterator<ModelSnapshot> modelSnapshotIterator, ActionListener<Boolean> listener) {
private void deleteModelSnapshots(Iterator<JobSnapshotId> modelSnapshotIterator, ActionListener<Boolean> listener) {
if (modelSnapshotIterator.hasNext() == false) {
listener.onResponse(true);
return;
}
ModelSnapshot modelSnapshot = modelSnapshotIterator.next();
DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request(
modelSnapshot.getJobId(), modelSnapshot.getSnapshotId());
JobSnapshotId idPair = modelSnapshotIterator.next();
DeleteModelSnapshotAction.Request deleteSnapshotRequest =
new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId);
getClient().execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {
Expand All @@ -136,9 +150,23 @@ public void onResponse(AcknowledgedResponse response) {

@Override
public void onFailure(Exception e) {
listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot ["
+ modelSnapshot.getSnapshotId() + "]", e));
listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot ["
+ idPair.snapshotId + "]", e));
}
});
}

static class JobSnapshotId {
private final String jobId;
private final String snapshotId;

JobSnapshotId(String jobId, String snapshotId) {
this.jobId = jobId;
this.snapshotId = snapshotId;
}

boolean hasNullValue() {
return jobId == null || snapshotId == null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,29 @@
package org.elasticsearch.xpack.ml.job.retention;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.search.SearchHit;

import java.util.function.Supplier;

public interface MlDataRemover {
void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier);

/**
* Extract {@code fieldName} from {@code hit} and if it is a string
* return the string else {@code null}.
* @param hit The search hit
* @param fieldName Field to find
* @return value iff the docfield is present and it is a string. Otherwise {@code null}
*/
default String stringFieldValueOrNull(SearchHit hit, String fieldName) {
DocumentField docField = hit.field(fieldName);
if (docField != null) {
Object value = docField.getValue();
if (value instanceof String) {
return (String)value;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ private static SearchResponse createSearchResponse(List<? extends ToXContent> to
return searchResponse;
}

static SearchResponse createSearchResponseFromHits(List<SearchHit> hits) {
SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[] {}), hits.size(), 1.0f);
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.getHits()).thenReturn(searchHits);
return searchResponse;
}

public void testRemoveGivenNoJobs() throws IOException {
SearchResponse response = createSearchResponse(Collections.emptyList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -25,7 +26,9 @@
import org.elasticsearch.xpack.core.ml.job.config.JobTests;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.test.SearchHitBuilder;
import org.junit.After;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
Expand Down Expand Up @@ -118,11 +121,13 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
));

List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1");
SearchHit snapshot1_2 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2");
searchResponsesPerCall.add(
AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Arrays.asList(snapshot1_1, snapshot1_2)));

SearchHit snapshot2_1 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1");
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1)));

createExpiredModelSnapshotsRemover().remove(listener, () -> false);

Expand Down Expand Up @@ -203,12 +208,13 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio
JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
));

List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
createModelSnapshot("snapshots-1", "snapshots-1_2"));
List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1");
SearchHit snapshot1_2 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2");
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(
Arrays.asList(snapshot1_1, snapshot1_2)));

SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1");
searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2)));
createExpiredModelSnapshotsRemover().remove(listener, () -> false);

listener.waitToCompletion();
Expand All @@ -224,6 +230,17 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio
assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1"));
}

public void testJobSnapshotId() {
ExpiredModelSnapshotsRemover.JobSnapshotId id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", "b");
assertFalse(id.hasNullValue());
id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, "b");
assertTrue(id.hasNullValue());
id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", null);
assertTrue(id.hasNullValue());
id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, null);
assertTrue(id.hasNullValue());
}

@SuppressWarnings("unchecked")
private void givenJobs(List<Job> jobs) throws IOException {
SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs);
Expand Down Expand Up @@ -287,4 +304,10 @@ public Void answer(InvocationOnMock invocationOnMock) {
}).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any());
}

private static SearchHit createModelSnapshotQueryHit(String jobId, String snapshotId) {
SearchHitBuilder hitBuilder = new SearchHitBuilder(0);
hitBuilder.addField(Job.ID.getPreferredName(), Collections.singletonList(jobId));
hitBuilder.addField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), Collections.singletonList(snapshotId));
return hitBuilder.build();
}
}
Loading