Skip to content

Commit

Permalink
Fix: Ensure correct index mapping in Elasticsearch for clusterAlias (#…
Browse files Browse the repository at this point in the history
…16589)

* Fix: Ensure correct index mapping in Elasticsearch for clusterAlias

* Fix: Ensure correct index mapping in Elasticsearch for clusterAlias
  • Loading branch information
sonika-shah authored Jun 10, 2024
1 parent ff00175 commit 8723b8c
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ public String getIndexOrAliasName(String name) {
: name;
}

public String getIndexNameWithoutAlias(String fullIndexName) {
if (clusterAlias != null
&& !clusterAlias.isEmpty()
&& fullIndexName.startsWith(clusterAlias + indexNameSeparator)) {
return fullIndexName.substring((clusterAlias + indexNameSeparator).length());
}
return fullIndexName;
}

public boolean indexExists(IndexMapping indexMapping) {
return searchClient.indexExists(indexMapping.getIndexName(clusterAlias));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.Entity.DATA_PRODUCT;
import static org.openmetadata.service.Entity.DOMAIN;
import static org.openmetadata.service.Entity.FIELD_DESCRIPTION;
import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME;
import static org.openmetadata.service.Entity.FIELD_NAME;
import static org.openmetadata.service.Entity.GLOSSARY_TERM;
import static org.openmetadata.service.Entity.QUERY;
import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse;
import static org.openmetadata.service.search.EntityBuilderConstant.COLUMNS_NAME_KEYWORD;
import static org.openmetadata.service.search.EntityBuilderConstant.DATA_MODEL_COLUMNS_NAME_KEYWORD;
Expand Down Expand Up @@ -347,8 +353,12 @@ public Response search(SearchRequest request) throws IOException {
}

/* For backward-compatibility we continue supporting the deleted argument, this should be removed in future versions */
if (request.getIndex().equalsIgnoreCase("all")
|| request.getIndex().equalsIgnoreCase("dataAsset")) {
if (request
.getIndex()
.equalsIgnoreCase(Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS))
|| request
.getIndex()
.equalsIgnoreCase(Entity.getSearchRepository().getIndexOrAliasName("dataAsset"))) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.should(
QueryBuilders.boolQuery()
Expand All @@ -360,12 +370,36 @@ public Response search(SearchRequest request) throws IOException {
.must(searchSourceBuilder.query())
.mustNot(QueryBuilders.existsQuery("deleted")));
searchSourceBuilder.query(boolQueryBuilder);
} else if (request.getIndex().equalsIgnoreCase("domain_search_index")
|| request.getIndex().equalsIgnoreCase("data_product_search_index")
|| request.getIndex().equalsIgnoreCase("query_search_index")
|| request.getIndex().equalsIgnoreCase("knowledge_page_search_index")
|| request.getIndex().equalsIgnoreCase("raw_cost_analysis_report_data_index")
|| request.getIndex().equalsIgnoreCase("aggregated_cost_analysis_report_data_index")) {
} else if (request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository().getIndexMapping(DOMAIN).getIndexName(clusterAlias))
|| request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository()
.getIndexMapping(DATA_PRODUCT)
.getIndexName(clusterAlias))
|| request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository().getIndexMapping(QUERY).getIndexName(clusterAlias))
|| request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository().getIndexOrAliasName("knowledge_page_search_index"))
|| request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository()
.getIndexMapping(RAW_COST_ANALYSIS_REPORT_DATA)
.getIndexName(clusterAlias))
|| request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository()
.getIndexMapping(AGGREGATED_COST_ANALYSIS_REPORT_DATA)
.getIndexName(clusterAlias))) {
searchSourceBuilder.query(QueryBuilders.boolQuery().must(searchSourceBuilder.query()));
} else {
searchSourceBuilder.query(
Expand All @@ -379,7 +413,12 @@ public Response search(SearchRequest request) throws IOException {
request.getSortFieldParam(), SortOrder.fromString(request.getSortOrder()));
}

if (request.getIndex().equalsIgnoreCase("glossary_term_search_index")) {
if (request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository()
.getIndexMapping(GLOSSARY_TERM)
.getIndexName(clusterAlias))) {
searchSourceBuilder.query(QueryBuilders.boolQuery().must(searchSourceBuilder.query()));

if (request.isGetHierarchy()) {
Expand Down Expand Up @@ -495,7 +534,12 @@ public Response getDocByID(String indexName, String entityId) throws IOException

public List<?> buildSearchHierarchy(SearchRequest request, SearchResponse searchResponse) {
List<?> response = new ArrayList<>();
if (request.getIndex().equalsIgnoreCase("glossary_term_search_index")) {
if (request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository()
.getIndexMapping(GLOSSARY_TERM)
.getIndexName(clusterAlias))) {
response = buildGlossaryTermSearchHierarchy(searchResponse);
}
return response;
Expand Down Expand Up @@ -639,7 +683,8 @@ public Map<String, Object> searchLineageInternal(
fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, responseMap);
}
es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest(GLOBAL_SEARCH_ALIAS);
new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn)));
Expand Down Expand Up @@ -687,7 +732,8 @@ private void getLineage(
return;
}
es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest(GLOBAL_SEARCH_ALIAS);
new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery(direction, fqn)));
Expand Down Expand Up @@ -750,7 +796,8 @@ private Map<String, Object> searchPipelineLineage(
Set<Map<String, Object>> edges = new HashSet<>();
Set<Map<String, Object>> nodes = new HashSet<>();
es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest(GLOBAL_SEARCH_ALIAS);
new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders.boolQuery()
Expand Down Expand Up @@ -811,7 +858,8 @@ private Map<String, Object> searchPipelineLineage(
// TODO: Fix this , this is hack
if (edges.isEmpty()) {
es.org.elasticsearch.action.search.SearchRequest searchRequestForEntity =
new es.org.elasticsearch.action.search.SearchRequest(GLOBAL_SEARCH_ALIAS);
new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilderForEntity = new SearchSourceBuilder();
searchSourceBuilderForEntity.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn)));
Expand Down Expand Up @@ -1775,7 +1823,8 @@ private static es.org.elasticsearch.action.search.SearchRequest buildSearchReque
}

es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest(dataReportIndex);
new es.org.elasticsearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(dataReportIndex));
searchRequest.source(searchSourceBuilder);
return searchRequest;
}
Expand Down Expand Up @@ -2088,7 +2137,7 @@ public RestHighLevelClient createElasticSearchClient(ElasticSearchConfiguration

private static SearchSourceBuilder getSearchSourceBuilder(
String index, String q, int from, int size) {
return switch (index) {
return switch (Entity.getSearchRepository().getIndexNameWithoutAlias(index)) {
case "topic_search_index", "topic" -> buildTopicSearchBuilder(q, from, size);
case "dashboard_search_index", "dashboard" -> buildDashboardSearchBuilder(q, from, size);
case "pipeline_search_index", "pipeline" -> buildPipelineSearchBuilder(q, from, size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

public interface SearchIndex {
Set<String> DEFAULT_EXCLUDED_FIELDS =
Set.of("changeDescription", "lineage.pipeline.changeDescription");
Set.of("changeDescription", "lineage.pipeline.changeDescription", "connection");

default Map<String, Object> buildSearchIndexDoc() {
// Build Index Doc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.OK;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.Entity.DATA_PRODUCT;
import static org.openmetadata.service.Entity.DOMAIN;
import static org.openmetadata.service.Entity.FIELD_DESCRIPTION;
import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME;
import static org.openmetadata.service.Entity.FIELD_NAME;
import static org.openmetadata.service.Entity.GLOSSARY_TERM;
import static org.openmetadata.service.Entity.QUERY;
import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse;
import static org.openmetadata.service.search.EntityBuilderConstant.COLUMNS_NAME_KEYWORD;
import static org.openmetadata.service.search.EntityBuilderConstant.DATA_MODEL_COLUMNS_NAME_KEYWORD;
Expand Down Expand Up @@ -345,8 +351,12 @@ public Response search(SearchRequest request) throws IOException {
}

/* For backward-compatibility we continue supporting the deleted argument, this should be removed in future versions */
if (request.getIndex().equalsIgnoreCase("all")
|| request.getIndex().equalsIgnoreCase("dataAsset")) {
if (request
.getIndex()
.equalsIgnoreCase(Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS))
|| request
.getIndex()
.equalsIgnoreCase(Entity.getSearchRepository().getIndexOrAliasName("dataAsset"))) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.should(
QueryBuilders.boolQuery()
Expand All @@ -358,12 +368,36 @@ public Response search(SearchRequest request) throws IOException {
.must(searchSourceBuilder.query())
.mustNot(QueryBuilders.existsQuery("deleted")));
searchSourceBuilder.query(boolQueryBuilder);
} else if (request.getIndex().equalsIgnoreCase("domain_search_index")
|| request.getIndex().equalsIgnoreCase("data_product_search_index")
|| request.getIndex().equalsIgnoreCase("query_search_index")
|| request.getIndex().equalsIgnoreCase("knowledge_page_search_index")
|| request.getIndex().equalsIgnoreCase("raw_cost_analysis_report_data_index")
|| request.getIndex().equalsIgnoreCase("aggregated_cost_analysis_report_data_index")) {
} else if (request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository().getIndexMapping(DOMAIN).getIndexName(clusterAlias))
|| request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository()
.getIndexMapping(DATA_PRODUCT)
.getIndexName(clusterAlias))
|| request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository().getIndexMapping(QUERY).getIndexName(clusterAlias))
|| request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository().getIndexOrAliasName("knowledge_page_search_index"))
|| request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository()
.getIndexMapping(RAW_COST_ANALYSIS_REPORT_DATA)
.getIndexName(clusterAlias))
|| request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository()
.getIndexMapping(AGGREGATED_COST_ANALYSIS_REPORT_DATA)
.getIndexName(clusterAlias))) {
searchSourceBuilder.query(QueryBuilders.boolQuery().must(searchSourceBuilder.query()));
} else {
searchSourceBuilder.query(
Expand All @@ -377,7 +411,12 @@ public Response search(SearchRequest request) throws IOException {
request.getSortFieldParam(), SortOrder.fromString(request.getSortOrder()));
}

if (request.getIndex().equalsIgnoreCase("glossary_term_search_index")) {
if (request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository()
.getIndexMapping(GLOSSARY_TERM)
.getIndexName(clusterAlias))) {
searchSourceBuilder.query(QueryBuilders.boolQuery().must(searchSourceBuilder.query()));

if (request.isGetHierarchy()) {
Expand Down Expand Up @@ -486,7 +525,12 @@ public Response getDocByID(String indexName, String entityId) throws IOException

public List<?> buildSearchHierarchy(SearchRequest request, SearchResponse searchResponse) {
List<?> response = new ArrayList<>();
if (request.getIndex().equalsIgnoreCase("glossary_term_search_index")) {
if (request
.getIndex()
.equalsIgnoreCase(
Entity.getSearchRepository()
.getIndexMapping(GLOSSARY_TERM)
.getIndexName(clusterAlias))) {
response = buildGlossaryTermSearchHierarchy(searchResponse);
}
return response;
Expand Down Expand Up @@ -640,7 +684,8 @@ public Map<String, Object> searchLineageInternal(
Set<Map<String, Object>> edges = new HashSet<>();
Set<Map<String, Object>> nodes = new HashSet<>();
os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(GLOBAL_SEARCH_ALIAS);
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn)));
Expand Down Expand Up @@ -688,7 +733,8 @@ private void getLineage(
return;
}
os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(GLOBAL_SEARCH_ALIAS);
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery(direction, fqn)));
Expand Down Expand Up @@ -749,7 +795,8 @@ private Map<String, Object> searchPipelineLineage(
Set<Map<String, Object>> nodes = new HashSet<>();
responseMap.put("entity", null);
os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(GLOBAL_SEARCH_ALIAS);
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders.boolQuery()
Expand Down Expand Up @@ -809,7 +856,8 @@ private Map<String, Object> searchPipelineLineage(
}
if (edges.isEmpty()) {
os.org.opensearch.action.search.SearchRequest searchRequestForEntity =
new os.org.opensearch.action.search.SearchRequest(GLOBAL_SEARCH_ALIAS);
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS));
SearchSourceBuilder searchSourceBuilderForEntity = new SearchSourceBuilder();
searchSourceBuilderForEntity.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn)));
Expand Down Expand Up @@ -1762,7 +1810,8 @@ private static os.org.opensearch.action.search.SearchRequest buildSearchRequest(
}

os.org.opensearch.action.search.SearchRequest searchRequest =
new os.org.opensearch.action.search.SearchRequest(dataReportIndex);
new os.org.opensearch.action.search.SearchRequest(
Entity.getSearchRepository().getIndexOrAliasName(dataReportIndex));
searchRequest.source(searchSourceBuilder);
return searchRequest;
}
Expand Down Expand Up @@ -2065,7 +2114,7 @@ public RestHighLevelClient createOpenSearchClient(ElasticSearchConfiguration esC

private static SearchSourceBuilder getSearchSourceBuilder(
String index, String q, int from, int size) {
return switch (index) {
return switch (Entity.getSearchRepository().getIndexNameWithoutAlias(index)) {
case "topic_search_index", "topic" -> buildTopicSearchBuilder(q, from, size);
case "dashboard_search_index", "dashboard" -> buildDashboardSearchBuilder(q, from, size);
case "pipeline_search_index", "pipeline" -> buildPipelineSearchBuilder(q, from, size);
Expand Down

0 comments on commit 8723b8c

Please sign in to comment.