Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Nov 23, 2022
1 parent a1349cc commit af7a924
Show file tree
Hide file tree
Showing 24 changed files with 172 additions and 69 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ buildscript {
ext.neo4jVersion = '4.4.9'
ext.graphQLJavaVersion = '19.0'
ext.testContainersVersion = '1.17.4'
ext.elasticsearchVersion = '7.17.7'
ext.elasticsearchVersion = '7.10.2'
apply from: './repositories.gradle'
buildscript.repositories.addAll(project.repositories)
dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.xcontent.XContentType;

import static com.linkedin.metadata.graph.elastic.ESGraphQueryDAO.buildQuery;
import static com.linkedin.metadata.graph.elastic.ElasticSearchGraphService.INDEX_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder.TEXT_ANALYZER;
import static com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder.TEXT_SEARCH_ANALYZER;
import static com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder.*;


@Slf4j
Expand Down Expand Up @@ -39,7 +38,9 @@ private static Map<String, Object> getMappingsForUrn() {
Map<String, Object> subFields = new HashMap<>();
subFields.put("delimited", ImmutableMap.of(
"type", "text",
"analyzer", "urn_component"));
"analyzer", URN_ANALYZER,
"search_analyzer", URN_SEARCH_ANALYZER)
);
subFields.put("ngram", ImmutableMap.of(
"type", "search_as_you_type",
"max_shingle_size", "4"));
Expand Down Expand Up @@ -88,7 +89,8 @@ private static Map<String, Object> getMappingsForField(@Nonnull final Searchable
mappingForField.put("fielddata", true);
} else if (fieldType == FieldType.URN || fieldType == FieldType.URN_PARTIAL) {
mappingForField.put("type", "text");
mappingForField.put("analyzer", "urn_component");
mappingForField.put("analyzer", URN_ANALYZER);
mappingForField.put("search_analyzer", URN_SEARCH_ANALYZER);
Map<String, Object> subFields = new HashMap<>();
if (fieldType == FieldType.URN_PARTIAL) {
subFields.put("ngram", ImmutableMap.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class SettingsBuilder {
public static final String KEYWORD_LOWERCASE_ANALYZER = "custom_keyword";
public static final String TEXT_ANALYZER = "word_delimited";
public static final String TEXT_SEARCH_ANALYZER = "query_word_delimited";
public static final String URN_ANALYZER = "urn_component";
public static final String URN_SEARCH_ANALYZER = "query_urn_component";
private final Map<String, Object> settings;

public SettingsBuilder(String mainTokenizer) {
Expand Down Expand Up @@ -240,15 +242,31 @@ private static Map<String, Object> buildAnalyzers(String mainTokenizer) {
.build());

// Analyzer for getting urn components
analyzers.put("urn_component", ImmutableMap.<String, Object>builder()
analyzers.put(URN_ANALYZER, ImmutableMap.<String, Object>builder()
.put("tokenizer", "main_tokenizer")
.put("filter", ImmutableList.of(
"asciifolding",
"lowercase",
"custom_delimiter",
"multifilter",
"trim_colon",
"urn_stop",
"stop",
"unique",
"stem_override",
"snowball",
"min_length_2"))
.build());

analyzers.put(URN_SEARCH_ANALYZER, ImmutableMap.<String, Object>builder()
.put("tokenizer", "keyword")
.put("filter", ImmutableList.of(
"asciifolding",
"lowercase",
"multifilter_graph",
"trim_colon",
"urn_stop",
"stop",
"unique",
"stem_override",
"snowball",
"min_length_2"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
Expand Down Expand Up @@ -55,8 +55,8 @@ public static ESBulkProcessor.ESBulkProcessorBuilder builder(RestHighLevelClient
private final BulkProcessor bulkProcessor;

private ESBulkProcessor(@NonNull RestHighLevelClient searchClient, @NonNull Boolean async, Integer bulkRequestsLimit,
Integer bulkFlushPeriod, Integer numRetries, Long retryInterval,
TimeValue defaultTimeout, WriteRequest.RefreshPolicy writeRequestRefreshPolicy,
Integer bulkFlushPeriod, Integer numRetries, Long retryInterval,
TimeValue defaultTimeout, WriteRequest.RefreshPolicy writeRequestRefreshPolicy,
BulkProcessor ignored) {
this.searchClient = searchClient;
this.async = async;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.xcontent.XContentType;


@Slf4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
Expand All @@ -29,7 +30,6 @@
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.XContentType;

import static com.linkedin.metadata.systemmetadata.ElasticSearchSystemMetadataService.INDEX_NAME;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.XContentType;


@Slf4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ stg, staging
dev, development
prod, production
glue, athena
big query => bigquery
data platform => dataplatform
bigquery, big query
data platform, dataplatform
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,28 @@
import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl;
import org.apache.commons.io.FilenameUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.TestConfiguration;
Expand Down Expand Up @@ -210,7 +217,7 @@ protected SearchService sampleDataSearchService(

// Build indices & write fixture data
indexBuilders.buildAll();
indexFixture(fixtureName, prefix);
indexFixture(_bulkProcessor, fixtureName, prefix);

return service;
}
Expand Down Expand Up @@ -241,7 +248,7 @@ protected EntityClient sampleDataEntityClient(
null);
}

private Set<String> indexFixture(String fixture, String prefix) throws IOException {
private static Set<String> indexFixture(ESBulkProcessor bulkProcessor, String fixture, String prefix) throws IOException {
try (Stream<Path> files = Files.list(Paths.get(String.format("%s/%s", FIXTURE_BASE, fixture)))) {
return files.map(file -> {
String indexName = String.format("%s_%s", prefix, FilenameUtils.getBaseName(file.toAbsolutePath().toString()));
Expand All @@ -254,7 +261,7 @@ private Set<String> indexFixture(String fixture, String prefix) throws IOExcepti
.id(doc.urn)
.source(line.getBytes(), XContentType.JSON);

_bulkProcessor.add(request);
bulkProcessor.add(request);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
Expand All @@ -266,39 +273,61 @@ private Set<String> indexFixture(String fixture, String prefix) throws IOExcepti
return indexName;
}).collect(Collectors.toSet());
} finally {
_bulkProcessor.flush();
bulkProcessor.flush();
}
}

private static RestClientBuilder buildEnvironmentClient() {

return RestClient.builder(
new HttpHost(System.getenv("ELASTICSEARCH_HOST"),
Integer.parseInt(System.getenv("ELASTICSEARCH_PORT")),
System.getenv("ELASTICSEARCH_PORT").equals("443") ? "https" : "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();

if (System.getenv("ELASTICSEARCH_USERNAME") != null) {
final CredentialsProvider credentialsProvider =
new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(System.getenv("ELASTICSEARCH_USERNAME"),
System.getenv("ELASTICSEARCH_PASSWORD")));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}

return httpClientBuilder;
}
});
}

@Test
@Ignore("Fixture capture logic")
/*
* Run this to capture test fixtures
* 1. Update fixture name
* 2. Comment @Ignore
* 3. Run extraction
*/
* 1. Update environment variables for ELASTICSEARCH_* (see buildEnvironmentClient)
* 2. Update fixture name
* 3. Comment @Ignore
* 4. Run extraction
**/
private static void extractTestFixtureData() throws IOException {

String fixtureName = "sample_data";
String host = "localhost";
int port = 9200;

String fixtureName = "long_tail";
String prefix = "482ajm7100-longtailcompanions_";
String commonSuffix = "index_v2";
int fetchSize = 1000;

Set<String> searchIndexSuffixes = SEARCHABLE_ENTITY_TYPES.stream()
.map(entityType -> entityType.toString().toLowerCase() + commonSuffix)
.collect(Collectors.toSet());

RestClientBuilder builder = RestClient.builder(
new HttpHost(host, port, "http"));

try (RestHighLevelClient client = new RestHighLevelClient(builder)) {
try (RestHighLevelClient client = new RestHighLevelClient(buildEnvironmentClient())) {
GetMappingsResponse response = client.indices().getMapping(new GetMappingsRequest().indices("*"),
RequestOptions.DEFAULT);
response.mappings().keySet().stream()
.filter(index -> searchIndexSuffixes.stream().anyMatch(index::contains))
.filter(index -> searchIndexSuffixes.stream().anyMatch(index::contains) &&
index.startsWith(prefix))
.map(index -> index.split(commonSuffix, 2)[0] + commonSuffix)
.forEach(indexName -> {

Expand All @@ -315,7 +344,8 @@ private static void extractTestFixtureData() throws IOException {
long remainingHits = hits.getTotalHits().value;

if (remainingHits > 0) {
try (FileWriter writer = new FileWriter(String.format("%s/%s/%s.json", FIXTURE_BASE, fixtureName, indexName));
try (FileWriter writer = new FileWriter(String.format("%s/%s/%s.json", FIXTURE_BASE,
fixtureName, indexName.replaceFirst(prefix, "")));
BufferedWriter bw = new BufferedWriter(writer)) {

while (remainingHits > 0) {
Expand All @@ -340,6 +370,24 @@ private static void extractTestFixtureData() throws IOException {
}
}

@Test
@Ignore("Write capture logic to some external ES cluster for testing")
/*
* Can be used to write fixture data to external ES cluster
* 1. Set environment variables
* 2. Update fixture name and prefix
* 3. Uncomment and run test
*/
private static void writeTestFixtureData() throws IOException {
ESBulkProcessor bulkProcessor = ESBulkProcessor.builder(new RestHighLevelClient(buildEnvironmentClient()))
.async(true)
.bulkRequestsLimit(1000)
.retryInterval(1L)
.numRetries(2)
.build();
indexFixture(bulkProcessor, "long_tail", "");
}

public static class UrnDocument {
public String urn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.ElasticSearchTestConfiguration;
import com.linkedin.metadata.graph.Edge;
import com.linkedin.metadata.graph.EntityLineageResult;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.graph.GraphServiceTestBase;
import com.linkedin.metadata.graph.RelatedEntitiesResult;
Expand Down
Loading

0 comments on commit af7a924

Please sign in to comment.