From 88651b183e9937f0ac6d83d35cdf7802b39e4784 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 26 Oct 2022 09:48:28 -0700 Subject: [PATCH 01/18] move dbt.py into folder --- .../src/datahub/ingestion/source/{ => dbt}/dbt.py | 0 metadata-ingestion/tests/integration/dbt/test_dbt.py | 2 +- metadata-ingestion/tests/unit/test_dbt_source.py | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename metadata-ingestion/src/datahub/ingestion/source/{ => dbt}/dbt.py (100%) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt.py similarity index 100% rename from metadata-ingestion/src/datahub/ingestion/source/dbt.py rename to metadata-ingestion/src/datahub/ingestion/source/dbt/dbt.py diff --git a/metadata-ingestion/tests/integration/dbt/test_dbt.py b/metadata-ingestion/tests/integration/dbt/test_dbt.py index 3ab5d063a60203..fb1661546192ce 100644 --- a/metadata-ingestion/tests/integration/dbt/test_dbt.py +++ b/metadata-ingestion/tests/integration/dbt/test_dbt.py @@ -12,7 +12,7 @@ from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.run.pipeline_config import PipelineConfig, SourceConfig -from datahub.ingestion.source.dbt import ( +from datahub.ingestion.source.dbt.dbt import ( DBTConfig, DBTEntitiesEnabled, DBTSource, diff --git a/metadata-ingestion/tests/unit/test_dbt_source.py b/metadata-ingestion/tests/unit/test_dbt_source.py index e6b3aa4fa0f066..ecf99c10fc6c63 100644 --- a/metadata-ingestion/tests/unit/test_dbt_source.py +++ b/metadata-ingestion/tests/unit/test_dbt_source.py @@ -5,7 +5,7 @@ from datahub.emitter import mce_builder from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.source.dbt import DBTConfig, DBTSource +from datahub.ingestion.source.dbt.dbt import DBTConfig, DBTSource from datahub.metadata.schema_classes import ( OwnerClass, OwnershipSourceClass, From acb037504f16286ba9f6560e8d91c88c32f90c28 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 26 Oct 2022 11:59:09 -0700 Subject: [PATCH 02/18] split dbt into dbt_common + dbt_core --- metadata-ingestion/setup.py | 2 +- .../datahub/ingestion/source/dbt/__init__.py | 0 .../source/dbt/{dbt.py => dbt_common.py} | 329 ++--------------- .../datahub/ingestion/source/dbt/dbt_core.py | 339 ++++++++++++++++++ .../tests/integration/dbt/test_dbt.py | 15 +- .../tests/unit/test_dbt_source.py | 18 +- 6 files changed, 380 insertions(+), 323 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/dbt/__init__.py rename metadata-ingestion/src/datahub/ingestion/source/dbt/{dbt.py => dbt_common.py} (83%) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index ddf07ca46d79c3..034abdd17fdc53 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -491,7 +491,7 @@ def get_long_description(): "clickhouse-usage = datahub.ingestion.source.usage.clickhouse_usage:ClickHouseUsageSource", "delta-lake = datahub.ingestion.source.delta_lake:DeltaLakeSource", "s3 = datahub.ingestion.source.s3:S3Source", - "dbt = datahub.ingestion.source.dbt:DBTSource", + "dbt = datahub.ingestion.source.dbt.dbt_core:DBTCoreSource", "druid = datahub.ingestion.source.sql.druid:DruidSource", "elasticsearch = datahub.ingestion.source.elastic_search:ElasticsearchSource", "feast-legacy = datahub.ingestion.source.feast_legacy:FeastSource", diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py similarity index 83% rename from metadata-ingestion/src/datahub/ingestion/source/dbt/dbt.py rename to metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index af4d3545399e16..b5bdf83089fc7a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -1,6 +1,7 @@ import json import logging import re +from abc import abstractmethod from dataclasses import dataclass, field from datetime import datetime from enum import auto @@ -18,7 +19,6 @@ ) from urllib.parse import urlparse -import dateutil.parser import pydantic import requests from pydantic import BaseModel, root_validator, validator @@ -214,21 +214,7 @@ def can_emit_test_results(self) -> bool: return self.test_results == EmitDirective.YES -class DBTConfig(StatefulIngestionConfigBase): - manifest_path: str = Field( - description="Path to dbt manifest JSON. See https://docs.getdbt.com/reference/artifacts/manifest-json Note this can be a local file or a URI." - ) - catalog_path: str = Field( - description="Path to dbt catalog JSON. See https://docs.getdbt.com/reference/artifacts/catalog-json Note this can be a local file or a URI." - ) - sources_path: Optional[str] = Field( - default=None, - description="Path to dbt sources JSON. See https://docs.getdbt.com/reference/artifacts/sources-json. If not specified, last-modified fields will not be populated. Note this can be a local file or a URI.", - ) - test_results_path: Optional[str] = Field( - default=None, - description="Path to output of dbt test run as run_results file in JSON format. See https://docs.getdbt.com/reference/artifacts/run-results-json. If not specified, test execution results will not be populated in DataHub.", - ) +class DBTCommonConfig(StatefulIngestionConfigBase): env: str = Field( default=mce_builder.DEFAULT_ENV, description="Environment to use in namespace when constructing URNs.", @@ -453,150 +439,6 @@ def get_urn( ) -def get_columns( - catalog_node: dict, manifest_node: dict, tag_prefix: str -) -> List[DBTColumn]: - columns = [] - - catalog_columns = catalog_node["columns"] - manifest_columns = manifest_node.get("columns", {}) - - for key, catalog_column in catalog_columns.items(): - manifest_column = manifest_columns.get(key.lower(), {}) - - meta = manifest_column.get("meta", {}) - - tags = manifest_column.get("tags", []) - tags = [tag_prefix + tag for tag in tags] - - dbtCol = DBTColumn( - name=catalog_column["name"].lower(), - comment=catalog_column.get("comment", ""), - description=manifest_column.get("description", ""), - data_type=catalog_column["type"], - index=catalog_column["index"], - meta=meta, - tags=tags, - ) - columns.append(dbtCol) - return columns - - -def extract_dbt_entities( - all_manifest_entities: Dict[str, Dict[str, Any]], - all_catalog_entities: Dict[str, Dict[str, Any]], - sources_results: List[Dict[str, Any]], - manifest_adapter: str, - use_identifiers: bool, - tag_prefix: str, - report: DBTSourceReport, -) -> List[DBTNode]: - sources_by_id = {x["unique_id"]: x for x in sources_results} - - dbt_entities = [] - for key, manifest_node in all_manifest_entities.items(): - name = manifest_node["name"] - - if "identifier" in manifest_node and use_identifiers: - name = manifest_node["identifier"] - - if ( - manifest_node.get("alias") is not None - and manifest_node.get("resource_type") - != "test" # tests have non-human-friendly aliases, so we don't want to use it for tests - ): - name = manifest_node["alias"] - - # initialize comment to "" for consistency with descriptions - # (since dbt null/undefined descriptions as "") - comment = "" - - if key in all_catalog_entities and all_catalog_entities[key]["metadata"].get( - "comment" - ): - comment = all_catalog_entities[key]["metadata"]["comment"] - - materialization = None - upstream_nodes = [] - - if "materialized" in manifest_node.get("config", {}): - # It's a model - materialization = manifest_node["config"]["materialized"] - upstream_nodes = manifest_node["depends_on"]["nodes"] - - # It's a source - catalog_node = all_catalog_entities.get(key) - catalog_type = None - - if catalog_node is None: - if materialization != "test": - report.report_warning( - key, - f"Entity {key} ({name}) is in manifest but missing from catalog", - ) - else: - catalog_type = all_catalog_entities[key]["metadata"]["type"] - - query_tag_props = manifest_node.get("query_tag", {}) - - meta = manifest_node.get("meta", {}) - - owner = meta.get("owner") - if owner is None: - owner = manifest_node.get("config", {}).get("meta", {}).get("owner") - - tags = manifest_node.get("tags", []) - tags = [tag_prefix + tag for tag in tags] - if not meta: - meta = manifest_node.get("config", {}).get("meta", {}) - - max_loaded_at_str = sources_by_id.get(key, {}).get("max_loaded_at") - max_loaded_at = None - if max_loaded_at_str: - max_loaded_at = dateutil.parser.parse(max_loaded_at_str) - - dbtNode = DBTNode( - dbt_name=key, - dbt_adapter=manifest_adapter, - database=manifest_node["database"], - schema=manifest_node["schema"], - name=name, - alias=manifest_node.get("alias"), - dbt_file_path=manifest_node["original_file_path"], - node_type=manifest_node["resource_type"], - max_loaded_at=max_loaded_at, - comment=comment, - description=manifest_node.get("description", ""), - raw_sql=manifest_node.get("raw_sql"), - upstream_nodes=upstream_nodes, - materialization=materialization, - catalog_type=catalog_type, - meta=meta, - query_tag=query_tag_props, - tags=tags, - owner=owner, - compiled_sql=manifest_node.get("compiled_sql"), - manifest_raw=manifest_node, - ) - - # Load columns from catalog, and override some properties from manifest. - if dbtNode.materialization not in [ - "ephemeral", - "test", - ]: - logger.debug(f"Loading schema info for {dbtNode.dbt_name}") - if catalog_node is not None: - # We already have done the reporting for catalog_node being None above. - dbtNode.columns = get_columns(catalog_node, manifest_node, tag_prefix) - - else: - dbtNode.columns = [] - - dbt_entities.append(dbtNode) - - return dbt_entities - - def get_custom_properties(node: DBTNode) -> Dict[str, str]: # initialize custom properties to node's meta props # (dbt-native node properties) @@ -855,7 +697,7 @@ class DBTTest: @staticmethod def load_test_results( - config: DBTConfig, + config: DBTCommonConfig, test_results_json: Dict[str, Any], test_nodes: List[DBTNode], all_nodes_map: Dict[str, DBTNode], @@ -950,48 +792,15 @@ def load_test_results( @platform_name("dbt") -@config_class(DBTConfig) +@config_class(DBTCommonConfig) @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") @capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") @capability(SourceCapability.USAGE_STATS, "", supported=False) -class DBTSource(StatefulIngestionSourceBase): - """ - This plugin pulls metadata from dbt's artifact files and generates: - - dbt Tables: for nodes in the dbt manifest file that are models materialized as tables - - dbt Views: for nodes in the dbt manifest file that are models materialized as views - - dbt Ephemeral: for nodes in the dbt manifest file that are ephemeral models - - dbt Sources: for nodes that are sources on top of the underlying platform tables - - dbt Seed: for seed entities - - dbt Tests as Assertions: for dbt test entities (starting with version 0.8.38.1) - - Note: - 1. It also generates lineage between the `dbt` nodes (e.g. ephemeral nodes that depend on other dbt sources) as well as lineage between the `dbt` nodes and the underlying (target) platform nodes (e.g. BigQuery Table -> dbt Source, dbt View -> BigQuery View). - 2. We also support automated actions (like add a tag, term or owner) based on properties defined in dbt meta. - - The artifacts used by this source are: - - [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json) - - This file contains model, source, tests and lineage data. - - [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json) - - This file contains schema data. - - dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models - - [dbt sources file](https://docs.getdbt.com/reference/artifacts/sources-json) - - This file contains metadata for sources with freshness checks. - - We transfer dbt's freshness checks to DataHub's last-modified fields. - - Note that this file is optional – if not specified, we'll use time of ingestion instead as a proxy for time last-modified. - - [dbt run_results file](https://docs.getdbt.com/reference/artifacts/run-results-json) - - This file contains metadata from the result of a dbt run, e.g. dbt test - - When provided, we transfer dbt test run results into assertion run events to see a timeline of test runs on the dataset - """ - - @classmethod - def create(cls, config_dict, ctx): - config = DBTConfig.parse_obj(config_dict) - return cls(config, ctx, "dbt") - - def __init__(self, config: DBTConfig, ctx: PipelineContext, platform: str): +class DBTSourceBase(StatefulIngestionSourceBase): + def __init__(self, config: DBTCommonConfig, ctx: PipelineContext, platform: str): super().__init__(config, ctx) - self.config: DBTConfig = config + self.config = config self.platform: str = platform self.report: DBTSourceReport = DBTSourceReport() self.compiled_owner_extraction_pattern: Optional[Any] = None @@ -1015,13 +824,13 @@ def get_last_checkpoint( is_conversion_required: bool = False try: # Best-case that last checkpoint state is DbtCheckpointState - last_checkpoint = super(DBTSource, self).get_last_checkpoint( + last_checkpoint = super(DBTSourceBase, self).get_last_checkpoint( job_id, checkpoint_state_class ) except Exception as e: # Backward compatibility for old dbt ingestion source which was saving dbt-nodes in # BaseSQLAlchemyCheckpointState - last_checkpoint = super(DBTSource, self).get_last_checkpoint( + last_checkpoint = super(DBTSourceBase, self).get_last_checkpoint( job_id, BaseSQLAlchemyCheckpointState # type: ignore ) logger.debug( @@ -1054,67 +863,6 @@ def load_file_as_json(self, uri: str) -> Any: with open(uri, "r") as f: return json.load(f) - def loadManifestAndCatalog( - self, - manifest_path: str, - catalog_path: str, - sources_path: Optional[str], - use_identifiers: bool, - tag_prefix: str, - ) -> Tuple[ - List[DBTNode], - Optional[str], - Optional[str], - Optional[str], - Optional[str], - Optional[str], - ]: - dbt_manifest_json = self.load_file_as_json(manifest_path) - - dbt_catalog_json = self.load_file_as_json(catalog_path) - - if sources_path is not None: - dbt_sources_json = self.load_file_as_json(sources_path) - sources_results = dbt_sources_json["results"] - else: - sources_results = {} - - manifest_schema = dbt_manifest_json["metadata"].get("dbt_schema_version") - manifest_version = dbt_manifest_json["metadata"].get("dbt_version") - manifest_adapter = dbt_manifest_json["metadata"].get("adapter_type") - - catalog_schema = dbt_catalog_json.get("metadata", {}).get("dbt_schema_version") - catalog_version = dbt_catalog_json.get("metadata", {}).get("dbt_version") - - manifest_nodes = dbt_manifest_json["nodes"] - manifest_sources = dbt_manifest_json["sources"] - - all_manifest_entities = {**manifest_nodes, **manifest_sources} - - catalog_nodes = dbt_catalog_json["nodes"] - catalog_sources = dbt_catalog_json["sources"] - - all_catalog_entities = {**catalog_nodes, **catalog_sources} - - nodes = extract_dbt_entities( - all_manifest_entities, - all_catalog_entities, - sources_results, - manifest_adapter, - use_identifiers, - tag_prefix, - self.report, - ) - - return ( - nodes, - manifest_schema, - manifest_version, - manifest_adapter, - catalog_schema, - catalog_version, - ) - def create_test_entity_mcps( self, test_nodes: List[DBTNode], @@ -1269,6 +1017,19 @@ def string_map(input_map: Dict[str, Any]) -> Dict[str, str]: self.report.report_workunit(wu) yield wu + @abstractmethod + def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]: + # return dbt nodes + global custom properties + raise NotImplementedError() + + @abstractmethod + def load_tests( + self, + test_nodes: List[DBTNode], + all_nodes_map: Dict[str, DBTNode], + ) -> Iterable[MetadataWorkUnit]: + raise NotImplementedError() + # create workunits from dbt nodes def get_workunits(self) -> Iterable[MetadataWorkUnit]: if self.config.write_semantics == "PATCH" and not self.ctx.graph: @@ -1277,32 +1038,11 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: "Consider using the datahub-rest sink or provide a datahub_api: configuration on your ingestion recipe." ) - ( - all_nodes, - manifest_schema, - manifest_version, - manifest_adapter, - catalog_schema, - catalog_version, - ) = self.loadManifestAndCatalog( - self.config.manifest_path, - self.config.catalog_path, - self.config.sources_path, - self.config.use_identifiers, - self.config.tag_prefix, - ) + all_nodes, additional_custom_props = self.load_nodes() all_nodes_map = {node.dbt_name: node for node in all_nodes} nodes = self.filter_nodes(all_nodes) - additional_custom_props = { - "manifest_schema": manifest_schema, - "manifest_version": manifest_version, - "manifest_adapter": manifest_adapter, - "catalog_schema": catalog_schema, - "catalog_version": catalog_version, - } - additional_custom_props_filtered = { key: value for key, value in additional_custom_props.items() @@ -1336,13 +1076,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: all_nodes_map, ) - if self.config.test_results_path: - yield from DBTTest.load_test_results( - self.config, - self.load_file_as_json(self.config.test_results_path), - test_nodes, - all_nodes_map, - ) + yield from self.load_tests(test_nodes, all_nodes_map) yield from self.stale_entity_removal_handler.gen_removed_entity_workunits() @@ -1851,20 +1585,5 @@ def get_transformed_terms( def get_report(self): return self.report - def get_platform_instance_id(self) -> str: - """ - DBT project identifier is used as platform instance. - """ - - project_id = ( - self.load_file_as_json(self.config.manifest_path) - .get("metadata", {}) - .get("project_id") - ) - if project_id is None: - raise ValueError("DBT project identifier is not found in manifest") - - return f"{self.platform}_{project_id}" - def close(self): self.prepare_for_commit() diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py new file mode 100644 index 00000000000000..ea4350458a50fa --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -0,0 +1,339 @@ +import logging +from typing import Any, Dict, Iterable, List, Optional, Tuple + +import dateutil.parser +from pydantic import Field + +from datahub.ingestion.api.decorators import ( + SupportStatus, + capability, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.source import SourceCapability +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.dbt.dbt_common import ( + DBTColumn, + DBTCommonConfig, + DBTNode, + DBTSourceBase, + DBTSourceReport, + DBTTest, +) + +logger = logging.getLogger(__name__) + + +class DBTCoreConfig(DBTCommonConfig): + manifest_path: str = Field( + description="Path to dbt manifest JSON. See https://docs.getdbt.com/reference/artifacts/manifest-json Note this can be a local file or a URI." + ) + catalog_path: str = Field( + description="Path to dbt catalog JSON. See https://docs.getdbt.com/reference/artifacts/catalog-json Note this can be a local file or a URI." + ) + sources_path: Optional[str] = Field( + default=None, + description="Path to dbt sources JSON. See https://docs.getdbt.com/reference/artifacts/sources-json. If not specified, last-modified fields will not be populated. Note this can be a local file or a URI.", + ) + test_results_path: Optional[str] = Field( + default=None, + description="Path to output of dbt test run as run_results file in JSON format. See https://docs.getdbt.com/reference/artifacts/run-results-json. If not specified, test execution results will not be populated in DataHub.", + ) + + +def get_columns( + catalog_node: dict, manifest_node: dict, tag_prefix: str +) -> List[DBTColumn]: + columns = [] + + catalog_columns = catalog_node["columns"] + manifest_columns = manifest_node.get("columns", {}) + + for key, catalog_column in catalog_columns.items(): + manifest_column = manifest_columns.get(key.lower(), {}) + + meta = manifest_column.get("meta", {}) + + tags = manifest_column.get("tags", []) + tags = [tag_prefix + tag for tag in tags] + + dbtCol = DBTColumn( + name=catalog_column["name"].lower(), + comment=catalog_column.get("comment", ""), + description=manifest_column.get("description", ""), + data_type=catalog_column["type"], + index=catalog_column["index"], + meta=meta, + tags=tags, + ) + columns.append(dbtCol) + return columns + + +def extract_dbt_entities( + all_manifest_entities: Dict[str, Dict[str, Any]], + all_catalog_entities: Dict[str, Dict[str, Any]], + sources_results: List[Dict[str, Any]], + manifest_adapter: str, + use_identifiers: bool, + tag_prefix: str, + report: DBTSourceReport, +) -> List[DBTNode]: + sources_by_id = {x["unique_id"]: x for x in sources_results} + + dbt_entities = [] + for key, manifest_node in all_manifest_entities.items(): + name = manifest_node["name"] + + if "identifier" in manifest_node and use_identifiers: + name = manifest_node["identifier"] + + if ( + manifest_node.get("alias") is not None + and manifest_node.get("resource_type") + != "test" # tests have non-human-friendly aliases, so we don't want to use it for tests + ): + name = manifest_node["alias"] + + # initialize comment to "" for consistency with descriptions + # (since dbt null/undefined descriptions as "") + comment = "" + + if key in all_catalog_entities and all_catalog_entities[key]["metadata"].get( + "comment" + ): + comment = all_catalog_entities[key]["metadata"]["comment"] + + materialization = None + upstream_nodes = [] + + if "materialized" in manifest_node.get("config", {}): + # It's a model + materialization = manifest_node["config"]["materialized"] + upstream_nodes = manifest_node["depends_on"]["nodes"] + + # It's a source + catalog_node = all_catalog_entities.get(key) + catalog_type = None + + if catalog_node is None: + if materialization != "test": + report.report_warning( + key, + f"Entity {key} ({name}) is in manifest but missing from catalog", + ) + else: + catalog_type = all_catalog_entities[key]["metadata"]["type"] + + query_tag_props = manifest_node.get("query_tag", {}) + + meta = manifest_node.get("meta", {}) + + owner = meta.get("owner") + if owner is None: + owner = manifest_node.get("config", {}).get("meta", {}).get("owner") + + tags = manifest_node.get("tags", []) + tags = [tag_prefix + tag for tag in tags] + if not meta: + meta = manifest_node.get("config", {}).get("meta", {}) + + max_loaded_at_str = sources_by_id.get(key, {}).get("max_loaded_at") + max_loaded_at = None + if max_loaded_at_str: + max_loaded_at = dateutil.parser.parse(max_loaded_at_str) + + dbtNode = DBTNode( + dbt_name=key, + dbt_adapter=manifest_adapter, + database=manifest_node["database"], + schema=manifest_node["schema"], + name=name, + alias=manifest_node.get("alias"), + dbt_file_path=manifest_node["original_file_path"], + node_type=manifest_node["resource_type"], + max_loaded_at=max_loaded_at, + comment=comment, + description=manifest_node.get("description", ""), + raw_sql=manifest_node.get("raw_sql"), + upstream_nodes=upstream_nodes, + materialization=materialization, + catalog_type=catalog_type, + meta=meta, + query_tag=query_tag_props, + tags=tags, + owner=owner, + compiled_sql=manifest_node.get("compiled_sql"), + manifest_raw=manifest_node, + ) + + # Load columns from catalog, and override some properties from manifest. + if dbtNode.materialization not in [ + "ephemeral", + "test", + ]: + logger.debug(f"Loading schema info for {dbtNode.dbt_name}") + if catalog_node is not None: + # We already have done the reporting for catalog_node being None above. + dbtNode.columns = get_columns(catalog_node, manifest_node, tag_prefix) + + else: + dbtNode.columns = [] + + dbt_entities.append(dbtNode) + + return dbt_entities + + +@platform_name("dbt") +@config_class(DBTCommonConfig) +@support_status(SupportStatus.CERTIFIED) +@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") +@capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") +@capability(SourceCapability.USAGE_STATS, "", supported=False) +class DBTCoreSource(DBTSourceBase): + """ + This plugin pulls metadata from dbt's artifact files and generates: + - dbt Tables: for nodes in the dbt manifest file that are models materialized as tables + - dbt Views: for nodes in the dbt manifest file that are models materialized as views + - dbt Ephemeral: for nodes in the dbt manifest file that are ephemeral models + - dbt Sources: for nodes that are sources on top of the underlying platform tables + - dbt Seed: for seed entities + - dbt Tests as Assertions: for dbt test entities (starting with version 0.8.38.1) + + Note: + 1. It also generates lineage between the `dbt` nodes (e.g. ephemeral nodes that depend on other dbt sources) as well as lineage between the `dbt` nodes and the underlying (target) platform nodes (e.g. BigQuery Table -> dbt Source, dbt View -> BigQuery View). + 2. We also support automated actions (like add a tag, term or owner) based on properties defined in dbt meta. + + The artifacts used by this source are: + - [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json) + - This file contains model, source, tests and lineage data. + - [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json) + - This file contains schema data. + - dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models + - [dbt sources file](https://docs.getdbt.com/reference/artifacts/sources-json) + - This file contains metadata for sources with freshness checks. + - We transfer dbt's freshness checks to DataHub's last-modified fields. + - Note that this file is optional – if not specified, we'll use time of ingestion instead as a proxy for time last-modified. + - [dbt run_results file](https://docs.getdbt.com/reference/artifacts/run-results-json) + - This file contains metadata from the result of a dbt run, e.g. dbt test + - When provided, we transfer dbt test run results into assertion run events to see a timeline of test runs on the dataset + """ + + config: DBTCoreConfig + + @classmethod + def create(cls, config_dict, ctx): + config = DBTCoreConfig.parse_obj(config_dict) + return cls(config, ctx, "dbt") + + def loadManifestAndCatalog( + self, + manifest_path: str, + catalog_path: str, + sources_path: Optional[str], + use_identifiers: bool, + tag_prefix: str, + ) -> Tuple[ + List[DBTNode], + Optional[str], + Optional[str], + Optional[str], + Optional[str], + Optional[str], + ]: + dbt_manifest_json = self.load_file_as_json(manifest_path) + + dbt_catalog_json = self.load_file_as_json(catalog_path) + + if sources_path is not None: + dbt_sources_json = self.load_file_as_json(sources_path) + sources_results = dbt_sources_json["results"] + else: + sources_results = {} + + manifest_schema = dbt_manifest_json["metadata"].get("dbt_schema_version") + manifest_version = dbt_manifest_json["metadata"].get("dbt_version") + manifest_adapter = dbt_manifest_json["metadata"].get("adapter_type") + + catalog_schema = dbt_catalog_json.get("metadata", {}).get("dbt_schema_version") + catalog_version = dbt_catalog_json.get("metadata", {}).get("dbt_version") + + manifest_nodes = dbt_manifest_json["nodes"] + manifest_sources = dbt_manifest_json["sources"] + + all_manifest_entities = {**manifest_nodes, **manifest_sources} + + catalog_nodes = dbt_catalog_json["nodes"] + catalog_sources = dbt_catalog_json["sources"] + + all_catalog_entities = {**catalog_nodes, **catalog_sources} + + nodes = extract_dbt_entities( + all_manifest_entities, + all_catalog_entities, + sources_results, + manifest_adapter, + use_identifiers, + tag_prefix, + self.report, + ) + + return ( + nodes, + manifest_schema, + manifest_version, + manifest_adapter, + catalog_schema, + catalog_version, + ) + + def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]: + ( + all_nodes, + manifest_schema, + manifest_version, + manifest_adapter, + catalog_schema, + catalog_version, + ) = self.loadManifestAndCatalog( + self.config.manifest_path, + self.config.catalog_path, + self.config.sources_path, + self.config.use_identifiers, + self.config.tag_prefix, + ) + additional_custom_props = { + "manifest_schema": manifest_schema, + "manifest_version": manifest_version, + "manifest_adapter": manifest_adapter, + "catalog_schema": catalog_schema, + "catalog_version": catalog_version, + } + + return all_nodes, additional_custom_props + + def load_tests( + self, test_nodes: List[DBTNode], all_nodes_map: Dict[str, DBTNode] + ) -> Iterable[MetadataWorkUnit]: + if self.config.test_results_path: + yield from DBTTest.load_test_results( + self.config, + self.load_file_as_json(self.config.test_results_path), + test_nodes, + all_nodes_map, + ) + + def get_platform_instance_id(self) -> str: + """The DBT project identifier is used as platform instance.""" + + project_id = ( + self.load_file_as_json(self.config.manifest_path) + .get("metadata", {}) + .get("project_id") + ) + if project_id is None: + raise ValueError("DBT project identifier is not found in manifest") + + return f"{self.platform}_{project_id}" diff --git a/metadata-ingestion/tests/integration/dbt/test_dbt.py b/metadata-ingestion/tests/integration/dbt/test_dbt.py index fb1661546192ce..dc5d52f76d5e3b 100644 --- a/metadata-ingestion/tests/integration/dbt/test_dbt.py +++ b/metadata-ingestion/tests/integration/dbt/test_dbt.py @@ -12,13 +12,12 @@ from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.run.pipeline_config import PipelineConfig, SourceConfig -from datahub.ingestion.source.dbt.dbt import ( - DBTConfig, +from datahub.ingestion.source.dbt.dbt_common import ( DBTEntitiesEnabled, - DBTSource, EmitDirective, StatefulIngestionSourceBase, ) +from datahub.ingestion.source.dbt.dbt_core import DBTCoreConfig, DBTCoreSource from datahub.ingestion.source.sql.sql_types import ( TRINO_SQL_TYPES_MAP, resolve_trino_modified_type, @@ -240,7 +239,7 @@ def test_dbt_ingest(dbt_test_config, pytestconfig, tmp_path, mock_time, **kwargs def get_current_checkpoint_from_pipeline( pipeline: Pipeline, ) -> Optional[Checkpoint]: - dbt_source = cast(DBTSource, pipeline.source) + dbt_source = cast(DBTCoreSource, pipeline.source) return dbt_source.get_current_checkpoint( dbt_source.stale_entity_removal_handler.job_id ) @@ -456,7 +455,7 @@ def get_fake_base_sql_alchemy_checkpoint_state( ) as mock_source_base_get_last_checkpoint: mock_checkpoint.return_value = mock_datahub_graph pipeline = Pipeline.create(pipeline_config_dict) - dbt_source = cast(DBTSource, pipeline.source) + dbt_source = cast(DBTCoreSource, pipeline.source) last_checkpoint = dbt_source.get_last_checkpoint( dbt_source.stale_entity_removal_handler.job_id, DbtCheckpointState @@ -484,7 +483,7 @@ def test_dbt_tests(pytestconfig, tmp_path, mock_time, **kwargs): config=PipelineConfig( source=SourceConfig( type="dbt", - config=DBTConfig( + config=DBTCoreConfig( manifest_path=str( (test_resources_dir / "jaffle_shop_manifest.json").resolve() ), @@ -626,7 +625,7 @@ def test_dbt_tests_only_assertions(pytestconfig, tmp_path, mock_time, **kwargs): config=PipelineConfig( source=SourceConfig( type="dbt", - config=DBTConfig( + config=DBTCoreConfig( manifest_path=str( (test_resources_dir / "jaffle_shop_manifest.json").resolve() ), @@ -705,7 +704,7 @@ def test_dbt_only_test_definitions_and_results( config=PipelineConfig( source=SourceConfig( type="dbt", - config=DBTConfig( + config=DBTCoreConfig( manifest_path=str( (test_resources_dir / "jaffle_shop_manifest.json").resolve() ), diff --git a/metadata-ingestion/tests/unit/test_dbt_source.py b/metadata-ingestion/tests/unit/test_dbt_source.py index ecf99c10fc6c63..ca822795c6e007 100644 --- a/metadata-ingestion/tests/unit/test_dbt_source.py +++ b/metadata-ingestion/tests/unit/test_dbt_source.py @@ -5,7 +5,7 @@ from datahub.emitter import mce_builder from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.source.dbt.dbt import DBTConfig, DBTSource +from datahub.ingestion.source.dbt.dbt_core import DBTCoreConfig, DBTCoreSource from datahub.metadata.schema_classes import ( OwnerClass, OwnershipSourceClass, @@ -31,7 +31,7 @@ def create_owners_list_from_urn_list( return owners_list -def create_mocked_dbt_source() -> DBTSource: +def create_mocked_dbt_source() -> DBTCoreSource: ctx = PipelineContext("test-run-id") graph = mock.MagicMock() graph.get_ownership.return_value = mce_builder.make_ownership_aspect_from_urn_list( @@ -46,7 +46,7 @@ def create_mocked_dbt_source() -> DBTSource: ["non_dbt_existing", "dbt:existing"] ) ctx.graph = graph - return DBTSource(DBTConfig(**create_base_dbt_config()), ctx, "dbt") + return DBTCoreSource(DBTCoreConfig(**create_base_dbt_config()), ctx, "dbt") def create_base_dbt_config() -> Dict: @@ -180,7 +180,7 @@ def test_dbt_entity_emission_configuration(): "entities_enabled": {"models": "Only", "seeds": "Only"}, } try: - DBTConfig.parse_obj(config_dict) + DBTCoreConfig.parse_obj(config_dict) except ValidationError as ve: assert len(ve.errors()) == 1 assert ( @@ -194,7 +194,7 @@ def test_dbt_entity_emission_configuration(): "target_platform": "dummy_platform", "entities_enabled": {"models": "Yes", "seeds": "Only"}, } - DBTConfig.parse_obj(config_dict) + DBTCoreConfig.parse_obj(config_dict) def test_dbt_entity_emission_configuration_helpers(): @@ -206,7 +206,7 @@ def test_dbt_entity_emission_configuration_helpers(): "models": "Only", }, } - config = DBTConfig.parse_obj(config_dict) + config = DBTCoreConfig.parse_obj(config_dict) assert config.entities_enabled.can_emit_node_type("model") assert not config.entities_enabled.can_emit_node_type("source") assert not config.entities_enabled.can_emit_node_type("test") @@ -217,7 +217,7 @@ def test_dbt_entity_emission_configuration_helpers(): "catalog_path": "dummy_path", "target_platform": "dummy_platform", } - config = DBTConfig.parse_obj(config_dict) + config = DBTCoreConfig.parse_obj(config_dict) assert config.entities_enabled.can_emit_node_type("model") assert config.entities_enabled.can_emit_node_type("source") assert config.entities_enabled.can_emit_node_type("test") @@ -231,7 +231,7 @@ def test_dbt_entity_emission_configuration_helpers(): "test_results": "Only", }, } - config = DBTConfig.parse_obj(config_dict) + config = DBTCoreConfig.parse_obj(config_dict) assert not config.entities_enabled.can_emit_node_type("model") assert not config.entities_enabled.can_emit_node_type("source") assert not config.entities_enabled.can_emit_node_type("test") @@ -248,7 +248,7 @@ def test_dbt_entity_emission_configuration_helpers(): "sources": "No", }, } - config = DBTConfig.parse_obj(config_dict) + config = DBTCoreConfig.parse_obj(config_dict) assert not config.entities_enabled.can_emit_node_type("model") assert not config.entities_enabled.can_emit_node_type("source") assert config.entities_enabled.can_emit_node_type("test") From 5668213bfec98038a0d3653465bdfc9dd8a555c0 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 26 Oct 2022 21:26:34 -0700 Subject: [PATCH 03/18] begin dbt cloud integration --- metadata-ingestion/setup.py | 1 + metadata-ingestion/src/datahub/entrypoints.py | 6 +- .../datahub/ingestion/source/dbt/dbt_cloud.py | 340 ++++++++++++++++++ .../ingestion/source/dbt/dbt_common.py | 10 +- 4 files changed, 351 insertions(+), 6 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 034abdd17fdc53..f2be7600feca73 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -492,6 +492,7 @@ def get_long_description(): "delta-lake = datahub.ingestion.source.delta_lake:DeltaLakeSource", "s3 = datahub.ingestion.source.s3:S3Source", "dbt = datahub.ingestion.source.dbt.dbt_core:DBTCoreSource", + "dbt-cloud = datahub.ingestion.source.dbt.dbt_cloud:DBTCloudSource", "druid = datahub.ingestion.source.sql.druid:DruidSource", "elasticsearch = datahub.ingestion.source.elastic_search:ElasticsearchSource", "feast-legacy = datahub.ingestion.source.feast_legacy:FeastSource", diff --git a/metadata-ingestion/src/datahub/entrypoints.py b/metadata-ingestion/src/datahub/entrypoints.py index 8e917d3f394f44..2f1850904821a4 100644 --- a/metadata-ingestion/src/datahub/entrypoints.py +++ b/metadata-ingestion/src/datahub/entrypoints.py @@ -206,11 +206,13 @@ def main(**kwargs): def _get_pretty_chained_message(exc: Exception) -> str: - pretty_msg = f"{exc}" + pretty_msg = f"{exc.__class__.__name__} {exc}" tmp_exc = exc.__cause__ indent = "\n\t\t" while tmp_exc: - pretty_msg = f"{pretty_msg} due to {indent}'{tmp_exc}'" + pretty_msg = ( + f"{pretty_msg} due to {indent}{tmp_exc.__class__.__name__}{tmp_exc}" + ) tmp_exc = tmp_exc.__cause__ indent += "\t" return pretty_msg diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py new file mode 100644 index 00000000000000..bfd96b3cd4d998 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -0,0 +1,340 @@ +import logging +from json import JSONDecodeError +from typing import Dict, Iterable, List, Optional, Tuple + +import dateutil.parser +import requests +from pydantic import Field + +from datahub.ingestion.api.decorators import ( + SupportStatus, + capability, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.source import SourceCapability +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.dbt.dbt_common import ( + DBTColumn, + DBTCommonConfig, + DBTNode, + DBTSourceBase, +) + +logger = logging.getLogger(__name__) + +DBT_METADATA_API_ENDPOINT = "https://metadata.cloud.getdbt.com/graphql" + + +class DBTCloudConfig(DBTCommonConfig): + token: str = Field( + description="The API token to use to authenticate with DBT Cloud.", + ) + + # In the URL https://cloud.getdbt.com/next/deploy/107298/projects/175705/jobs/148094, + # the job ID would be 148094. + account_id: int = Field( + description="The DBT Cloud account ID to use.", + ) + project_id: int = Field( + description="The dbt Cloud project ID to use.", + ) + job_id: int = Field( + description="The ID of the job to ingest metadata from.", + ) + run_id: Optional[int] = Field( + description="The ID of the run to ingest metadata from. If not specified, we'll default to the latest run.", + ) + + # TODO account id? + # TODO project id? + + +_DBT_NODE_GRAPHQL_COMMON_FIELDS = """ + runId + accountId + projectId + environmentId + jobId + resourceType + uniqueId + name + description + meta + dbtVersion + tags + database + schema + type + owner + comment + + columns { + name + index + type + comment + description + tags + meta + } + + # TODO: We currently don't support this field. + stats { + id + label + description + include + value + } +""" + +_DBT_NODE_GRAPHQL_MODEL_SEED_FIELDS = """ + alias + error + status + skip + rawSql + rawCode + compiledSql + compiledCode +""" + +_DBT_NODE_GRAPHQL_TEST_FIELDS = """ + tests { + runId + accountId + projectId + environmentId + jobId + resourceType + uniqueId + name + description + meta + dbtVersion + tags + state + columnName + status + error + dependsOn + fail + warn + skip + rawSql + rawCode + compiledSql + compiledCode + } +""" + +_DBT_GRAPHQL_QUERY = f""" +query DatahubMetadataQuery($jobId: Int!, $runId: Int) {{ + models(jobId: $jobId, runId: $runId) {{ + { _DBT_NODE_GRAPHQL_COMMON_FIELDS } + { _DBT_NODE_GRAPHQL_MODEL_SEED_FIELDS } + { _DBT_NODE_GRAPHQL_TEST_FIELDS } + dependsOn + materializedType + }} + + seeds(jobId: $jobId, runId: $runId) {{ + { _DBT_NODE_GRAPHQL_COMMON_FIELDS } + { _DBT_NODE_GRAPHQL_MODEL_SEED_FIELDS } + }} + + sources(jobId: $jobId, runId: $runId) {{ + { _DBT_NODE_GRAPHQL_COMMON_FIELDS } + { _DBT_NODE_GRAPHQL_TEST_FIELDS } + identifier + sourceName + sourceDescription + maxLoadedAt + snapshottedAt + state + freshnessChecked + loader + }} + + # TODO: tests + + # TODO: Currently unsupported dbt node types: + # - metrics + # - snapshots + # - exposures +}} +""" + + +@platform_name("dbt") +@config_class(DBTCommonConfig) +@support_status(SupportStatus.CERTIFIED) +@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") +@capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") +@capability(SourceCapability.USAGE_STATS, "", supported=False) +class DBTCloudSource(DBTSourceBase): + """ + TODO docs + """ + + config: DBTCloudConfig + + @classmethod + def create(cls, config_dict, ctx): + config = DBTCloudConfig.parse_obj(config_dict) + return cls(config, ctx, "dbt") + + def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]: + response = requests.post( + DBT_METADATA_API_ENDPOINT, + json={ + "query": _DBT_GRAPHQL_QUERY, + "variables": { + "jobId": self.config.job_id, + "runId": self.config.run_id, + }, + }, + headers={ + "Authorization": f"Bearer {self.config.token}", + }, + ) + + try: + res = response.json() + if "errors" in res: + raise ValueError( + f'Unable to fetch metadata from dbt Cloud: {res["errors"]}' + ) + data = res["data"] + except JSONDecodeError as e: + response.raise_for_status() + raise e + + raw_nodes = [ + *data["models"], + *data["seeds"], + *data["sources"], + ] + + nodes = [self._parse_into_dbt_node(node) for node in raw_nodes] + + return nodes, {} + + def _parse_into_dbt_node(self, node: Dict) -> DBTNode: + key = node["uniqueId"] + + name = node["name"] + if self.config.use_identifiers and node.get("identifier"): + name = node["identifier"] + if node["resourceType"] != "test" and node.get("alias"): + name = node["alias"] + # TODO check sourceName for alternative source schema + + comment = node.get("comment", "") + description = node["description"] + if node.get("sourceDescription"): + description = node["sourceDescription"] + + if node["resourceType"] == "model": + materialization = node["materializedType"] + upstream_nodes = node["dependsOn"] + else: + materialization = None + upstream_nodes = [] + + catalog_type = node["type"] + + # TODO query_tag_props + + meta = node["meta"] + + # The dbt owner field is set to the db user, and not the meta property. + # owner = node.get("owner") + owner = meta.get("owner") + + tags = node["tags"] + tags = [self.config.tag_prefix + tag for tag in tags] + + if node["resourceType"] in {"model", "seed"}: + status = node["status"] + if status is None and materialization != "ephemeral": + self.report.report_warning( + key, "node is missing a status, metadata will be incomplete" + ) + + # The code fields are new in dbt 1.3, and replace the sql ones. + raw_sql = node["rawCode"] or node["rawSql"] + compiled_sql = node["compiledCode"] or node["compiledSql"] + else: + raw_sql = None + compiled_sql = None + + max_loaded_at = None + if node["resourceType"] == "source": + max_loaded_at_str = node["maxLoadedAt"] + if max_loaded_at_str: + max_loaded_at = dateutil.parser.parse(max_loaded_at_str) + + # For missing data, dbt returns 0001-01-01T00:00:00.000Z. + if max_loaded_at.year <= 1: + max_loaded_at = None + + columns = [] + if "columns" in node: + # columns will be empty for ephemeral models + columns = [ + self._parse_into_dbt_column(column) + for column in sorted(node["columns"], key=lambda c: c["index"]) + ] + + # TODO add project id, env, etc to custom metadata + + return DBTNode( + dbt_name=key, + # TODO get the dbt adapter natively + dbt_adapter=self.config.target_platform, + database=node["database"], + schema=node["schema"], + name=name, + alias=node.get("alias"), + dbt_file_path="TODO", + node_type=node["resourceType"], + max_loaded_at=max_loaded_at, + comment=comment, + description=description, + upstream_nodes=upstream_nodes, + materialization=materialization, + catalog_type=catalog_type, + meta=meta, + query_tag={}, + tags=tags, + owner=owner, + raw_sql=raw_sql, + compiled_sql=compiled_sql, + manifest_raw=node, + columns=columns, + ) + + def _parse_into_dbt_column(self, column: Dict) -> DBTColumn: + return DBTColumn( + name=column["name"], + comment=column.get("comment", ""), + description=column["description"], + index=column["index"], + data_type=column["type"], + meta=column["meta"], + tags=column["tags"], + ) + + def load_tests( + self, test_nodes: List[DBTNode], all_nodes_map: Dict[str, DBTNode] + ) -> Iterable[MetadataWorkUnit]: + # TODO + return [] + + def get_platform_instance_id(self) -> str: + """The DBT project identifier is used as platform instance.""" + + return f"{self.platform}_{self.config.project_id}" diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index b5bdf83089fc7a..fab17319966762 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -919,6 +919,8 @@ def string_map(input_map: Dict[str, Any]) -> Dict[str, str]: f"Failed to find test node {node.dbt_name} in the manifest" ) continue + + # TODO update this raw_node = raw_node_obj.manifest_raw test_metadata = raw_node.get("test_metadata", {}) @@ -1142,11 +1144,11 @@ def create_platform_mces( if self.config.enable_query_tag_mapping and node.query_tag: self.extract_query_tag_aspects(action_processor_tag, meta_aspects, node) - aspects = self._generate_base_aspects( - node, additional_custom_props_filtered, mce_platform, meta_aspects - ) - if mce_platform == DBT_PLATFORM: + aspects = self._generate_base_aspects( + node, additional_custom_props_filtered, mce_platform, meta_aspects + ) + # add upstream lineage upstream_lineage_class = self._create_lineage_aspect_for_dbt_node( node, all_nodes_map From 9d797dbd473a66252813431af1b5f6ba1f84ef30 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 27 Oct 2022 09:48:25 -0700 Subject: [PATCH 04/18] make dbt_file_path optional --- docs/how/updating-datahub.md | 1 + .../src/datahub/ingestion/source/dbt/dbt_cloud.py | 8 +++++++- .../src/datahub/ingestion/source/dbt/dbt_common.py | 4 ++-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index a060ec6ea86d1b..3ced229e855586 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -6,6 +6,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - LookML source will only emit views that are reachable from explores while scanning your git repo. Previous behavior can be achieved by setting `emit_reachable_views_only` to False. - LookML source will always lowercase urns for lineage edges from views to upstream tables. There is no fallback provided to previous behavior because it was inconsistent in application of lower-casing earlier. - dbt config `node_type_pattern` which was previously deprecated has been removed. Use `entities_enabled` instead to control whether to emit metadata for sources, models, seeds, tests, etc. +- The dbt source will always lowercase urns for lineage edges to the underlying data platform. ### Breaking Changes - Java version 11 or greater is required. diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index bfd96b3cd4d998..58c448ad2c44ee 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -186,7 +186,13 @@ def create(cls, config_dict, ctx): config = DBTCloudConfig.parse_obj(config_dict) return cls(config, ctx, "dbt") + # TODO: Add support for test_connection. + def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]: + # TODO: model dbt cloud runs as datahub DataProcesses or DataJobs + # TODO: figure out how to deal with jobs that only run part of the job + # TODO capture model creation failures? + response = requests.post( DBT_METADATA_API_ENDPOINT, json={ @@ -299,7 +305,7 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: schema=node["schema"], name=name, alias=node.get("alias"), - dbt_file_path="TODO", + dbt_file_path=None, # TODO: Get this from the dbt API. node_type=node["resourceType"], max_loaded_at=max_loaded_at, comment=comment, diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index fab17319966762..e1135fcb82b38d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -394,7 +394,7 @@ class DBTNode: dbt_adapter: str dbt_name: str - dbt_file_path: str + dbt_file_path: Optional[str] node_type: str # source, model max_loaded_at: Optional[datetime] @@ -1262,7 +1262,7 @@ def _create_dataset_properties_aspect( tags=node.tags, name=node.name, ) - if self.config.github_info is not None: + if self.config.github_info and node.dbt_file_path: github_file_url = self.config.github_info.get_url_for_file_path( node.dbt_file_path ) From dca747098d657d439b0a9acdc329e9facd231552 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 27 Oct 2022 12:59:47 -0700 Subject: [PATCH 05/18] start working on tests --- .../src/datahub/ingestion/source/dbt/dbt_cloud.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index 58c448ad2c44ee..045057ba8d760b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -297,6 +297,10 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: # TODO add project id, env, etc to custom metadata + if "tests" in node: + tests = node["tests"] + breakpoint() + return DBTNode( dbt_name=key, # TODO get the dbt adapter natively From 06a48f4d135591b7437d77fb0fc76e8be09ded6a Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 28 Oct 2022 14:15:02 -0700 Subject: [PATCH 06/18] refactor aws creds into dbt-core --- .../ingestion/source/dbt/dbt_common.py | 42 --------------- .../datahub/ingestion/source/dbt/dbt_core.py | 52 ++++++++++++++++++- 2 files changed, 51 insertions(+), 43 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index e1135fcb82b38d..36b1ea51e90ae8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -17,10 +17,8 @@ Union, cast, ) -from urllib.parse import urlparse import pydantic -import requests from pydantic import BaseModel, root_validator, validator from pydantic.fields import Field @@ -44,7 +42,6 @@ ) from datahub.ingestion.api.ingestion_job_state_provider import JobId from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig from datahub.ingestion.source.sql.sql_types import ( BIGQUERY_TYPES_MAP, POSTGRES_TYPES_MAP, @@ -278,10 +275,6 @@ class DBTCommonConfig(StatefulIngestionConfigBase): default=None, description='Regex string to extract owner from the dbt node using the `(?P...) syntax` of the [match object](https://docs.python.org/3/library/re.html#match-objects), where the group name must be `owner`. Examples: (1)`r"(?P(.*)): (\\w+) (\\w+)"` will extract `jdoe` as the owner from `"jdoe: John Doe"` (2) `r"@(?P(.*))"` will extract `alice` as the owner from `"@alice"`.', ) - aws_connection: Optional[AwsConnectionConfig] = Field( - default=None, - description="When fetching manifest files from s3, configuration for aws connection details", - ) backcompat_skip_source_on_lineage_edge: bool = Field( False, description="Prior to version 0.8.41, lineage edges to sources were directed to the target platform node rather than the dbt source node. This contradicted the established pattern for other lineage edges to point to upstream dbt nodes. To revert lineage logic to this legacy approach, set this flag to true.", @@ -295,11 +288,6 @@ class DBTCommonConfig(StatefulIngestionConfigBase): default=None, description="DBT Stateful Ingestion Config." ) - @property - def s3_client(self): - assert self.aws_connection - return self.aws_connection.get_s3_client() - @validator("target_platform") def validate_target_platform_value(cls, target_platform: str) -> str: if target_platform.lower() == DBT_PLATFORM: @@ -319,23 +307,6 @@ def validate_write_semantics(cls, write_semantics: str) -> str: ) return write_semantics - @validator("aws_connection") - def aws_connection_needed_if_s3_uris_present( - cls, aws_connection: Optional[AwsConnectionConfig], values: Dict, **kwargs: Any - ) -> Optional[AwsConnectionConfig]: - # first check if there are fields that contain s3 uris - uri_containing_fields = [ - f - for f in ["manifest_path", "catalog_path", "sources_path"] - if (values.get(f) or "").startswith("s3://") - ] - - if uri_containing_fields and not aws_connection: - raise ValueError( - f"Please provide aws_connection configuration, since s3 uris have been provided in fields {uri_containing_fields}" - ) - return aws_connection - @validator("meta_mapping") def meta_mapping_validator( cls, meta_mapping: Dict[str, Any], values: Dict, **kwargs: Any @@ -850,19 +821,6 @@ def get_last_checkpoint( return last_checkpoint - def load_file_as_json(self, uri: str) -> Any: - if re.match("^https?://", uri): - return json.loads(requests.get(uri).text) - elif re.match("^s3://", uri): - u = urlparse(uri) - response = self.config.s3_client.get_object( - Bucket=u.netloc, Key=u.path.lstrip("/") - ) - return json.loads(response["Body"].read().decode("utf-8")) - else: - with open(uri, "r") as f: - return json.load(f) - def create_test_entity_mcps( self, test_nodes: List[DBTNode], diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index ea4350458a50fa..e937cb8a3921ea 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -1,8 +1,12 @@ +import json import logging +import re from typing import Any, Dict, Iterable, List, Optional, Tuple +from urllib.parse import urlparse import dateutil.parser -from pydantic import Field +import requests +from pydantic import Field, validator from datahub.ingestion.api.decorators import ( SupportStatus, @@ -13,6 +17,7 @@ ) from datahub.ingestion.api.source import SourceCapability from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig from datahub.ingestion.source.dbt.dbt_common import ( DBTColumn, DBTCommonConfig, @@ -41,6 +46,38 @@ class DBTCoreConfig(DBTCommonConfig): description="Path to output of dbt test run as run_results file in JSON format. See https://docs.getdbt.com/reference/artifacts/run-results-json. If not specified, test execution results will not be populated in DataHub.", ) + aws_connection: Optional[AwsConnectionConfig] = Field( + default=None, + description="When fetching manifest files from s3, configuration for aws connection details", + ) + + @property + def s3_client(self): + assert self.aws_connection + return self.aws_connection.get_s3_client() + + @validator("aws_connection") + def aws_connection_needed_if_s3_uris_present( + cls, aws_connection: Optional[AwsConnectionConfig], values: Dict, **kwargs: Any + ) -> Optional[AwsConnectionConfig]: + # first check if there are fields that contain s3 uris + uri_containing_fields = [ + f + for f in [ + "manifest_path", + "catalog_path", + "sources_path", + "test_results_path", + ] + if (values.get(f) or "").startswith("s3://") + ] + + if uri_containing_fields and not aws_connection: + raise ValueError( + f"Please provide aws_connection configuration, since s3 uris have been provided in fields {uri_containing_fields}" + ) + return aws_connection + def get_columns( catalog_node: dict, manifest_node: dict, tag_prefix: str @@ -228,6 +265,19 @@ def create(cls, config_dict, ctx): config = DBTCoreConfig.parse_obj(config_dict) return cls(config, ctx, "dbt") + def load_file_as_json(self, uri: str) -> Any: + if re.match("^https?://", uri): + return json.loads(requests.get(uri).text) + elif re.match("^s3://", uri): + u = urlparse(uri) + response = self.config.s3_client.get_object( + Bucket=u.netloc, Key=u.path.lstrip("/") + ) + return json.loads(response["Body"].read().decode("utf-8")) + else: + with open(uri, "r") as f: + return json.load(f) + def loadManifestAndCatalog( self, manifest_path: str, From 65b6bb24d59980ab99d903ad5119e012072c61d9 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 31 Oct 2022 15:31:50 -0700 Subject: [PATCH 07/18] start refactoring dbt core test / assertions --- metadata-ingestion/dbt_cloud.dhub.yml | 15 ++ .../datahub/ingestion/source/dbt/dbt_cloud.py | 80 +++--- .../ingestion/source/dbt/dbt_common.py | 227 +++++++++--------- .../datahub/ingestion/source/dbt/dbt_core.py | 23 +- .../src/datahub/ingestion/source/sql/mssql.py | 2 +- 5 files changed, 179 insertions(+), 168 deletions(-) create mode 100644 metadata-ingestion/dbt_cloud.dhub.yml diff --git a/metadata-ingestion/dbt_cloud.dhub.yml b/metadata-ingestion/dbt_cloud.dhub.yml new file mode 100644 index 00000000000000..4abad608b5c75f --- /dev/null +++ b/metadata-ingestion/dbt_cloud.dhub.yml @@ -0,0 +1,15 @@ +source: + type: dbt-cloud + config: + token: ${DBT_CLOUD_TOKEN} + account_id: 107298 + project_id: 175705 + job_id: 148094 + run_id: 92263293 + + target_platform: postgres + write_semantics: OVERRIDE +# sink: +# type: file +# config: +# filename: /tmp/dbt_cloud.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index 045057ba8d760b..c7879542f58bbc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -51,7 +51,7 @@ class DBTCloudConfig(DBTCommonConfig): # TODO project id? -_DBT_NODE_GRAPHQL_COMMON_FIELDS = """ +_DBT_GRAPHQL_COMMON_FIELDS = """ runId accountId projectId @@ -64,6 +64,9 @@ class DBTCloudConfig(DBTCommonConfig): meta dbtVersion tags +""" + +_DBT_GRAPHQL_NODE_COMMON_FIELDS = """ database schema type @@ -90,7 +93,7 @@ class DBTCloudConfig(DBTCommonConfig): } """ -_DBT_NODE_GRAPHQL_MODEL_SEED_FIELDS = """ +_DBT_GRAPHQL_MODEL_SEED_FIELDS = """ alias error status @@ -101,53 +104,25 @@ class DBTCloudConfig(DBTCommonConfig): compiledCode """ -_DBT_NODE_GRAPHQL_TEST_FIELDS = """ - tests { - runId - accountId - projectId - environmentId - jobId - resourceType - uniqueId - name - description - meta - dbtVersion - tags - state - columnName - status - error - dependsOn - fail - warn - skip - rawSql - rawCode - compiledSql - compiledCode - } -""" - _DBT_GRAPHQL_QUERY = f""" query DatahubMetadataQuery($jobId: Int!, $runId: Int) {{ models(jobId: $jobId, runId: $runId) {{ - { _DBT_NODE_GRAPHQL_COMMON_FIELDS } - { _DBT_NODE_GRAPHQL_MODEL_SEED_FIELDS } - { _DBT_NODE_GRAPHQL_TEST_FIELDS } + { _DBT_GRAPHQL_COMMON_FIELDS } + { _DBT_GRAPHQL_NODE_COMMON_FIELDS } + { _DBT_GRAPHQL_MODEL_SEED_FIELDS } dependsOn materializedType }} seeds(jobId: $jobId, runId: $runId) {{ - { _DBT_NODE_GRAPHQL_COMMON_FIELDS } - { _DBT_NODE_GRAPHQL_MODEL_SEED_FIELDS } + { _DBT_GRAPHQL_COMMON_FIELDS } + { _DBT_GRAPHQL_NODE_COMMON_FIELDS } + { _DBT_GRAPHQL_MODEL_SEED_FIELDS } }} sources(jobId: $jobId, runId: $runId) {{ - { _DBT_NODE_GRAPHQL_COMMON_FIELDS } - { _DBT_NODE_GRAPHQL_TEST_FIELDS } + { _DBT_GRAPHQL_COMMON_FIELDS } + { _DBT_GRAPHQL_NODE_COMMON_FIELDS } identifier sourceName sourceDescription @@ -158,7 +133,21 @@ class DBTCloudConfig(DBTCommonConfig): loader }} - # TODO: tests + tests(jobId: $jobId, runId: $runId) {{ + { _DBT_GRAPHQL_COMMON_FIELDS } + state + columnName + status + error + dependsOn + fail + warn + skip + rawSql + rawCode + compiledSql + compiledCode + }} # TODO: Currently unsupported dbt node types: # - metrics @@ -193,6 +182,7 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]: # TODO: figure out how to deal with jobs that only run part of the job # TODO capture model creation failures? + logger.debug("Sending graphql request to the dbt Cloud metadata API") response = requests.post( DBT_METADATA_API_ENDPOINT, json={ @@ -222,6 +212,7 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]: *data["models"], *data["seeds"], *data["sources"], + *data["tests"], ] nodes = [self._parse_into_dbt_node(node) for node in raw_nodes] @@ -250,9 +241,7 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: materialization = None upstream_nodes = [] - catalog_type = node["type"] - - # TODO query_tag_props + catalog_type = node.get("type") meta = node["meta"] @@ -297,13 +286,12 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: # TODO add project id, env, etc to custom metadata - if "tests" in node: - tests = node["tests"] + if node["resourceType"] == "test": breakpoint() return DBTNode( dbt_name=key, - # TODO get the dbt adapter natively + # TODO: Get the dbt adapter natively. dbt_adapter=self.config.target_platform, database=node["database"], schema=node["schema"], @@ -318,7 +306,7 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: materialization=materialization, catalog_type=catalog_type, meta=meta, - query_tag={}, + query_tag={}, # TODO: Get this from the dbt API. tags=tags, owner=owner, raw_sql=raw_sql, diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 36b1ea51e90ae8..a4325bc97dee1e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -8,6 +8,7 @@ from typing import ( Any, Callable, + ClassVar, Dict, Iterable, List, @@ -373,8 +374,6 @@ class DBTNode: # see https://docs.getdbt.com/reference/artifacts/manifest-json catalog_type: Optional[str] - manifest_raw: Dict - owner: Optional[str] columns: List[DBTColumn] = field(default_factory=list) @@ -386,6 +385,8 @@ class DBTNode: tags: List[str] = field(default_factory=list) compiled_sql: Optional[str] = None + test_info: Optional["DBTTest"] = None # only populated if node_type == 'test' + def get_db_fqn(self) -> str: if self.database: fqn = f"{self.database}.{self.schema}.{self.name}" @@ -591,9 +592,13 @@ class DBTRunMetadata(BaseModel): invocation_id: str +@dataclass class DBTTest: + qualified_test_name: str + column_name: Optional[str] + kw_args: dict - test_name_to_assertion_map = { + TEST_NAME_TO_ASSERTION_MAP: ClassVar[Dict[str, AssertionParams]] = { "not_null": AssertionParams( scope=DatasetAssertionScopeClass.DATASET_COLUMN, operator=AssertionStdOperatorClass.NOT_NULL, @@ -762,6 +767,10 @@ def load_test_results( logger.debug(f"Failed to process test result {result} due to {e}") +def string_map(input_map: Dict[str, Any]) -> Dict[str, str]: + return {k: str(v) for k, v in input_map.items()} + + @platform_name("dbt") @config_class(DBTCommonConfig) @support_status(SupportStatus.CERTIFIED) @@ -827,14 +836,11 @@ def create_test_entity_mcps( custom_props: Dict[str, str], all_nodes_map: Dict[str, DBTNode], ) -> Iterable[MetadataWorkUnit]: - def string_map(input_map: Dict[str, Any]) -> Dict[str, str]: - return {k: str(v) for k, v in input_map.items()} - if not self.config.entities_enabled.can_emit_node_type("test"): return [] for node in test_nodes: - node_datahub_urn = mce_builder.make_assertion_urn( + assertion_urn = mce_builder.make_assertion_urn( mce_builder.datahub_guid( { "platform": DBT_PLATFORM, @@ -845,14 +851,11 @@ def string_map(input_map: Dict[str, Any]) -> Dict[str, str]: ) self.stale_entity_removal_handler.add_entity_to_state( type="assertion", - urn=node_datahub_urn, + urn=assertion_urn, ) wu = MetadataChangeProposalWrapper( - entityType="assertion", - entityUrn=node_datahub_urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="dataPlatformInstance", + entityUrn=assertion_urn, aspect=DataPlatformInstanceClass( platform=mce_builder.make_data_platform_urn(DBT_PLATFORM) ), @@ -871,112 +874,103 @@ def string_map(input_map: Dict[str, Any]) -> Dict[str, str]: legacy_skip_source_lineage=self.config.backcompat_skip_source_on_lineage_edge, ) - raw_node_obj = all_nodes_map.get(node.dbt_name) - if raw_node_obj is None: - logger.warning( - f"Failed to find test node {node.dbt_name} in the manifest" - ) - continue - - # TODO update this - raw_node = raw_node_obj.manifest_raw - - test_metadata = raw_node.get("test_metadata", {}) - kw_args = test_metadata.get("kwargs", {}) for upstream_urn in upstream_urns: - qualified_test_name = ( - (test_metadata.get("namespace") or "") - + "." - + (test_metadata.get("name") or "") - ) - qualified_test_name = ( - qualified_test_name[1:] - if qualified_test_name.startswith(".") - else qualified_test_name - ) - - if qualified_test_name in DBTTest.test_name_to_assertion_map: - assertion_params: AssertionParams = ( - DBTTest.test_name_to_assertion_map[qualified_test_name] - ) - assertion_info = AssertionInfoClass( - type=AssertionTypeClass.DATASET, - customProperties=custom_props, - datasetAssertion=DatasetAssertionInfoClass( - dataset=upstream_urn, - scope=assertion_params.scope, - operator=assertion_params.operator, - fields=[ - mce_builder.make_schema_field_urn( - upstream_urn, kw_args.get("column_name") - ) - ] - if assertion_params.scope - == DatasetAssertionScopeClass.DATASET_COLUMN - else [], - nativeType=node.name, - aggregation=assertion_params.aggregation, - parameters=assertion_params.parameters(kw_args) - if assertion_params.parameters - else None, - logic=assertion_params.logic_fn(kw_args) - if assertion_params.logic_fn - else None, - nativeParameters=string_map(kw_args), - ), - ) - elif kw_args.get("column_name"): - # no match with known test types, column-level test - assertion_info = AssertionInfoClass( - type=AssertionTypeClass.DATASET, - customProperties=custom_props, - datasetAssertion=DatasetAssertionInfoClass( - dataset=upstream_urn, - scope=DatasetAssertionScopeClass.DATASET_COLUMN, - operator=AssertionStdOperatorClass._NATIVE_, - fields=[ - mce_builder.make_schema_field_urn( - upstream_urn, kw_args.get("column_name") - ) - ], - nativeType=node.name, - logic=node.compiled_sql - if node.compiled_sql - else node.raw_sql, - aggregation=AssertionStdAggregationClass._NATIVE_, - nativeParameters=string_map(kw_args), - ), - ) - else: - # no match with known test types, default to row-level test - assertion_info = AssertionInfoClass( - type=AssertionTypeClass.DATASET, - customProperties=custom_props, - datasetAssertion=DatasetAssertionInfoClass( - dataset=upstream_urn, - scope=DatasetAssertionScopeClass.DATASET_ROWS, - operator=AssertionStdOperatorClass._NATIVE_, - logic=node.compiled_sql - if node.compiled_sql - else node.raw_sql, - nativeType=node.name, - aggregation=AssertionStdAggregationClass._NATIVE_, - nativeParameters=string_map(kw_args), - ), - ) - wu = MetadataWorkUnit( - id=f"{node_datahub_urn}-assertioninfo", - mcp=MetadataChangeProposalWrapper( - entityType="assertion", - entityUrn=node_datahub_urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="assertionInfo", - aspect=assertion_info, - ), + wu = self._make_assertion_from_test( + custom_props, + node, + assertion_urn, + upstream_urn, ) self.report.report_workunit(wu) yield wu + def _make_assertion_from_test( + self, + extra_custom_props: Dict[str, str], + node: DBTNode, + assertion_urn: str, + upstream_urn: str, + ) -> MetadataWorkUnit: + assert node.test_info + qualified_test_name = node.test_info.qualified_test_name + column_name = node.test_info.column_name + kw_args = node.test_info.kw_args + + if qualified_test_name in DBTTest.TEST_NAME_TO_ASSERTION_MAP: + assertion_params = DBTTest.TEST_NAME_TO_ASSERTION_MAP[qualified_test_name] + assertion_info = AssertionInfoClass( + type=AssertionTypeClass.DATASET, + customProperties=extra_custom_props, + datasetAssertion=DatasetAssertionInfoClass( + dataset=upstream_urn, + scope=assertion_params.scope, + operator=assertion_params.operator, + fields=[ + mce_builder.make_schema_field_urn(upstream_urn, column_name) + ] + if ( + assertion_params.scope + == DatasetAssertionScopeClass.DATASET_COLUMN + and column_name + ) + else [], + nativeType=node.name, + aggregation=assertion_params.aggregation, + parameters=assertion_params.parameters(kw_args) + if assertion_params.parameters + else None, + logic=assertion_params.logic_fn(kw_args) + if assertion_params.logic_fn + else None, + nativeParameters=string_map(kw_args), + ), + ) + elif column_name: + # no match with known test types, column-level test + assertion_info = AssertionInfoClass( + type=AssertionTypeClass.DATASET, + customProperties=extra_custom_props, + datasetAssertion=DatasetAssertionInfoClass( + dataset=upstream_urn, + scope=DatasetAssertionScopeClass.DATASET_COLUMN, + operator=AssertionStdOperatorClass._NATIVE_, + fields=[ + mce_builder.make_schema_field_urn(upstream_urn, column_name) + ], + nativeType=node.name, + logic=node.compiled_sql if node.compiled_sql else node.raw_sql, + aggregation=AssertionStdAggregationClass._NATIVE_, + nativeParameters=string_map(kw_args), + ), + ) + else: + # no match with known test types, default to row-level test + assertion_info = AssertionInfoClass( + type=AssertionTypeClass.DATASET, + customProperties=extra_custom_props, + datasetAssertion=DatasetAssertionInfoClass( + dataset=upstream_urn, + scope=DatasetAssertionScopeClass.DATASET_ROWS, + operator=AssertionStdOperatorClass._NATIVE_, + logic=node.compiled_sql if node.compiled_sql else node.raw_sql, + nativeType=node.name, + aggregation=AssertionStdAggregationClass._NATIVE_, + nativeParameters=string_map(kw_args), + ), + ) + wu = MetadataWorkUnit( + id=f"{assertion_urn}-assertioninfo", + mcp=MetadataChangeProposalWrapper( + entityType="assertion", + entityUrn=assertion_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName="assertionInfo", + aspect=assertion_info, + ), + ) + + return wu + @abstractmethod def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]: # return dbt nodes + global custom properties @@ -1502,13 +1496,6 @@ def get_transformed_owners_by_source_type( def owner_sort_key(self, owner_class: OwnerClass) -> str: return str(owner_class) - # TODO: Remove. keeping this till PR review - # assert owner_class is not None - # owner = owner_class.owner - # type = str(owner_class.type) - # source_type = "None" if not owner_class.source else str(owner_class.source.type) - # source_url = "None" if not owner_class.source else str(owner_class.source.url) - # return f"{owner}-{type}-{source_type}-{source_url}" # This method attempts to read-modify and return the tags of a dataset. # From the existing tags it will remove the tags that have a prefix tags_prefix_filter and diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index e937cb8a3921ea..44e7a48aaf73f2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -181,6 +181,27 @@ def extract_dbt_entities( if max_loaded_at_str: max_loaded_at = dateutil.parser.parse(max_loaded_at_str) + test_info = None + if manifest_node.get("resource_type") == "test": + test_metadata = manifest_node.get("test_metadata", {}) + kw_args = test_metadata.get("kwargs", {}) + + qualified_test_name = ( + (test_metadata.get("namespace") or "") + + "." + + (test_metadata.get("name") or "") + ) + qualified_test_name = ( + qualified_test_name[1:] + if qualified_test_name.startswith(".") + else qualified_test_name + ) + test_info = DBTTest( + qualified_test_name=qualified_test_name, + column_name=kw_args.get("column_name"), + kw_args=kw_args, + ) + dbtNode = DBTNode( dbt_name=key, dbt_adapter=manifest_adapter, @@ -202,7 +223,7 @@ def extract_dbt_entities( tags=tags, owner=owner, compiled_sql=manifest_node.get("compiled_sql"), - manifest_raw=manifest_node, + test_info=test_info, ) # Load columns from catalog, and override some properties from manifest. diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql.py index c490bb4738b188..3af35cf0932604 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql.py @@ -41,7 +41,7 @@ class SQLServerConfig(BasicSQLAlchemyConfig): ) uri_args: Dict[str, str] = Field( default={}, - desscription="Arguments to URL-encode when connecting. See https://docs.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver15.", + description="Arguments to URL-encode when connecting. See https://docs.microsoft.com/en-us/sql/connect/odbc/dsn-connection-string-attribute?view=sql-server-ver15.", ) database_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), From 59626f61f9c017913555224fbd70cd38ceb05c03 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 31 Oct 2022 15:43:18 -0700 Subject: [PATCH 08/18] more refactoring --- .../datahub/ingestion/source/dbt/dbt_cloud.py | 8 +- .../ingestion/source/dbt/dbt_common.py | 129 +----------------- .../datahub/ingestion/source/dbt/dbt_core.py | 129 +++++++++++++++++- 3 files changed, 135 insertions(+), 131 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index c7879542f58bbc..1186938fab33dd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -286,15 +286,17 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: # TODO add project id, env, etc to custom metadata + test_info = None if node["resourceType"] == "test": breakpoint() + test_info = TODO return DBTNode( dbt_name=key, # TODO: Get the dbt adapter natively. dbt_adapter=self.config.target_platform, - database=node["database"], - schema=node["schema"], + database=node.get("database"), + schema=node.get("schema"), name=name, alias=node.get("alias"), dbt_file_path=None, # TODO: Get this from the dbt API. @@ -311,8 +313,8 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: owner=owner, raw_sql=raw_sql, compiled_sql=compiled_sql, - manifest_raw=node, columns=columns, + test_info=test_info, ) def _parse_into_dbt_column(self, column: Dict) -> DBTColumn: diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index a4325bc97dee1e..2673946e16c3df 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -20,7 +20,7 @@ ) import pydantic -from pydantic import BaseModel, root_validator, validator +from pydantic import root_validator, validator from pydantic.fields import Field from datahub.configuration.common import ( @@ -93,10 +93,6 @@ ) from datahub.metadata.schema_classes import ( AssertionInfoClass, - AssertionResultClass, - AssertionResultTypeClass, - AssertionRunEventClass, - AssertionRunStatusClass, AssertionStdAggregationClass, AssertionStdOperatorClass, AssertionStdParameterClass, @@ -352,12 +348,12 @@ class DBTColumn: class DBTNode: """ The DBTNode is generated by joining data from the manifest and catalog files. - It can contain source and model nodes, and models can have a variety of + It can contain source/model/seed/test nodes, and models can have a variety of materialization types. """ database: Optional[str] - schema: str + schema: Optional[str] name: str # name, identifier alias: Optional[str] # alias if present comment: str @@ -568,30 +564,6 @@ def _get_name_for_relationship_test(kw_args: Dict[str, str]) -> Optional[str]: return f"{source_table}.{column_name} referential integrity to {destination_table}.{dest_field_name}" -class DBTTestStep(BaseModel): - name: Optional[str] = None - started_at: Optional[str] = None - completed_at: Optional[str] = None - - -class DBTTestResult(BaseModel): - class Config: - extra = "allow" - - status: str - timing: List[DBTTestStep] = [] - unique_id: str - failures: Optional[int] = None - message: Optional[str] = None - - -class DBTRunMetadata(BaseModel): - dbt_schema_version: str - dbt_version: str - generated_at: str - invocation_id: str - - @dataclass class DBTTest: qualified_test_name: str @@ -671,101 +643,6 @@ class DBTTest: ), } - @staticmethod - def load_test_results( - config: DBTCommonConfig, - test_results_json: Dict[str, Any], - test_nodes: List[DBTNode], - all_nodes_map: Dict[str, DBTNode], - ) -> Iterable[MetadataWorkUnit]: - if not config.entities_enabled.can_emit_test_results: - logger.debug("Skipping test result emission since it is turned off.") - return [] - - args = test_results_json.get("args", {}) - dbt_metadata = DBTRunMetadata.parse_obj(test_results_json.get("metadata", {})) - test_nodes_map: Dict[str, DBTNode] = {x.dbt_name: x for x in test_nodes} - if "test" in args.get("which", "") or "test" in args.get("rpc_method", ""): - # this was a test run - results = test_results_json.get("results", []) - for result in results: - try: - test_result = DBTTestResult.parse_obj(result) - id = test_result.unique_id - test_node = test_nodes_map.get(id) - assert test_node, f"Failed to find test_node {id} in the catalog" - upstream_urns = get_upstreams( - test_node.upstream_nodes, - all_nodes_map, - config.use_identifiers, - config.target_platform, - config.target_platform_instance, - config.env, - config.platform_instance, - config.backcompat_skip_source_on_lineage_edge, - ) - assertion_urn = mce_builder.make_assertion_urn( - mce_builder.datahub_guid( - { - "platform": DBT_PLATFORM, - "name": test_result.unique_id, - "instance": config.platform_instance, - } - ) - ) - - if test_result.status != "pass": - native_results = {"message": test_result.message or ""} - if test_result.failures: - native_results.update( - {"failures": str(test_result.failures)} - ) - else: - native_results = {} - - stage_timings = {x.name: x.started_at for x in test_result.timing} - # look for execution start time, fall back to compile start time and finally generation time - execution_timestamp = ( - stage_timings.get("execute") - or stage_timings.get("compile") - or dbt_metadata.generated_at - ) - - execution_timestamp_parsed = datetime.strptime( - execution_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" - ) - - for upstream in upstream_urns: - assertionResult = AssertionRunEventClass( - timestampMillis=int( - execution_timestamp_parsed.timestamp() * 1000.0 - ), - assertionUrn=assertion_urn, - asserteeUrn=upstream, - runId=dbt_metadata.invocation_id, - result=AssertionResultClass( - type=AssertionResultTypeClass.SUCCESS - if test_result.status == "pass" - else AssertionResultTypeClass.FAILURE, - nativeResults=native_results, - ), - status=AssertionRunStatusClass.COMPLETE, - ) - - event = MetadataChangeProposalWrapper( - entityType="assertion", - entityUrn=assertion_urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="assertionRunEvent", - aspect=assertionResult, - ) - yield MetadataWorkUnit( - id=f"{assertion_urn}-assertionRunEvent-{upstream}", - mcp=event, - ) - except Exception as e: - logger.debug(f"Failed to process test result {result} due to {e}") - def string_map(input_map: Dict[str, Any]) -> Dict[str, str]: return {k: str(v) for k, v in input_map.items()} diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index 44e7a48aaf73f2..9ff968d3baa6c2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -1,13 +1,16 @@ import json import logging import re +from datetime import datetime from typing import Any, Dict, Iterable, List, Optional, Tuple from urllib.parse import urlparse import dateutil.parser import requests -from pydantic import Field, validator +from pydantic import BaseModel, Field, validator +from datahub.emitter import mce_builder +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.decorators import ( SupportStatus, capability, @@ -19,12 +22,20 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig from datahub.ingestion.source.dbt.dbt_common import ( + DBT_PLATFORM, DBTColumn, DBTCommonConfig, DBTNode, DBTSourceBase, DBTSourceReport, DBTTest, + get_upstreams, +) +from datahub.metadata.schema_classes import ( + AssertionResultClass, + AssertionResultTypeClass, + AssertionRunEventClass, + AssertionRunStatusClass, ) logger = logging.getLogger(__name__) @@ -244,6 +255,120 @@ def extract_dbt_entities( return dbt_entities +class DBTTestStep(BaseModel): + name: Optional[str] = None + started_at: Optional[str] = None + completed_at: Optional[str] = None + + +class DBTTestResult(BaseModel): + class Config: + extra = "allow" + + status: str + timing: List[DBTTestStep] = [] + unique_id: str + failures: Optional[int] = None + message: Optional[str] = None + + +class DBTRunMetadata(BaseModel): + dbt_schema_version: str + dbt_version: str + generated_at: str + invocation_id: str + + +def load_test_results( + config: DBTCommonConfig, + test_results_json: Dict[str, Any], + test_nodes: List[DBTNode], + all_nodes_map: Dict[str, DBTNode], +) -> Iterable[MetadataWorkUnit]: + if not config.entities_enabled.can_emit_test_results: + logger.debug("Skipping test result emission since it is turned off.") + return [] + + args = test_results_json.get("args", {}) + dbt_metadata = DBTRunMetadata.parse_obj(test_results_json.get("metadata", {})) + test_nodes_map: Dict[str, DBTNode] = {x.dbt_name: x for x in test_nodes} + if "test" in args.get("which", "") or "test" in args.get("rpc_method", ""): + # this was a test run + results = test_results_json.get("results", []) + for result in results: + try: + test_result = DBTTestResult.parse_obj(result) + id = test_result.unique_id + test_node = test_nodes_map.get(id) + assert test_node, f"Failed to find test_node {id} in the catalog" + upstream_urns = get_upstreams( + test_node.upstream_nodes, + all_nodes_map, + config.use_identifiers, + config.target_platform, + config.target_platform_instance, + config.env, + config.platform_instance, + config.backcompat_skip_source_on_lineage_edge, + ) + assertion_urn = mce_builder.make_assertion_urn( + mce_builder.datahub_guid( + { + "platform": DBT_PLATFORM, + "name": test_result.unique_id, + "instance": config.platform_instance, + } + ) + ) + + if test_result.status != "pass": + native_results = {"message": test_result.message or ""} + if test_result.failures: + native_results.update({"failures": str(test_result.failures)}) + else: + native_results = {} + + stage_timings = {x.name: x.started_at for x in test_result.timing} + # look for execution start time, fall back to compile start time and finally generation time + execution_timestamp = ( + stage_timings.get("execute") + or stage_timings.get("compile") + or dbt_metadata.generated_at + ) + + execution_timestamp_parsed = datetime.strptime( + execution_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" + ) + + for upstream in upstream_urns: + assertionResult = AssertionRunEventClass( + timestampMillis=int( + execution_timestamp_parsed.timestamp() * 1000.0 + ), + assertionUrn=assertion_urn, + asserteeUrn=upstream, + runId=dbt_metadata.invocation_id, + result=AssertionResultClass( + type=AssertionResultTypeClass.SUCCESS + if test_result.status == "pass" + else AssertionResultTypeClass.FAILURE, + nativeResults=native_results, + ), + status=AssertionRunStatusClass.COMPLETE, + ) + + event = MetadataChangeProposalWrapper( + entityUrn=assertion_urn, + aspect=assertionResult, + ) + yield MetadataWorkUnit( + id=f"{assertion_urn}-assertionRunEvent-{upstream}", + mcp=event, + ) + except Exception as e: + logger.debug(f"Failed to process test result {result} due to {e}") + + @platform_name("dbt") @config_class(DBTCommonConfig) @support_status(SupportStatus.CERTIFIED) @@ -389,7 +514,7 @@ def load_tests( self, test_nodes: List[DBTNode], all_nodes_map: Dict[str, DBTNode] ) -> Iterable[MetadataWorkUnit]: if self.config.test_results_path: - yield from DBTTest.load_test_results( + yield from load_test_results( self.config, self.load_file_as_json(self.config.test_results_path), test_nodes, From 4f7ce646d4b1a2b57cb491137d0b5731923d1ca1 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 31 Oct 2022 16:45:02 -0700 Subject: [PATCH 09/18] move test loading into main node loading routine --- .../datahub/ingestion/source/dbt/dbt_cloud.py | 12 +- .../ingestion/source/dbt/dbt_common.py | 118 +++++++++----- .../datahub/ingestion/source/dbt/dbt_core.py | 151 ++++++------------ 3 files changed, 137 insertions(+), 144 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index 1186938fab33dd..31b7a78865e634 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -1,6 +1,6 @@ import logging from json import JSONDecodeError -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple import dateutil.parser import requests @@ -14,7 +14,6 @@ support_status, ) from datahub.ingestion.api.source import SourceCapability -from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.dbt.dbt_common import ( DBTColumn, DBTCommonConfig, @@ -287,9 +286,11 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: # TODO add project id, env, etc to custom metadata test_info = None + test_result = None if node["resourceType"] == "test": breakpoint() test_info = TODO + test_result = TODO return DBTNode( dbt_name=key, @@ -315,6 +316,7 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: compiled_sql=compiled_sql, columns=columns, test_info=test_info, + test_result=test_result, ) def _parse_into_dbt_column(self, column: Dict) -> DBTColumn: @@ -328,12 +330,6 @@ def _parse_into_dbt_column(self, column: Dict) -> DBTColumn: tags=column["tags"], ) - def load_tests( - self, test_nodes: List[DBTNode], all_nodes_map: Dict[str, DBTNode] - ) -> Iterable[MetadataWorkUnit]: - # TODO - return [] - def get_platform_instance_id(self) -> str: """The DBT project identifier is used as platform instance.""" diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 2673946e16c3df..4e14fdd373c3de 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -93,6 +93,10 @@ ) from datahub.metadata.schema_classes import ( AssertionInfoClass, + AssertionResultClass, + AssertionResultTypeClass, + AssertionRunEventClass, + AssertionRunStatusClass, AssertionStdAggregationClass, AssertionStdOperatorClass, AssertionStdParameterClass, @@ -382,6 +386,7 @@ class DBTNode: compiled_sql: Optional[str] = None test_info: Optional["DBTTest"] = None # only populated if node_type == 'test' + test_result: Optional["DBTTestResult"] = None def get_db_fqn(self) -> str: if self.database: @@ -644,6 +649,16 @@ class DBTTest: } +@dataclass +class DBTTestResult: + invocation_id: str + + status: str + execution_time: datetime + + native_results: Dict[str, str] + + def string_map(input_map: Dict[str, Any]) -> Dict[str, str]: return {k: str(v) for k, v in input_map.items()} @@ -713,9 +728,6 @@ def create_test_entity_mcps( custom_props: Dict[str, str], all_nodes_map: Dict[str, DBTNode], ) -> Iterable[MetadataWorkUnit]: - if not self.config.entities_enabled.can_emit_node_type("test"): - return [] - for node in test_nodes: assertion_urn = mce_builder.make_assertion_urn( mce_builder.datahub_guid( @@ -731,14 +743,15 @@ def create_test_entity_mcps( urn=assertion_urn, ) - wu = MetadataChangeProposalWrapper( - entityUrn=assertion_urn, - aspect=DataPlatformInstanceClass( - platform=mce_builder.make_data_platform_urn(DBT_PLATFORM) - ), - ).as_workunit() - self.report.report_workunit(wu) - yield wu + if self.config.entities_enabled.can_emit_node_type("test"): + wu = MetadataChangeProposalWrapper( + entityUrn=assertion_urn, + aspect=DataPlatformInstanceClass( + platform=mce_builder.make_data_platform_urn(DBT_PLATFORM) + ), + ).as_workunit() + self.report.report_workunit(wu) + yield wu upstream_urns = get_upstreams( upstreams=node.upstream_nodes, @@ -752,14 +765,27 @@ def create_test_entity_mcps( ) for upstream_urn in upstream_urns: - wu = self._make_assertion_from_test( - custom_props, - node, - assertion_urn, - upstream_urn, - ) - self.report.report_workunit(wu) - yield wu + if self.config.entities_enabled.can_emit_node_type("test"): + wu = self._make_assertion_from_test( + custom_props, + node, + assertion_urn, + upstream_urn, + ) + self.report.report_workunit(wu) + yield wu + + if node.test_result: + if self.config.entities_enabled.can_emit_test_results: + wu = self._make_assertion_result_from_test( + node, assertion_urn, upstream_urn + ) + self.report.report_workunit(wu) + yield wu + else: + logger.debug( + f"Skipping test result {node.name} emission since it is turned off." + ) def _make_assertion_from_test( self, @@ -835,17 +861,45 @@ def _make_assertion_from_test( nativeParameters=string_map(kw_args), ), ) - wu = MetadataWorkUnit( - id=f"{assertion_urn}-assertioninfo", - mcp=MetadataChangeProposalWrapper( - entityType="assertion", - entityUrn=assertion_urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="assertionInfo", - aspect=assertion_info, + + wu = MetadataChangeProposalWrapper( + entityUrn=assertion_urn, + aspect=assertion_info, + ).as_workunit() + + return wu + + def _make_assertion_result_from_test( + self, + node: DBTNode, + assertion_urn: str, + upstream_urn: str, + ) -> MetadataWorkUnit: + assert node.test_result + test_result = node.test_result + + assertionResult = AssertionRunEventClass( + timestampMillis=int(test_result.execution_time.timestamp() * 1000.0), + assertionUrn=assertion_urn, + asserteeUrn=upstream_urn, + runId=test_result.invocation_id, + result=AssertionResultClass( + type=AssertionResultTypeClass.SUCCESS + if test_result.status == "pass" + else AssertionResultTypeClass.FAILURE, + nativeResults=test_result.native_results, ), + status=AssertionRunStatusClass.COMPLETE, ) + event = MetadataChangeProposalWrapper( + entityUrn=assertion_urn, + aspect=assertionResult, + ) + wu = MetadataWorkUnit( + id=f"{assertion_urn}-assertionRunEvent-{upstream_urn}", + mcp=event, + ) return wu @abstractmethod @@ -853,14 +907,6 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]: # return dbt nodes + global custom properties raise NotImplementedError() - @abstractmethod - def load_tests( - self, - test_nodes: List[DBTNode], - all_nodes_map: Dict[str, DBTNode], - ) -> Iterable[MetadataWorkUnit]: - raise NotImplementedError() - # create workunits from dbt nodes def get_workunits(self) -> Iterable[MetadataWorkUnit]: if self.config.write_semantics == "PATCH" and not self.ctx.graph: @@ -907,8 +953,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: all_nodes_map, ) - yield from self.load_tests(test_nodes, all_nodes_map) - yield from self.stale_entity_removal_handler.gen_removed_entity_workunits() def filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index 9ff968d3baa6c2..74074893499c95 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -2,15 +2,13 @@ import logging import re from datetime import datetime -from typing import Any, Dict, Iterable, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from urllib.parse import urlparse import dateutil.parser import requests from pydantic import BaseModel, Field, validator -from datahub.emitter import mce_builder -from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.decorators import ( SupportStatus, capability, @@ -19,23 +17,15 @@ support_status, ) from datahub.ingestion.api.source import SourceCapability -from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig from datahub.ingestion.source.dbt.dbt_common import ( - DBT_PLATFORM, DBTColumn, DBTCommonConfig, DBTNode, DBTSourceBase, DBTSourceReport, DBTTest, - get_upstreams, -) -from datahub.metadata.schema_classes import ( - AssertionResultClass, - AssertionResultTypeClass, - AssertionRunEventClass, - AssertionRunStatusClass, + DBTTestResult, ) logger = logging.getLogger(__name__) @@ -255,18 +245,18 @@ def extract_dbt_entities( return dbt_entities -class DBTTestStep(BaseModel): +class DBTRunTiming(BaseModel): name: Optional[str] = None started_at: Optional[str] = None completed_at: Optional[str] = None -class DBTTestResult(BaseModel): +class DBTRunResult(BaseModel): class Config: extra = "allow" status: str - timing: List[DBTTestStep] = [] + timing: List[DBTRunTiming] = [] unique_id: str failures: Optional[int] = None message: Optional[str] = None @@ -282,91 +272,57 @@ class DBTRunMetadata(BaseModel): def load_test_results( config: DBTCommonConfig, test_results_json: Dict[str, Any], - test_nodes: List[DBTNode], - all_nodes_map: Dict[str, DBTNode], -) -> Iterable[MetadataWorkUnit]: - if not config.entities_enabled.can_emit_test_results: - logger.debug("Skipping test result emission since it is turned off.") - return [] - + all_nodes: List[DBTNode], +) -> List[DBTNode]: args = test_results_json.get("args", {}) dbt_metadata = DBTRunMetadata.parse_obj(test_results_json.get("metadata", {})) - test_nodes_map: Dict[str, DBTNode] = {x.dbt_name: x for x in test_nodes} + + test_nodes_map: Dict[str, DBTNode] = { + x.dbt_name: x for x in all_nodes if x.node_type == "test" + } + if "test" in args.get("which", "") or "test" in args.get("rpc_method", ""): # this was a test run results = test_results_json.get("results", []) for result in results: - try: - test_result = DBTTestResult.parse_obj(result) - id = test_result.unique_id - test_node = test_nodes_map.get(id) - assert test_node, f"Failed to find test_node {id} in the catalog" - upstream_urns = get_upstreams( - test_node.upstream_nodes, - all_nodes_map, - config.use_identifiers, - config.target_platform, - config.target_platform_instance, - config.env, - config.platform_instance, - config.backcompat_skip_source_on_lineage_edge, - ) - assertion_urn = mce_builder.make_assertion_urn( - mce_builder.datahub_guid( - { - "platform": DBT_PLATFORM, - "name": test_result.unique_id, - "instance": config.platform_instance, - } - ) - ) + run_result = DBTRunResult.parse_obj(result) + id = run_result.unique_id + test_node = test_nodes_map.get(id) + if not test_node: + logger.debug(f"Failed to find test node {id} in the catalog") + continue + + if run_result.status != "pass": + native_results = {"message": run_result.message or ""} + if run_result.failures: + native_results.update({"failures": str(run_result.failures)}) + else: + native_results = {} + + stage_timings = {x.name: x.started_at for x in run_result.timing} + # look for execution start time, fall back to compile start time and finally generation time + execution_timestamp = ( + stage_timings.get("execute") + or stage_timings.get("compile") + or dbt_metadata.generated_at + ) - if test_result.status != "pass": - native_results = {"message": test_result.message or ""} - if test_result.failures: - native_results.update({"failures": str(test_result.failures)}) - else: - native_results = {} - - stage_timings = {x.name: x.started_at for x in test_result.timing} - # look for execution start time, fall back to compile start time and finally generation time - execution_timestamp = ( - stage_timings.get("execute") - or stage_timings.get("compile") - or dbt_metadata.generated_at - ) + execution_timestamp_parsed = datetime.strptime( + execution_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" + ) - execution_timestamp_parsed = datetime.strptime( - execution_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" - ) + test_result = DBTTestResult( + invocation_id=dbt_metadata.invocation_id, + status=run_result.status, + native_results=native_results, + execution_time=execution_timestamp_parsed, + ) + + assert test_node.test_info is not None + assert test_node.test_result is None + test_node.test_result = test_result - for upstream in upstream_urns: - assertionResult = AssertionRunEventClass( - timestampMillis=int( - execution_timestamp_parsed.timestamp() * 1000.0 - ), - assertionUrn=assertion_urn, - asserteeUrn=upstream, - runId=dbt_metadata.invocation_id, - result=AssertionResultClass( - type=AssertionResultTypeClass.SUCCESS - if test_result.status == "pass" - else AssertionResultTypeClass.FAILURE, - nativeResults=native_results, - ), - status=AssertionRunStatusClass.COMPLETE, - ) - - event = MetadataChangeProposalWrapper( - entityUrn=assertion_urn, - aspect=assertionResult, - ) - yield MetadataWorkUnit( - id=f"{assertion_urn}-assertionRunEvent-{upstream}", - mcp=event, - ) - except Exception as e: - logger.debug(f"Failed to process test result {result} due to {e}") + return all_nodes @platform_name("dbt") @@ -508,19 +464,16 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]: "catalog_version": catalog_version, } - return all_nodes, additional_custom_props - - def load_tests( - self, test_nodes: List[DBTNode], all_nodes_map: Dict[str, DBTNode] - ) -> Iterable[MetadataWorkUnit]: if self.config.test_results_path: - yield from load_test_results( + # This will populate the test_results field on each test node. + all_nodes = load_test_results( self.config, self.load_file_as_json(self.config.test_results_path), - test_nodes, - all_nodes_map, + all_nodes, ) + return all_nodes, additional_custom_props + def get_platform_instance_id(self) -> str: """The DBT project identifier is used as platform instance.""" From 67b25f7a1a938a59636b1f44d5a3fac55cecf5af Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 31 Oct 2022 16:54:45 -0700 Subject: [PATCH 10/18] improve support for dbt build in core --- .../datahub/ingestion/source/dbt/dbt_core.py | 82 ++++++++++--------- 1 file changed, 44 insertions(+), 38 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index 74074893499c95..6724492cfed41e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -274,53 +274,59 @@ def load_test_results( test_results_json: Dict[str, Any], all_nodes: List[DBTNode], ) -> List[DBTNode]: - args = test_results_json.get("args", {}) dbt_metadata = DBTRunMetadata.parse_obj(test_results_json.get("metadata", {})) test_nodes_map: Dict[str, DBTNode] = { x.dbt_name: x for x in all_nodes if x.node_type == "test" } - if "test" in args.get("which", "") or "test" in args.get("rpc_method", ""): - # this was a test run - results = test_results_json.get("results", []) - for result in results: - run_result = DBTRunResult.parse_obj(result) - id = run_result.unique_id - test_node = test_nodes_map.get(id) - if not test_node: - logger.debug(f"Failed to find test node {id} in the catalog") - continue - - if run_result.status != "pass": - native_results = {"message": run_result.message or ""} - if run_result.failures: - native_results.update({"failures": str(run_result.failures)}) - else: - native_results = {} - - stage_timings = {x.name: x.started_at for x in run_result.timing} - # look for execution start time, fall back to compile start time and finally generation time - execution_timestamp = ( - stage_timings.get("execute") - or stage_timings.get("compile") - or dbt_metadata.generated_at - ) + results = test_results_json.get("results", []) + for result in results: + run_result = DBTRunResult.parse_obj(result) + id = run_result.unique_id - execution_timestamp_parsed = datetime.strptime( - execution_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" - ) + if not id.startswith("test."): + continue - test_result = DBTTestResult( - invocation_id=dbt_metadata.invocation_id, - status=run_result.status, - native_results=native_results, - execution_time=execution_timestamp_parsed, - ) + test_node = test_nodes_map.get(id) + if not test_node: + logger.debug(f"Failed to find test node {id} in the catalog") + continue + + if run_result.status == "success": + # This was probably a docs generate run result, so this isn't actually + # a test result. + continue + + if run_result.status != "pass": + native_results = {"message": run_result.message or ""} + if run_result.failures: + native_results.update({"failures": str(run_result.failures)}) + else: + native_results = {} + + stage_timings = {x.name: x.started_at for x in run_result.timing} + # look for execution start time, fall back to compile start time and finally generation time + execution_timestamp = ( + stage_timings.get("execute") + or stage_timings.get("compile") + or dbt_metadata.generated_at + ) + + execution_timestamp_parsed = datetime.strptime( + execution_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" + ) + + test_result = DBTTestResult( + invocation_id=dbt_metadata.invocation_id, + status=run_result.status, + native_results=native_results, + execution_time=execution_timestamp_parsed, + ) - assert test_node.test_info is not None - assert test_node.test_result is None - test_node.test_result = test_result + assert test_node.test_info is not None + assert test_node.test_result is None + test_node.test_result = test_result return all_nodes From b6f6a41ac9d0c21665136168320791e03e5c7814 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 31 Oct 2022 17:12:48 -0700 Subject: [PATCH 11/18] support dbt cloud tests --- .../datahub/ingestion/source/dbt/dbt_cloud.py | 49 +++++++++++++++++-- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index 31b7a78865e634..74fe891d809c05 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -1,4 +1,5 @@ import logging +from datetime import datetime from json import JSONDecodeError from typing import Dict, List, Optional, Tuple @@ -19,6 +20,8 @@ DBTCommonConfig, DBTNode, DBTSourceBase, + DBTTest, + DBTTestResult, ) logger = logging.getLogger(__name__) @@ -288,9 +291,49 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: test_info = None test_result = None if node["resourceType"] == "test": - breakpoint() - test_info = TODO - test_result = TODO + qualified_test_name = name + + # The qualified test name should be the test name from the dbt project. + # It can be simple (e.g. 'unique') or prefixed (e.g. 'dbt_expectations.expect_column_values_to_not_be_null'). + # We attempt to guess the test name based on the macros used. + for dependency in node["dependsOn"]: + # An example dependsOn list could be: + # ['model.sample_dbt.monthly_billing_with_cust', 'macro.dbt.test_not_null', 'macro.dbt.get_where_subquery'] + # In that case, the test should be `not_null`. + + if dependency.startswith("macro."): + _, macro = dependency.split(".", 1) + if macro.startswith("dbt."): + if not macro.startswith("dbt.test_"): + continue + macro = macro[len("dbt.test_") :] + + qualified_test_name = macro + break + + test_info = DBTTest( + qualified_test_name=qualified_test_name, + column_name=node["columnName"], + kw_args={}, # TODO: dbt Cloud doesn't expose the args. + ) + if not node["skip"]: + test_result = DBTTestResult( + invocation_id=f"job{node['jobId']}-run{node['runId']}", + execution_time=datetime.now(), # TODO: dbt Cloud doesn't expose this. + status=node["status"], + native_results={ + key: node[key] + for key in { + "columnName", + "error", + "fail", + "warn", + "skip", + "state", + "status", + } + }, + ) return DBTNode( dbt_name=key, From 538941a6fdea6ae704a0ad2e9731a75ec9ebdc9e Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 31 Oct 2022 20:29:40 -0700 Subject: [PATCH 12/18] switch config --- metadata-ingestion/dbt_cloud.dhub.yml | 15 --------------- .../src/datahub/ingestion/source/dbt/dbt_cloud.py | 4 ++-- .../src/datahub/ingestion/source/dbt/dbt_core.py | 2 +- 3 files changed, 3 insertions(+), 18 deletions(-) delete mode 100644 metadata-ingestion/dbt_cloud.dhub.yml diff --git a/metadata-ingestion/dbt_cloud.dhub.yml b/metadata-ingestion/dbt_cloud.dhub.yml deleted file mode 100644 index 4abad608b5c75f..00000000000000 --- a/metadata-ingestion/dbt_cloud.dhub.yml +++ /dev/null @@ -1,15 +0,0 @@ -source: - type: dbt-cloud - config: - token: ${DBT_CLOUD_TOKEN} - account_id: 107298 - project_id: 175705 - job_id: 148094 - run_id: 92263293 - - target_platform: postgres - write_semantics: OVERRIDE -# sink: -# type: file -# config: -# filename: /tmp/dbt_cloud.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index 74fe891d809c05..bdaa9e831e6f19 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -151,7 +151,7 @@ class DBTCloudConfig(DBTCommonConfig): compiledCode }} - # TODO: Currently unsupported dbt node types: + # Currently unsupported dbt node types: # - metrics # - snapshots # - exposures @@ -160,7 +160,7 @@ class DBTCloudConfig(DBTCommonConfig): @platform_name("dbt") -@config_class(DBTCommonConfig) +@config_class(DBTCloudConfig) @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") @capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index 6724492cfed41e..517e1fe4c2e7ef 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -332,7 +332,7 @@ def load_test_results( @platform_name("dbt") -@config_class(DBTCommonConfig) +@config_class(DBTCoreConfig) @support_status(SupportStatus.CERTIFIED) @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") @capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") From 81001c3982540d63d3040cc9b7d75bbd61569fe0 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 31 Oct 2022 22:03:25 -0700 Subject: [PATCH 13/18] cleanup todos --- metadata-ingestion/docs/sources/dbt/README.md | 20 ++++++ .../docs/sources/dbt/dbt-cloud_recipe.yml | 19 ++++++ .../datahub/ingestion/source/dbt/dbt_cloud.py | 63 +++++++++++-------- .../datahub/ingestion/source/dbt/dbt_core.py | 12 ---- 4 files changed, 75 insertions(+), 39 deletions(-) create mode 100644 metadata-ingestion/docs/sources/dbt/README.md create mode 100644 metadata-ingestion/docs/sources/dbt/dbt-cloud_recipe.yml diff --git a/metadata-ingestion/docs/sources/dbt/README.md b/metadata-ingestion/docs/sources/dbt/README.md new file mode 100644 index 00000000000000..b7e5ebba4b048e --- /dev/null +++ b/metadata-ingestion/docs/sources/dbt/README.md @@ -0,0 +1,20 @@ +Ingesting metadata from dbt requires either using the **dbt** module or the **dbt-cloud** module. + +### Concept Mapping + +| Source Concept | DataHub Concept | Notes | +| ------------------------ | ------------------------------------------------------------- | --------------------- | +| `"dbt"` | [Data Platform](../../metamodel/entities/dataPlatform.md) | | +| dbt Source | [Dataset](../../metamodel/entities/dataset.md) | Subtype `source` | +| dbt Seed | [Dataset](../../metamodel/entities/dataset.md) | Subtype `seed` | +| dbt Model - materialized | [Dataset](../../metamodel/entities/dataset.md) | Subtype `table` | +| dbt Model - view | [Dataset](../../metamodel/entities/dataset.md) | Subtype `view` | +| dbt Model - incremental | [Dataset](../../metamodel/entities/dataset.md) | Subtype `incremental` | +| dbt Model - ephemeral | [Dataset](../../metamodel/entities/dataset.md) | Subtype `ephemeral` | +| dbt Test | [Assertion](../../metamodel/entities/assertion.md) | | +| dbt Test Result | [Assertion Run Result](../../metamodel/entities/assertion.md) | | + +Note: + +1. It also generates lineage between the `dbt` nodes (e.g. ephemeral nodes that depend on other dbt sources) as well as lineage between the `dbt` nodes and the underlying (target) platform nodes (e.g. BigQuery Table -> dbt Source, dbt View -> BigQuery View). +2. We also support automated actions (like add a tag, term or owner) based on properties defined in dbt meta. diff --git a/metadata-ingestion/docs/sources/dbt/dbt-cloud_recipe.yml b/metadata-ingestion/docs/sources/dbt/dbt-cloud_recipe.yml new file mode 100644 index 00000000000000..113303cfc1ad40 --- /dev/null +++ b/metadata-ingestion/docs/sources/dbt/dbt-cloud_recipe.yml @@ -0,0 +1,19 @@ +source: + type: "dbt-cloud" + config: + token: ${DBT_CLOUD_TOKEN} + + # In the URL https://cloud.getdbt.com/next/deploy/107298/projects/175705/jobs/148094, + # 107298 is the account_id, 175705 is the project_id, and 148094 is the job_id + + account_id: # set to your dbt cloud account id + project_id: # set to your dbt cloud project id + job_id: # set to your dbt cloud job id + run_id: # set to your dbt cloud run id. This is optional, and defaults to the latest run + + target_platform: postgres + + # Options + target_platform: "my_target_platform_id" # e.g. bigquery/postgres/etc. + +# sink configs diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index bdaa9e831e6f19..282dcc3541db5c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -26,22 +26,26 @@ logger = logging.getLogger(__name__) -DBT_METADATA_API_ENDPOINT = "https://metadata.cloud.getdbt.com/graphql" - class DBTCloudConfig(DBTCommonConfig): + metadata_endpoint: str = Field( + default="https://metadata.cloud.getdbt.com/graphql", + description="The dbt Cloud metadata API endpoint.", + ) token: str = Field( description="The API token to use to authenticate with DBT Cloud.", ) - # In the URL https://cloud.getdbt.com/next/deploy/107298/projects/175705/jobs/148094, - # the job ID would be 148094. + # We don't actually need the account ID or project ID right now, but will + # need it once we start using the dbt Cloud admin API to list jobs/runs within a project. + # As such, I've preemptively added these fields to the config. account_id: int = Field( description="The DBT Cloud account ID to use.", ) project_id: int = Field( description="The dbt Cloud project ID to use.", ) + job_id: int = Field( description="The ID of the job to ingest metadata from.", ) @@ -49,9 +53,6 @@ class DBTCloudConfig(DBTCommonConfig): description="The ID of the run to ingest metadata from. If not specified, we'll default to the latest run.", ) - # TODO account id? - # TODO project id? - _DBT_GRAPHQL_COMMON_FIELDS = """ runId @@ -85,14 +86,14 @@ class DBTCloudConfig(DBTCommonConfig): meta } - # TODO: We currently don't support this field. - stats { - id - label - description - include - value - } + # We don't currently support this field, but should in the future. + #stats { + # id + # label + # description + # include + # value + #} """ _DBT_GRAPHQL_MODEL_SEED_FIELDS = """ @@ -167,7 +168,9 @@ class DBTCloudConfig(DBTCommonConfig): @capability(SourceCapability.USAGE_STATS, "", supported=False) class DBTCloudSource(DBTSourceBase): """ - TODO docs + This source pulls dbt metadata directly from the dbt Cloud APIs. + + You'll need to have a dbt Cloud job set up to run your dbt project, and "Generate docs on run" should be enabled. """ config: DBTCloudConfig @@ -177,16 +180,19 @@ def create(cls, config_dict, ctx): config = DBTCloudConfig.parse_obj(config_dict) return cls(config, ctx, "dbt") - # TODO: Add support for test_connection. - def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]: - # TODO: model dbt cloud runs as datahub DataProcesses or DataJobs - # TODO: figure out how to deal with jobs that only run part of the job - # TODO capture model creation failures? + # TODO: In dbt Cloud, commands are scheduled as part of jobs, where + # each job can have multiple runs. We currently only fully support + # jobs that do a full / mostly full build of the project, and will + # print out warnings if data is missing. In the future, it'd be nice + # to automatically detect all jobs that are part of a project and combine + # their metadata together. + # Additionally, we'd like to model dbt Cloud jobs/runs in DataHub + # as DataProcesses or DataJobs. logger.debug("Sending graphql request to the dbt Cloud metadata API") response = requests.post( - DBT_METADATA_API_ENDPOINT, + self.config.metadata_endpoint, json={ "query": _DBT_GRAPHQL_QUERY, "variables": { @@ -219,7 +225,13 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]: nodes = [self._parse_into_dbt_node(node) for node in raw_nodes] - return nodes, {} + additional_metadata: Dict[str, Optional[str]] = { + "project_id": str(self.config.project_id), + "account_id": str(self.config.account_id), + "job_id": str(self.config.job_id), + } + + return nodes, additional_metadata def _parse_into_dbt_node(self, node: Dict) -> DBTNode: key = node["uniqueId"] @@ -229,7 +241,6 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: name = node["identifier"] if node["resourceType"] != "test" and node.get("alias"): name = node["alias"] - # TODO check sourceName for alternative source schema comment = node.get("comment", "") description = node["description"] @@ -258,7 +269,7 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: status = node["status"] if status is None and materialization != "ephemeral": self.report.report_warning( - key, "node is missing a status, metadata will be incomplete" + key, "node is missing a status, schema metadata will be incomplete" ) # The code fields are new in dbt 1.3, and replace the sql ones. @@ -286,8 +297,6 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: for column in sorted(node["columns"], key=lambda c: c["index"]) ] - # TODO add project id, env, etc to custom metadata - test_info = None test_result = None if node["resourceType"] == "test": diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index 517e1fe4c2e7ef..dbde97d7eadd85 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -339,18 +339,6 @@ def load_test_results( @capability(SourceCapability.USAGE_STATS, "", supported=False) class DBTCoreSource(DBTSourceBase): """ - This plugin pulls metadata from dbt's artifact files and generates: - - dbt Tables: for nodes in the dbt manifest file that are models materialized as tables - - dbt Views: for nodes in the dbt manifest file that are models materialized as views - - dbt Ephemeral: for nodes in the dbt manifest file that are ephemeral models - - dbt Sources: for nodes that are sources on top of the underlying platform tables - - dbt Seed: for seed entities - - dbt Tests as Assertions: for dbt test entities (starting with version 0.8.38.1) - - Note: - 1. It also generates lineage between the `dbt` nodes (e.g. ephemeral nodes that depend on other dbt sources) as well as lineage between the `dbt` nodes and the underlying (target) platform nodes (e.g. BigQuery Table -> dbt Source, dbt View -> BigQuery View). - 2. We also support automated actions (like add a tag, term or owner) based on properties defined in dbt meta. - The artifacts used by this source are: - [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json) - This file contains model, source, tests and lineage data. From 90d74c97877e4c89c7669756a5a6590cc674c237 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 31 Oct 2022 22:23:55 -0700 Subject: [PATCH 14/18] fix dbt cloud tests bug --- .../src/datahub/ingestion/source/dbt/dbt_cloud.py | 6 +++--- .../src/datahub/ingestion/source/dbt/dbt_core.py | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index 282dcc3541db5c..a04700f649e63e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -249,10 +249,10 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: if node["resourceType"] == "model": materialization = node["materializedType"] - upstream_nodes = node["dependsOn"] else: materialization = None - upstream_nodes = [] + + upstream_nodes = node.get("dependsOn", []) catalog_type = node.get("type") @@ -331,7 +331,7 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode: execution_time=datetime.now(), # TODO: dbt Cloud doesn't expose this. status=node["status"], native_results={ - key: node[key] + key: str(node[key]) for key in { "columnName", "error", diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index dbde97d7eadd85..f45f73bf5beb51 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -144,11 +144,12 @@ def extract_dbt_entities( comment = all_catalog_entities[key]["metadata"]["comment"] materialization = None - upstream_nodes = [] - if "materialized" in manifest_node.get("config", {}): # It's a model materialization = manifest_node["config"]["materialized"] + + upstream_nodes = [] + if "depends_on" in manifest_node: upstream_nodes = manifest_node["depends_on"]["nodes"] # It's a source From 2f7053a7d7737119a98c45630925c2c0e18390fa Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 31 Oct 2022 22:25:37 -0700 Subject: [PATCH 15/18] change dbt cloud to incubating --- .../src/datahub/ingestion/source/dbt/dbt_cloud.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index a04700f649e63e..2ce1de7e04b350 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -162,7 +162,7 @@ class DBTCloudConfig(DBTCommonConfig): @platform_name("dbt") @config_class(DBTCloudConfig) -@support_status(SupportStatus.CERTIFIED) +@support_status(SupportStatus.INCUBATING) @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") @capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") @capability(SourceCapability.USAGE_STATS, "", supported=False) From 9f07d4562a67ca2e1dbb36f9a71a03cfe34ed2cf Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 14 Nov 2022 16:02:06 -0500 Subject: [PATCH 16/18] set external url to the dbt cloud ide --- .../src/datahub/ingestion/source/dbt/dbt_cloud.py | 15 ++++++++++++--- .../datahub/ingestion/source/dbt/dbt_common.py | 15 +++++---------- .../src/datahub/ingestion/source/dbt/dbt_core.py | 11 +++++++++++ 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index 2ce1de7e04b350..cec5f067ebbf79 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -36,9 +36,6 @@ class DBTCloudConfig(DBTCommonConfig): description="The API token to use to authenticate with DBT Cloud.", ) - # We don't actually need the account ID or project ID right now, but will - # need it once we start using the dbt Cloud admin API to list jobs/runs within a project. - # As such, I've preemptively added these fields to the config. account_id: int = Field( description="The DBT Cloud account ID to use.", ) @@ -171,8 +168,16 @@ class DBTCloudSource(DBTSourceBase): This source pulls dbt metadata directly from the dbt Cloud APIs. You'll need to have a dbt Cloud job set up to run your dbt project, and "Generate docs on run" should be enabled. + + The token should have the "read metadata" permission. + + To get the required IDs, go to the job details page (this is the one with the "Run History" table), and look at the URL. + It should look something like this: https://cloud.getdbt.com/next/deploy/107298/projects/175705/jobs/148094. + In this example, the account ID is 107298, the project ID is 175705, and the job ID is 148094. """ + # TODO: add some screenshots to the docs + config: DBTCloudConfig @classmethod @@ -382,6 +387,10 @@ def _parse_into_dbt_column(self, column: Dict) -> DBTColumn: tags=column["tags"], ) + def get_external_url(self, node: DBTNode) -> Optional[str]: + # TODO: Once dbt Cloud supports deep linking to specific files, we can use that. + return f"https://cloud.getdbt.com/next/accounts/{self.config.account_id}/projects/{self.config.project_id}/develop" + def get_platform_instance_id(self) -> str: """The DBT project identifier is used as platform instance.""" diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index d2581e9b7cb7fd..8901927005663c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -30,7 +30,6 @@ ConfigurationError, LineageConfig, ) -from datahub.configuration.github import GitHubReference from datahub.emitter import mce_builder from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext @@ -282,10 +281,6 @@ class DBTCommonConfig(StatefulIngestionConfigBase, LineageConfig): False, description="Prior to version 0.8.41, lineage edges to sources were directed to the target platform node rather than the dbt source node. This contradicted the established pattern for other lineage edges to point to upstream dbt nodes. To revert lineage logic to this legacy approach, set this flag to true.", ) - github_info: Optional[GitHubReference] = Field( - None, - description="Reference to your github location to enable easy navigation from DataHub to your dbt files.", - ) stateful_ingestion: Optional[DBTStatefulIngestionConfig] = pydantic.Field( default=None, description="DBT Stateful Ingestion Config." @@ -1156,14 +1151,14 @@ def _create_dataset_properties_aspect( tags=node.tags, name=node.name, ) - if self.config.github_info and node.dbt_file_path: - github_file_url = self.config.github_info.get_url_for_file_path( - node.dbt_file_path - ) - dbt_properties.externalUrl = github_file_url + dbt_properties.externalUrl = self.get_external_url(node) return dbt_properties + @abstractmethod + def get_external_url(self, node: DBTNode) -> Optional[str]: + pass + def _create_view_properties_aspect(self, node: DBTNode) -> ViewPropertiesClass: materialized = node.materialization in {"table", "incremental"} # this function is only called when raw sql is present. assert is added to satisfy lint checks diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index f45f73bf5beb51..0236dc7e26b79a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -9,6 +9,7 @@ import requests from pydantic import BaseModel, Field, validator +from datahub.configuration.github import GitHubReference from datahub.ingestion.api.decorators import ( SupportStatus, capability, @@ -52,6 +53,11 @@ class DBTCoreConfig(DBTCommonConfig): description="When fetching manifest files from s3, configuration for aws connection details", ) + github_info: Optional[GitHubReference] = Field( + None, + description="Reference to your github location to enable easy navigation from DataHub to your dbt files.", + ) + @property def s3_client(self): assert self.aws_connection @@ -469,6 +475,11 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]: return all_nodes, additional_custom_props + def get_external_url(self, node: DBTNode) -> Optional[str]: + if self.config.github_info and node.dbt_file_path: + return self.config.github_info.get_url_for_file_path(node.dbt_file_path) + return None + def get_platform_instance_id(self) -> str: """The DBT project identifier is used as platform instance.""" From f470d988567d3bac108e5c64a81bd361ce27ee96 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 17 Nov 2022 13:44:08 -0500 Subject: [PATCH 17/18] add dbt cloud deps --- metadata-ingestion/setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index d2a251ec3e4146..c2da589081ecc1 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -258,6 +258,7 @@ def get_long_description(): "datahub-business-glossary": set(), "delta-lake": {*data_lake_profiling, *delta_lake}, "dbt": {"requests"} | aws_common, + "dbt-cloud": {"requests"}, "druid": sql_common | {"pydruid>=0.6.2"}, # Starting with 7.14.0 python client is checking if it is connected to elasticsearch client. If its not it throws # UnsupportedProductError From 05dad4e9c110f7896b81abf43096b883865a8f13 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 18 Nov 2022 00:11:03 -0500 Subject: [PATCH 18/18] fix merge --- .../datahub/ingestion/source/dbt/dbt_common.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index e2bfb3aa4a0e15..a31e7a3a780294 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -1463,18 +1463,3 @@ def get_transformed_terms( def get_report(self): return self.report - - def get_platform_instance_id(self) -> str: - """ - DBT project identifier is used as platform instance. - """ - - project_id = ( - self.load_file_as_json(self.config.manifest_path) - .get("metadata", {}) - .get("project_id") - ) - if project_id is None: - raise ValueError("DBT project identifier is not found in manifest") - - return f"{self.platform}_{project_id}"