Skip to content

Commit

Permalink
feat(elasticsearch): Updates to elasticsearch configuration, dao, and…
Browse files Browse the repository at this point in the history
… tests
  • Loading branch information
david-leifker committed Nov 15, 2022
1 parent fc26cf3 commit bec9354
Show file tree
Hide file tree
Showing 84 changed files with 1,768 additions and 669 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ project.ext.externalDependency = [
'springJdbc': "org.springframework:spring-jdbc:$springVersion",
'springWeb': "org.springframework:spring-web:$springVersion",
'springWebMVC': "org.springframework:spring-webmvc:$springVersion",
'springBootTest': "org.springframework.boot:spring-boot-starter-test:$springBootVersion",
'springBoot': "org.springframework.boot:spring-boot:$springBootVersion",
'springBootAutoconfigure': "org.springframework.boot:spring-boot-autoconfigure:$springBootVersion",
'springBootStarterWeb': "org.springframework.boot:spring-boot-starter-web:$springBootVersion",
Expand Down
2 changes: 2 additions & 0 deletions docker/datahub-gms/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
GRAPH_SERVICE_DIFF_MODE_ENABLED=true
GRAPH_SERVICE_IMPL=elasticsearch
JAVA_OPTS=-Xms1g -Xmx1g
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
3 changes: 3 additions & 0 deletions docker/datahub-gms/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
NEO4J_HOST=http://neo4j:7474
NEO4J_URI=bolt://neo4j
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=datahub
JAVA_OPTS=-Xms1g -Xmx1g
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
GRAPH_SERVICE_DIFF_MODE_ENABLED=true
GRAPH_SERVICE_IMPL=neo4j
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
ENTITY_SERVICE_ENABLE_RETENTION=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=elasticsearch
- JAVA_OPTS=-Xms1g -Xmx1g
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
2 changes: 2 additions & 0 deletions docker/quickstart/docker-compose-without-neo4j.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=elasticsearch
- JAVA_OPTS=-Xms1g -Xmx1g
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
2 changes: 2 additions & 0 deletions docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- NEO4J_HOST=http://neo4j:7474
- NEO4J_URI=bolt://neo4j
- NEO4J_USERNAME=neo4j
- NEO4J_PASSWORD=datahub
- JAVA_OPTS=-Xms1g -Xmx1g
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=neo4j
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;


/**
Expand All @@ -32,7 +33,7 @@ private static long getNumArrayWildcards(PathSpec pathSpec) {
}

// Extract the value of each field in the field specs from the input record
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(RecordTemplate record, List<T> fieldSpecs) {
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(@Nonnull RecordTemplate record, List<T> fieldSpecs) {
final Map<T, List<Object>> extractedFields = new HashMap<>();
for (T fieldSpec : fieldSpecs) {
Optional<Object> value = RecordUtils.getFieldValue(record, fieldSpec.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ public class MergedEntityRegistry implements EntityRegistry {
private final Map<String, AspectSpec> _aspectNameToSpec;

public MergedEntityRegistry(EntityRegistry baseEntityRegistry) {
entityNameToSpec = baseEntityRegistry.getEntitySpecs() != null ? baseEntityRegistry.getEntitySpecs() : new HashMap<>();
eventNameToSpec = baseEntityRegistry.getEventSpecs() != null ? baseEntityRegistry.getEventSpecs() : new HashMap<>();
// baseEntityRegistry.get*Specs() can return immutable Collections.emptyMap() which fails
// when this class attempts .put* operations on it.
entityNameToSpec = baseEntityRegistry.getEntitySpecs() != null ? new HashMap<>(baseEntityRegistry.getEntitySpecs()) : new HashMap<>();
eventNameToSpec = baseEntityRegistry.getEventSpecs() != null ? new HashMap<>(baseEntityRegistry.getEventSpecs()) : new HashMap<>();
baseEntityRegistry.getAspectTemplateEngine();
_aspectTemplateEngine = baseEntityRegistry.getAspectTemplateEngine();
_aspectNameToSpec = baseEntityRegistry.getAspectSpecs();
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def get_long_description():

rest_common = {
"requests",
"requests_file"
}

kafka_common = {
Expand Down
38 changes: 23 additions & 15 deletions metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pydantic
import requests
from expandvars import expandvars
from requests_file import FileAdapter

from datahub.cli.cli_utils import DATAHUB_ROOT_FOLDER
from datahub.cli.docker_check import (
Expand Down Expand Up @@ -47,16 +48,19 @@

BOOTSTRAP_MCES_FILE = "metadata-ingestion/examples/mce_files/bootstrap_mce.json"

GITHUB_BASE_URL = "https://raw.githubusercontent.com/datahub-project/datahub/master"
DOCKER_COMPOSE_BASE = os.getenv(
"DOCKER_COMPOSE_BASE",
"https://raw.githubusercontent.com/datahub-project/datahub/master",
)

GITHUB_NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{GITHUB_BASE_URL}/{NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_FILE}"
NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{DOCKER_COMPOSE_BASE}/{NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_FILE}"
)
GITHUB_ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{GITHUB_BASE_URL}/{ELASTIC_QUICKSTART_COMPOSE_FILE}"
ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{DOCKER_COMPOSE_BASE}/{ELASTIC_QUICKSTART_COMPOSE_FILE}"
)
GITHUB_M1_QUICKSTART_COMPOSE_URL = f"{GITHUB_BASE_URL}/{M1_QUICKSTART_COMPOSE_FILE}"
GITHUB_BOOTSTRAP_MCES_URL = f"{GITHUB_BASE_URL}/{BOOTSTRAP_MCES_FILE}"
M1_QUICKSTART_COMPOSE_URL = f"{DOCKER_COMPOSE_BASE}/{M1_QUICKSTART_COMPOSE_FILE}"
BOOTSTRAP_MCES_URL = f"{DOCKER_COMPOSE_BASE}/{BOOTSTRAP_MCES_FILE}"


class Architectures(Enum):
Expand Down Expand Up @@ -630,13 +634,17 @@ def quickstart(
fg="red",
)
github_file = (
GITHUB_NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL
NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL
if should_use_neo4j and not is_arch_m1(quickstart_arch)
else GITHUB_ELASTIC_QUICKSTART_COMPOSE_URL
else ELASTIC_QUICKSTART_COMPOSE_URL
if not is_arch_m1(quickstart_arch)
else GITHUB_M1_QUICKSTART_COMPOSE_URL
else M1_QUICKSTART_COMPOSE_URL
)

# also allow local files
request_session = requests.Session()
request_session.mount("file://", FileAdapter())

with open(
default_quickstart_compose_file, "wb"
) if default_quickstart_compose_file else tempfile.NamedTemporaryFile(
Expand All @@ -646,16 +654,16 @@ def quickstart(
quickstart_compose_file.append(path)
click.echo(f"Fetching docker-compose file {github_file} from GitHub")
# Download the quickstart docker-compose file from GitHub.
quickstart_download_response = requests.get(github_file)
quickstart_download_response = request_session.get(github_file)
quickstart_download_response.raise_for_status()
tmp_file.write(quickstart_download_response.content)
logger.debug(f"Copied to {path}")

if standalone_consumers:
consumer_github_file = (
f"{GITHUB_BASE_URL}/{CONSUMERS_QUICKSTART_COMPOSE_FILE}"
f"{DOCKER_COMPOSE_BASE}/{CONSUMERS_QUICKSTART_COMPOSE_FILE}"
if should_use_neo4j
else f"{GITHUB_BASE_URL}/{ELASTIC_CONSUMERS_QUICKSTART_COMPOSE_FILE}"
else f"{DOCKER_COMPOSE_BASE}/{ELASTIC_CONSUMERS_QUICKSTART_COMPOSE_FILE}"
)

default_consumer_compose_file = (
Expand All @@ -672,7 +680,7 @@ def quickstart(
f"Fetching consumer docker-compose file {consumer_github_file} from GitHub"
)
# Download the quickstart docker-compose file from GitHub.
quickstart_download_response = requests.get(consumer_github_file)
quickstart_download_response = request_session.get(consumer_github_file)
quickstart_download_response.raise_for_status()
tmp_file.write(quickstart_download_response.content)
logger.debug(f"Copied to {path}")
Expand Down Expand Up @@ -839,7 +847,7 @@ def ingest_sample_data(path: Optional[str], token: Optional[str]) -> None:
path = str(pathlib.Path(tmp_file.name))

# Download the bootstrap MCE file from GitHub.
mce_json_download_response = requests.get(GITHUB_BOOTSTRAP_MCES_URL)
mce_json_download_response = requests.get(BOOTSTRAP_MCES_URL)
mce_json_download_response.raise_for_status()
tmp_file.write(mce_json_download_response.content)
click.echo(f"Downloaded to {path}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def create_from_checkpoint_aspect(
except Exception as e:
# Failure to load config is probably okay...config structure has changed.
logger.warning(
"Failed to construct checkpoint's config from checkpoint aspect.", e
"Failed to construct checkpoint's config from checkpoint aspect. %s", e
)
else:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.Network;
Expand Down Expand Up @@ -106,7 +105,7 @@ private static String createTopics(Stream<String> bootstraps) {
try {
createAdminClient(bootstrap).createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))).all().get();
return bootstrap;
} catch (TimeoutException | InterruptedException | ExecutionException ex) {
} catch (RuntimeException | InterruptedException | ExecutionException ex) {
return null;
}
}).filter(Objects::nonNull).findFirst().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in2.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in2.csv,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/JavaHdfsIn2HdfsOut2/out.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/JavaHdfsIn2HdfsOut2/out.csv,PROD)"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,42 +200,6 @@
"jobId": "QueryExecId_11",
"flow": "urn:li:dataFlow:(spark,JavaHiveInHiveOut,spark_spark-master_7077)"
}
},
{
"com.linkedin.common.DataPlatformInstance": {
"platform": "urn:li:dataPlatform:spark"
}
},
{
"com.linkedin.datajob.DataJobInfo": {
"name": "insertInto at HiveInHiveOut.java:44",
"type": {
"string": "sparkJob"
},
"customProperties": {
"SQLQueryId": "11",
"appName": "JavaHiveInHiveOut",
"description": "insertInto at HiveInHiveOut.java:44",
"queryPlan": "InsertIntoHiveTable `javahiveinhiveout`.`hivetab`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [a, b, c, d]\n+- HiveTableRelation `javahiveinhiveout`.`foo5`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#90, b#91, c#92, d#93]\n"
}
}
},
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,javahiveinhiveout.foo5,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,javahiveinhiveout.hivetab,PROD)"
]
}
},
{
"com.linkedin.common.BrowsePaths": {
"paths": [
"/spark/spark_spark-master_7077"
]
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in2.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in2.csv,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/PythonHdfsIn2HdfsOut2/out1.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/PythonHdfsIn2HdfsOut2/out1.csv,PROD)"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,42 +200,6 @@
"jobId": "QueryExecId_11",
"flow": "urn:li:dataFlow:(spark,PythonHiveInHiveOut,spark_spark-master_7077)"
}
},
{
"com.linkedin.common.DataPlatformInstance": {
"platform": "urn:li:dataPlatform:spark"
}
},
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,pythonhiveinhiveout.foo5,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,pythonhiveinhiveout.hivetab,PROD)"
]
}
},
{
"com.linkedin.common.BrowsePaths": {
"paths": [
"/spark/spark_spark-master_7077"
]
}
},
{
"com.linkedin.datajob.DataJobInfo": {
"name": "insertInto at NativeMethodAccessorImpl.java:0",
"type": {
"string": "sparkJob"
},
"customProperties": {
"SQLQueryId": "11",
"appName": "PythonHiveInHiveOut",
"description": "insertInto at NativeMethodAccessorImpl.java:0",
"queryPlan": "InsertIntoHiveTable `pythonhiveinhiveout`.`hivetab`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [a, b, c, d]\n+- HiveTableRelation `pythonhiveinhiveout`.`foo5`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#90, b#91, c#92, d#93]\n"
}
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
import json
import time
# import urllib
# from typing import Any, Dict, Optional, cast

import pytest
import requests
import os
from jsoncomparison import Compare, NO_DIFF

# from datahub.ingestion.run.pipeline import Pipeline
# from datahub.ingestion.source.sql.mysql import MySQLConfig, MySQLSource
# from datahub.ingestion.source.sql.sql_common import BaseSQLAlchemyCheckpointState
# from datahub.ingestion.source.state.checkpoint import Checkpoint
# from tests.utils import ingest_file_via_rest

GMS_ENDPOINT = "http://localhost:8080"
GOLDEN_FILES_PATH = "./spark-smoke-test/golden_json/"
golden_files = os.listdir(GOLDEN_FILES_PATH)
Expand All @@ -23,7 +14,6 @@
restli_default_headers = {
"X-RestLi-Protocol-Version": "2.0.0",
}
kafka_post_ingestion_wait_sec = 60

JSONDIFF_CONFIG = {
'output': {
Expand Down
1 change: 1 addition & 0 deletions metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies {
testCompile externalDependency.testContainersCassandra
testCompile externalDependency.lombok
testCompile project(':test-models')
testImplementation externalDependency.springBootTest

testAnnotationProcessor externalDependency.lombok

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public interface GraphService {
*/
void addEdge(final Edge edge);

/**
* Remove an edge from the graph.
* @param edge the edge to delete
*/
void removeEdge(final Edge edge);

/**
* Find related entities (nodes) connected to a source entity via edges of given relationship types. Related entities
* can be filtered by source and destination type (use `null` for any type), by source and destination entity filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,11 @@ protected static String getQueryForRelatedEntities(@Nullable List<String> source
relationships);
}

@Override
public void removeEdge(final Edge edge) {
throw new UnsupportedOperationException("Remove edge not supported by DgraphGraphService at this time.");
}

@Nonnull
@Override
public RelatedEntitiesResult findRelatedEntities(@Nullable List<String> sourceTypes,
Expand Down
Loading

0 comments on commit bec9354

Please sign in to comment.