Skip to content

Commit

Permalink
Merge branch 'master' into mssql-urn-case
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Nov 16, 2022
2 parents 9853353 + 1ffd241 commit 4e4beeb
Show file tree
Hide file tree
Showing 88 changed files with 1,781 additions and 710 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ project.ext.externalDependency = [
'springJdbc': "org.springframework:spring-jdbc:$springVersion",
'springWeb': "org.springframework:spring-web:$springVersion",
'springWebMVC': "org.springframework:spring-webmvc:$springVersion",
'springBootTest': "org.springframework.boot:spring-boot-starter-test:$springBootVersion",
'springBoot': "org.springframework.boot:spring-boot:$springBootVersion",
'springBootAutoconfigure': "org.springframework.boot:spring-boot-autoconfigure:$springBootVersion",
'springBootStarterWeb': "org.springframework.boot:spring-boot-starter-web:$springBootVersion",
Expand Down
39 changes: 2 additions & 37 deletions datahub-web-react/src/graphql/dataset.graphql
Original file line number Diff line number Diff line change
@@ -1,38 +1,3 @@
fragment simplifiedGlossaryTerms on GlossaryTerms {
terms {
term {
urn
name
type
hierarchicalName
properties {
name
description
definition
termSource
customProperties {
key
value
}
}
ownership {
...ownershipFields
}
parentNodes {
count
nodes {
urn
type
properties {
name
}
}
}
}
associatedUrn
}
}

query getDataProfiles($urn: String!, $limit: Int, $startTime: Long, $endTime: Long) {
dataset(urn: $urn) {
urn
Expand Down Expand Up @@ -84,7 +49,7 @@ fragment nonSiblingDatasetFields on Dataset {
...globalTagsFields
}
glossaryTerms {
...simplifiedGlossaryTerms
...glossaryTerms
}
}
}
Expand All @@ -98,7 +63,7 @@ fragment nonSiblingDatasetFields on Dataset {
...globalTagsFields
}
glossaryTerms {
...simplifiedGlossaryTerms
...glossaryTerms
}
subTypes {
typeNames
Expand Down
6 changes: 5 additions & 1 deletion datahub-web-react/src/graphql/fragments.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ fragment parentContainersFields on ParentContainersResult {
fragment parentNodesFields on ParentNodesResult {
count
nodes {
...glossaryNode
urn
type
properties {
name
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions docker/datahub-gms/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
GRAPH_SERVICE_DIFF_MODE_ENABLED=true
GRAPH_SERVICE_IMPL=elasticsearch
JAVA_OPTS=-Xms1g -Xmx1g
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
3 changes: 3 additions & 0 deletions docker/datahub-gms/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ KAFKA_BOOTSTRAP_SERVER=broker:29092
KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
ELASTICSEARCH_HOST=elasticsearch
ELASTICSEARCH_PORT=9200
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
NEO4J_HOST=http://neo4j:7474
NEO4J_URI=bolt://neo4j
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=datahub
JAVA_OPTS=-Xms1g -Xmx1g
ES_BULK_REFRESH_POLICY=WAIT_UNTIL
GRAPH_SERVICE_DIFF_MODE_ENABLED=true
GRAPH_SERVICE_IMPL=neo4j
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
ENTITY_SERVICE_ENABLE_RETENTION=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=elasticsearch
- JAVA_OPTS=-Xms1g -Xmx1g
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
2 changes: 2 additions & 0 deletions docker/quickstart/docker-compose-without-neo4j.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=elasticsearch
- JAVA_OPTS=-Xms1g -Xmx1g
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
Expand Down
2 changes: 2 additions & 0 deletions docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ services:
- KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- NEO4J_HOST=http://neo4j:7474
- NEO4J_URI=bolt://neo4j
- NEO4J_USERNAME=neo4j
- NEO4J_PASSWORD=datahub
- JAVA_OPTS=-Xms1g -Xmx1g
- GRAPH_SERVICE_DIFF_MODE_ENABLED=true
- GRAPH_SERVICE_IMPL=neo4j
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;


/**
Expand All @@ -32,7 +33,7 @@ private static long getNumArrayWildcards(PathSpec pathSpec) {
}

// Extract the value of each field in the field specs from the input record
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(RecordTemplate record, List<T> fieldSpecs) {
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(@Nonnull RecordTemplate record, List<T> fieldSpecs) {
final Map<T, List<Object>> extractedFields = new HashMap<>();
for (T fieldSpec : fieldSpecs) {
Optional<Object> value = RecordUtils.getFieldValue(record, fieldSpec.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ public class MergedEntityRegistry implements EntityRegistry {
private final Map<String, AspectSpec> _aspectNameToSpec;

public MergedEntityRegistry(EntityRegistry baseEntityRegistry) {
entityNameToSpec = baseEntityRegistry.getEntitySpecs() != null ? baseEntityRegistry.getEntitySpecs() : new HashMap<>();
eventNameToSpec = baseEntityRegistry.getEventSpecs() != null ? baseEntityRegistry.getEventSpecs() : new HashMap<>();
// baseEntityRegistry.get*Specs() can return immutable Collections.emptyMap() which fails
// when this class attempts .put* operations on it.
entityNameToSpec = baseEntityRegistry.getEntitySpecs() != null ? new HashMap<>(baseEntityRegistry.getEntitySpecs()) : new HashMap<>();
eventNameToSpec = baseEntityRegistry.getEventSpecs() != null ? new HashMap<>(baseEntityRegistry.getEventSpecs()) : new HashMap<>();
baseEntityRegistry.getAspectTemplateEngine();
_aspectTemplateEngine = baseEntityRegistry.getAspectTemplateEngine();
_aspectNameToSpec = baseEntityRegistry.getAspectSpecs();
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def get_long_description():

rest_common = {
"requests",
"requests_file"
}

kafka_common = {
Expand Down
38 changes: 23 additions & 15 deletions metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pydantic
import requests
from expandvars import expandvars
from requests_file import FileAdapter

from datahub.cli.cli_utils import DATAHUB_ROOT_FOLDER
from datahub.cli.docker_check import (
Expand Down Expand Up @@ -47,16 +48,19 @@

BOOTSTRAP_MCES_FILE = "metadata-ingestion/examples/mce_files/bootstrap_mce.json"

GITHUB_BASE_URL = "https://raw.githubusercontent.com/datahub-project/datahub/master"
DOCKER_COMPOSE_BASE = os.getenv(
"DOCKER_COMPOSE_BASE",
"https://raw.githubusercontent.com/datahub-project/datahub/master",
)

GITHUB_NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{GITHUB_BASE_URL}/{NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_FILE}"
NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{DOCKER_COMPOSE_BASE}/{NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_FILE}"
)
GITHUB_ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{GITHUB_BASE_URL}/{ELASTIC_QUICKSTART_COMPOSE_FILE}"
ELASTIC_QUICKSTART_COMPOSE_URL = (
f"{DOCKER_COMPOSE_BASE}/{ELASTIC_QUICKSTART_COMPOSE_FILE}"
)
GITHUB_M1_QUICKSTART_COMPOSE_URL = f"{GITHUB_BASE_URL}/{M1_QUICKSTART_COMPOSE_FILE}"
GITHUB_BOOTSTRAP_MCES_URL = f"{GITHUB_BASE_URL}/{BOOTSTRAP_MCES_FILE}"
M1_QUICKSTART_COMPOSE_URL = f"{DOCKER_COMPOSE_BASE}/{M1_QUICKSTART_COMPOSE_FILE}"
BOOTSTRAP_MCES_URL = f"{DOCKER_COMPOSE_BASE}/{BOOTSTRAP_MCES_FILE}"


class Architectures(Enum):
Expand Down Expand Up @@ -630,13 +634,17 @@ def quickstart(
fg="red",
)
github_file = (
GITHUB_NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL
NEO4J_AND_ELASTIC_QUICKSTART_COMPOSE_URL
if should_use_neo4j and not is_arch_m1(quickstart_arch)
else GITHUB_ELASTIC_QUICKSTART_COMPOSE_URL
else ELASTIC_QUICKSTART_COMPOSE_URL
if not is_arch_m1(quickstart_arch)
else GITHUB_M1_QUICKSTART_COMPOSE_URL
else M1_QUICKSTART_COMPOSE_URL
)

# also allow local files
request_session = requests.Session()
request_session.mount("file://", FileAdapter())

with open(
default_quickstart_compose_file, "wb"
) if default_quickstart_compose_file else tempfile.NamedTemporaryFile(
Expand All @@ -646,16 +654,16 @@ def quickstart(
quickstart_compose_file.append(path)
click.echo(f"Fetching docker-compose file {github_file} from GitHub")
# Download the quickstart docker-compose file from GitHub.
quickstart_download_response = requests.get(github_file)
quickstart_download_response = request_session.get(github_file)
quickstart_download_response.raise_for_status()
tmp_file.write(quickstart_download_response.content)
logger.debug(f"Copied to {path}")

if standalone_consumers:
consumer_github_file = (
f"{GITHUB_BASE_URL}/{CONSUMERS_QUICKSTART_COMPOSE_FILE}"
f"{DOCKER_COMPOSE_BASE}/{CONSUMERS_QUICKSTART_COMPOSE_FILE}"
if should_use_neo4j
else f"{GITHUB_BASE_URL}/{ELASTIC_CONSUMERS_QUICKSTART_COMPOSE_FILE}"
else f"{DOCKER_COMPOSE_BASE}/{ELASTIC_CONSUMERS_QUICKSTART_COMPOSE_FILE}"
)

default_consumer_compose_file = (
Expand All @@ -672,7 +680,7 @@ def quickstart(
f"Fetching consumer docker-compose file {consumer_github_file} from GitHub"
)
# Download the quickstart docker-compose file from GitHub.
quickstart_download_response = requests.get(consumer_github_file)
quickstart_download_response = request_session.get(consumer_github_file)
quickstart_download_response.raise_for_status()
tmp_file.write(quickstart_download_response.content)
logger.debug(f"Copied to {path}")
Expand Down Expand Up @@ -839,7 +847,7 @@ def ingest_sample_data(path: Optional[str], token: Optional[str]) -> None:
path = str(pathlib.Path(tmp_file.name))

# Download the bootstrap MCE file from GitHub.
mce_json_download_response = requests.get(GITHUB_BOOTSTRAP_MCES_URL)
mce_json_download_response = requests.get(BOOTSTRAP_MCES_URL)
mce_json_download_response.raise_for_status()
tmp_file.write(mce_json_download_response.content)
click.echo(f"Downloaded to {path}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def _map_top_sql_queries(self, top_sql_queries: Dict) -> List[str]:
[
trim_query(format_sql_query(query), budget_per_query)
if self.config.format_sql_queries
else query
else trim_query(query, budget_per_query)
for query in top_sql_queries
]
)
Expand Down
7 changes: 5 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/sql/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,16 @@ def __init__(self, config: SQLServerConfig, ctx: PipelineContext):
@staticmethod
def _add_output_converters(conn: Connection) -> None:
def handle_sql_variant_as_string(value):
return value.decode('utf-16le')
return value.decode("utf-16le")

# see https://stackoverflow.com/questions/45677374/pandas-pyodbc-odbc-sql-type-150-is-not-yet-supported
# and https://stackoverflow.com/questions/11671170/adding-output-converter-to-pyodbc-connection-in-sqlalchemy
try:
conn.connection.add_output_converter(-150, handle_sql_variant_as_string)
except AttributeError as e:
logger.debug(f"Failed to mount output converter for MSSQL data type -150 due to {e}")
logger.debug(
f"Failed to mount output converter for MSSQL data type -150 due to {e}"
)

def _populate_table_descriptions(self, conn: Connection, db_name: str) -> None:
# see https://stackoverflow.com/questions/5953330/how-do-i-map-the-id-in-sys-extended-properties-to-an-object-name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def create_from_checkpoint_aspect(
except Exception as e:
# Failure to load config is probably okay...config structure has changed.
logger.warning(
"Failed to construct checkpoint's config from checkpoint aspect.", e
"Failed to construct checkpoint's config from checkpoint aspect. %s", e
)
else:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.Network;
Expand Down Expand Up @@ -106,7 +105,7 @@ private static String createTopics(Stream<String> bootstraps) {
try {
createAdminClient(bootstrap).createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))).all().get();
return bootstrap;
} catch (TimeoutException | InterruptedException | ExecutionException ex) {
} catch (RuntimeException | InterruptedException | ExecutionException ex) {
return null;
}
}).filter(Objects::nonNull).findFirst().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in2.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in2.csv,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/JavaHdfsIn2HdfsOut2/out.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/JavaHdfsIn2HdfsOut2/out.csv,PROD)"
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,42 +200,6 @@
"jobId": "QueryExecId_11",
"flow": "urn:li:dataFlow:(spark,JavaHiveInHiveOut,spark_spark-master_7077)"
}
},
{
"com.linkedin.common.DataPlatformInstance": {
"platform": "urn:li:dataPlatform:spark"
}
},
{
"com.linkedin.datajob.DataJobInfo": {
"name": "insertInto at HiveInHiveOut.java:44",
"type": {
"string": "sparkJob"
},
"customProperties": {
"SQLQueryId": "11",
"appName": "JavaHiveInHiveOut",
"description": "insertInto at HiveInHiveOut.java:44",
"queryPlan": "InsertIntoHiveTable `javahiveinhiveout`.`hivetab`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [a, b, c, d]\n+- HiveTableRelation `javahiveinhiveout`.`foo5`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [a#90, b#91, c#92, d#93]\n"
}
}
},
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,javahiveinhiveout.foo5,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hive,javahiveinhiveout.hivetab,PROD)"
]
}
},
{
"com.linkedin.common.BrowsePaths": {
"paths": [
"/spark/spark_spark-master_7077"
]
}
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@
{
"com.linkedin.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/in2.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in1.csv,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/in2.csv,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:hdfs,file:/opt/workspace/resources/data/PythonHdfsIn2HdfsOut2/out1.csv,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:file,file:/opt/workspace/resources/data/PythonHdfsIn2HdfsOut2/out1.csv,PROD)"
]
}
},
Expand Down
Loading

0 comments on commit 4e4beeb

Please sign in to comment.