From 8f4e7bfe3c812ffd59c814ca4e6be31a4f1fed44 Mon Sep 17 00:00:00 2001 From: David Leifker Date: Sat, 22 Oct 2022 19:27:32 -0500 Subject: [PATCH] feat(elasticsearch): Updates to elasticsearch configuration, dao, and tests --- build.gradle | 1 + .../client/kafka/KafkaEmitterTest.java | 3 +- metadata-io/build.gradle | 1 + .../graph/elastic/ESGraphWriteDAO.java | 11 +- .../elastic/ElasticSearchGraphService.java | 4 +- .../indexbuilder/ESIndexBuilder.java | 123 +++++++++-- .../elasticsearch/update/BulkListener.java | 53 ++++- .../elasticsearch/update/ESBulkProcessor.java | 60 ++++++ .../elasticsearch/update/ESWriteDAO.java | 10 +- .../systemmetadata/ESSystemMetadataDAO.java | 16 +- .../ElasticSearchSystemMetadataService.java | 4 +- .../ElasticSearchTimeseriesAspectService.java | 16 +- .../ElasticSearchTestConfiguration.java | 91 ++++++++ .../metadata/ElasticSearchTestUtils.java | 116 ---------- .../linkedin/metadata/ElasticTestUtils.java | 48 ----- .../update/BulkListenerTest.java | 39 ++++ .../update/ESBulkProcessorTest.java | 19 ++ .../metadata/graph/GraphServiceTestBase.java | 34 +-- .../graph/dgraph/DgraphGraphServiceTest.java | 3 +- .../ElasticSearchGraphServiceTest.java | 38 ++-- .../graph/neo4j/Neo4jGraphServiceTest.java | 3 +- .../search/LineageSearchServiceTest.java | 43 ++-- .../metadata/search/SearchServiceTest.java | 48 ++--- .../ElasticSearchServiceTest.java | 58 ++--- .../indexbuilder/ESIndexBuilderTest.java | 199 ++++++++++++++++++ .../elasticsearch/query/ESBrowseDAOTest.java | 2 +- ...lasticSearchSystemMetadataServiceTest.java | 52 ++--- ...sticSearchTimeseriesAspectServiceTest.java | 41 ++-- .../elasticsearch/ElasticsearchConnector.java | 50 +---- .../ElasticsearchConnectorFactory.java | 18 +- metadata-service/factories/build.gradle | 1 + .../ElasticSearchGraphServiceFactory.java | 2 +- ...ticSearchSystemMetadataServiceFactory.java | 2 +- .../BaseElasticSearchComponentsFactory.java | 10 +- .../ElasticSearchBulkProcessorFactory.java | 30 ++- .../ElasticSearchIndexBuilderFactory.java | 55 ++++- .../search/ElasticSearchServiceFactory.java | 2 +- ...cSearchTimeseriesAspectServiceFactory.java | 2 +- .../src/main/resources/application.yml | 9 +- ...SearchIndexBuilderFactoryDefaultsTest.java | 26 +++ ...ticSearchIndexBuilderFactoryEmptyTest.java | 32 +++ ...earchIndexBuilderFactoryOverridesTest.java | 28 +++ .../metadata/utils/metrics/MetricUtils.java | 10 + 43 files changed, 910 insertions(+), 503 deletions(-) create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESBulkProcessor.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestConfiguration.java delete mode 100644 metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestUtils.java delete mode 100644 metadata-io/src/test/java/com/linkedin/metadata/ElasticTestUtils.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/BulkListenerTest.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/ESBulkProcessorTest.java create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilderTest.java create mode 100644 metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactoryDefaultsTest.java create mode 100644 metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactoryEmptyTest.java create mode 100644 metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactoryOverridesTest.java diff --git a/build.gradle b/build.gradle index d96da3f8e3f05f..6d1dfa365f0292 100644 --- a/build.gradle +++ b/build.gradle @@ -149,6 +149,7 @@ project.ext.externalDependency = [ 'springJdbc': "org.springframework:spring-jdbc:$springVersion", 'springWeb': "org.springframework:spring-web:$springVersion", 'springWebMVC': "org.springframework:spring-webmvc:$springVersion", + 'springBootTest': "org.springframework.boot:spring-boot-starter-test:$springBootVersion", 'springBoot': "org.springframework.boot:spring-boot:$springBootVersion", 'springBootAutoconfigure': "org.springframework.boot:spring-boot-autoconfigure:$springBootVersion", 'springBootStarterWeb': "org.springframework.boot:spring-boot-starter-web:$springBootVersion", diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/KafkaEmitterTest.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/KafkaEmitterTest.java index 8940d951bfc10a..213e987e74d887 100644 --- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/KafkaEmitterTest.java +++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/KafkaEmitterTest.java @@ -15,7 +15,6 @@ import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.errors.TimeoutException; import org.junit.BeforeClass; import org.junit.Test; import org.testcontainers.containers.Network; @@ -106,7 +105,7 @@ private static String createTopics(Stream bootstraps) { try { createAdminClient(bootstrap).createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))).all().get(); return bootstrap; - } catch (TimeoutException | InterruptedException | ExecutionException ex) { + } catch (RuntimeException | InterruptedException | ExecutionException ex) { return null; } }).filter(Objects::nonNull).findFirst().get(); diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle index 131c457293cb44..3c0edff7ac4221 100644 --- a/metadata-io/build.gradle +++ b/metadata-io/build.gradle @@ -55,6 +55,7 @@ dependencies { testCompile externalDependency.testContainersCassandra testCompile externalDependency.lombok testCompile project(':test-models') + testImplementation externalDependency.springBootTest testAnnotationProcessor externalDependency.lombok diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java index ab77ca8e2d205d..5fd0af80fbd0a4 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphWriteDAO.java @@ -30,6 +30,7 @@ public class ESGraphWriteDAO { private final RestHighLevelClient client; private final IndexConvention indexConvention; private final BulkProcessor bulkProcessor; + private final int numRetries; /** * Updates or inserts the given search document. @@ -42,8 +43,7 @@ public void upsertDocument(@Nonnull String docId, @Nonnull String document) { new IndexRequest(indexConvention.getIndexName(INDEX_NAME)).id(docId).source(document, XContentType.JSON); final UpdateRequest updateRequest = new UpdateRequest(indexConvention.getIndexName(INDEX_NAME), docId).doc(document, XContentType.JSON) - .detectNoop(false) - .upsert(indexRequest); + .detectNoop(false).retryOnConflict(numRetries).upsert(indexRequest); bulkProcessor.add(updateRequest); } @@ -55,10 +55,9 @@ public BulkByScrollResponse deleteByQuery(@Nullable final String sourceType, @No destinationType == null ? ImmutableList.of() : ImmutableList.of(destinationType), destinationEntityFilter, relationshipTypes, relationshipFilter); - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); - - deleteByQueryRequest.setQuery(finalQuery); - + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest() + .setQuery(finalQuery) + .setRefresh(true); deleteByQueryRequest.indices(indexConvention.getIndexName(INDEX_NAME)); try { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index 41787d5dd9b761..203cd805b3761f 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -251,7 +251,9 @@ public void configure() { @Override public void clear() { DeleteByQueryRequest deleteRequest = - new DeleteByQueryRequest(_indexConvention.getIndexName(INDEX_NAME)).setQuery(QueryBuilders.matchAllQuery()); + new DeleteByQueryRequest(_indexConvention.getIndexName(INDEX_NAME)) + .setQuery(QueryBuilders.matchAllQuery()) + .setRefresh(true); try { _searchClient.deleteByQuery(deleteRequest, RequestOptions.DEFAULT); } catch (Exception e) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java index 46c8c8334028b7..2bd55f9edc4678 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilder.java @@ -1,5 +1,6 @@ package com.linkedin.metadata.search.elasticsearch.indexbuilder; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.MapDifference; @@ -10,7 +11,11 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nonnull; + +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; @@ -32,6 +37,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; @Slf4j @@ -39,11 +45,35 @@ public class ESIndexBuilder { private final RestHighLevelClient searchClient; + @Getter private final int numShards; + + @Getter private final int numReplicas; + + @Getter private final int numRetries; - private static final List SETTINGS_TO_COMPARE = ImmutableList.of("number_of_shards", "number_of_replicas"); + @Getter + private final int refreshIntervalSeconds; + + @Getter + private final Map> indexSettingOverrides; + + @Getter + private final boolean enableIndexSettingsReindex; + + private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /* + Most index settings are default values and populated by Elastic. This list is an include list to determine which + settings we care about when a difference is present. + */ + private static final List SETTINGS_DYNAMIC = ImmutableList.of("number_of_replicas", "refresh_interval"); + // These setting require reindex + private static final List SETTINGS_STATIC = ImmutableList.of("number_of_shards"); + private static final List SETTINGS = Stream.concat( + SETTINGS_DYNAMIC.stream(), SETTINGS_STATIC.stream()).collect(Collectors.toList()); public void buildIndex(String indexName, Map mappings, Map settings) throws IOException { @@ -53,6 +83,8 @@ public void buildIndex(String indexName, Map mappings, Map baseSettings = new HashMap<>(settings); baseSettings.put("number_of_shards", numShards); baseSettings.put("number_of_replicas", numReplicas); + baseSettings.put("refresh_interval", String.format("%ss", refreshIntervalSeconds)); + baseSettings.putAll(indexSettingOverrides.getOrDefault(indexName, Map.of())); Map finalSettings = ImmutableMap.of("index", baseSettings); // If index doesn't exist, create index @@ -70,26 +102,30 @@ public void buildIndex(String indexName, Map mappings, Map mappingsDiff = Maps.difference((Map) oldMappings.get("properties"), - (Map) mappings.get("properties")); + MapDifference mappingsDiff = Maps.difference( + (Map) oldMappings.getOrDefault("properties", Map.of()), + (Map) mappings.getOrDefault("properties", Map.of())); Settings oldSettings = searchClient.indices() .getSettings(new GetSettingsRequest().indices(indexName), RequestOptions.DEFAULT) .getIndexToSettings() .valuesIt() .next(); - boolean isSettingsEqual = equals(finalSettings, oldSettings); + + final boolean isAnalysisEqual = isAnalysisEqual(finalSettings, oldSettings); + final boolean isSettingsEqual = isSettingsEqual(finalSettings, oldSettings); + final boolean isSettingsReindexRequired = isSettingsReindexRequired(finalSettings, oldSettings); // If there are no updates to mappings and settings, return - if (mappingsDiff.areEqual() && isSettingsEqual) { + if (mappingsDiff.areEqual() && isAnalysisEqual && isSettingsEqual) { log.info("No updates to index {}", indexName); return; } // If there are no updates to settings, and there are only pure additions to mappings (no updates to existing fields), // there is no need to reindex. Just update mappings - if (isSettingsEqual && isPureAddition(mappingsDiff)) { - log.info("New fields have been added to index {}. Updating index in place", indexName); + if (isAnalysisEqual && isPureAddition(mappingsDiff) && isSettingsEqual) { + log.info("New fields have been added to index {}. Updating index in place. Adding: {}", indexName, mappingsDiff); PutMappingRequest request = new PutMappingRequest(indexName).source(mappings); searchClient.indices().putMapping(request, RequestOptions.DEFAULT); log.info("Updated index {} with new mappings", indexName); @@ -97,11 +133,49 @@ public void buildIndex(String indexName, Map mappings, Map indexSettings = ((Map) finalSettings.get("index")) + .entrySet().stream() + .filter(e -> SETTINGS_DYNAMIC.contains(e.getKey())) + .collect(Collectors.toMap(e -> "index." + e.getKey(), Map.Entry::getValue)); + + /* + We might not have any changes that can be applied without reindex. This is the case when a reindex + is needed due to a setting, but not allowed. We don't want to apply empty settings for no reason. + */ + if (!indexSettings.isEmpty()) { + request.settings(indexSettings); + boolean ack = searchClient.indices().putSettings(request, RequestOptions.DEFAULT).isAcknowledged(); + log.info("Updated index {} with new settings. Settings: {}, Acknowledged: {}", indexName, + OBJECT_MAPPER.writeValueAsString(indexSettings), ack); + } + } } + } + private void reindex(String indexName, Map mappings, Map finalSettings) + throws IOException { String tempIndexName = indexName + "_" + System.currentTimeMillis(); createIndex(tempIndexName, mappings, finalSettings); try { @@ -205,12 +279,12 @@ private void createIndex(String indexName, Map mappings, Map mapDifference) { + private static boolean isPureAddition(MapDifference mapDifference) { return !mapDifference.areEqual() && mapDifference.entriesDiffering().isEmpty() && !mapDifference.entriesOnlyOnRight().isEmpty(); } - private boolean equals(Map newSettings, Settings oldSettings) { + private static boolean isAnalysisEqual(Map newSettings, Settings oldSettings) { if (!newSettings.containsKey("index")) { return true; } @@ -221,15 +295,34 @@ private boolean equals(Map newSettings, Settings oldSettings) { // Compare analysis section Map newAnalysis = (Map) indexSettings.get("analysis"); Settings oldAnalysis = oldSettings.getByPrefix("index.analysis."); - if (!equalsGroup(newAnalysis, oldAnalysis)) { + return equalsGroup(newAnalysis, oldAnalysis); + } + + private static boolean isSettingsEqual(Map newSettings, Settings oldSettings) { + if (!newSettings.containsKey("index")) { + return true; + } + Map indexSettings = (Map) newSettings.get("index"); + return SETTINGS.stream() + .allMatch(settingKey -> Objects.equals(indexSettings.get(settingKey).toString(), oldSettings.get("index." + settingKey))); + } + + private static boolean isSettingsReindexRequired(Map newSettings, Settings oldSettings) { + if (!newSettings.containsKey("index")) { return false; } - // Compare remaining settings - return SETTINGS_TO_COMPARE.stream() - .noneMatch(settingKey -> Objects.equals(indexSettings.get(settingKey), oldSettings.get("index." + settingKey))); + Map indexSettings = (Map) newSettings.get("index"); + + if (SETTINGS_STATIC.stream().anyMatch(settingKey -> + !Objects.equals(indexSettings.get(settingKey).toString(), oldSettings.get("index." + settingKey)))) { + return true; + } + + return indexSettings.containsKey("analysis") + && !equalsGroup((Map) indexSettings.get("analysis"), oldSettings.getByPrefix("index.analysis.")); } - private boolean equalsGroup(Map newSettings, Settings oldSettings) { + private static boolean equalsGroup(Map newSettings, Settings oldSettings) { if (!newSettings.keySet().equals(oldSettings.names())) { return false; } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/BulkListener.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/BulkListener.java index 5420a497411378..1e4751459a48c6 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/BulkListener.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/BulkListener.java @@ -1,22 +1,41 @@ package com.linkedin.metadata.search.elasticsearch.update; +import com.linkedin.metadata.utils.metrics.MetricUtils; import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.support.WriteRequest; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; @Slf4j public class BulkListener implements BulkProcessor.Listener { - private static final BulkListener INSTANCE = new BulkListener(); + private static final Map INSTANCES = new HashMap<>(); public static BulkListener getInstance() { - return INSTANCE; + return INSTANCES.computeIfAbsent(null, BulkListener::new); + } + public static BulkListener getInstance(WriteRequest.RefreshPolicy refreshPolicy) { + return INSTANCES.computeIfAbsent(refreshPolicy, BulkListener::new); + } + + private final WriteRequest.RefreshPolicy refreshPolicy; + + public BulkListener(WriteRequest.RefreshPolicy policy) { + refreshPolicy = policy; } @Override public void beforeBulk(long executionId, BulkRequest request) { - + if (refreshPolicy != null) { + request.setRefreshPolicy(refreshPolicy); + } } @Override @@ -28,10 +47,36 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon log.info("Successfully fed bulk request. Number of events: " + response.getItems().length + " Took time ms: " + response.getIngestTookInMillis()); } + incrementMetrics(response); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error("Error feeding bulk request. No retries left", failure); + // Exception raised outside this method + log.error("Error feeding bulk request. No retries left. Request: {}", buildBulkRequestSummary(request), failure); + incrementMetrics(request, failure); + } + + private static void incrementMetrics(BulkResponse response) { + Arrays.stream(response.getItems()) + .map(req -> buildMetricName(req.getOpType(), req.status().name())) + .forEach(metricName -> MetricUtils.counter(BulkListener.class, metricName).inc()); + } + + private static void incrementMetrics(BulkRequest request, Throwable failure) { + request.requests().stream() + .map(req -> buildMetricName(req.opType(), "exception")) + .forEach(metricName -> MetricUtils.exceptionCounter(BulkListener.class, metricName, failure)); + } + + private static String buildMetricName(DocWriteRequest.OpType opType, String status) { + return opType.getLowercase() + MetricUtils.DELIMITER + status.toLowerCase(); + } + + private static String buildBulkRequestSummary(BulkRequest request) { + return request.requests().stream().map(req -> String.format( + "Failed to perform bulk request: index [%s], optype: [%s], type [%s], id [%s]", + req.index(), req.opType(), req.type(), req.id()) + ).collect(Collectors.joining(";")); } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESBulkProcessor.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESBulkProcessor.java new file mode 100644 index 00000000000000..3618c55f6c4515 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESBulkProcessor.java @@ -0,0 +1,60 @@ +package com.linkedin.metadata.search.elasticsearch.update; + +import lombok.Builder; +import lombok.NonNull; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; + +import java.io.IOException; + +@Builder(builderMethodName = "hiddenBuilder") +public class ESBulkProcessor { + + public static ESBulkProcessor.ESBulkProcessorBuilder builder(RestHighLevelClient searchClient) { + return hiddenBuilder().searchClient(searchClient); + } + @NonNull + private RestHighLevelClient searchClient; + @Builder.Default + private Integer bulkRequestsLimit = 500; + @Builder.Default + private Integer bulkFlushPeriod = 1; + @Builder.Default + private Integer numRetries = 3; + @Builder.Default + private Long retryInterval = 1L; + private WriteRequest.RefreshPolicy writeRequestRefreshPolicy; + + public BulkProcessor toBulkProcessor() { + return BulkProcessor.builder((request, bulkListener) -> { + try { + BulkResponse response = searchClient.bulk(request, RequestOptions.DEFAULT); + bulkListener.onResponse(response); + } catch (IOException e) { + bulkListener.onFailure(e); + throw new RuntimeException(e); + } + }, BulkListener.getInstance(writeRequestRefreshPolicy)) + .setBulkActions(bulkRequestsLimit) + .setFlushInterval(TimeValue.timeValueSeconds(bulkFlushPeriod)) + // This retry is ONLY for "resource constraints", i.e. 429 errors (each request has other retry methods) + .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(retryInterval), numRetries)) + .build(); + } + + public BulkProcessor toAsyncBulkProcessor() { + return BulkProcessor.builder((request, bulkListener) -> { + searchClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); + }, BulkListener.getInstance(writeRequestRefreshPolicy)) + .setBulkActions(bulkRequestsLimit) + .setFlushInterval(TimeValue.timeValueSeconds(bulkFlushPeriod)) + // This retry is ONLY for "resource constraints", i.e. 429 errors (each request has other retry methods) + .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(retryInterval), numRetries)) + .build(); + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java index cda77412baebac..0439068ed55653 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java @@ -28,6 +28,7 @@ public class ESWriteDAO { private final RestHighLevelClient searchClient; private final IndexConvention indexConvention; private final BulkProcessor bulkProcessor; + private final int numRetries; /** * Updates or inserts the given search document. @@ -40,7 +41,8 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document, final String indexName = indexConvention.getIndexName(entityRegistry.getEntitySpec(entityName)); final IndexRequest indexRequest = new IndexRequest(indexName).id(docId).source(document, XContentType.JSON); final UpdateRequest updateRequest = - new UpdateRequest(indexName, docId).doc(document, XContentType.JSON).detectNoop(false).upsert(indexRequest); + new UpdateRequest(indexName, docId).doc(document, XContentType.JSON) + .detectNoop(false).retryOnConflict(numRetries).upsert(indexRequest); bulkProcessor.add(updateRequest); } @@ -60,7 +62,7 @@ public void deleteDocument(@Nonnull String entityName, @Nonnull String docId) { */ public void applyScriptUpdate(@Nonnull String entityName, @Nonnull String docId, @Nonnull String script) { final String indexName = indexConvention.getIndexName(entityRegistry.getEntitySpec(entityName)); - bulkProcessor.add(new UpdateRequest(indexName, docId).script(new Script(script))); + bulkProcessor.add(new UpdateRequest(indexName, docId).retryOnConflict(numRetries).script(new Script(script))); } /** @@ -68,7 +70,9 @@ public void applyScriptUpdate(@Nonnull String entityName, @Nonnull String docId, */ public void clear() { String[] indices = getIndices(indexConvention.getAllEntityIndicesPattern()); - DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(indices).setQuery(QueryBuilders.matchAllQuery()); + DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(indices) + .setQuery(QueryBuilders.matchAllQuery()) + .setRefresh(true); try { searchClient.deleteByQuery(deleteRequest, RequestOptions.DEFAULT); } catch (Exception e) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ESSystemMetadataDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ESSystemMetadataDAO.java index c5a01278c546bd..463160ac759303 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ESSystemMetadataDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ESSystemMetadataDAO.java @@ -40,6 +40,7 @@ public class ESSystemMetadataDAO { private final RestHighLevelClient client; private final IndexConvention indexConvention; private final BulkProcessor bulkProcessor; + private final int numRetries; /** * Updates or inserts the given search document. @@ -52,8 +53,7 @@ public void upsertDocument(@Nonnull String docId, @Nonnull String document) { new IndexRequest(indexConvention.getIndexName(INDEX_NAME)).id(docId).source(document, XContentType.JSON); final UpdateRequest updateRequest = new UpdateRequest(indexConvention.getIndexName(INDEX_NAME), docId).doc(document, XContentType.JSON) - .detectNoop(false) - .upsert(indexRequest); + .detectNoop(false).retryOnConflict(numRetries).upsert(indexRequest); bulkProcessor.add(updateRequest); } @@ -74,9 +74,9 @@ public BulkByScrollResponse deleteByUrn(@Nonnull final String urn) { BoolQueryBuilder finalQuery = QueryBuilders.boolQuery(); finalQuery.must(QueryBuilders.termQuery("urn", urn)); - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); - - deleteByQueryRequest.setQuery(finalQuery); + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest() + .setQuery(finalQuery) + .setRefresh(true); deleteByQueryRequest.indices(indexConvention.getIndexName(INDEX_NAME)); @@ -95,9 +95,9 @@ public BulkByScrollResponse deleteByUrnAspect(@Nonnull final String urn, @Nonnul finalQuery.must(QueryBuilders.termQuery("urn", urn)); finalQuery.must(QueryBuilders.termQuery("aspect", aspect)); - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); - - deleteByQueryRequest.setQuery(finalQuery); + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest() + .setQuery(finalQuery) + .setRefresh(true); deleteByQueryRequest.indices(indexConvention.getIndexName(INDEX_NAME)); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java index 6f6c625666cf80..df0c23ceb8687c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java @@ -207,7 +207,9 @@ public void configure() { @Override public void clear() { DeleteByQueryRequest deleteRequest = - new DeleteByQueryRequest(_indexConvention.getIndexName(INDEX_NAME)).setQuery(QueryBuilders.matchAllQuery()); + new DeleteByQueryRequest(_indexConvention.getIndexName(INDEX_NAME)) + .setQuery(QueryBuilders.matchAllQuery()) + .setRefresh(true); try { _searchClient.deleteByQuery(deleteRequest, RequestOptions.DEFAULT); } catch (Exception e) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java index 978ab524b05ce0..996aed6137a02e 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectService.java @@ -67,6 +67,7 @@ public class ElasticSearchTimeseriesAspectService implements TimeseriesAspectSer private final IndexConvention _indexConvention; private final BulkProcessor _bulkProcessor; + private final int _numRetries; private final TimeseriesAspectIndexBuilders _indexBuilders; private final RestHighLevelClient _searchClient; private final ESAggregatedStatsDAO _esAggregatedStatsDAO; @@ -74,12 +75,13 @@ public class ElasticSearchTimeseriesAspectService implements TimeseriesAspectSer public ElasticSearchTimeseriesAspectService(@Nonnull RestHighLevelClient searchClient, @Nonnull IndexConvention indexConvention, @Nonnull TimeseriesAspectIndexBuilders indexBuilders, - @Nonnull EntityRegistry entityRegistry, @Nonnull BulkProcessor bulkProcessor) { + @Nonnull EntityRegistry entityRegistry, @Nonnull BulkProcessor bulkProcessor, int numRetries) { _indexConvention = indexConvention; _indexBuilders = indexBuilders; _searchClient = searchClient; _bulkProcessor = bulkProcessor; _entityRegistry = entityRegistry; + _numRetries = numRetries; _esAggregatedStatsDAO = new ESAggregatedStatsDAO(indexConvention, searchClient, entityRegistry); } @@ -122,8 +124,7 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String aspectNam final IndexRequest indexRequest = new IndexRequest(indexName).id(docId).source(document.toString(), XContentType.JSON); final UpdateRequest updateRequest = new UpdateRequest(indexName, docId).doc(document.toString(), XContentType.JSON) - .detectNoop(false) - .upsert(indexRequest); + .detectNoop(false).retryOnConflict(_numRetries).upsert(indexRequest); _bulkProcessor.add(updateRequest); } @@ -202,10 +203,11 @@ public DeleteAspectValuesResult deleteAspectValues(@Nonnull String entityName, @ @Nonnull Filter filter) { final String indexName = _indexConvention.getTimeseriesAspectIndexName(entityName, aspectName); final BoolQueryBuilder filterQueryBuilder = ESUtils.buildFilterQuery(filter); - final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName).setQuery(filterQueryBuilder) - .setBatchSize(DEFAULT_LIMIT) - .setRefresh(true) - .setTimeout(TimeValue.timeValueMinutes(10)); + final DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName) + .setQuery(filterQueryBuilder) + .setBatchSize(DEFAULT_LIMIT) + .setRefresh(true) + .setTimeout(TimeValue.timeValueMinutes(10)); try { final BulkByScrollResponse response = _searchClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); return new DeleteAspectValuesResult().setNumDocsDeleted(response.getDeleted()); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestConfiguration.java b/metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestConfiguration.java new file mode 100644 index 00000000000000..ffccfc8603d05d --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestConfiguration.java @@ -0,0 +1,91 @@ +package com.linkedin.metadata; + +import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; +import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; +import org.apache.http.HttpHost; +import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.Scope; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.utility.DockerImageName; + +import javax.annotation.Nonnull; + +import java.util.Map; + +import static com.linkedin.metadata.DockerTestUtils.checkContainerEngine; + +@TestConfiguration +public class ElasticSearchTestConfiguration { + private static final String ELASTIC_VERSION = "7.9.3"; + private static final String ELASTIC_IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch"; + private static final String ENV_ELASTIC_IMAGE_FULL_NAME = System.getenv("ELASTIC_IMAGE_FULL_NAME"); + private static final String ELASTIC_IMAGE_FULL_NAME = ENV_ELASTIC_IMAGE_FULL_NAME != null + ? ENV_ELASTIC_IMAGE_FULL_NAME : ELASTIC_IMAGE_NAME + ":" + ELASTIC_VERSION; + private static final DockerImageName DOCKER_IMAGE_NAME = DockerImageName.parse(ELASTIC_IMAGE_FULL_NAME) + .asCompatibleSubstituteFor(ELASTIC_IMAGE_NAME); + + private static final int HTTP_PORT = 9200; + + @Primary + @Scope("singleton") + @Bean(name = "elasticSearchRestHighLevelClient") + @Nonnull + public RestHighLevelClient getElasticsearchClient() { + ElasticsearchContainer esContainer = new ElasticsearchContainer(DOCKER_IMAGE_NAME); + checkContainerEngine(esContainer.getDockerClient()); + esContainer.start(); + return buildRestClient(esContainer); + } + + /* + Cannot use the factory class without circular dependencies + */ + @Primary + @Bean(name = "elasticSearchBulkProcessor") + @Nonnull + public BulkProcessor getBulkProcessor(@Qualifier("elasticSearchRestHighLevelClient") RestHighLevelClient searchClient) { + ESBulkProcessor builder = ESBulkProcessor.builder(searchClient) + // For testing, immediately return write from search query + .writeRequestRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .bulkRequestsLimit(1) + .bulkFlushPeriod(-1) + .retryInterval(10L) + .numRetries(1) + .build(); + + // Using the default sync processor + return builder.toBulkProcessor(); + } + + @Primary + @Bean(name = "elasticSearchIndexBuilder") + @Nonnull + public ESIndexBuilder getIndexBuilder(@Qualifier("elasticSearchRestHighLevelClient") RestHighLevelClient searchClient) { + return new ESIndexBuilder(searchClient, 1, 1, 3, 1, Map.of(), false); + } + + // A helper method to create an ElasticseachContainer defaulting to the current image and version, with the ability + // within firewalled environments to override with an environment variable to point to the offline repository. + // A helper method to construct a standard rest client for Elastic search. + @Nonnull + private static RestHighLevelClient buildRestClient(ElasticsearchContainer elasticsearchContainer) { + final RestClientBuilder builder = + RestClient.builder(new HttpHost("localhost", elasticsearchContainer.getMappedPort(HTTP_PORT), "http")) + .setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultIOReactorConfig( + IOReactorConfig.custom().setIoThreadCount(1).build())); + + builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder. + setConnectionRequestTimeout(30000)); + + return new RestHighLevelClient(builder); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestUtils.java b/metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestUtils.java deleted file mode 100644 index c8db02ab3ea10d..00000000000000 --- a/metadata-io/src/test/java/com/linkedin/metadata/ElasticSearchTestUtils.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.linkedin.metadata; - -import io.github.resilience4j.retry.Retry; -import io.github.resilience4j.retry.RetryConfig; -import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.action.admin.indices.flush.FlushRequest; -import org.elasticsearch.action.admin.indices.flush.FlushResponse; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.get.GetRequest; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.xcontent.XContentType; -import org.testng.TestException; - -import java.net.SocketTimeoutException; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -public class ElasticSearchTestUtils { - - // request options for all requests - private static final RequestOptions OPTIONS = RequestOptions.DEFAULT; - - // retry logic for ES requests - private static final Retry RETRY = Retry.of("ElasticSearchTestUtils", RetryConfig.custom() - .retryExceptions(SocketTimeoutException.class, ElasticsearchStatusException.class) - .failAfterMaxAttempts(true) - .maxAttempts(3) - .build() - ); - - // allow for Supplier that throw exceptions - private interface ThrowingSupplier { - T get() throws E; - } - - // We are retrying requests, otherwise concurrency tests will see exceptions like these: - // java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-1 [ACTIVE] - private static T retry(ThrowingSupplier func) { - return RETRY.executeSupplier(() -> { - try { - return func.get(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - private ElasticSearchTestUtils() { - } - - public static void syncAfterWrite(RestHighLevelClient client) throws Exception { - syncAfterWrite(client, "test-sync-flag"); - } - - public static void syncAfterWrite(RestHighLevelClient searchClient, String indexName) throws Exception { - // we add some more data (a sync flag) and wait for it to appear - // we pick a random flag so that this can be used concurrently - String syncFlag = UUID.randomUUID().toString(); - - // add the flag and wait for it to appear, preferably to the indexed modified outside - addSyncFlag(searchClient, syncFlag, indexName); - waitForSyncFlag(searchClient, syncFlag, indexName, true); - - // flush changes for all indices in ES to disk - FlushResponse fResponse = retry(() -> searchClient.indices().flush(new FlushRequest(), OPTIONS)); - if (fResponse.getFailedShards() > 0) { - throw new RuntimeException("Failed to flush " + fResponse.getFailedShards() + " of " + fResponse.getTotalShards() + " shards"); - } - - // wait for all indices to be refreshed - RefreshResponse rResponse = retry(() -> searchClient.indices().refresh(new RefreshRequest(), OPTIONS)); - if (rResponse.getFailedShards() > 0) { - throw new RuntimeException("Failed to refresh " + rResponse.getFailedShards() + " of " + rResponse.getTotalShards() + " shards"); - } - - // remove the flag again and wait for it to disappear - removeSyncFlag(searchClient, syncFlag, indexName); - waitForSyncFlag(searchClient, syncFlag, indexName, false); - } - - private static void addSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) { - String document = "{ }"; - final IndexRequest indexRequest = new IndexRequest(indexName).id(docId).source(document, XContentType.JSON); - final UpdateRequest updateRequest = new UpdateRequest(indexName, docId).doc(document, XContentType.JSON) - .detectNoop(false) - .retryOnConflict(3) - .upsert(indexRequest); - retry(() -> searchClient.update(updateRequest, OPTIONS)); - } - - private static void removeSyncFlag(RestHighLevelClient searchClient, String docId, String indexName) { - final DeleteRequest deleteRequest = new DeleteRequest(indexName).id(docId); - retry(() -> searchClient.delete(deleteRequest, OPTIONS)); - } - - private static void waitForSyncFlag(RestHighLevelClient searchClient, String docId, String indexName, boolean toExist) - throws InterruptedException { - GetRequest request = new GetRequest(indexName).id(docId); - long timeout = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS); - while (System.currentTimeMillis() < timeout) { - GetResponse response = retry(() -> searchClient.get(request, OPTIONS)); - if (response.isExists() == toExist) { - return; - } - TimeUnit.MILLISECONDS.sleep(50); - } - throw new TestException("Waiting for sync timed out"); - } - -} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/ElasticTestUtils.java b/metadata-io/src/test/java/com/linkedin/metadata/ElasticTestUtils.java deleted file mode 100644 index 9f2c2f82d634c3..00000000000000 --- a/metadata-io/src/test/java/com/linkedin/metadata/ElasticTestUtils.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.linkedin.metadata; - -import org.apache.http.HttpHost; -import org.apache.http.impl.nio.reactor.IOReactorConfig; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; -import org.elasticsearch.client.RestHighLevelClient; -import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testcontainers.utility.DockerImageName; - -import javax.annotation.Nonnull; - -public class ElasticTestUtils { - private ElasticTestUtils() { - } - - private static final String ELASTIC_VERSION = "7.9.3"; - private static final String ELASTIC_IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch"; - private static final String ENV_ELASTIC_IMAGE_FULL_NAME = System.getenv("ELASTIC_IMAGE_FULL_NAME"); - private static final String ELASTIC_IMAGE_FULL_NAME = ENV_ELASTIC_IMAGE_FULL_NAME != null - ? ENV_ELASTIC_IMAGE_FULL_NAME : ELASTIC_IMAGE_NAME + ":" + ELASTIC_VERSION; - private static final DockerImageName DOCKER_IMAGE_NAME = DockerImageName.parse(ELASTIC_IMAGE_FULL_NAME) - .asCompatibleSubstituteFor(ELASTIC_IMAGE_NAME); - - private static final int HTTP_PORT = 9200; - - // A helper method to create an ElasticseachContainer defaulting to the current image and version, with the ability - // within firewalled environments to override with an environment variable to point to the offline repository. - @Nonnull - public static final ElasticsearchContainer getNewElasticsearchContainer() { - return new ElasticsearchContainer(DOCKER_IMAGE_NAME); - } - - // A helper method to construct a standard rest client for Elastic search. - @Nonnull - public static RestHighLevelClient buildRestClient(ElasticsearchContainer elasticsearchContainer) { - final RestClientBuilder builder = - RestClient.builder(new HttpHost("localhost", elasticsearchContainer.getMappedPort(HTTP_PORT), "http")) - .setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultIOReactorConfig( - IOReactorConfig.custom().setIoThreadCount(1).build())); - - builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder. - setConnectionRequestTimeout(3000)); - - return new RestHighLevelClient(builder); - } - -} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/BulkListenerTest.java b/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/BulkListenerTest.java new file mode 100644 index 00000000000000..154131ceb6fee5 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/BulkListenerTest.java @@ -0,0 +1,39 @@ +package com.linkedin.metadata.elasticsearch.update; + +import com.linkedin.metadata.search.elasticsearch.update.BulkListener; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.ArgumentMatchers.any; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; + +public class BulkListenerTest { + + @Test + public void testConstructor() { + BulkListener test = BulkListener.getInstance(); + assertNotNull(test); + assertEquals(test, BulkListener.getInstance()); + assertNotEquals(test, BulkListener.getInstance(WriteRequest.RefreshPolicy.IMMEDIATE)); + } + + @Test + public void testDefaultPolicy() { + BulkListener test = BulkListener.getInstance(); + + BulkRequest mockRequest1 = Mockito.mock(BulkRequest.class); + test.beforeBulk(0L, mockRequest1); + verify(mockRequest1, times(0)).setRefreshPolicy(any(WriteRequest.RefreshPolicy.class)); + + BulkRequest mockRequest2 = Mockito.mock(BulkRequest.class); + test = BulkListener.getInstance(WriteRequest.RefreshPolicy.IMMEDIATE); + test.beforeBulk(0L, mockRequest2); + verify(mockRequest2, times(1)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/ESBulkProcessorTest.java b/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/ESBulkProcessorTest.java new file mode 100644 index 00000000000000..f025a981b05acb --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/elasticsearch/update/ESBulkProcessorTest.java @@ -0,0 +1,19 @@ +package com.linkedin.metadata.elasticsearch.update; + +import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.client.RestHighLevelClient; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertNotNull; + +public class ESBulkProcessorTest { + + @Test + public void testESBulkProcessorBuilder() { + RestHighLevelClient mock = Mockito.mock(RestHighLevelClient.class); + BulkProcessor test = ESBulkProcessor.builder(mock).build().toBulkProcessor(); + assertNotNull(test); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java index 5647b828c37cba..619fc5e1611458 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java @@ -27,6 +27,8 @@ import java.util.stream.IntStream; import javax.annotation.Nonnull; import javax.annotation.Nullable; + +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -54,7 +56,7 @@ * Feel free to add a test to your test implementation that calls `getPopulatedGraphService` and * asserts the state of the graph in an implementation specific way. */ -abstract public class GraphServiceTestBase { +abstract public class GraphServiceTestBase extends AbstractTestNGSpringContextTests { private static class RelatedEntityComparator implements Comparator { @Override @@ -181,15 +183,7 @@ public void testStaticUrns() { @Nonnull abstract protected GraphService getGraphService() throws Exception; - /** - * Allows the specific GraphService test implementation to wait for GraphService writes to - * be synced / become available to reads. - * - * @throws Exception on failure - */ - abstract protected void syncAfterWrite() throws Exception; - - /** + /** * Calls getGraphService to retrieve the test GraphService and populates it * with edges via `GraphService.addEdge`. * @@ -214,8 +208,6 @@ protected GraphService getPopulatedGraphService() throws Exception { ); edges.forEach(service::addEdge); - syncAfterWrite(); - return service; } @@ -245,7 +237,6 @@ protected GraphService getLineagePopulatedGraphService() throws Exception { ); edges.forEach(service::addEdge); - syncAfterWrite(); return service; } @@ -341,7 +332,6 @@ public void testAddEdge(List edges, List expectedOutgoing, GraphService service = getGraphService(); edges.forEach(service::addEdge); - syncAfterWrite(); RelatedEntitiesResult relatedOutgoing = service.findRelatedEntities( anyType, EMPTY_FILTER, @@ -921,12 +911,10 @@ public void testFindRelatedEntitiesNullSourceType() throws Exception { doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service); service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf)); - syncAfterWrite(); doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service); doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, downstreamOfDatasetOneRelatedEntity); service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf)); - syncAfterWrite(); doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service, nullRelatedEntity); doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, nullRelatedEntity, downstreamOfDatasetOneRelatedEntity); } @@ -943,12 +931,10 @@ public void testFindRelatedEntitiesNullDestinationType() throws Exception { doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service); service.addEdge(new Edge(datasetTwoUrn, datasetOneUrn, downstreamOf)); - syncAfterWrite(); doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service); doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, downstreamOfDatasetOneRelatedEntity); service.addEdge(new Edge(datasetOneUrn, nullUrn, downstreamOf)); - syncAfterWrite(); doTestFindRelatedEntitiesEntityType(anyType, ImmutableList.of("null"), downstreamOf, outgoingRelationships, service, nullRelatedEntity); doTestFindRelatedEntitiesEntityType(anyType, null, downstreamOf, outgoingRelationships, service, nullRelatedEntity, downstreamOfDatasetOneRelatedEntity); } @@ -1212,7 +1198,6 @@ anyType, newFilter("urn", nodeToRemoveFrom.toString()), relationTypes, relationshipFilter ); - syncAfterWrite(); RelatedEntitiesResult actualOutgoingRelatedUrnsAfterRemove = service.findRelatedEntities( anyType, newFilter("urn", nodeToRemoveFrom.toString()), @@ -1265,7 +1250,6 @@ anyType, newFilter("urn", nodeToRemoveFrom.toString()), Collections.emptyList(), incomingRelationships ); - syncAfterWrite(); RelatedEntitiesResult relatedOutgoingEntitiesAfterRemove = service.findRelatedEntities( anyType, newFilter("urn", nodeToRemoveFrom.toString()), @@ -1285,7 +1269,6 @@ anyType, newFilter("urn", nodeToRemoveFrom.toString()), Arrays.asList(downstreamOf, hasOwner, knowsUser), incomingRelationships ); - syncAfterWrite(); RelatedEntitiesResult relatedOutgoingEntitiesAfterRemoveAll = service.findRelatedEntities( anyType, newFilter("urn", nodeToRemoveFrom.toString()), @@ -1318,7 +1301,6 @@ public void testRemoveEdgesFromUnknownNode() throws Exception { Arrays.asList(downstreamOf, hasOwner, knowsUser), incomingRelationships ); - syncAfterWrite(); RelatedEntitiesResult relatedOutgoingEntitiesAfterRemove = service.findRelatedEntities( anyType, EMPTY_FILTER, @@ -1333,7 +1315,6 @@ public void testRemoveNode() throws Exception { GraphService service = getPopulatedGraphService(); service.removeNode(datasetTwoUrn); - syncAfterWrite(); // assert the modified graph assertEqualsAnyOrder( @@ -1362,7 +1343,6 @@ public void testRemoveUnknownNode() throws Exception { 0, 100); service.removeNode(unknownUrn); - syncAfterWrite(); RelatedEntitiesResult entitiesAfterRemove = service.findRelatedEntities( anyType, EMPTY_FILTER, @@ -1379,7 +1359,6 @@ public void testClear() throws Exception { // populated graph asserted in testPopulatedGraphService service.clear(); - syncAfterWrite(); // assert the modified graph: check all nodes related to upstreamOf and nextVersionOf edges again assertEqualsAnyOrder( @@ -1449,7 +1428,6 @@ public void run() { }).collect(Collectors.toList()); doTestConcurrentOp(operations); - syncAfterWrite(); RelatedEntitiesResult relatedEntities = service.findRelatedEntities( null, EMPTY_FILTER, @@ -1475,7 +1453,6 @@ public void testConcurrentRemoveEdgesFromNode() throws Exception { // add fully connected graph edges.forEach(service::addEdge); - syncAfterWrite(); // assert the graph is there RelatedEntitiesResult relatedEntities = service.findRelatedEntities( @@ -1494,7 +1471,6 @@ public void run() { } }).collect(Collectors.toList()); doTestConcurrentOp(operations); - syncAfterWrite(); // assert the graph is gone RelatedEntitiesResult relatedEntitiesAfterDeletion = service.findRelatedEntities( @@ -1519,7 +1495,6 @@ public void testConcurrentRemoveNodes() throws Exception { // add fully connected graph edges.forEach(service::addEdge); - syncAfterWrite(); // assert the graph is there RelatedEntitiesResult relatedEntities = service.findRelatedEntities( @@ -1539,7 +1514,6 @@ public void run() { } }).collect(Collectors.toList()); doTestConcurrentOp(operations); - syncAfterWrite(); // assert the graph is gone RelatedEntitiesResult relatedEntitiesAfterDeletion = service.findRelatedEntities( diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/dgraph/DgraphGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/dgraph/DgraphGraphServiceTest.java index a851b62527707f..bd0ca12a7a8438 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/dgraph/DgraphGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/dgraph/DgraphGraphServiceTest.java @@ -109,8 +109,7 @@ protected GraphService getGraphService() { return _service; } - @Override - protected void syncAfterWrite() { } + private void syncAfterWrite() { } @Test public void testGetSchema() { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java index 104f7c06ec8863..2a2053fe01d254 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphServiceTest.java @@ -1,8 +1,7 @@ package com.linkedin.metadata.graph.elastic; import com.linkedin.common.urn.Urn; -import com.linkedin.metadata.ElasticSearchTestUtils; -import com.linkedin.metadata.ElasticTestUtils; +import com.linkedin.metadata.ElasticSearchTestConfiguration; import com.linkedin.metadata.graph.EntityLineageResult; import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.graph.GraphServiceTestBase; @@ -15,13 +14,14 @@ import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.RelationshipDirection; import com.linkedin.metadata.query.filter.RelationshipFilter; -import com.linkedin.metadata.search.elasticsearch.ElasticSearchServiceTest; +import org.elasticsearch.action.bulk.BulkProcessor; +import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; import org.elasticsearch.client.RestHighLevelClient; -import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Import; import org.testng.SkipException; -import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -34,26 +34,26 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static com.linkedin.metadata.DockerTestUtils.checkContainerEngine; import static com.linkedin.metadata.graph.elastic.ElasticSearchGraphService.INDEX_NAME; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; - +@Import(ElasticSearchTestConfiguration.class) public class ElasticSearchGraphServiceTest extends GraphServiceTestBase { - private ElasticsearchContainer _elasticsearchContainer; + @Autowired private RestHighLevelClient _searchClient; + @Autowired + private BulkProcessor _bulkProcessor; + @Autowired + private ESIndexBuilder _esIndexBuilder; + private final IndexConvention _indexConvention = new IndexConventionImpl(null); private final String _indexName = _indexConvention.getIndexName(INDEX_NAME); private ElasticSearchGraphService _client; @BeforeClass public void setup() { - _elasticsearchContainer = ElasticTestUtils.getNewElasticsearchContainer(); - checkContainerEngine(_elasticsearchContainer.getDockerClient()); - _elasticsearchContainer.start(); - _searchClient = ElasticTestUtils.buildRestClient(_elasticsearchContainer); _client = buildService(); _client.configure(); } @@ -68,15 +68,9 @@ public void wipe() throws Exception { private ElasticSearchGraphService buildService() { LineageRegistry lineageRegistry = new LineageRegistry(SnapshotEntityRegistry.getInstance()); ESGraphQueryDAO readDAO = new ESGraphQueryDAO(_searchClient, lineageRegistry, _indexConvention); - ESGraphWriteDAO writeDAO = - new ESGraphWriteDAO(_searchClient, _indexConvention, ElasticSearchServiceTest.getBulkProcessor(_searchClient)); + ESGraphWriteDAO writeDAO = new ESGraphWriteDAO(_searchClient, _indexConvention, _bulkProcessor, 1); return new ElasticSearchGraphService(lineageRegistry, _searchClient, _indexConvention, writeDAO, readDAO, - ElasticSearchServiceTest.getIndexBuilder(_searchClient)); - } - - @AfterClass - public void tearDown() { - _elasticsearchContainer.stop(); + _esIndexBuilder); } @Override @@ -85,9 +79,7 @@ protected GraphService getGraphService() { return _client; } - @Override - protected void syncAfterWrite() throws Exception { - ElasticSearchTestUtils.syncAfterWrite(_searchClient, _indexName); + private void syncAfterWrite() throws Exception { } @Override diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java index 409c6d71685d43..4de91867efe416 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java @@ -47,8 +47,7 @@ GraphService getGraphService() { return _client; } - @Override - protected void syncAfterWrite() { + private void syncAfterWrite() { } @Override diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchServiceTest.java index 00302bd06a99c6..7ac19780041c2a 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/LineageSearchServiceTest.java @@ -7,7 +7,7 @@ import com.linkedin.common.urn.TestEntityUrn; import com.linkedin.common.urn.Urn; import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor; -import com.linkedin.metadata.ElasticTestUtils; +import com.linkedin.metadata.ElasticSearchTestConfiguration; import com.linkedin.metadata.TestEntityUtil; import com.linkedin.metadata.graph.EntityLineageResult; import com.linkedin.metadata.graph.GraphService; @@ -21,7 +21,7 @@ import com.linkedin.metadata.search.cache.EntityDocCountCache; import com.linkedin.metadata.search.client.CachingEntitySearchService; import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; -import com.linkedin.metadata.search.elasticsearch.ElasticSearchServiceTest; +import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders; import com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder; import com.linkedin.metadata.search.elasticsearch.query.ESBrowseDAO; @@ -31,11 +31,13 @@ import com.linkedin.metadata.search.utils.QueryUtils; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.CacheManager; import org.springframework.cache.concurrent.ConcurrentMapCacheManager; -import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testng.annotations.AfterClass; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -44,19 +46,22 @@ import java.util.Collections; import java.util.List; -import static com.linkedin.metadata.DockerTestUtils.checkContainerEngine; -import static com.linkedin.metadata.ElasticSearchTestUtils.syncAfterWrite; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +@Import(ElasticSearchTestConfiguration.class) +public class LineageSearchServiceTest extends AbstractTestNGSpringContextTests { -public class LineageSearchServiceTest { - - private ElasticsearchContainer _elasticsearchContainer; + @Autowired private RestHighLevelClient _searchClient; + @Autowired + private BulkProcessor _bulkProcessor; + @Autowired + private ESIndexBuilder _esIndexBuilder; + private EntityRegistry _entityRegistry; private IndexConvention _indexConvention; private SettingsBuilder _settingsBuilder; @@ -77,12 +82,8 @@ public void disableAssert() { @BeforeClass public void setup() { _entityRegistry = new SnapshotEntityRegistry(new Snapshot()); - _indexConvention = new IndexConventionImpl(null); - _elasticsearchContainer = ElasticTestUtils.getNewElasticsearchContainer(); + _indexConvention = new IndexConventionImpl("lineage_search_service_test"); _settingsBuilder = new SettingsBuilder(Collections.emptyList(), null); - checkContainerEngine(_elasticsearchContainer.getDockerClient()); - _elasticsearchContainer.start(); - _searchClient = ElasticTestUtils.buildRestClient(_elasticsearchContainer); _elasticSearchService = buildEntitySearchService(); _elasticSearchService.configure(); _cacheManager = new ConcurrentMapCacheManager(); @@ -109,18 +110,16 @@ private void resetService(boolean withCache) { public void wipe() throws Exception { _elasticSearchService.clear(); clearCache(); - syncAfterWrite(_searchClient); } @Nonnull private ElasticSearchService buildEntitySearchService() { EntityIndexBuilders indexBuilders = - new EntityIndexBuilders(ElasticSearchServiceTest.getIndexBuilder(_searchClient), _entityRegistry, + new EntityIndexBuilders(_esIndexBuilder, _entityRegistry, _indexConvention, _settingsBuilder); ESSearchDAO searchDAO = new ESSearchDAO(_entityRegistry, _searchClient, _indexConvention); ESBrowseDAO browseDAO = new ESBrowseDAO(_entityRegistry, _searchClient, _indexConvention); - ESWriteDAO writeDAO = new ESWriteDAO(_entityRegistry, _searchClient, _indexConvention, - ElasticSearchServiceTest.getBulkProcessor(_searchClient)); + ESWriteDAO writeDAO = new ESWriteDAO(_entityRegistry, _searchClient, _indexConvention, _bulkProcessor, 1); return new ElasticSearchService(indexBuilders, searchDAO, browseDAO, writeDAO); } @@ -129,11 +128,6 @@ private void clearCache() { resetService(true); } - @AfterClass - public void tearDown() { - _elasticsearchContainer.stop(); - } - private EntityLineageResult mockResult(List lineageRelationships) { return new EntityLineageResult().setRelationships(new LineageRelationshipArray(lineageRelationships)) .setStart(0) @@ -175,7 +169,6 @@ public void testSearchService() throws Exception { document.set("textFieldOverride", JsonNodeFactory.instance.textNode("textFieldOverride")); document.set("browsePaths", JsonNodeFactory.instance.textNode("/a/b/c")); _elasticSearchService.upsertDocument(ENTITY_NAME, document.toString(), urn.toString()); - syncAfterWrite(_searchClient); when(_graphService.getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), anyInt())).thenReturn(mockResult(Collections.emptyList())); @@ -217,7 +210,6 @@ public void testSearchService() throws Exception { document2.set("textFieldOverride", JsonNodeFactory.instance.textNode("textFieldOverride2")); document2.set("browsePaths", JsonNodeFactory.instance.textNode("/b/c")); _elasticSearchService.upsertDocument(ENTITY_NAME, document2.toString(), urn2.toString()); - syncAfterWrite(_searchClient); searchResult = _lineageSearchService.searchAcrossLineage(TEST_URN, LineageDirection.DOWNSTREAM, ImmutableList.of(), "test", @@ -238,7 +230,6 @@ public void testSearchService() throws Exception { _elasticSearchService.deleteDocument(ENTITY_NAME, urn.toString()); _elasticSearchService.deleteDocument(ENTITY_NAME, urn2.toString()); - syncAfterWrite(_searchClient); when(_graphService.getLineage(eq(TEST_URN), eq(LineageDirection.DOWNSTREAM), anyInt(), anyInt(), anyInt())).thenReturn( diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTest.java index f62c008ddf46d6..961d8513b08e83 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/SearchServiceTest.java @@ -7,7 +7,7 @@ import com.linkedin.common.urn.TestEntityUrn; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.StringArray; -import com.linkedin.metadata.ElasticTestUtils; +import com.linkedin.metadata.ElasticSearchTestConfiguration; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.SnapshotEntityRegistry; import com.linkedin.metadata.query.filter.Condition; @@ -21,7 +21,7 @@ import com.linkedin.metadata.search.cache.EntityDocCountCache; import com.linkedin.metadata.search.client.CachingEntitySearchService; import com.linkedin.metadata.search.elasticsearch.ElasticSearchService; -import com.linkedin.metadata.search.elasticsearch.ElasticSearchServiceTest; +import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders; import com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder; import com.linkedin.metadata.search.elasticsearch.query.ESBrowseDAO; @@ -30,11 +30,13 @@ import com.linkedin.metadata.search.ranker.SimpleRanker; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.CacheManager; import org.springframework.cache.concurrent.ConcurrentMapCacheManager; -import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testng.annotations.AfterClass; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -42,15 +44,17 @@ import javax.annotation.Nonnull; import java.util.Collections; -import static com.linkedin.metadata.DockerTestUtils.checkContainerEngine; -import static com.linkedin.metadata.ElasticSearchTestUtils.syncAfterWrite; import static org.testng.Assert.assertEquals; +@Import(ElasticSearchTestConfiguration.class) +public class SearchServiceTest extends AbstractTestNGSpringContextTests { -public class SearchServiceTest { - - private ElasticsearchContainer _elasticsearchContainer; + @Autowired private RestHighLevelClient _searchClient; + @Autowired + private BulkProcessor _bulkProcessor; + @Autowired + private ESIndexBuilder _esIndexBuilder; private EntityRegistry _entityRegistry; private IndexConvention _indexConvention; private SettingsBuilder _settingsBuilder; @@ -63,12 +67,8 @@ public class SearchServiceTest { @BeforeClass public void setup() { _entityRegistry = new SnapshotEntityRegistry(new Snapshot()); - _indexConvention = new IndexConventionImpl(null); - _elasticsearchContainer = ElasticTestUtils.getNewElasticsearchContainer(); + _indexConvention = new IndexConventionImpl("search_service_test"); _settingsBuilder = new SettingsBuilder(Collections.emptyList(), null); - checkContainerEngine(_elasticsearchContainer.getDockerClient()); - _elasticsearchContainer.start(); - _searchClient = ElasticTestUtils.buildRestClient(_elasticsearchContainer); _elasticSearchService = buildEntitySearchService(); _elasticSearchService.configure(); _cacheManager = new ConcurrentMapCacheManager(); @@ -99,18 +99,17 @@ private void resetSearchService() { @BeforeMethod public void wipe() throws Exception { _elasticSearchService.clear(); - syncAfterWrite(_searchClient); } @Nonnull private ElasticSearchService buildEntitySearchService() { EntityIndexBuilders indexBuilders = - new EntityIndexBuilders(ElasticSearchServiceTest.getIndexBuilder(_searchClient), _entityRegistry, + new EntityIndexBuilders(_esIndexBuilder, _entityRegistry, _indexConvention, _settingsBuilder); ESSearchDAO searchDAO = new ESSearchDAO(_entityRegistry, _searchClient, _indexConvention); ESBrowseDAO browseDAO = new ESBrowseDAO(_entityRegistry, _searchClient, _indexConvention); ESWriteDAO writeDAO = new ESWriteDAO(_entityRegistry, _searchClient, _indexConvention, - ElasticSearchServiceTest.getBulkProcessor(_searchClient)); + _bulkProcessor, 1); return new ElasticSearchService(indexBuilders, searchDAO, browseDAO, writeDAO); } @@ -119,11 +118,6 @@ private void clearCache() { resetSearchService(); } - @AfterClass - public void tearDown() { - _elasticsearchContainer.stop(); - } - @Test public void testSearchService() throws Exception { SearchResult searchResult = @@ -140,7 +134,6 @@ public void testSearchService() throws Exception { document.set("textFieldOverride", JsonNodeFactory.instance.textNode("textFieldOverride")); document.set("browsePaths", JsonNodeFactory.instance.textNode("/a/b/c")); _elasticSearchService.upsertDocument(ENTITY_NAME, document.toString(), urn.toString()); - syncAfterWrite(_searchClient); searchResult = _searchService.searchAcrossEntities(ImmutableList.of(), "test", null, null, 0, 10, null); assertEquals(searchResult.getNumEntities().intValue(), 1); @@ -154,7 +147,6 @@ public void testSearchService() throws Exception { document2.set("textFieldOverride", JsonNodeFactory.instance.textNode("textFieldOverride2")); document2.set("browsePaths", JsonNodeFactory.instance.textNode("/b/c")); _elasticSearchService.upsertDocument(ENTITY_NAME, document2.toString(), urn2.toString()); - syncAfterWrite(_searchClient); searchResult = _searchService.searchAcrossEntities(ImmutableList.of(), "test", null, null, 0, 10, null); assertEquals(searchResult.getNumEntities().intValue(), 1); @@ -163,7 +155,7 @@ public void testSearchService() throws Exception { _elasticSearchService.deleteDocument(ENTITY_NAME, urn.toString()); _elasticSearchService.deleteDocument(ENTITY_NAME, urn2.toString()); - syncAfterWrite(_searchClient); + searchResult = _searchService.searchAcrossEntities(ImmutableList.of(), "test", null, null, 0, 10, null); assertEquals(searchResult.getNumEntities().intValue(), 0); } @@ -227,8 +219,6 @@ public void testAdvancedSearchOr() throws Exception { document3.set("platform", JsonNodeFactory.instance.textNode("snowflake")); _elasticSearchService.upsertDocument(ENTITY_NAME, document3.toString(), urn3.toString()); - syncAfterWrite(_searchClient); - searchResult = _searchService.searchAcrossEntities(ImmutableList.of(), "test", filterWithCondition, null, 0, 10, null); assertEquals(searchResult.getNumEntities().intValue(), 2); assertEquals(searchResult.getEntities().get(0).getEntity(), urn); @@ -296,8 +286,6 @@ public void testAdvancedSearchSoftDelete() throws Exception { document.set("removed", JsonNodeFactory.instance.booleanNode(false)); _elasticSearchService.upsertDocument(ENTITY_NAME, document3.toString(), urn3.toString()); - syncAfterWrite(_searchClient); - searchResult = _searchService.searchAcrossEntities(ImmutableList.of(), "test", filterWithCondition, null, 0, 10, null); assertEquals(searchResult.getNumEntities().intValue(), 1); assertEquals(searchResult.getEntities().get(0).getEntity(), urn); @@ -359,8 +347,6 @@ public void testAdvancedSearchNegated() throws Exception { document.set("removed", JsonNodeFactory.instance.booleanNode(false)); _elasticSearchService.upsertDocument(ENTITY_NAME, document3.toString(), urn3.toString()); - syncAfterWrite(_searchClient); - searchResult = _searchService.searchAcrossEntities(ImmutableList.of(), "test", filterWithCondition, null, 0, 10, null); assertEquals(searchResult.getNumEntities().intValue(), 1); assertEquals(searchResult.getEntities().get(0).getEntity(), urn3); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchServiceTest.java index 5c2f0e651734ef..2c73c95500f085 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchServiceTest.java @@ -5,7 +5,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.linkedin.common.urn.TestEntityUrn; import com.linkedin.common.urn.Urn; -import com.linkedin.metadata.ElasticTestUtils; +import com.linkedin.metadata.ElasticSearchTestConfiguration; import com.linkedin.metadata.browse.BrowseResult; import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.models.registry.SnapshotEntityRegistry; @@ -15,18 +15,15 @@ import com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder; import com.linkedin.metadata.search.elasticsearch.query.ESBrowseDAO; import com.linkedin.metadata.search.elasticsearch.query.ESSearchDAO; -import com.linkedin.metadata.search.elasticsearch.update.BulkListener; import com.linkedin.metadata.search.elasticsearch.update.ESWriteDAO; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; -import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.unit.TimeValue; -import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; -import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -34,15 +31,18 @@ import javax.annotation.Nonnull; import java.util.Collections; -import static com.linkedin.metadata.DockerTestUtils.checkContainerEngine; -import static com.linkedin.metadata.ElasticSearchTestUtils.syncAfterWrite; import static org.testng.Assert.assertEquals; +@Import(ElasticSearchTestConfiguration.class) +public class ElasticSearchServiceTest extends AbstractTestNGSpringContextTests { -public class ElasticSearchServiceTest { - - private ElasticsearchContainer _elasticsearchContainer; + @Autowired private RestHighLevelClient _searchClient; + @Autowired + private BulkProcessor _bulkProcessor; + @Autowired + private ESIndexBuilder _esIndexBuilder; + private EntityRegistry _entityRegistry; private IndexConvention _indexConvention; private SettingsBuilder _settingsBuilder; @@ -53,12 +53,8 @@ public class ElasticSearchServiceTest { @BeforeClass public void setup() { _entityRegistry = new SnapshotEntityRegistry(new Snapshot()); - _indexConvention = new IndexConventionImpl(null); - _elasticsearchContainer = ElasticTestUtils.getNewElasticsearchContainer(); + _indexConvention = new IndexConventionImpl("es_service_test"); _settingsBuilder = new SettingsBuilder(Collections.emptyList(), null); - checkContainerEngine(_elasticsearchContainer.getDockerClient()); - _elasticsearchContainer.start(); - _searchClient = ElasticTestUtils.buildRestClient(_elasticsearchContainer); _elasticSearchService = buildService(); _elasticSearchService.configure(); } @@ -66,39 +62,19 @@ public void setup() { @BeforeMethod public void wipe() throws Exception { _elasticSearchService.clear(); - syncAfterWrite(_searchClient); - } - - public static BulkProcessor getBulkProcessor(RestHighLevelClient searchClient) { - return BulkProcessor.builder((request, bulkListener) -> { - searchClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); - }, BulkListener.getInstance()) - .setBulkActions(1) - .setFlushInterval(TimeValue.timeValueSeconds(1)) - .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1000), 1)) - .build(); - } - - public static ESIndexBuilder getIndexBuilder(RestHighLevelClient searchClient) { - return new ESIndexBuilder(searchClient, 1, 1, 3); } @Nonnull private ElasticSearchService buildService() { EntityIndexBuilders indexBuilders = - new EntityIndexBuilders(getIndexBuilder(_searchClient), _entityRegistry, _indexConvention, _settingsBuilder); + new EntityIndexBuilders(_esIndexBuilder, _entityRegistry, _indexConvention, _settingsBuilder); ESSearchDAO searchDAO = new ESSearchDAO(_entityRegistry, _searchClient, _indexConvention); ESBrowseDAO browseDAO = new ESBrowseDAO(_entityRegistry, _searchClient, _indexConvention); ESWriteDAO writeDAO = - new ESWriteDAO(_entityRegistry, _searchClient, _indexConvention, getBulkProcessor(_searchClient)); + new ESWriteDAO(_entityRegistry, _searchClient, _indexConvention, _bulkProcessor, 1); return new ElasticSearchService(indexBuilders, searchDAO, browseDAO, writeDAO); } - @AfterClass - public void tearDown() { - _elasticsearchContainer.stop(); - } - @Test public void testElasticSearchService() throws Exception { SearchResult searchResult = _elasticSearchService.search(ENTITY_NAME, "test", null, null, 0, 10); @@ -116,7 +92,6 @@ public void testElasticSearchService() throws Exception { document.set("browsePaths", JsonNodeFactory.instance.textNode("/a/b/c")); document.set("foreignKey", JsonNodeFactory.instance.textNode("urn:li:tag:Node.Value")); _elasticSearchService.upsertDocument(ENTITY_NAME, document.toString(), urn.toString()); - syncAfterWrite(_searchClient); searchResult = _elasticSearchService.search(ENTITY_NAME, "test", null, null, 0, 10); assertEquals(searchResult.getNumEntities().intValue(), 1); @@ -141,7 +116,6 @@ public void testElasticSearchService() throws Exception { document2.set("textFieldOverride", JsonNodeFactory.instance.textNode("textFieldOverride2")); document2.set("browsePaths", JsonNodeFactory.instance.textNode("/b/c")); _elasticSearchService.upsertDocument(ENTITY_NAME, document2.toString(), urn2.toString()); - syncAfterWrite(_searchClient); searchResult = _elasticSearchService.search(ENTITY_NAME, "test", null, null, 0, 10); assertEquals(searchResult.getNumEntities().intValue(), 1); @@ -159,7 +133,7 @@ public void testElasticSearchService() throws Exception { _elasticSearchService.deleteDocument(ENTITY_NAME, urn.toString()); _elasticSearchService.deleteDocument(ENTITY_NAME, urn2.toString()); - syncAfterWrite(_searchClient); + searchResult = _elasticSearchService.search(ENTITY_NAME, "test", null, null, 0, 10); assertEquals(searchResult.getNumEntities().intValue(), 0); browseResult = _elasticSearchService.browse(ENTITY_NAME, "", null, 0, 10); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilderTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilderTest.java new file mode 100644 index 00000000000000..407c8f7f16b40d --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/indexbuilder/ESIndexBuilderTest.java @@ -0,0 +1,199 @@ +package com.linkedin.metadata.search.elasticsearch.indexbuilder; + +import com.google.common.collect.ImmutableMap; +import com.linkedin.metadata.ElasticSearchTestConfiguration; +import com.linkedin.metadata.systemmetadata.SystemMetadataMappingsBuilder; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.client.IndicesClient; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.indices.GetIndexResponse; +import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.rest.RestStatus; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; + +@Import(ElasticSearchTestConfiguration.class) +public class ESIndexBuilderTest extends AbstractTestNGSpringContextTests { + + @Autowired + private RestHighLevelClient _searchClient; + private static IndicesClient _indexClient; + private static final String TEST_INDEX_NAME = "esindex_builder_test"; + private static ESIndexBuilder testDefaultBuilder; + + + @BeforeClass + public void setup() { + _indexClient = _searchClient.indices(); + testDefaultBuilder = new ESIndexBuilder(_searchClient, 1, 0, 0, 0, Map.of(), false); + } + + @BeforeMethod + public static void wipe() throws Exception { + try { + _indexClient.getAlias(new GetAliasesRequest(TEST_INDEX_NAME), RequestOptions.DEFAULT) + .getAliases().keySet().forEach(index -> { + try { + _indexClient.delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + _indexClient.delete(new DeleteIndexRequest(TEST_INDEX_NAME), RequestOptions.DEFAULT); + } catch (ElasticsearchException exception) { + if (exception.status() != RestStatus.NOT_FOUND) { + throw exception; + } + } + } + + public static GetIndexResponse getTestIndex() throws IOException { + return _indexClient.get(new GetIndexRequest(TEST_INDEX_NAME).includeDefaults(true), RequestOptions.DEFAULT); + } + + @Test + public void testESIndexBuilderCreation() throws Exception { + ESIndexBuilder customIndexBuilder = new ESIndexBuilder(_searchClient, 2, 0, 1, 0, Map.of(), false); + customIndexBuilder.buildIndex(TEST_INDEX_NAME, Map.of(), Map.of()); + GetIndexResponse resp = getTestIndex(); + + assertEquals("2", resp.getSetting(TEST_INDEX_NAME, "index.number_of_shards")); + assertEquals("0", resp.getSetting(TEST_INDEX_NAME, "index.number_of_replicas")); + assertEquals("0s", resp.getSetting(TEST_INDEX_NAME, "index.refresh_interval")); + } + + @Test + public void testMappingReindex() throws Exception { + // No mappings + testDefaultBuilder.buildIndex(TEST_INDEX_NAME, Map.of(), Map.of()); + String beforeCreationDate = getTestIndex().getSetting(TEST_INDEX_NAME, "index.creation_date"); + + // add new mappings + testDefaultBuilder.buildIndex(TEST_INDEX_NAME, SystemMetadataMappingsBuilder.getMappings(), Map.of()); + + String afterAddedMappingCreationDate = getTestIndex().getSetting(TEST_INDEX_NAME, "index.creation_date"); + assertEquals(beforeCreationDate, afterAddedMappingCreationDate, "Expected no reindex on *adding* mappings"); + + // change mappings + Map newProps = ((Map) SystemMetadataMappingsBuilder.getMappings().get("properties")) + .entrySet().stream() + .map(m -> !m.getKey().equals("urn") ? m + : Map.entry("urn", ImmutableMap.builder().put("type", "wildcard").build())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + testDefaultBuilder.buildIndex(TEST_INDEX_NAME, Map.of("properties", newProps), Map.of()); + + assertTrue(Arrays.stream(getTestIndex().getIndices()).noneMatch(name -> name.equals(TEST_INDEX_NAME)), + "Expected original index to be replaced with alias"); + + Map.Entry> newIndex = getTestIndex().getAliases().entrySet().stream() + .filter(e -> e.getValue().stream().anyMatch(aliasMeta -> aliasMeta.alias().equals(TEST_INDEX_NAME))) + .findFirst().get(); + String afterChangedMappingCreationDate = getTestIndex().getSetting(newIndex.getKey(), "index.creation_date"); + assertNotEquals(beforeCreationDate, afterChangedMappingCreationDate, "Expected reindex on *changing* mappings"); + } + + @Test + public void testSettingsNumberOfShardsReindex() throws Exception { + // Set test defaults + testDefaultBuilder.buildIndex(TEST_INDEX_NAME, Map.of(), Map.of()); + assertEquals("1", getTestIndex().getSetting(TEST_INDEX_NAME, "index.number_of_shards")); + String beforeCreationDate = getTestIndex().getSetting(TEST_INDEX_NAME, "index.creation_date"); + + String expectedShards = "5"; + ESIndexBuilder changedShardBuilder = new ESIndexBuilder(_searchClient, + Integer.parseInt(expectedShards), + testDefaultBuilder.getNumReplicas(), + testDefaultBuilder.getNumRetries(), + testDefaultBuilder.getRefreshIntervalSeconds(), + Map.of(), + true); + + // add new shard setting + changedShardBuilder.buildIndex(TEST_INDEX_NAME, Map.of(), Map.of()); + assertTrue(Arrays.stream(getTestIndex().getIndices()).noneMatch(name -> name.equals(TEST_INDEX_NAME)), + "Expected original index to be replaced with alias"); + + Map.Entry> newIndex = getTestIndex().getAliases().entrySet().stream() + .filter(e -> e.getValue().stream().anyMatch(aliasMeta -> aliasMeta.alias().equals(TEST_INDEX_NAME))) + .findFirst().get(); + + String afterCreationDate = getTestIndex().getSetting(newIndex.getKey(), "index.creation_date"); + assertNotEquals(beforeCreationDate, afterCreationDate, "Expected reindex to result in different timestamp"); + assertEquals(expectedShards, getTestIndex().getSetting(newIndex.getKey(), "index.number_of_shards"), + "Expected number of shards: " + expectedShards); + } + + @Test + public void testSettingsNoReindex() throws Exception { + List noReindexBuilders = List.of( + new ESIndexBuilder(_searchClient, + testDefaultBuilder.getNumShards(), + testDefaultBuilder.getNumReplicas() + 1, + testDefaultBuilder.getNumRetries(), + testDefaultBuilder.getRefreshIntervalSeconds(), + Map.of(), + true), + new ESIndexBuilder(_searchClient, + testDefaultBuilder.getNumShards(), + testDefaultBuilder.getNumReplicas(), + testDefaultBuilder.getNumRetries(), + testDefaultBuilder.getRefreshIntervalSeconds() + 10, + Map.of(), + true), + new ESIndexBuilder(_searchClient, + testDefaultBuilder.getNumShards() + 1, + testDefaultBuilder.getNumReplicas(), + testDefaultBuilder.getNumRetries(), + testDefaultBuilder.getRefreshIntervalSeconds(), + Map.of(), + false), + new ESIndexBuilder(_searchClient, + testDefaultBuilder.getNumShards(), + testDefaultBuilder.getNumReplicas() + 1, + testDefaultBuilder.getNumRetries(), + testDefaultBuilder.getRefreshIntervalSeconds(), + Map.of(), + false) + ); + + for (ESIndexBuilder builder : noReindexBuilders) { + // Set test defaults + testDefaultBuilder.buildIndex(TEST_INDEX_NAME, Map.of(), Map.of()); + assertEquals("0", getTestIndex().getSetting(TEST_INDEX_NAME, "index.number_of_replicas")); + assertEquals("0s", getTestIndex().getSetting(TEST_INDEX_NAME, "index.refresh_interval")); + String beforeCreationDate = getTestIndex().getSetting(TEST_INDEX_NAME, "index.creation_date"); + + // build index with builder + builder.buildIndex(TEST_INDEX_NAME, Map.of(), Map.of()); + assertTrue(Arrays.asList(getTestIndex().getIndices()).contains(TEST_INDEX_NAME), + "Expected original index to remain"); + String afterCreationDate = getTestIndex().getSetting(TEST_INDEX_NAME, "index.creation_date"); + + assertEquals(beforeCreationDate, afterCreationDate, "Expected no difference in index timestamp"); + assertEquals(String.valueOf(builder.getNumReplicas()), getTestIndex().getSetting(TEST_INDEX_NAME, "index.number_of_replicas")); + assertEquals(builder.getRefreshIntervalSeconds() + "s", getTestIndex().getSetting(TEST_INDEX_NAME, "index.refresh_interval")); + + wipe(); + } + } + +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAOTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAOTest.java index 1b2259626a9da4..d8c28ec34d213c 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAOTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAOTest.java @@ -30,7 +30,7 @@ public class ESBrowseDAOTest { @BeforeMethod public void setup() { _mockClient = mock(RestHighLevelClient.class); - _browseDAO = new ESBrowseDAO(new TestEntityRegistry(), _mockClient, new IndexConventionImpl(null)); + _browseDAO = new ESBrowseDAO(new TestEntityRegistry(), _mockClient, new IndexConventionImpl("es_browse_dao_test")); } public static Urn makeUrn(Object id) { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataServiceTest.java index 4b4401545b73b5..48db13c75d7b88 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataServiceTest.java @@ -1,16 +1,18 @@ package com.linkedin.metadata.systemmetadata; -import com.linkedin.metadata.ElasticTestUtils; +import com.linkedin.metadata.ElasticSearchTestConfiguration; import com.linkedin.metadata.run.AspectRowSummary; import com.linkedin.metadata.run.IngestionRunSummary; -import com.linkedin.metadata.search.elasticsearch.ElasticSearchServiceTest; +import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import com.linkedin.metadata.search.utils.ESUtils; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; import com.linkedin.mxe.SystemMetadata; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.RestHighLevelClient; -import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testng.annotations.AfterClass; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -18,26 +20,24 @@ import javax.annotation.Nonnull; import java.util.List; -import static com.linkedin.metadata.DockerTestUtils.checkContainerEngine; -import static com.linkedin.metadata.ElasticSearchTestUtils.syncAfterWrite; import static com.linkedin.metadata.systemmetadata.ElasticSearchSystemMetadataService.INDEX_NAME; import static org.testng.Assert.assertEquals; +@Import(ElasticSearchTestConfiguration.class) +public class ElasticSearchSystemMetadataServiceTest extends AbstractTestNGSpringContextTests { -public class ElasticSearchSystemMetadataServiceTest { - - private ElasticsearchContainer _elasticsearchContainer; + @Autowired private RestHighLevelClient _searchClient; - private final IndexConvention _indexConvention = new IndexConventionImpl(null); + @Autowired + private BulkProcessor _bulkProcessor; + @Autowired + private ESIndexBuilder _esIndexBuilder; + private final IndexConvention _indexConvention = new IndexConventionImpl("es_system_metadata_service_test"); private final String _indexName = _indexConvention.getIndexName(INDEX_NAME); private ElasticSearchSystemMetadataService _client; @BeforeClass public void setup() { - _elasticsearchContainer = ElasticTestUtils.getNewElasticsearchContainer(); - checkContainerEngine(_elasticsearchContainer.getDockerClient()); - _elasticsearchContainer.start(); - _searchClient = ElasticTestUtils.buildRestClient(_elasticsearchContainer); _client = buildService(); _client.configure(); } @@ -45,20 +45,12 @@ public void setup() { @BeforeMethod public void wipe() throws Exception { _client.clear(); - syncAfterWrite(_searchClient, _indexName); } @Nonnull private ElasticSearchSystemMetadataService buildService() { - ESSystemMetadataDAO dao = new ESSystemMetadataDAO(_searchClient, _indexConvention, - ElasticSearchServiceTest.getBulkProcessor(_searchClient)); - return new ElasticSearchSystemMetadataService(_searchClient, _indexConvention, dao, - ElasticSearchServiceTest.getIndexBuilder(_searchClient)); - } - - @AfterClass - public void tearDown() { - _elasticsearchContainer.stop(); + ESSystemMetadataDAO dao = new ESSystemMetadataDAO(_searchClient, _indexConvention, _bulkProcessor, 1); + return new ElasticSearchSystemMetadataService(_searchClient, _indexConvention, dao, _esIndexBuilder); } @Test @@ -78,8 +70,6 @@ public void testListRuns() throws Exception { _client.insert(metadata2, "urn:li:chart:2", "chartKey"); _client.insert(metadata2, "urn:li:chart:2", "Ownership"); - syncAfterWrite(_searchClient, _indexName); - List runs = _client.listRuns(0, 20, false); assertEquals(runs.size(), 2); @@ -107,8 +97,6 @@ public void testOverwriteRuns() throws Exception { _client.insert(metadata2, "urn:li:chart:2", "chartKey"); _client.insert(metadata2, "urn:li:chart:2", "Ownership"); - syncAfterWrite(_searchClient, _indexName); - List runs = _client.listRuns(0, 20, false); assertEquals(runs.size(), 2); @@ -136,8 +124,6 @@ public void testFindByRunId() throws Exception { _client.insert(metadata2, "urn:li:chart:2", "chartKey"); _client.insert(metadata2, "urn:li:chart:2", "Ownership"); - syncAfterWrite(_searchClient, _indexName); - List rows = _client.findByRunId("abc-456", false, 0, ESUtils.MAX_RESULT_SIZE); assertEquals(rows.size(), 4); @@ -164,12 +150,8 @@ public void testDelete() throws Exception { _client.insert(metadata2, "urn:li:chart:2", "chartKey"); _client.insert(metadata2, "urn:li:chart:2", "Ownership"); - syncAfterWrite(_searchClient, _indexName); - _client.deleteUrn("urn:li:chart:1"); - syncAfterWrite(_searchClient, _indexName); - List rows = _client.findByRunId("abc-456", false, 0, ESUtils.MAX_RESULT_SIZE); assertEquals(rows.size(), 2); @@ -180,8 +162,6 @@ public void testDelete() throws Exception { public void testInsertNullData() throws Exception { _client.insert(null, "urn:li:chart:1", "chartKey"); - syncAfterWrite(_searchClient, _indexName); - List runs = _client.listRuns(0, 20, false); assertEquals(runs.size(), 0); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectServiceTest.java index 3c175834c99465..31fd4918287d66 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeseries/elastic/ElasticSearchTimeseriesAspectServiceTest.java @@ -15,7 +15,7 @@ import com.linkedin.data.template.StringArrayArray; import com.linkedin.data.template.StringMap; import com.linkedin.data.template.StringMapArray; -import com.linkedin.metadata.ElasticTestUtils; +import com.linkedin.metadata.ElasticSearchTestConfiguration; import com.linkedin.metadata.aspect.EnvelopedAspect; import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.models.DataSchemaFactory; @@ -26,7 +26,7 @@ import com.linkedin.metadata.query.filter.Criterion; import com.linkedin.metadata.query.filter.CriterionArray; import com.linkedin.metadata.query.filter.Filter; -import com.linkedin.metadata.search.elasticsearch.ElasticSearchServiceTest; +import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import com.linkedin.metadata.search.utils.QueryUtils; import com.linkedin.metadata.timeseries.elastic.indexbuilder.TimeseriesAspectIndexBuilders; import com.linkedin.metadata.timeseries.transformer.TimeseriesAspectTransformer; @@ -41,9 +41,11 @@ import com.linkedin.timeseries.GroupingBucket; import com.linkedin.timeseries.GroupingBucketType; import com.linkedin.timeseries.TimeWindowSize; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.RestHighLevelClient; -import org.testcontainers.elasticsearch.ElasticsearchContainer; -import org.testng.annotations.AfterClass; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -55,14 +57,12 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static com.linkedin.metadata.DockerTestUtils.checkContainerEngine; -import static com.linkedin.metadata.ElasticSearchTestUtils.syncAfterWrite; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; - -public class ElasticSearchTimeseriesAspectServiceTest { +@Import(ElasticSearchTestConfiguration.class) +public class ElasticSearchTimeseriesAspectServiceTest extends AbstractTestNGSpringContextTests { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String ENTITY_NAME = "testEntity"; @@ -75,8 +75,12 @@ public class ElasticSearchTimeseriesAspectServiceTest { private static final String ES_FILED_TIMESTAMP = "timestampMillis"; private static final String ES_FILED_STAT = "stat"; - private ElasticsearchContainer _elasticsearchContainer; + @Autowired private RestHighLevelClient _searchClient; + @Autowired + private BulkProcessor _bulkProcessor; + @Autowired + private ESIndexBuilder _esIndexBuilder; private EntityRegistry _entityRegistry; private IndexConvention _indexConvention; private ElasticSearchTimeseriesAspectService _elasticSearchTimeseriesAspectService; @@ -93,11 +97,7 @@ public class ElasticSearchTimeseriesAspectServiceTest { public void setup() { _entityRegistry = new ConfigEntityRegistry(new DataSchemaFactory("com.datahub.test"), TestEntityProfile.class.getClassLoader().getResourceAsStream("test-entity-registry.yml")); - _indexConvention = new IndexConventionImpl(null); - _elasticsearchContainer = ElasticTestUtils.getNewElasticsearchContainer(); - checkContainerEngine(_elasticsearchContainer.getDockerClient()); - _elasticsearchContainer.start(); - _searchClient = ElasticTestUtils.buildRestClient(_elasticsearchContainer); + _indexConvention = new IndexConventionImpl("es_timeseries_aspect_service_test"); _elasticSearchTimeseriesAspectService = buildService(); _elasticSearchTimeseriesAspectService.configure(); EntitySpec entitySpec = _entityRegistry.getEntitySpec(ENTITY_NAME); @@ -107,13 +107,8 @@ public void setup() { @Nonnull private ElasticSearchTimeseriesAspectService buildService() { return new ElasticSearchTimeseriesAspectService(_searchClient, _indexConvention, - new TimeseriesAspectIndexBuilders(ElasticSearchServiceTest.getIndexBuilder(_searchClient), _entityRegistry, - _indexConvention), _entityRegistry, ElasticSearchServiceTest.getBulkProcessor(_searchClient)); - } - - @AfterClass - public void tearDown() { - _elasticsearchContainer.stop(); + new TimeseriesAspectIndexBuilders(_esIndexBuilder, _entityRegistry, + _indexConvention), _entityRegistry, _bulkProcessor, 1); } /* @@ -184,8 +179,6 @@ public void testUpsertProfiles() throws Exception { jsonProcessingException.printStackTrace(); } }); - - syncAfterWrite(_searchClient); } @Test(groups = "upsertUniqueMessageId") @@ -211,8 +204,6 @@ public void testUpsertProfilesWithUniqueMessageIds() throws Exception { } }); - syncAfterWrite(_searchClient); - List resultAspects = _elasticSearchTimeseriesAspectService.getAspectValues(urn, ENTITY_NAME, ASPECT_NAME, null, null, testEntityProfiles.size(), false, null); diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnector.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnector.java index 496333cd2d95ab..3a2513187cc05e 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnector.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnector.java @@ -4,58 +4,21 @@ import javax.annotation.Nonnull; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.unit.TimeValue; @Slf4j public class ElasticsearchConnector { - private BulkProcessor _bulkProcessor; - private static final int DEFAULT_NUMBER_OF_RETRIES = 3; // TODO: Test and also add these into config - private static final long DEFAULT_RETRY_INTERVAL = 1L; + private final BulkProcessor _bulkProcessor; + private final int _numRetries; - public ElasticsearchConnector(RestHighLevelClient elasticSearchRestClient, Integer bulkRequestsLimit, - Integer bulkFlushPeriod) { - initBulkProcessor(elasticSearchRestClient, bulkRequestsLimit, bulkFlushPeriod); - } - - private void initBulkProcessor(RestHighLevelClient elasticSearchRestClient, Integer bulkRequestsLimit, - Integer bulkFlushPeriod) { - BulkProcessor.Listener listener = new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - log.info("Successfully feeded bulk request. Number of events: " + response.getItems().length + " Took time ms: " - + response.getIngestTookInMillis()); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error("Error feeding bulk request. No retries left", failure); - } - }; - - _bulkProcessor = BulkProcessor.builder( - (request, bulkListener) -> elasticSearchRestClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), - listener) - .setBulkActions(bulkRequestsLimit) - .setFlushInterval(TimeValue.timeValueSeconds(bulkFlushPeriod)) - .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(DEFAULT_RETRY_INTERVAL), - DEFAULT_NUMBER_OF_RETRIES)) - .build(); + public ElasticsearchConnector(BulkProcessor bulkProcessor, int numRetries) { + _bulkProcessor = bulkProcessor; + _numRetries = numRetries; } public void feedElasticEvent(@Nonnull ElasticEvent event) { @@ -81,10 +44,11 @@ private static DeleteRequest createDeleteRequest(@Nonnull ElasticEvent event) { } @Nonnull - private static UpdateRequest createUpsertRequest(@Nonnull ElasticEvent event) { + private UpdateRequest createUpsertRequest(@Nonnull ElasticEvent event) { final IndexRequest indexRequest = new IndexRequest(event.getIndex()).id(event.getId()).source(event.buildJson()); return new UpdateRequest(event.getIndex(), event.getId()).doc(event.buildJson()) .detectNoop(false) + .retryOnConflict(_numRetries) .upsert(indexRequest); } } diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnectorFactory.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnectorFactory.java index a83f754cb91e3f..8f711a7b01b781 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnectorFactory.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/elasticsearch/ElasticsearchConnectorFactory.java @@ -1,7 +1,9 @@ package com.linkedin.metadata.kafka.elasticsearch; import lombok.extern.slf4j.Slf4j; -import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -13,17 +15,17 @@ @Slf4j @Configuration public class ElasticsearchConnectorFactory { + @Autowired + @Qualifier("elasticSearchBulkProcessor") + private BulkProcessor bulkProcessor; - @Value("${ES_BULK_REQUESTS_LIMIT:1}") - private Integer bulkRequestsLimit; - - @Value("${ES_BULK_FLUSH_PERIOD:1}") - private Integer bulkFlushPeriod; + @Value("${elasticsearch.bulkProcessor.numRetries}") + private Integer numRetries; @Bean(name = "elasticsearchConnector") @Nonnull - public ElasticsearchConnector createInstance(@Nonnull RestHighLevelClient elasticSearchRestHighLevelClient) { - return new ElasticsearchConnector(elasticSearchRestHighLevelClient, bulkRequestsLimit, bulkFlushPeriod); + public ElasticsearchConnector createInstance() { + return new ElasticsearchConnector(bulkProcessor, numRetries); } } \ No newline at end of file diff --git a/metadata-service/factories/build.gradle b/metadata-service/factories/build.gradle index 6ad9fc6b89b169..1a82f4d52f210f 100644 --- a/metadata-service/factories/build.gradle +++ b/metadata-service/factories/build.gradle @@ -30,6 +30,7 @@ dependencies { compile spec.product.pegasus.restliSpringBridge + testImplementation externalDependency.springBootTest testCompile externalDependency.mockito testCompile externalDependency.testng diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java index 12ede1ef940b57..d757371ccbe1a8 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchGraphServiceFactory.java @@ -35,7 +35,7 @@ protected ElasticSearchGraphService getInstance() { LineageRegistry lineageRegistry = new LineageRegistry(entityRegistry); return new ElasticSearchGraphService(lineageRegistry, components.getSearchClient(), components.getIndexConvention(), new ESGraphWriteDAO(components.getSearchClient(), components.getIndexConvention(), - components.getBulkProcessor()), + components.getBulkProcessor(), components.getNumRetries()), new ESGraphQueryDAO(components.getSearchClient(), lineageRegistry, components.getIndexConvention()), components.getIndexBuilder()); } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchSystemMetadataServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchSystemMetadataServiceFactory.java index 25afaef5e8eb76..818f01b8f64c79 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchSystemMetadataServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/common/ElasticSearchSystemMetadataServiceFactory.java @@ -26,6 +26,6 @@ public class ElasticSearchSystemMetadataServiceFactory { protected ElasticSearchSystemMetadataService getInstance() { return new ElasticSearchSystemMetadataService(components.getSearchClient(), components.getIndexConvention(), new ESSystemMetadataDAO(components.getSearchClient(), components.getIndexConvention(), - components.getBulkProcessor()), components.getIndexBuilder()); + components.getBulkProcessor(), components.getNumRetries()), components.getIndexBuilder()); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/BaseElasticSearchComponentsFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/BaseElasticSearchComponentsFactory.java index 9c84b743639668..9828f350fc9fd3 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/BaseElasticSearchComponentsFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/BaseElasticSearchComponentsFactory.java @@ -6,7 +6,7 @@ import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import javax.annotation.Nonnull; -import lombok.Value; +import org.springframework.beans.factory.annotation.Value; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Autowired; @@ -25,14 +25,18 @@ ElasticSearchIndexBuilderFactory.class}) @PropertySource(value = "classpath:/application.yml", factory = YamlPropertySourceFactory.class) public class BaseElasticSearchComponentsFactory { - @Value + @lombok.Value public static class BaseElasticSearchComponents { RestHighLevelClient searchClient; IndexConvention indexConvention; BulkProcessor bulkProcessor; ESIndexBuilder indexBuilder; + int numRetries; } + @Value("${elasticsearch.bulkProcessor.numRetries}") + private Integer numRetries; + @Autowired @Qualifier("elasticSearchRestHighLevelClient") private RestHighLevelClient searchClient; @@ -52,6 +56,6 @@ public static class BaseElasticSearchComponents { @Bean(name = "baseElasticSearchComponents") @Nonnull protected BaseElasticSearchComponents getInstance() { - return new BaseElasticSearchComponents(searchClient, indexConvention, bulkProcessor, indexBuilder); + return new BaseElasticSearchComponents(searchClient, indexConvention, bulkProcessor, indexBuilder, numRetries); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchBulkProcessorFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchBulkProcessorFactory.java index 0a7877acce8cff..8ee133ac6bb1fc 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchBulkProcessorFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchBulkProcessorFactory.java @@ -2,13 +2,12 @@ import com.linkedin.gms.factory.common.RestHighLevelClientFactory; import com.linkedin.gms.factory.spring.YamlPropertySourceFactory; -import com.linkedin.metadata.search.elasticsearch.update.BulkListener; import javax.annotation.Nonnull; -import org.elasticsearch.action.bulk.BackoffPolicy; + +import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor; +import lombok.extern.slf4j.Slf4j; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.unit.TimeValue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -18,6 +17,7 @@ import org.springframework.context.annotation.PropertySource; +@Slf4j @Configuration @Import({RestHighLevelClientFactory.class}) @PropertySource(value = "classpath:/application.yml", factory = YamlPropertySourceFactory.class) @@ -38,15 +38,23 @@ public class ElasticSearchBulkProcessorFactory { @Value("${elasticsearch.bulkProcessor.retryInterval}") private Long retryInterval; + @Value("#{new Boolean('${elasticsearch.bulkProcessor.async}')}") + private boolean async; + @Bean(name = "elasticSearchBulkProcessor") @Nonnull protected BulkProcessor getInstance() { - return BulkProcessor.builder((request, bulkListener) -> { - searchClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); - }, BulkListener.getInstance()) - .setBulkActions(bulkRequestsLimit) - .setFlushInterval(TimeValue.timeValueSeconds(bulkFlushPeriod)) - .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(retryInterval), numRetries)) - .build(); + ESBulkProcessor builder = ESBulkProcessor.builder(searchClient) + .bulkFlushPeriod(bulkFlushPeriod) + .bulkRequestsLimit(bulkRequestsLimit) + .retryInterval(retryInterval) + .numRetries(numRetries) + .build(); + + if (async) { + return builder.toAsyncBulkProcessor(); + } else { + return builder.toBulkProcessor(); + } } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactory.java index d43603b328c17d..224f865a114827 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactory.java @@ -1,9 +1,15 @@ package com.linkedin.gms.factory.search; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import com.linkedin.gms.factory.common.IndexConventionFactory; import com.linkedin.gms.factory.common.RestHighLevelClientFactory; import com.linkedin.gms.factory.spring.YamlPropertySourceFactory; import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -13,11 +19,19 @@ import org.springframework.context.annotation.Import; import org.springframework.context.annotation.PropertySource; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static com.linkedin.gms.factory.common.IndexConventionFactory.INDEX_CONVENTION_BEAN; + @Configuration -@Import({RestHighLevelClientFactory.class}) +@Import({RestHighLevelClientFactory.class, IndexConventionFactory.class}) @PropertySource(value = "classpath:/application.yml", factory = YamlPropertySourceFactory.class) public class ElasticSearchIndexBuilderFactory { + @Autowired @Qualifier("elasticSearchRestHighLevelClient") private RestHighLevelClient searchClient; @@ -31,9 +45,44 @@ public class ElasticSearchIndexBuilderFactory { @Value("${elasticsearch.index.numRetries}") private Integer numRetries; + @Value("${elasticsearch.index.refreshIntervalSeconds}") + private Integer refreshIntervalSeconds; + + @Value("${elasticsearch.index.settingsOverrides}") + private String indexSettingOverrides; + + @Value("${elasticsearch.index.entitySettingsOverrides}") + private String entityIndexSettingOverrides; + + @Value("#{new Boolean('${elasticsearch.index.enableSettingsReindex}')}") + private boolean enableSettingsReindex; + + @Bean(name = "elasticSearchIndexSettingsOverrides") + @Nonnull + protected Map> getIndexSettingsOverrides( + @Qualifier(INDEX_CONVENTION_BEAN) IndexConvention indexConvention) { + + return Stream.concat( + parseIndexSettingsMap(indexSettingOverrides).entrySet().stream() + .map(e -> Map.entry(indexConvention.getIndexName(e.getKey()), e.getValue())), + parseIndexSettingsMap(entityIndexSettingOverrides).entrySet().stream() + .map(e -> Map.entry(indexConvention.getEntityIndexName(e.getKey()), e.getValue()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + @Bean(name = "elasticSearchIndexBuilder") @Nonnull - protected ESIndexBuilder getInstance() { - return new ESIndexBuilder(searchClient, numShards, numReplicas, numRetries); + protected ESIndexBuilder getInstance( + @Qualifier("elasticSearchIndexSettingsOverrides") Map> overrides) { + return new ESIndexBuilder(searchClient, numShards, numReplicas, numRetries, refreshIntervalSeconds, overrides, + enableSettingsReindex); + } + + @Nonnull + private static Map> parseIndexSettingsMap(@Nullable String json) { + Optional>> parseOpt = Optional.ofNullable( + new Gson().fromJson(json, + new TypeToken>>() { }.getType())); + return parseOpt.orElse(Map.of()); } } \ No newline at end of file diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java index 551085fd7e363e..5c6f8a0476e61d 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/search/ElasticSearchServiceFactory.java @@ -44,6 +44,6 @@ protected ElasticSearchService getInstance() { settingsBuilder), esSearchDAO, new ESBrowseDAO(entityRegistry, components.getSearchClient(), components.getIndexConvention()), new ESWriteDAO(entityRegistry, components.getSearchClient(), components.getIndexConvention(), - components.getBulkProcessor())); + components.getBulkProcessor(), components.getNumRetries())); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeseries/ElasticSearchTimeseriesAspectServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeseries/ElasticSearchTimeseriesAspectServiceFactory.java index 06d9cf951025e2..717adf7d559b79 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeseries/ElasticSearchTimeseriesAspectServiceFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/timeseries/ElasticSearchTimeseriesAspectServiceFactory.java @@ -32,6 +32,6 @@ public class ElasticSearchTimeseriesAspectServiceFactory { protected ElasticSearchTimeseriesAspectService getInstance() { return new ElasticSearchTimeseriesAspectService(components.getSearchClient(), components.getIndexConvention(), new TimeseriesAspectIndexBuilders(components.getIndexBuilder(), entityRegistry, - components.getIndexConvention()), entityRegistry, components.getBulkProcessor()); + components.getIndexConvention()), entityRegistry, components.getBulkProcessor(), components.getNumRetries()); } } \ No newline at end of file diff --git a/metadata-service/factories/src/main/resources/application.yml b/metadata-service/factories/src/main/resources/application.yml index a54a16d59d408a..6f8393957de837 100644 --- a/metadata-service/factories/src/main/resources/application.yml +++ b/metadata-service/factories/src/main/resources/application.yml @@ -141,8 +141,9 @@ elasticsearch: keyStorePassword: ${ELASTICSEARCH_SSL_KEYSTORE_PASSWORD:#{null}} keyPassword: ${ELASTICSEARCH_SSL_KEY_PASSWORD:#{null}} bulkProcessor: - requestsLimit: ${ES_BULK_REQUESTS_LIMIT:1000} - flushPeriod: ${ES_BULK_FLUSH_PERIOD:1} + async: ${ES_BULK_ASYNC:false} + requestsLimit: ${ES_BULK_REQUESTS_LIMIT:500} + flushPeriod: ${ES_BULK_FLUSH_PERIOD:10} numRetries: ${ES_BULK_NUM_RETRIES:3} retryInterval: ${ES_BULK_RETRY_INTERVAL:1} index: @@ -150,9 +151,13 @@ elasticsearch: numShards: ${ELASTICSEARCH_NUM_SHARDS_PER_INDEX:1} numReplicas: ${ELASTICSEARCH_NUM_REPLICAS_PER_INDEX:1} numRetries: ${ELASTICSEARCH_INDEX_BUILDER_NUM_RETRIES:3} + refreshIntervalSeconds: ${ELASTICSEARCH_INDEX_BUILDER_REFRESH_INTERVAL_SECONDS:1} # increase to 30 if expected indexing rates to be greater than 100/s maxArrayLength: ${SEARCH_DOCUMENT_MAX_ARRAY_LENGTH:1000} maxObjectKeys: ${SEARCH_DOCUMENT_MAX_OBJECT_KEYS:1000} mainTokenizer: ${ELASTICSEARCH_MAIN_TOKENIZER:#{null}} + enableSettingsReindex: ${ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX:false} + settingsOverrides: ${ELASTICSEARCH_INDEX_BUILDER_SETTINGS_OVERRIDES:#{null}} + entitySettingsOverrides: ${ELASTICSEARCH_INDEX_BUILDER_ENTITY_SETTINGS_OVERRIDES:#{null}} # TODO: Kafka topic convention kafka: diff --git a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactoryDefaultsTest.java b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactoryDefaultsTest.java new file mode 100644 index 00000000000000..27707441a198e3 --- /dev/null +++ b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactoryDefaultsTest.java @@ -0,0 +1,26 @@ +package com.linkedin.gms.factory.search; + +import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; + +import java.util.Map; + +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertEquals; + +@TestPropertySource(locations = "classpath:/application.yml") +@SpringBootTest(classes = {ElasticSearchIndexBuilderFactory.class}) +public class ElasticSearchIndexBuilderFactoryDefaultsTest extends AbstractTestNGSpringContextTests { + @Autowired + ESIndexBuilder test; + + @Test + void testInjection() { + assertNotNull(test); + assertEquals(Map.of(), test.getIndexSettingOverrides()); + } +} diff --git a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactoryEmptyTest.java b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactoryEmptyTest.java new file mode 100644 index 00000000000000..3022308b42faaa --- /dev/null +++ b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactoryEmptyTest.java @@ -0,0 +1,32 @@ +package com.linkedin.gms.factory.search; + +import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.PropertySource; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; + +import java.util.Map; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +@PropertySource("classpath:/test-empty-application.yml") +@SpringBootTest( + properties = { + "elasticsearch.index.settingsOverrides=", + "elasticsearch.index.entitySettingsOverrides=", + "elasticsearch.index.prefix=test_prefix" + }, + classes = {ElasticSearchIndexBuilderFactory.class}) +public class ElasticSearchIndexBuilderFactoryEmptyTest extends AbstractTestNGSpringContextTests { + @Autowired + ESIndexBuilder test; + + @Test + void testInjection() { + assertNotNull(test); + assertEquals(Map.of(), test.getIndexSettingOverrides()); + } +} diff --git a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactoryOverridesTest.java b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactoryOverridesTest.java new file mode 100644 index 00000000000000..2f14507371f190 --- /dev/null +++ b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/search/ElasticSearchIndexBuilderFactoryOverridesTest.java @@ -0,0 +1,28 @@ +package com.linkedin.gms.factory.search; + +import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; +import org.springframework.beans.factory.annotation.Autowired; + +import static org.testng.Assert.*; + +@SpringBootTest( + properties = { + "elasticsearch.index.settingsOverrides={\"my_index\":{\"number_of_shards\":\"10\"}}", + "elasticsearch.index.entitySettingsOverrides={\"my_entity\":{\"number_of_shards\":\"5\"}}", + "elasticsearch.index.prefix=test_prefix" + }, + classes = {ElasticSearchIndexBuilderFactory.class}) +public class ElasticSearchIndexBuilderFactoryOverridesTest extends AbstractTestNGSpringContextTests { + @Autowired + ESIndexBuilder test; + + @Test + void testInjection() { + assertNotNull(test); + assertEquals("10", test.getIndexSettingOverrides().get("test_prefix_my_index").get("number_of_shards")); + assertEquals("5", test.getIndexSettingOverrides().get("test_prefix_my_entityindex_v2").get("number_of_shards")); + } +} diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java index 2ba29b0007f5fe..3d90cba85b0fb1 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java @@ -11,6 +11,8 @@ public class MetricUtils { private MetricUtils() { } + public static final String DELIMITER = "_"; + public static final String NAME = "default"; private static final MetricRegistry REGISTRY = SharedMetricRegistries.getOrCreate(NAME); @@ -27,6 +29,14 @@ public static Counter counter(Class klass, String metricName) { return REGISTRY.counter(MetricRegistry.name(klass, metricName)); } + public static void exceptionCounter(Class klass, String metricName, Throwable t) { + String[] splitClassName = t.getClass().getName().split("[.]"); + String snakeCase = splitClassName[splitClassName.length - 1].replaceAll("([A-Z][a-z])", DELIMITER + "$1"); + + counter(klass, metricName).inc(); + counter(klass, metricName + DELIMITER + snakeCase).inc(); + } + public static Counter counter(String metricName) { return REGISTRY.counter(MetricRegistry.name(metricName)); }