Skip to content

Commit

Permalink
feat(elasticsearch): Updates to elasticsearch configuration, dao, and…
Browse files Browse the repository at this point in the history
… tests
  • Loading branch information
david-leifker committed Oct 23, 2022
1 parent 775d2b0 commit d87a802
Show file tree
Hide file tree
Showing 43 changed files with 911 additions and 503 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ project.ext.externalDependency = [
'springJdbc': "org.springframework:spring-jdbc:$springVersion",
'springWeb': "org.springframework:spring-web:$springVersion",
'springWebMVC': "org.springframework:spring-webmvc:$springVersion",
'springBootTest': "org.springframework.boot:spring-boot-starter-test:$springBootVersion",
'springBoot': "org.springframework.boot:spring-boot:$springBootVersion",
'springBootAutoconfigure': "org.springframework.boot:spring-boot-autoconfigure:$springBootVersion",
'springBootStarterWeb': "org.springframework.boot:spring-boot-starter-web:$springBootVersion",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.Network;
Expand Down Expand Up @@ -106,7 +105,7 @@ private static String createTopics(Stream<String> bootstraps) {
try {
createAdminClient(bootstrap).createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))).all().get();
return bootstrap;
} catch (TimeoutException | InterruptedException | ExecutionException ex) {
} catch (RuntimeException | InterruptedException | ExecutionException ex) {
return null;
}
}).filter(Objects::nonNull).findFirst().get();
Expand Down
1 change: 1 addition & 0 deletions metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies {
testCompile externalDependency.testContainersCassandra
testCompile externalDependency.lombok
testCompile project(':test-models')
testImplementation externalDependency.springBootTest

testAnnotationProcessor externalDependency.lombok

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class ESGraphWriteDAO {
private final RestHighLevelClient client;
private final IndexConvention indexConvention;
private final BulkProcessor bulkProcessor;
private final int numRetries;

/**
* Updates or inserts the given search document.
Expand All @@ -42,8 +43,7 @@ public void upsertDocument(@Nonnull String docId, @Nonnull String document) {
new IndexRequest(indexConvention.getIndexName(INDEX_NAME)).id(docId).source(document, XContentType.JSON);
final UpdateRequest updateRequest =
new UpdateRequest(indexConvention.getIndexName(INDEX_NAME), docId).doc(document, XContentType.JSON)
.detectNoop(false)
.upsert(indexRequest);
.detectNoop(false).retryOnConflict(numRetries).upsert(indexRequest);
bulkProcessor.add(updateRequest);
}

Expand All @@ -55,10 +55,9 @@ public BulkByScrollResponse deleteByQuery(@Nullable final String sourceType, @No
destinationType == null ? ImmutableList.of() : ImmutableList.of(destinationType), destinationEntityFilter,
relationshipTypes, relationshipFilter);

DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();

deleteByQueryRequest.setQuery(finalQuery);

DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest()
.setQuery(finalQuery)
.setRefresh(true);
deleteByQueryRequest.indices(indexConvention.getIndexName(INDEX_NAME));

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ public void configure() {
@Override
public void clear() {
DeleteByQueryRequest deleteRequest =
new DeleteByQueryRequest(_indexConvention.getIndexName(INDEX_NAME)).setQuery(QueryBuilders.matchAllQuery());
new DeleteByQueryRequest(_indexConvention.getIndexName(INDEX_NAME))
.setQuery(QueryBuilders.matchAllQuery())
.setRefresh(true);
try {
_searchClient.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.search.elasticsearch.indexbuilder;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapDifference;
Expand All @@ -10,7 +11,11 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
Expand All @@ -32,18 +37,43 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;


@Slf4j
@RequiredArgsConstructor
public class ESIndexBuilder {

private final RestHighLevelClient searchClient;
@Getter
private final int numShards;

@Getter
private final int numReplicas;

@Getter
private final int numRetries;

private static final List<String> SETTINGS_TO_COMPARE = ImmutableList.of("number_of_shards", "number_of_replicas");
@Getter
private final int refreshIntervalSeconds;

@Getter
private final Map<String, Map<String, String>> indexSettingOverrides;

@Getter
private final boolean enableIndexSettingsReindex;

private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/*
Most index settings are default values and populated by Elastic. This list is an include list to determine which
settings we care about when a difference is present.
*/
private static final List<String> SETTINGS_DYNAMIC = ImmutableList.of("number_of_replicas", "refresh_interval");
// These setting require reindex
private static final List<String> SETTINGS_STATIC = ImmutableList.of("number_of_shards");
private static final List<String> SETTINGS = Stream.concat(
SETTINGS_DYNAMIC.stream(), SETTINGS_STATIC.stream()).collect(Collectors.toList());

public void buildIndex(String indexName, Map<String, Object> mappings, Map<String, Object> settings)
throws IOException {
Expand All @@ -53,6 +83,8 @@ public void buildIndex(String indexName, Map<String, Object> mappings, Map<Strin
Map<String, Object> baseSettings = new HashMap<>(settings);
baseSettings.put("number_of_shards", numShards);
baseSettings.put("number_of_replicas", numReplicas);
baseSettings.put("refresh_interval", String.format("%ss", refreshIntervalSeconds));
baseSettings.putAll(indexSettingOverrides.getOrDefault(indexName, Map.of()));
Map<String, Object> finalSettings = ImmutableMap.of("index", baseSettings);

// If index doesn't exist, create index
Expand All @@ -70,38 +102,80 @@ public void buildIndex(String indexName, Map<String, Object> mappings, Map<Strin
.get()
.getSourceAsMap();

MapDifference<String, Object> mappingsDiff = Maps.difference((Map<String, Object>) oldMappings.get("properties"),
(Map<String, Object>) mappings.get("properties"));
MapDifference<String, Object> mappingsDiff = Maps.difference(
(Map<String, Object>) oldMappings.getOrDefault("properties", Map.of()),
(Map<String, Object>) mappings.getOrDefault("properties", Map.of()));

Settings oldSettings = searchClient.indices()
.getSettings(new GetSettingsRequest().indices(indexName), RequestOptions.DEFAULT)
.getIndexToSettings()
.valuesIt()
.next();
boolean isSettingsEqual = equals(finalSettings, oldSettings);

final boolean isAnalysisEqual = isAnalysisEqual(finalSettings, oldSettings);
final boolean isSettingsEqual = isSettingsEqual(finalSettings, oldSettings);
final boolean isSettingsReindexRequired = isSettingsReindexRequired(finalSettings, oldSettings);

// If there are no updates to mappings and settings, return
if (mappingsDiff.areEqual() && isSettingsEqual) {
if (mappingsDiff.areEqual() && isAnalysisEqual && isSettingsEqual) {
log.info("No updates to index {}", indexName);
return;
}

// If there are no updates to settings, and there are only pure additions to mappings (no updates to existing fields),
// there is no need to reindex. Just update mappings
if (isSettingsEqual && isPureAddition(mappingsDiff)) {
log.info("New fields have been added to index {}. Updating index in place", indexName);
if (isAnalysisEqual && isPureAddition(mappingsDiff) && isSettingsEqual) {
log.info("New fields have been added to index {}. Updating index in place. Adding: {}", indexName, mappingsDiff);
PutMappingRequest request = new PutMappingRequest(indexName).source(mappings);
searchClient.indices().putMapping(request, RequestOptions.DEFAULT);
log.info("Updated index {} with new mappings", indexName);
return;
}

if (!mappingsDiff.entriesDiffering().isEmpty()) {
log.info("There's diff between new mappings (left) and old mappings (right): {}", mappingsDiff.toString());
log.info("There's diff between new mappings (left) and old mappings (right): {}", mappingsDiff);
reindex(indexName, mappings, finalSettings);
} else {
log.info("There's an update to settings");
if (isSettingsReindexRequired) {
if (enableIndexSettingsReindex) {
log.info("There's an update to settings that requires reindexing. Target: {}",
OBJECT_MAPPER.writeValueAsString(finalSettings));
reindex(indexName, mappings, finalSettings);
} else {
log.warn("There's an update to settings that requires reindexing, however reindexing is disabled. Existing: {} Target: {}",
oldSettings, OBJECT_MAPPER.writeValueAsString(finalSettings));
}
}

/*
If we allow reindexing, then any setting that doesn't require reindexing is also
applied above and our equality is out of date. We don't want to apply them again for no reason.
*/
boolean settingsApplied = isSettingsReindexRequired && enableIndexSettingsReindex;
if (!isSettingsEqual && !settingsApplied) {
UpdateSettingsRequest request = new UpdateSettingsRequest(indexName);
Map<String, Object> indexSettings = ((Map<String, Object>) finalSettings.get("index"))
.entrySet().stream()
.filter(e -> SETTINGS_DYNAMIC.contains(e.getKey()))
.collect(Collectors.toMap(e -> "index." + e.getKey(), Map.Entry::getValue));

/*
We might not have any changes that can be applied without reindex. This is the case when a reindex
is needed due to a setting, but not allowed. We don't want to apply empty settings for no reason.
*/
if (!indexSettings.isEmpty()) {
request.settings(indexSettings);
boolean ack = searchClient.indices().putSettings(request, RequestOptions.DEFAULT).isAcknowledged();
log.info("Updated index {} with new settings. Settings: {}, Acknowledged: {}", indexName,
OBJECT_MAPPER.writeValueAsString(indexSettings), ack);
}
}
}
}

private void reindex(String indexName, Map<String, Object> mappings, Map<String, Object> finalSettings)
throws IOException {
String tempIndexName = indexName + "_" + System.currentTimeMillis();
createIndex(tempIndexName, mappings, finalSettings);
try {
Expand Down Expand Up @@ -205,12 +279,12 @@ private void createIndex(String indexName, Map<String, Object> mappings, Map<Str
log.info("Created index {}", indexName);
}

private boolean isPureAddition(MapDifference<String, Object> mapDifference) {
private static boolean isPureAddition(MapDifference<String, Object> mapDifference) {
return !mapDifference.areEqual() && mapDifference.entriesDiffering().isEmpty()
&& !mapDifference.entriesOnlyOnRight().isEmpty();
}

private boolean equals(Map<String, Object> newSettings, Settings oldSettings) {
private static boolean isAnalysisEqual(Map<String, Object> newSettings, Settings oldSettings) {
if (!newSettings.containsKey("index")) {
return true;
}
Expand All @@ -221,15 +295,34 @@ private boolean equals(Map<String, Object> newSettings, Settings oldSettings) {
// Compare analysis section
Map<String, Object> newAnalysis = (Map<String, Object>) indexSettings.get("analysis");
Settings oldAnalysis = oldSettings.getByPrefix("index.analysis.");
if (!equalsGroup(newAnalysis, oldAnalysis)) {
return equalsGroup(newAnalysis, oldAnalysis);
}

private static boolean isSettingsEqual(Map<String, Object> newSettings, Settings oldSettings) {
if (!newSettings.containsKey("index")) {
return true;
}
Map<String, Object> indexSettings = (Map<String, Object>) newSettings.get("index");
return SETTINGS.stream()
.allMatch(settingKey -> Objects.equals(indexSettings.get(settingKey).toString(), oldSettings.get("index." + settingKey)));
}

private static boolean isSettingsReindexRequired(Map<String, Object> newSettings, Settings oldSettings) {
if (!newSettings.containsKey("index")) {
return false;
}
// Compare remaining settings
return SETTINGS_TO_COMPARE.stream()
.noneMatch(settingKey -> Objects.equals(indexSettings.get(settingKey), oldSettings.get("index." + settingKey)));
Map<String, Object> indexSettings = (Map<String, Object>) newSettings.get("index");

if (SETTINGS_STATIC.stream().anyMatch(settingKey ->
!Objects.equals(indexSettings.get(settingKey).toString(), oldSettings.get("index." + settingKey)))) {
return true;
}

return indexSettings.containsKey("analysis")
&& !equalsGroup((Map<String, Object>) indexSettings.get("analysis"), oldSettings.getByPrefix("index.analysis."));
}

private boolean equalsGroup(Map<String, Object> newSettings, Settings oldSettings) {
private static boolean equalsGroup(Map<String, Object> newSettings, Settings oldSettings) {
if (!newSettings.keySet().equals(oldSettings.names())) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,41 @@
package com.linkedin.metadata.search.elasticsearch.update;

import com.linkedin.metadata.utils.metrics.MetricUtils;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.support.WriteRequest;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;


@Slf4j
public class BulkListener implements BulkProcessor.Listener {
private static final BulkListener INSTANCE = new BulkListener();
private static final Map<WriteRequest.RefreshPolicy, BulkListener> INSTANCES = new HashMap<>();

public static BulkListener getInstance() {
return INSTANCE;
return INSTANCES.computeIfAbsent(null, BulkListener::new);
}
public static BulkListener getInstance(WriteRequest.RefreshPolicy refreshPolicy) {
return INSTANCES.computeIfAbsent(refreshPolicy, BulkListener::new);
}

private final WriteRequest.RefreshPolicy refreshPolicy;

public BulkListener(WriteRequest.RefreshPolicy policy) {
refreshPolicy = policy;
}

@Override
public void beforeBulk(long executionId, BulkRequest request) {

if (refreshPolicy != null) {
request.setRefreshPolicy(refreshPolicy);
}
}

@Override
Expand All @@ -28,10 +47,36 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
log.info("Successfully fed bulk request. Number of events: " + response.getItems().length + " Took time ms: "
+ response.getIngestTookInMillis());
}
incrementMetrics(response);
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
log.error("Error feeding bulk request. No retries left", failure);
// Exception raised outside this method
log.error("Error feeding bulk request. No retries left. Request: {}", buildBulkRequestSummary(request), failure);
incrementMetrics(request, failure);
}

private static void incrementMetrics(BulkResponse response) {
Arrays.stream(response.getItems())
.map(req -> buildMetricName(req.getOpType(), req.status().name()))
.forEach(metricName -> MetricUtils.counter(BulkListener.class, metricName).inc());
}

private static void incrementMetrics(BulkRequest request, Throwable failure) {
request.requests().stream()
.map(req -> buildMetricName(req.opType(), "exception"))
.forEach(metricName -> MetricUtils.exceptionCounter(BulkListener.class, metricName, failure));
}

private static String buildMetricName(DocWriteRequest.OpType opType, String status) {
return opType.getLowercase() + MetricUtils.DELIMITER + status.toLowerCase();
}

private static String buildBulkRequestSummary(BulkRequest request) {
return request.requests().stream().map(req -> String.format(
"Failed to perform bulk request: index [%s], optype: [%s], type [%s], id [%s]",
req.index(), req.opType(), req.type(), req.id())
).collect(Collectors.joining(";"));
}
}
Loading

0 comments on commit d87a802

Please sign in to comment.