Skip to content

Commit

Permalink
feat(elasticsearch): Updates to elasticsearch configuration, dao, and…
Browse files Browse the repository at this point in the history
… tests
  • Loading branch information
david-leifker committed Nov 3, 2022
1 parent 0ca3383 commit b8ef061
Show file tree
Hide file tree
Showing 91 changed files with 1,818 additions and 772 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ project.ext.externalDependency = [
'springJdbc': "org.springframework:spring-jdbc:$springVersion",
'springWeb': "org.springframework:spring-web:$springVersion",
'springWebMVC': "org.springframework:spring-webmvc:$springVersion",
'springBootTest': "org.springframework.boot:spring-boot-starter-test:$springBootVersion",
'springBoot': "org.springframework.boot:spring-boot:$springBootVersion",
'springBootAutoconfigure': "org.springframework.boot:spring-boot-autoconfigure:$springBootVersion",
'springBootStarterWeb': "org.springframework.boot:spring-boot-starter-web:$springBootVersion",
Expand Down
2 changes: 2 additions & 0 deletions docker/datahub-gms/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
GRAPH_SERVICE_DIFF_MODE_ENABLED=true
GRAPH_SERVICE_IMPL=elasticsearch
JAVA_OPTS=-Xms1g -Xmx1g
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
3 changes: 3 additions & 0 deletions docker/datahub-gms/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
NEO4J_HOST=http://neo4j:7474
NEO4J_URI=bolt://neo4j
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=datahub
JAVA_OPTS=-Xms1g -Xmx1g
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
GRAPH_SERVICE_DIFF_MODE_ENABLED=true
GRAPH_SERVICE_IMPL=neo4j
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
ENTITY_SERVICE_ENABLE_RETENTION=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=elasticsearch
- JAVA_OPTS=-Xms1g -Xmx1g
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
2 changes: 2 additions & 0 deletions docker/quickstart/docker-compose-without-neo4j.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=elasticsearch
- JAVA_OPTS=-Xms1g -Xmx1g
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
2 changes: 2 additions & 0 deletions docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- NEO4J_HOST=http://neo4j:7474
- NEO4J_URI=bolt://neo4j
- NEO4J_USERNAME=neo4j
- NEO4J_PASSWORD=datahub
- JAVA_OPTS=-Xms1g -Xmx1g
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=neo4j
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;


/**
Expand All @@ -32,7 +33,7 @@ private static long getNumArrayWildcards(PathSpec pathSpec) {
}

// Extract the value of each field in the field specs from the input record
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(RecordTemplate record, List<T> fieldSpecs) {
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(@Nonnull RecordTemplate record, List<T> fieldSpecs) {
final Map<T, List<Object>> extractedFields = new HashMap<>();
for (T fieldSpec : fieldSpecs) {
Optional<Object> value = RecordUtils.getFieldValue(record, fieldSpec.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ public class MergedEntityRegistry implements EntityRegistry {
private final Map<String, AspectSpec> _aspectNameToSpec;

public MergedEntityRegistry(EntityRegistry baseEntityRegistry) {
entityNameToSpec = baseEntityRegistry.getEntitySpecs() != null ? baseEntityRegistry.getEntitySpecs() : new HashMap<>();
eventNameToSpec = baseEntityRegistry.getEventSpecs() != null ? baseEntityRegistry.getEventSpecs() : new HashMap<>();
// baseEntityRegistry.get*Specs() can return immutable Collections.emptyMap() which fails
// when this class attempts .put* operations on it.
entityNameToSpec = baseEntityRegistry.getEntitySpecs() != null ? new HashMap<>(baseEntityRegistry.getEntitySpecs()) : new HashMap<>();
eventNameToSpec = baseEntityRegistry.getEventSpecs() != null ? new HashMap<>(baseEntityRegistry.getEventSpecs()) : new HashMap<>();
baseEntityRegistry.getAspectTemplateEngine();
_aspectTemplateEngine = baseEntityRegistry.getAspectTemplateEngine();
_aspectNameToSpec = baseEntityRegistry.getAspectSpecs();
Expand Down
5 changes: 4 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ def get_long_description():
plugins: Dict[str, Set[str]] = {
# Sink plugins.
"datahub-kafka": kafka_common,
"datahub-rest": {"requests"},
"datahub-rest": {
"requests",
"requests_file"
},
# Integrations.
"airflow": {
"apache-airflow >= 2.0.2",
Expand Down
38 changes: 23 additions & 15 deletions metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pydantic
import requests
from expandvars import expandvars
from requests_file import FileAdapter

from datahub.cli.cli_utils import DATAHUB_ROOT_FOLDER
from datahub.cli.docker_check import (
Expand Down Expand Up @@ -47,16 +48,19 @@

BOOTSTRAP_MCES_FILE = "metadata-ingestion/examples/mce_files/bootstrap_mce.json"

GITHUB_BASE_URL = "https://raw.githubusercontent.com/datahub-project/datahub/master"
DOCKER_COMPOSE_BASE = os.getenv(
"DOCKER_COMPOSE_BASE",
"https://raw.githubusercontent.com/datahub-project/datahub/master",
)

GITHUB_NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{GITHUB_BASE_URL}/{NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_FILE}"
NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{DOCKER_COMPOSE_BASE}/{NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_FILE}"
)
GITHUB_ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{GITHUB_BASE_URL}/{ELASTIC_QUICKSTART_COMPOSE_FILE}"
ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{DOCKER_COMPOSE_BASE}/{ELASTIC_QUICKSTART_COMPOSE_FILE}"
)
GITHUB_M1_QUICKSTART_COMPOSE_URL = f"{GITHUB_BASE_URL}/{M1_QUICKSTART_COMPOSE_FILE}"
GITHUB_BOOTSTRAP_MCES_URL = f"{GITHUB_BASE_URL}/{BOOTSTRAP_MCES_FILE}"
M1_QUICKSTART_COMPOSE_URL = f"{DOCKER_COMPOSE_BASE}/{M1_QUICKSTART_COMPOSE_FILE}"
BOOTSTRAP_MCES_URL = f"{DOCKER_COMPOSE_BASE}/{BOOTSTRAP_MCES_FILE}"


class Architectures(Enum):
Expand Down Expand Up @@ -630,13 +634,17 @@ def quickstart(
fg="red",
)
github_file = (
GITHUB_NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL
NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL
if should_use_neo4j and not is_arch_m1(quickstart_arch)
else GITHUB_ELASTIC_QUICKSTART_COMPOSE_URL
else ELASTIC_QUICKSTART_COMPOSE_URL
if not is_arch_m1(quickstart_arch)
else GITHUB_M1_QUICKSTART_COMPOSE_URL
else M1_QUICKSTART_COMPOSE_URL
)

# also allow local files
request_session = requests.Session()
request_session.mount("file://", FileAdapter())

with open(
default_quickstart_compose_file, "wb"
) if default_quickstart_compose_file else tempfile.NamedTemporaryFile(
Expand All @@ -646,16 +654,16 @@ def quickstart(
quickstart_compose_file.append(path)
click.echo(f"Fetching docker-compose file {github_file} from GitHub")
# Download the quickstart docker-compose file from GitHub.
quickstart_download_response = requests.get(github_file)
quickstart_download_response = request_session.get(github_file)
quickstart_download_response.raise_for_status()
tmp_file.write(quickstart_download_response.content)
logger.debug(f"Copied to {path}")

if standalone_consumers:
consumer_github_file = (
f"{GITHUB_BASE_URL}/{CONSUMERS_QUICKSTART_COMPOSE_FILE}"
f"{DOCKER_COMPOSE_BASE}/{CONSUMERS_QUICKSTART_COMPOSE_FILE}"
if should_use_neo4j
else f"{GITHUB_BASE_URL}/{ELASTIC_CONSUMERS_QUICKSTART_COMPOSE_FILE}"
else f"{DOCKER_COMPOSE_BASE}/{ELASTIC_CONSUMERS_QUICKSTART_COMPOSE_FILE}"
)

default_consumer_compose_file = (
Expand All @@ -672,7 +680,7 @@ def quickstart(
f"Fetching consumer docker-compose file {consumer_github_file} from GitHub"
)
# Download the quickstart docker-compose file from GitHub.
quickstart_download_response = requests.get(consumer_github_file)
quickstart_download_response = request_session.get(consumer_github_file)
quickstart_download_response.raise_for_status()
tmp_file.write(quickstart_download_response.content)
logger.debug(f"Copied to {path}")
Expand Down Expand Up @@ -839,7 +847,7 @@ def ingest_sample_data(path: Optional[str], token: Optional[str]) -> None:
path = str(pathlib.Path(tmp_file.name))

# Download the bootstrap MCE file from GitHub.
mce_json_download_response = requests.get(GITHUB_BOOTSTRAP_MCES_URL)
mce_json_download_response = requests.get(BOOTSTRAP_MCES_URL)
mce_json_download_response.raise_for_status()
tmp_file.write(mce_json_download_response.content)
click.echo(f"Downloaded to {path}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def create_from_checkpoint_aspect(
except Exception as e:
# Failure to load config is probably okay...config structure has changed.
logger.warning(
"Failed to construct checkpoint's config from checkpoint aspect.", e
"Failed to construct checkpoint's config from checkpoint aspect. %s", e
)
else:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.Network;
Expand Down Expand Up @@ -106,7 +105,7 @@ private static String createTopics(Stream<String> bootstraps) {
try {
createAdminClient(bootstrap).createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))).all().get();
return bootstrap;
} catch (TimeoutException | InterruptedException | ExecutionException ex) {
} catch (RuntimeException | InterruptedException | ExecutionException ex) {
return null;
}
}).filter(Objects::nonNull).findFirst().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in2.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in2.csv,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/JavaHdfsIn2HdfsOut1/out.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/JavaHdfsIn2HdfsOut1/out.csv,PROD)"
]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in2.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in2.csv,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/JavaHdfsIn2HdfsOut2/out.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/JavaHdfsIn2HdfsOut2/out.csv,PROD)"
]
}
},
Expand Down Expand Up @@ -116,11 +116,11 @@
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in2.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in2.csv,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/JavaHdfsIn2HdfsOut2/out.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/JavaHdfsIn2HdfsOut2/out.csv,PROD)"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in2.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in2.csv,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,JavaHdfsIn2HiveCreateInsertTable.foo4,PROD)"
Expand Down Expand Up @@ -114,8 +114,8 @@
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in2.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in2.csv,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,javahdfsin2hivecreateinserttable.foo4,PROD)"
Expand Down Expand Up @@ -150,42 +150,6 @@
"jobId": "QueryExecId_7",
"flow": "urn:li:dataFlow:(spark,JavaHdfsIn2HiveCreateInsertTable,spark_spark-master_7077)"
}
},
{
"com.linkedin.datajob.DataJobInfo": {
"name": "insertInto at HdfsIn2HiveCreateInsertTable.java:30",
"type": {
"string": "sparkJob"
},
"customProperties": {
"SQLQueryId": "7",
"appName": "JavaHdfsIn2HiveCreateInsertTable",
"description": "insertInto at HdfsIn2HiveCreateInsertTable.java:30"
}
}
},
{
"com.linkedin.common.BrowsePaths": {
"paths": [
"/spark/spark_spark-master_7077"
]
}
},
{
"com.linkedin.common.DataPlatformInstance": {
"platform": "urn:li:dataPlatform:spark"
}
},
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in2.csv,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,javahdfsin2hivecreateinserttable.foo4,PROD)"
]
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in2.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in2.csv,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,JavaHdfsIn2HiveCreateTable.foo3,PROD)"
Expand Down
Loading

0 comments on commit b8ef061

Please sign in to comment.