Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(elasticsearch): Updates to elasticsearch configuration, dao, tests #6269

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ project.ext.externalDependency = [
'springJdbc': "org.springframework:spring-jdbc:$springVersion",
'springWeb': "org.springframework:spring-web:$springVersion",
'springWebMVC': "org.springframework:spring-webmvc:$springVersion",
'springBootTest': "org.springframework.boot:spring-boot-starter-test:$springBootVersion",
'springBoot': "org.springframework.boot:spring-boot:$springBootVersion",
'springBootAutoconfigure': "org.springframework.boot:spring-boot-autoconfigure:$springBootVersion",
'springBootStarterWeb': "org.springframework.boot:spring-boot-starter-web:$springBootVersion",
Expand Down
2 changes: 2 additions & 0 deletions docker/datahub-gms/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
GRAPH_SERVICE_DIFF_MODE_ENABLED=true
GRAPH_SERVICE_IMPL=elasticsearch
JAVA_OPTS=-Xms1g -Xmx1g
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
3 changes: 3 additions & 0 deletions docker/datahub-gms/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
NEO4J_HOST=http://neo4j:7474
NEO4J_URI=bolt://neo4j
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=datahub
JAVA_OPTS=-Xms1g -Xmx1g
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
GRAPH_SERVICE_DIFF_MODE_ENABLED=true
GRAPH_SERVICE_IMPL=neo4j
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
ENTITY_SERVICE_ENABLE_RETENTION=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=elasticsearch
- JAVA_OPTS=-Xms1g -Xmx1g
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
2 changes: 2 additions & 0 deletions docker/quickstart/docker-compose-without-neo4j.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=elasticsearch
- JAVA_OPTS=-Xms1g -Xmx1g
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
2 changes: 2 additions & 0 deletions docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- NEO4J_HOST=http://neo4j:7474
- NEO4J_URI=bolt://neo4j
- NEO4J_USERNAME=neo4j
- NEO4J_PASSWORD=datahub
- JAVA_OPTS=-Xms1g -Xmx1g
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=neo4j
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;


/**
Expand All @@ -32,7 +33,7 @@ private static long getNumArrayWildcards(PathSpec pathSpec) {
}

// Extract the value of each field in the field specs from the input record
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(RecordTemplate record, List<T> fieldSpecs) {
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(@Nonnull RecordTemplate record, List<T> fieldSpecs) {
final Map<T, List<Object>> extractedFields = new HashMap<>();
for (T fieldSpec : fieldSpecs) {
Optional<Object> value = RecordUtils.getFieldValue(record, fieldSpec.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ public class MergedEntityRegistry implements EntityRegistry {
private final Map<String, AspectSpec> _aspectNameToSpec;

public MergedEntityRegistry(EntityRegistry baseEntityRegistry) {
entityNameToSpec = baseEntityRegistry.getEntitySpecs() != null ? baseEntityRegistry.getEntitySpecs() : new HashMap<>();
eventNameToSpec = baseEntityRegistry.getEventSpecs() != null ? baseEntityRegistry.getEventSpecs() : new HashMap<>();
// baseEntityRegistry.get*Specs() can return immutable Collections.emptyMap() which fails
// when this class attempts .put* operations on it.
entityNameToSpec = baseEntityRegistry.getEntitySpecs() != null ? new HashMap<>(baseEntityRegistry.getEntitySpecs()) : new HashMap<>();
eventNameToSpec = baseEntityRegistry.getEventSpecs() != null ? new HashMap<>(baseEntityRegistry.getEventSpecs()) : new HashMap<>();
baseEntityRegistry.getAspectTemplateEngine();
_aspectTemplateEngine = baseEntityRegistry.getAspectTemplateEngine();
_aspectNameToSpec = baseEntityRegistry.getAspectSpecs();
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def get_long_description():

rest_common = {
"requests",
"requests_file"
}

kafka_common = {
Expand Down
38 changes: 23 additions & 15 deletions metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pydantic
import requests
from expandvars import expandvars
from requests_file import FileAdapter
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that intent here is to exercise the same logic that fetches compose files from github to be able to use file:// in the same way. There is logic in this class that is generally useful for running the smoke test locally.


from datahub.cli.cli_utils import DATAHUB_ROOT_FOLDER
from datahub.cli.docker_check import (
Expand Down Expand Up @@ -47,16 +48,19 @@

BOOTSTRAP_MCES_FILE = "metadata-ingestion/examples/mce_files/bootstrap_mce.json"

GITHUB_BASE_URL = "https://raw.githubusercontent.com/datahub-project/datahub/master"
DOCKER_COMPOSE_BASE = os.getenv(
"DOCKER_COMPOSE_BASE",
"https://raw.githubusercontent.com/datahub-project/datahub/master",
)

GITHUB_NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{GITHUB_BASE_URL}/{NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_FILE}"
NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{DOCKER_COMPOSE_BASE}/{NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_FILE}"
)
GITHUB_ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{GITHUB_BASE_URL}/{ELASTIC_QUICKSTART_COMPOSE_FILE}"
ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{DOCKER_COMPOSE_BASE}/{ELASTIC_QUICKSTART_COMPOSE_FILE}"
)
GITHUB_M1_QUICKSTART_COMPOSE_URL = f"{GITHUB_BASE_URL}/{M1_QUICKSTART_COMPOSE_FILE}"
GITHUB_BOOTSTRAP_MCES_URL = f"{GITHUB_BASE_URL}/{BOOTSTRAP_MCES_FILE}"
M1_QUICKSTART_COMPOSE_URL = f"{DOCKER_COMPOSE_BASE}/{M1_QUICKSTART_COMPOSE_FILE}"
BOOTSTRAP_MCES_URL = f"{DOCKER_COMPOSE_BASE}/{BOOTSTRAP_MCES_FILE}"


class Architectures(Enum):
Expand Down Expand Up @@ -630,13 +634,17 @@ def quickstart(
fg="red",
)
github_file = (
GITHUB_NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL
NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL
if should_use_neo4j and not is_arch_m1(quickstart_arch)
else GITHUB_ELASTIC_QUICKSTART_COMPOSE_URL
else ELASTIC_QUICKSTART_COMPOSE_URL
if not is_arch_m1(quickstart_arch)
else GITHUB_M1_QUICKSTART_COMPOSE_URL
else M1_QUICKSTART_COMPOSE_URL
)

# also allow local files
request_session = requests.Session()
request_session.mount("file://", FileAdapter())

with open(
default_quickstart_compose_file, "wb"
) if default_quickstart_compose_file else tempfile.NamedTemporaryFile(
Expand All @@ -646,16 +654,16 @@ def quickstart(
quickstart_compose_file.append(path)
click.echo(f"Fetching docker-compose file {github_file} from GitHub")
# Download the quickstart docker-compose file from GitHub.
quickstart_download_response = requests.get(github_file)
quickstart_download_response = request_session.get(github_file)
quickstart_download_response.raise_for_status()
tmp_file.write(quickstart_download_response.content)
logger.debug(f"Copied to {path}")

if standalone_consumers:
consumer_github_file = (
f"{GITHUB_BASE_URL}/{CONSUMERS_QUICKSTART_COMPOSE_FILE}"
f"{DOCKER_COMPOSE_BASE}/{CONSUMERS_QUICKSTART_COMPOSE_FILE}"
if should_use_neo4j
else f"{GITHUB_BASE_URL}/{ELASTIC_CONSUMERS_QUICKSTART_COMPOSE_FILE}"
else f"{DOCKER_COMPOSE_BASE}/{ELASTIC_CONSUMERS_QUICKSTART_COMPOSE_FILE}"
)

default_consumer_compose_file = (
Expand All @@ -672,7 +680,7 @@ def quickstart(
f"Fetching consumer docker-compose file {consumer_github_file} from GitHub"
)
# Download the quickstart docker-compose file from GitHub.
quickstart_download_response = requests.get(consumer_github_file)
quickstart_download_response = request_session.get(consumer_github_file)
quickstart_download_response.raise_for_status()
tmp_file.write(quickstart_download_response.content)
logger.debug(f"Copied to {path}")
Expand Down Expand Up @@ -839,7 +847,7 @@ def ingest_sample_data(path: Optional[str], token: Optional[str]) -> None:
path = str(pathlib.Path(tmp_file.name))

# Download the bootstrap MCE file from GitHub.
mce_json_download_response = requests.get(GITHUB_BOOTSTRAP_MCES_URL)
mce_json_download_response = requests.get(BOOTSTRAP_MCES_URL)
mce_json_download_response.raise_for_status()
tmp_file.write(mce_json_download_response.content)
click.echo(f"Downloaded to {path}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def create_from_checkpoint_aspect(
except Exception as e:
# Failure to load config is probably okay...config structure has changed.
logger.warning(
"Failed to construct checkpoint's config from checkpoint aspect.", e
"Failed to construct checkpoint's config from checkpoint aspect. %s", e
)
else:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.Network;
Expand Down Expand Up @@ -106,7 +105,7 @@ private static String createTopics(Stream<String> bootstraps) {
try {
createAdminClient(bootstrap).createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))).all().get();
return bootstrap;
} catch (TimeoutException | InterruptedException | ExecutionException ex) {
} catch (RuntimeException | InterruptedException | ExecutionException ex) {
return null;
}
}).filter(Objects::nonNull).findFirst().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in2.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in2.csv,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/JavaHdfsIn2HdfsOut2/out.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/JavaHdfsIn2HdfsOut2/out.csv,PROD)"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,42 +200,6 @@
"jobId": "QueryExecId_11",
"flow": "urn:li:dataFlow:(spark,JavaHiveInHiveOut,spark_spark-master_7077)"
}
},
{
"com.linkedin.common.DataPlatformInstance": {
"platform": "urn:li:dataPlatform:spark"
}
},
{
"com.linkedin.datajob.DataJobInfo": {
"name": "insertInto at HiveInHiveOut.java:44",
"type": {
"string": "sparkJob"
},
"customProperties": {
"SQLQueryId": "11",
"appName": "JavaHiveInHiveOut",
"description": "insertInto at HiveInHiveOut.java:44",
"queryPlan": "InsertIntoHiveTable `javahiveinhiveout`.`hivetab`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [a, b, c, d]\n+- HiveTableRelation `javahiveinhiveout`.`foo5`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#90, b#91, c#92, d#93]\n"
}
}
},
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,javahiveinhiveout.foo5,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,javahiveinhiveout.hivetab,PROD)"
]
}
},
{
"com.linkedin.common.BrowsePaths": {
"paths": [
"/spark/spark_spark-master_7077"
]
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in2.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in2.csv,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/PythonHdfsIn2HdfsOut2/out1.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/PythonHdfsIn2HdfsOut2/out1.csv,PROD)"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,42 +200,6 @@
"jobId": "QueryExecId_11",
"flow": "urn:li:dataFlow:(spark,PythonHiveInHiveOut,spark_spark-master_7077)"
}
},
{
"com.linkedin.common.DataPlatformInstance": {
"platform": "urn:li:dataPlatform:spark"
}
},
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,pythonhiveinhiveout.foo5,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,pythonhiveinhiveout.hivetab,PROD)"
]
}
},
{
"com.linkedin.common.BrowsePaths": {
"paths": [
"/spark/spark_spark-master_7077"
]
}
},
{
"com.linkedin.datajob.DataJobInfo": {
"name": "insertInto at NativeMethodAccessorImpl.java:0",
"type": {
"string": "sparkJob"
},
"customProperties": {
"SQLQueryId": "11",
"appName": "PythonHiveInHiveOut",
"description": "insertInto at NativeMethodAccessorImpl.java:0",
"queryPlan": "InsertIntoHiveTable `pythonhiveinhiveout`.`hivetab`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [a, b, c, d]\n+- HiveTableRelation `pythonhiveinhiveout`.`foo5`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#90, b#91, c#92, d#93]\n"
}
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
import json
import time
# import urllib
# from typing import Any, Dict, Optional, cast

import pytest
import requests
import os
from jsoncomparison import Compare, NO_DIFF

# from datahub.ingestion.run.pipeline import Pipeline
# from datahub.ingestion.source.sql.mysql import MySQLConfig, MySQLSource
# from datahub.ingestion.source.sql.sql_common import BaseSQLAlchemyCheckpointState
# from datahub.ingestion.source.state.checkpoint import Checkpoint
# from tests.utils import ingest_file_via_rest

GMS_ENDPOINT = "http://localhost:8080"
GOLDEN_FILES_PATH = "./spark-smoke-test/golden_json/"
golden_files = os.listdir(GOLDEN_FILES_PATH)
Expand All @@ -23,7 +14,6 @@
restli_default_headers = {
"X-RestLi-Protocol-Version": "2.0.0",
}
kafka_post_ingestion_wait_sec = 60

JSONDIFF_CONFIG = {
'output': {
Expand Down
1 change: 1 addition & 0 deletions metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies {
testCompile externalDependency.testContainersCassandra
testCompile externalDependency.lombok
testCompile project(':test-models')
testImplementation externalDependency.springBootTest

testAnnotationProcessor externalDependency.lombok

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public interface GraphService {
*/
void addEdge(final Edge edge);

/**
* Remove an edge from the graph.
* @param edge the edge to delete
*/
void removeEdge(final Edge edge);

/**
* Find related entities (nodes) connected to a source entity via edges of given relationship types. Related entities
* can be filtered by source and destination type (use `null` for any type), by source and destination entity filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,11 @@ protected static String getQueryForRelatedEntities(@Nullable List<String> source
relationships);
}

@Override
public void removeEdge(final Edge edge) {
throw new UnsupportedOperationException("Remove edge not supported by DgraphGraphService at this time.");
}

@Nonnull
@Override
public RelatedEntitiesResult findRelatedEntities(@Nullable List<String> sourceTypes,
Expand Down
Loading