Skip to content

Commit

Permalink
fix(elasticsearch): Update graph update function for ES
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Oct 25, 2022
1 parent 2006d55 commit 32b81d1
Show file tree
Hide file tree
Showing 41 changed files with 659 additions and 44 deletions.
3 changes: 1 addition & 2 deletions docker/datahub-gms/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
# For testing and small deployments, reduce flush period to decrease latency at the expense of throughput performance
ES_BULK_FLUSH_PERIOD=1
GRAPH_SERVICE_DIFF_MODE_ENABLED=true
GRAPH_SERVICE_IMPL=elasticsearch
JAVA_OPTS=-Xms1g -Xmx1g
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
2 changes: 0 additions & 2 deletions docker/datahub-gms/env/docker.cassandra.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
# For testing and small deployments, reduce flush period to decrease latency at the expense of throughput performance
ES_BULK_FLUSH_PERIOD=1
NEO4J_HOST=http://neo4j:7474
NEO4J_URI=bolt://neo4j
NEO4J_USERNAME=neo4j
Expand Down
3 changes: 1 addition & 2 deletions docker/datahub-gms/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
# For testing and small deployments, reduce flush period to decrease latency at the expense of throughput performance
ES_BULK_FLUSH_PERIOD=1
NEO4J_HOST=http://neo4j:7474
NEO4J_URI=bolt://neo4j
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=datahub
JAVA_OPTS=-Xms1g -Xmx1g
GRAPH_SERVICE_DIFF_MODE_ENABLED=true
GRAPH_SERVICE_IMPL=neo4j
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
ENTITY_SERVICE_ENABLE_RETENTION=true
Expand Down
2 changes: 0 additions & 2 deletions docker/datahub-gms/env/docker.mariadb.env
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
# For testing and small deployments, reduce flush period to decrease latency at the expense of throughput performance
ES_BULK_FLUSH_PERIOD=1
NEO4J_HOST=http://neo4j:7474
NEO4J_URI=bolt://neo4j
NEO4J_USERNAME=neo4j
Expand Down
2 changes: 0 additions & 2 deletions docker/datahub-gms/env/docker.postgres.env
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
# For testing and small deployments, reduce flush period to decrease latency at the expense of throughput performance
ES_BULK_FLUSH_PERIOD=1
NEO4J_HOST=http://neo4j:7474
NEO4J_URI=bolt://neo4j
NEO4J_USERNAME=neo4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_FLUSH_PERIOD=1
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=elasticsearch
- JAVA_OPTS=-Xms1g -Xmx1g
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_FLUSH_PERIOD=1
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=elasticsearch
- JAVA_OPTS=-Xms1g -Xmx1g
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_FLUSH_PERIOD=1
- GRAPH_SERVICE_IMPL=elasticsearch
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml
hostname: datahub-mae-consumer
Expand Down
1 change: 0 additions & 1 deletion docker/quickstart/docker-compose.consumers.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_FLUSH_PERIOD=1
- NEO4J_HOST=http://neo4j:7474
- NEO4J_URI=bolt://neo4j
- NEO4J_USERNAME=neo4j
Expand Down
1 change: 0 additions & 1 deletion docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_FLUSH_PERIOD=1
- NEO4J_HOST=http://neo4j:7474
- NEO4J_URI=bolt://neo4j
- NEO4J_USERNAME=neo4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;


/**
Expand All @@ -32,7 +33,7 @@ private static long getNumArrayWildcards(PathSpec pathSpec) {
}

// Extract the value of each field in the field specs from the input record
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(RecordTemplate record, List<T> fieldSpecs) {
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(@Nonnull RecordTemplate record, List<T> fieldSpecs) {
final Map<T, List<Object>> extractedFields = new HashMap<>();
for (T fieldSpec : fieldSpecs) {
Optional<Object> value = RecordUtils.getFieldValue(record, fieldSpec.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public interface GraphService {
*/
void addEdge(final Edge edge);

/**
* Remove an edge from the graph.
* @param edge the edge to delete
*/
void removeEdge(final Edge edge);

/**
* Find related entities (nodes) connected to a source entity via edges of given relationship types. Related entities
* can be filtered by source and destination type (use `null` for any type), by source and destination entity filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,11 @@ protected static String getQueryForRelatedEntities(@Nullable List<String> source
relationships);
}

@Override
public void removeEdge(final Edge edge) {
throw new UnsupportedOperationException("Remove edge not supported by DgraphGraphService at this time.");
}

@Nonnull
@Override
public RelatedEntitiesResult findRelatedEntities(@Nullable List<String> sourceTypes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class ESGraphQueryDAO {
private static final String SOURCE = "source";
private static final String DESTINATION = "destination";
private static final String RELATIONSHIP_TYPE = "relationshipType";
private static final String SEARCH_EXECUTIONS_METRIC = "num_elasticSearch_reads";

@Nonnull
public static void addFilterToQueryBuilder(@Nonnull Filter filter, String node, BoolQueryBuilder rootQuery) {
Expand Down Expand Up @@ -101,6 +102,7 @@ private SearchResponse executeSearchQuery(@Nonnull final QueryBuilder query, fin
searchRequest.indices(indexConvention.getIndexName(INDEX_NAME));

try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "esQuery").time()) {
MetricUtils.counter(this.getClass(), SEARCH_EXECUTIONS_METRIC).inc();
return client.search(searchRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("Search query failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;
Expand All @@ -27,6 +28,8 @@ public class ESGraphWriteDAO {
private final ESBulkProcessor bulkProcessor;
private final int numRetries;

private static final String ES_WRITES_METRIC = "num_elasticSearch_writes";

/**
* Updates or inserts the given search document.
*
Expand All @@ -38,10 +41,23 @@ public void upsertDocument(@Nonnull String docId, @Nonnull String document) {
new IndexRequest(indexConvention.getIndexName(INDEX_NAME)).id(docId).source(document, XContentType.JSON);
final UpdateRequest updateRequest =
new UpdateRequest(indexConvention.getIndexName(INDEX_NAME), docId).doc(document, XContentType.JSON)
.detectNoop(false).retryOnConflict(numRetries).upsert(indexRequest);
.detectNoop(false)
.retryOnConflict(numRetries)
.upsert(indexRequest);
bulkProcessor.add(updateRequest);
}

/**
* Deletes the given search document.
*
* @param docId the ID of the document
*/
public void deleteDocument(@Nonnull String docId) {
final DeleteRequest deleteRequest =
new DeleteRequest(indexConvention.getIndexName(INDEX_NAME)).id(docId);
bulkProcessor.add(deleteRequest);
}

public BulkByScrollResponse deleteByQuery(@Nullable final String sourceType, @Nonnull final Filter sourceEntityFilter,
@Nullable final String destinationType, @Nonnull final Filter destinationEntityFilter,
@Nonnull final List<String> relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,19 @@ public LineageRegistry getLineageRegistry() {
return _lineageRegistry;
}

@Override
public void addEdge(@Nonnull final Edge edge) {
String docId = toDocId(edge);
String edgeDocument = toDocument(edge);
_graphWriteDAO.upsertDocument(docId, edgeDocument);
}

@Override
public void removeEdge(@Nonnull final Edge edge) {
String docId = toDocId(edge);
_graphWriteDAO.deleteDocument(docId);
}

@Nonnull
public RelatedEntitiesResult findRelatedEntities(
@Nullable final List<String> sourceTypes,
Expand Down Expand Up @@ -250,7 +257,7 @@ public void configure() {
@VisibleForTesting
@Override
public void clear() {
_esBulkProcessor.deleteByQuery(QueryBuilders.matchAllQuery(), _indexConvention.getIndexName(INDEX_NAME));
_esBulkProcessor.deleteByQuery(QueryBuilders.matchAllQuery(), true, _indexConvention.getIndexName(INDEX_NAME));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public LineageRegistry getLineageRegistry() {
return _lineageRegistry;
}

@Override
public void addEdge(@Nonnull final Edge edge) {

log.debug(String.format("Adding Edge source: %s, destination: %s, type: %s",
Expand Down Expand Up @@ -94,6 +95,11 @@ public void addEdge(@Nonnull final Edge edge) {
executeStatements(statements);
}

@Override
public void removeEdge(final Edge edge) {
throw new UnsupportedOperationException("Remove edge not supported by Neo4JGraphService at this time.");
}

@Nonnull
public RelatedEntitiesResult findRelatedEntities(
@Nullable final List<String> sourceTypes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
@Slf4j
@Builder(builderMethodName = "hiddenBuilder")
public class ESBulkProcessor implements Closeable {
private static final String ES_WRITES_METRIC = "num_elasticSearch_writes";
private static final String ES_DELETE_EXCEPTION_METRIC = "delete_by_query";

public static ESBulkProcessor.ESBulkProcessorBuilder builder(RestHighLevelClient searchClient) {
return hiddenBuilder().searchClient(searchClient);
Expand Down Expand Up @@ -67,12 +69,17 @@ private ESBulkProcessor(@NonNull RestHighLevelClient searchClient, @NonNull Bool
}

public ESBulkProcessor add(DocWriteRequest<?> request) {
MetricUtils.counter(this.getClass(), ES_WRITES_METRIC).inc();
bulkProcessor.add(request);
return this;
}

public Optional<BulkByScrollResponse> deleteByQuery(QueryBuilder queryBuilder, String... indices) {
return deleteByQuery(queryBuilder, true, bulkRequestsLimit, defaultTimeout, indices);
return deleteByQuery(queryBuilder, false, bulkRequestsLimit, defaultTimeout, indices);
}

public Optional<BulkByScrollResponse> deleteByQuery(QueryBuilder queryBuilder, boolean refresh, String... indices) {
return deleteByQuery(queryBuilder, refresh, bulkRequestsLimit, defaultTimeout, indices);
}

public Optional<BulkByScrollResponse> deleteByQuery(QueryBuilder queryBuilder, boolean refresh,
Expand All @@ -91,10 +98,11 @@ public Optional<BulkByScrollResponse> deleteByQuery(QueryBuilder queryBuilder, b
bulkProcessor.flush();
// perform delete after local flush
final BulkByScrollResponse deleteResponse = searchClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
MetricUtils.counter(this.getClass(), ES_WRITES_METRIC).inc(deleteResponse.getTotal());
return Optional.of(deleteResponse);
} catch (Exception e) {
log.error("ERROR: Failed to delete by query. See stacktrace for a more detailed error:", e);
MetricUtils.exceptionCounter(ESBulkProcessor.class, "delete_by_query", e);
MetricUtils.exceptionCounter(ESBulkProcessor.class, ES_DELETE_EXCEPTION_METRIC, e);
}

return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String document,
final IndexRequest indexRequest = new IndexRequest(indexName).id(docId).source(document, XContentType.JSON);
final UpdateRequest updateRequest =
new UpdateRequest(indexName, docId).doc(document, XContentType.JSON)
.detectNoop(false).retryOnConflict(numRetries).upsert(indexRequest);
.detectNoop(false)
.retryOnConflict(numRetries)
.upsert(indexRequest);
bulkProcessor.add(updateRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public void upsertDocument(@Nonnull String docId, @Nonnull String document) {
new IndexRequest(indexConvention.getIndexName(INDEX_NAME)).id(docId).source(document, XContentType.JSON);
final UpdateRequest updateRequest =
new UpdateRequest(indexConvention.getIndexName(INDEX_NAME), docId).doc(document, XContentType.JSON)
.detectNoop(false).retryOnConflict(numRetries).upsert(indexRequest);
.detectNoop(false)
.retryOnConflict(numRetries)
.upsert(indexRequest);
bulkProcessor.add(updateRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,6 @@ public void configure() {
@VisibleForTesting
@Override
public void clear() {
_esBulkProcessor.deleteByQuery(QueryBuilders.matchAllQuery(), _indexConvention.getIndexName(INDEX_NAME));
_esBulkProcessor.deleteByQuery(QueryBuilders.matchAllQuery(), true, _indexConvention.getIndexName(INDEX_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ public void upsertDocument(@Nonnull String entityName, @Nonnull String aspectNam
final IndexRequest indexRequest =
new IndexRequest(indexName).id(docId).source(document.toString(), XContentType.JSON);
final UpdateRequest updateRequest = new UpdateRequest(indexName, docId).doc(document.toString(), XContentType.JSON)
.detectNoop(false).retryOnConflict(_numRetries).upsert(indexRequest);
.detectNoop(false)
.retryOnConflict(_numRetries)
.upsert(indexRequest);
_bulkProcessor.add(updateRequest);
}

Expand Down Expand Up @@ -203,7 +205,7 @@ public DeleteAspectValuesResult deleteAspectValues(@Nonnull String entityName, @
final BoolQueryBuilder filterQueryBuilder = ESUtils.buildFilterQuery(filter);

final Optional<DeleteAspectValuesResult> result = _bulkProcessor
.deleteByQuery(filterQueryBuilder, true, DEFAULT_LIMIT, TimeValue.timeValueMinutes(10), indexName)
.deleteByQuery(filterQueryBuilder, false, DEFAULT_LIMIT, TimeValue.timeValueMinutes(10), indexName)
.map(response -> new DeleteAspectValuesResult().setNumDocsDeleted(response.getDeleted()));

if (result.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static CassandraContainer setupContainer() {
.asCompatibleSubstituteFor("cassandra");

CassandraContainer container = new CassandraContainer(imageName);
container.withEnv("JVM_OPTS", "-Xms64M -Xmx64M")
container.withEnv("JVM_OPTS", "-Xms64M -Xmx96M")
.withStartupTimeout(Duration.ofMinutes(5)) // usually < 1min
.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

@TestConfiguration
public class ElasticSearchTestConfiguration {
private static final int REFRESH_INTERVAL_SECONDS = 1;
private static final String ELASTIC_VERSION = "7.9.3";
private static final String ELASTIC_IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch";
private static final String ENV_ELASTIC_IMAGE_FULL_NAME = System.getenv("ELASTIC_IMAGE_FULL_NAME");
Expand All @@ -35,6 +36,10 @@ public class ElasticSearchTestConfiguration {

private static final int HTTP_PORT = 9200;

public static void syncAfterWrite() throws InterruptedException {
Thread.sleep(REFRESH_INTERVAL_SECONDS);
}

@Primary
@Scope("singleton")
@Bean(name = "elasticSearchRestHighLevelClient")
Expand All @@ -55,13 +60,13 @@ public RestHighLevelClient getElasticsearchClient() {
@Bean(name = "elasticSearchBulkProcessor")
@Nonnull
public ESBulkProcessor getBulkProcessor(@Qualifier("elasticSearchRestHighLevelClient") RestHighLevelClient searchClient) {
// Using the default sync processor
return ESBulkProcessor.builder(searchClient)
// For testing, immediately return write from search query
.async(true)
// For testing, immediately return write from search query (avoid the need to syncAfterWrite workarounds
.writeRequestRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.bulkRequestsLimit(1)
.bulkFlushPeriod(-1)
.retryInterval(10L)
.bulkFlushPeriod(1)
.retryInterval(1L)
.numRetries(1)
.build();
}
Expand Down
Loading

0 comments on commit 32b81d1

Please sign in to comment.