Skip to content

Commit

Permalink
fix(elastic): fix race condition between gms and mae for index creation
Browse files Browse the repository at this point in the history
  • Loading branch information
leifker committed Dec 19, 2022
1 parent 3152645 commit 9629d12
Show file tree
Hide file tree
Showing 15 changed files with 36 additions and 5 deletions.
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
INDEX_CREATE=true
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
GRAPH_SERVICE_DIFF_MODE_ENABLED=true
GRAPH_SERVICE_IMPL=elasticsearch
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker.cassandra.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
INDEX_CREATE=true
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
NEO4J_HOST=http://neo4j:7474
NEO4J_URI=bolt://neo4j
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
INDEX_CREATE=true
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
NEO4J_HOST=http://neo4j:7474
NEO4J_URI=bolt://neo4j
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker.mariadb.env
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
INDEX_CREATE=true
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
NEO4J_HOST=http://neo4j:7474
NEO4J_URI=bolt://neo4j
Expand Down
1 change: 1 addition & 0 deletions docker/datahub-gms/env/docker.postgres.env
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
INDEX_CREATE=true
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
NEO4J_HOST=http://neo4j:7474
NEO4J_URI=bolt://neo4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- INDEX_CREATE=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=elasticsearch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- INDEX_CREATE=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=elasticsearch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- INDEX_CREATE=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_IMPL=elasticsearch
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose.consumers.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- INDEX_CREATE=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- NEO4J_HOST=http://neo4j:7474
- NEO4J_URI=bolt://neo4j
Expand Down
1 change: 1 addition & 0 deletions docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- INDEX_CREATE=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- NEO4J_HOST=http://neo4j:7474
- NEO4J_URI=bolt://neo4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public class ESIndexBuilder {
@Getter
private final Map<String, Map<String, String>> indexSettingOverrides;

@Getter
private final boolean enabled;

@Getter
private final boolean enableIndexSettingsReindex;

Expand All @@ -77,6 +80,12 @@ public class ESIndexBuilder {

public void buildIndex(String indexName, Map<String, Object> mappings, Map<String, Object> settings)
throws IOException {

if (!enabled) {
log.info("Index building is not enabled for this component.");
return;
}

// Check if index exists
boolean exists = searchClient.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ public class EntityIndexBuilder {
private final EntitySpec entitySpec;
private final SettingsBuilder settingsBuilder;
private final String indexName;
private final boolean enabled;

public void buildIndex() throws IOException {
log.info("Setting up index: {}", indexName);
Map<String, Object> mappings = MappingsBuilder.getMappings(entitySpec);
Map<String, Object> settings = settingsBuilder.getSettings();
if (enabled) {
Map<String, Object> mappings = MappingsBuilder.getMappings(entitySpec);
Map<String, Object> settings = settingsBuilder.getSettings();

indexBuilder.buildIndex(indexName, mappings, settings);
indexBuilder.buildIndex(indexName, mappings, settings);
} else {
log.info("Index building is not enabled for this component.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import java.io.IOException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;


@Slf4j
@RequiredArgsConstructor
public class EntityIndexBuilders {
private final ESIndexBuilder indexBuilder;
private final EntityRegistry entityRegistry;
private final IndexConvention indexConvention;
private final SettingsBuilder settingsBuilder;
private boolean enabled;

public void buildAll() {
for (EntitySpec entitySpec : entityRegistry.getEntitySpecs().values()) {
try {
new EntityIndexBuilder(indexBuilder, entitySpec, settingsBuilder,
indexConvention.getIndexName(entitySpec)).buildIndex();
indexConvention.getIndexName(entitySpec), enabled).buildIndex();
} catch (IOException e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public class ElasticSearchIndexBuilderFactory {
@Value("${elasticsearch.index.entitySettingsOverrides}")
private String entityIndexSettingOverrides;

@Value("${index.create}")
private boolean indexCreate;

@Value("#{new Boolean('${elasticsearch.index.enableSettingsReindex}')}")
private boolean enableSettingsReindex;

Expand All @@ -75,7 +78,7 @@ protected Map<String, Map<String, String>> getIndexSettingsOverrides(
protected ESIndexBuilder getInstance(
@Qualifier("elasticSearchIndexSettingsOverrides") Map<String, Map<String, String>> overrides) {
return new ESIndexBuilder(searchClient, numShards, numReplicas, numRetries, refreshIntervalSeconds, overrides,
enableSettingsReindex);
indexCreate, enableSettingsReindex);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ elasticsearch:
refreshPolicy: ${ES_BULK_REFRESH_POLICY:NONE}
index:
prefix: ${INDEX_PREFIX:}
create: ${INDEX_CREATE:false} # mae/gms there should only be 1 index creator
numShards: ${ELASTICSEARCH_NUM_SHARDS_PER_INDEX:1}
numReplicas: ${ELASTICSEARCH_NUM_REPLICAS_PER_INDEX:1}
numRetries: ${ELASTICSEARCH_INDEX_BUILDER_NUM_RETRIES:3}
Expand Down

0 comments on commit 9629d12

Please sign in to comment.