Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): types for dbt #2716

Merged
merged 16 commits into from
Jun 22, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ type Props = {

export default function TypeIcon({ type, nativeDataType }: Props) {
const { icon: Icon, size, text } = DATA_TYPE_ICON_MAP[type];

// if unable to match type to DataHub, display native type info by default
const nativeFallback = type === SchemaFieldDataType.Null;

// eslint-disable-next-line react/prop-types
const NativeDataTypeTooltip = ({ children }) =>
nativeDataType ? (
Expand All @@ -92,7 +96,7 @@ export default function TypeIcon({ type, nativeDataType }: Props) {
<TypeIconContainer data-testid={`icon-${type}`}>
{Icon && <Icon style={{ fontSize: size }} />}
<TypeSubtitle type="secondary" hasicon={Icon ? 'yes' : undefined}>
{text}
{nativeFallback ? nativeDataType : text}
</TypeSubtitle>
</TypeIconContainer>
</NativeDataTypeTooltip>
Expand Down
10 changes: 5 additions & 5 deletions metadata-ingestion/examples/recipes/dbt_to_datahub.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
source:
type: "dbt"
config:
manifest_path: "./your/dbt/manifest.json"
catalog_path: "./your/dbt/catalog.json"
target_platform: "[target data platform]"
load_schemas: True # or false
manifest_path: "./tests/integration/dbt/dbt_manifest.json"
catalog_path: "./tests/integration/dbt/dbt_catalog.json"
target_platform: "dbt"
load_schemas: True # or False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the example recipe be modified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not totally necessary, but just made testing things out a bit easier. I've simplified the paths so people won't get too confused though.

sink:
type: "datahub-rest"
config:
server: 'http://localhost:8080'
server: "http://localhost:8080"
11 changes: 6 additions & 5 deletions metadata-ingestion/scripts/update_golden_files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ pytest --basetemp=tmp || true
# Update the golden files.
cp tmp/test_serde_to_json_tests_unit_0/output.json tests/unit/serde/test_serde_large.json
cp tmp/test_serde_to_json_tests_unit_1/output.json tests/unit/serde/test_serde_chart_snapshot.json
cp tmp/test_ldap_ingest0/ldap_mces.json tests/integration/ldap/ldap_mce_golden.json
cp tmp/test_mysql_ingest0/mysql_mces.json tests/integration/mysql/mysql_mce_golden.json
cp tmp/test_mssql_ingest0/mssql_mces.json tests/integration/sql_server/mssql_mce_golden.json
cp tmp/test_mongodb_ingest0/mongodb_mces.json tests/integration/mongodb/mongodb_mce_golden.json
cp tmp/test_feast_ingest0/feast_mces.json tests/integration/feast/feast_mce_golden.json
cp tmp/test_ldap_ingest0/ldap_mces.json tests/integration/ldap/ldap_mces_golden.json
cp tmp/test_mysql_ingest0/mysql_mces.json tests/integration/mysql/mysql_mces_golden.json
cp tmp/test_mssql_ingest0/mssql_mces.json tests/integration/sql_server/mssql_mces_golden.json
cp tmp/test_mongodb_ingest0/mongodb_mces.json tests/integration/mongodb/mongodb_mces_golden.json
cp tmp/test_feast_ingest0/feast_mces.json tests/integration/feast/feast_mces_golden.json
cp tmp/test_dbt_ingest0/dbt_mces.json tests/integration/dbt/dbt_mces_golden.json
cp tmp/test_lookml_ingest0/lookml_mces.json tests/integration/lookml/expected_output.json
cp tmp/test_looker_ingest0/looker_mces.json tests/integration/looker/expected_output.json

Expand Down
57 changes: 28 additions & 29 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import json
import logging
import re
import time
from typing import Any, Dict, Iterable, List

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.source.dbt_types import (
POSTGRES_TYPES_MAP,
SNOWFLAKE_TYPES_MAP,
resolve_postgres_modified_type,
)
from datahub.ingestion.source.metadata_common import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
Expand All @@ -27,6 +31,7 @@
SchemaFieldDataType,
SchemaMetadata,
StringTypeClass,
TimeTypeClass,
)
from datahub.metadata.schema_classes import DatasetPropertiesClass

Expand Down Expand Up @@ -96,8 +101,7 @@ def extract_dbt_entities(
node_type_pattern: AllowDenyPattern,
) -> List[DBTNode]:
dbt_entities = []
for key in nodes:
node = nodes[key]
for key, node in nodes.items():
dbtNode = DBTNode()

# check if node pattern allowed based on config file
Expand Down Expand Up @@ -174,7 +178,7 @@ def loadManifestAndCatalog(

all_catalog_entities = {**catalog_nodes, **catalog_sources}

nodes = extract_dbt_entities(
return extract_dbt_entities(
all_manifest_entities,
all_catalog_entities,
load_catalog,
Expand All @@ -183,8 +187,6 @@ def loadManifestAndCatalog(
node_type_pattern,
)

return nodes


def get_urn_from_dbtNode(
database: str, schema: str, name: str, target_platform: str, env: str
Expand All @@ -195,11 +197,11 @@ def get_urn_from_dbtNode(


def get_custom_properties(node: DBTNode) -> Dict[str, str]:
properties = {}
properties["dbt_node_type"] = node.node_type
properties["materialization"] = node.materialization
properties["dbt_file_path"] = node.dbt_file_path
return properties
return {
"dbt_node_type": node.node_type,
"materialization": node.materialization,
"dbt_file_path": node.dbt_file_path,
}


def get_upstreams(
Expand All @@ -216,7 +218,7 @@ def get_upstreams(

dbtNode_upstream.database = all_nodes[upstream]["database"]
dbtNode_upstream.schema = all_nodes[upstream]["schema"]
if "identifier" in all_nodes[upstream] and load_catalog is False:
if "identifier" in all_nodes[upstream] and not load_catalog:
dbtNode_upstream.name = all_nodes[upstream]["identifier"]
else:
dbtNode_upstream.name = all_nodes[upstream]["name"]
Expand Down Expand Up @@ -247,20 +249,22 @@ def get_upstream_lineage(upstream_urns: List[str]) -> UpstreamLineage:
)
ucl.append(uc)

ulc = UpstreamLineage(upstreams=ucl)

return ulc
return UpstreamLineage(upstreams=ucl)


# This is from a fairly narrow data source that is posgres specific, we would expect this to expand over
# time or be replaced with a more thorough mechanism
# See https://github.com/fishtown-analytics/dbt/blob/master/core/dbt/adapters/sql/impl.py
_field_type_mapping = {
"boolean": BooleanTypeClass,
"date": DateTypeClass,
"time": TimeTypeClass,
"numeric": NumberTypeClass,
"text": StringTypeClass,
"timestamp with time zone": DateTypeClass,
"timestamp without time zone": DateTypeClass,
"integer": NumberTypeClass,
"float8": NumberTypeClass,
**POSTGRES_TYPES_MAP,
**SNOWFLAKE_TYPES_MAP,
}


Expand All @@ -270,20 +274,16 @@ def get_column_type(
"""
Maps known DBT types to datahub types
"""
column_type_stripped = ""
TypeClass: Any = _field_type_mapping.get(column_type)

pattern = re.compile(r"[\w ]+") # drop all non alphanumerics
match = pattern.match(column_type)
if match is not None:
column_type_stripped = match.group()
if TypeClass is None:

TypeClass: Any = None
for key in _field_type_mapping.keys():
if key == column_type_stripped:
TypeClass = _field_type_mapping[column_type_stripped]
break
# attempt Postgres modified type
TypeClass = resolve_postgres_modified_type(column_type)

# if still not found, report the warning
if TypeClass is None:

report.report_warning(
dataset_name, f"unable to map type {column_type} to metadata schema"
)
Expand All @@ -309,7 +309,7 @@ def get_schema_metadata(
canonical_schema.append(field)

actor, sys_time = "urn:li:corpuser:dbt_executor", int(time.time()) * 1000
schema_metadata = SchemaMetadata(
return SchemaMetadata(
schemaName=node.dbt_name,
platform=f"urn:li:dataPlatform:{platform}",
version=0,
Expand All @@ -319,7 +319,6 @@ def get_schema_metadata(
lastModified=AuditStamp(time=sys_time, actor=actor),
fields=canonical_schema,
)
return schema_metadata


class DBTSource(Source):
Expand Down
Loading