Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): support snapshots in dbt and dbt-cloud #7062

Merged
merged 7 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/docs/sources/dbt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Ingesting metadata from dbt requires either using the **dbt** module or the **db
| dbt Model - view | [Dataset](../../metamodel/entities/dataset.md) | Subtype `view` |
| dbt Model - incremental | [Dataset](../../metamodel/entities/dataset.md) | Subtype `incremental` |
| dbt Model - ephemeral | [Dataset](../../metamodel/entities/dataset.md) | Subtype `ephemeral` |
| dbt Snapshot | [Dataset](../../metamodel/entities/dataset.md) | Subtype `snapshot` |
| dbt Test | [Assertion](../../metamodel/entities/assertion.md) | |
| dbt Test Result | [Assertion Run Result](../../metamodel/entities/assertion.md) | |

Expand Down
7 changes: 4 additions & 3 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,10 @@ def get_urns_by_filter(
entities_yielded += 1
log.debug(f"yielding {x['entity']}")
yield x["entity"]
log.warning(
f"Discrepancy in entities yielded {entities_yielded} and num entities {num_entities}. This means all entities may not have been deleted."
)
if entities_yielded != num_entities:
log.warning(
f"Discrepancy in entities yielded {entities_yielded} and num entities {num_entities}. This means all entities may not have been deleted."
)
else:
log.error(f"Failed to execute search query with {str(response.content)}")
response.raise_for_status()
Expand Down
34 changes: 29 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class DBTCloudConfig(DBTCommonConfig):
#}
"""

_DBT_GRAPHQL_MODEL_SEED_FIELDS = """
_DBT_GRAPHQL_MODEL_SEED_SNAPSHOT_FIELDS = """
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean on dbt cloud we supported Snapshot field earlier or why did you rename it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a set of common fields that we use for models, seeds, and snapshots

alias
error
status
Expand All @@ -109,15 +109,15 @@ class DBTCloudConfig(DBTCommonConfig):
models(jobId: $jobId, runId: $runId) {{
{ _DBT_GRAPHQL_COMMON_FIELDS }
{ _DBT_GRAPHQL_NODE_COMMON_FIELDS }
{ _DBT_GRAPHQL_MODEL_SEED_FIELDS }
{ _DBT_GRAPHQL_MODEL_SEED_SNAPSHOT_FIELDS }
dependsOn
materializedType
}}

seeds(jobId: $jobId, runId: $runId) {{
{ _DBT_GRAPHQL_COMMON_FIELDS }
{ _DBT_GRAPHQL_NODE_COMMON_FIELDS }
{ _DBT_GRAPHQL_MODEL_SEED_FIELDS }
{ _DBT_GRAPHQL_MODEL_SEED_SNAPSHOT_FIELDS }
}}

sources(jobId: $jobId, runId: $runId) {{
Expand All @@ -133,6 +133,18 @@ class DBTCloudConfig(DBTCommonConfig):
loader
}}

snapshots(jobId: $jobId, runId: $runId) {{
{ _DBT_GRAPHQL_COMMON_FIELDS }
{ _DBT_GRAPHQL_NODE_COMMON_FIELDS }
{ _DBT_GRAPHQL_MODEL_SEED_SNAPSHOT_FIELDS }
parentsSources {{
uniqueId
}}
parentsModels {{
uniqueId
}}
}}

tests(jobId: $jobId, runId: $runId) {{
{ _DBT_GRAPHQL_COMMON_FIELDS }
state
Expand Down Expand Up @@ -224,6 +236,7 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]:
*data["models"],
*data["seeds"],
*data["sources"],
*data["snapshots"],
*data["tests"],
]

Expand Down Expand Up @@ -253,10 +266,21 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode:

if node["resourceType"] == "model":
materialization = node["materializedType"]
elif node["resourceType"] == "snapshot":
materialization = "snapshot"
else:
materialization = None

upstream_nodes = node.get("dependsOn", [])
if node["resourceType"] == "snapshot":
upstream_nodes = [
obj["uniqueId"]
for obj in [
*node.get("parentsModels", []),
*node.get("parentsSources", []),
]
]
else:
upstream_nodes = node.get("dependsOn", [])

catalog_type = node.get("type")

Expand All @@ -269,7 +293,7 @@ def _parse_into_dbt_node(self, node: Dict) -> DBTNode:
tags = node["tags"]
tags = [self.config.tag_prefix + tag for tag in tags]

if node["resourceType"] in {"model", "seed"}:
if node["resourceType"] in {"model", "seed", "snapshot"}:
status = node["status"]
if status is None and materialization != "ephemeral":
self.report.report_warning(
Expand Down
40 changes: 24 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
ConfigurationError,
LineageConfig,
)
from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated
from datahub.emitter import mce_builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
Expand Down Expand Up @@ -137,6 +138,10 @@ class DBTEntitiesEnabled(ConfigModel):
EmitDirective.YES,
description="Emit metadata for dbt seeds when set to Yes or Only",
)
snapshots: EmitDirective = Field(
EmitDirective.YES,
description="Emit metadata for dbt snapshots when set to Yes or Only",
)
test_definitions: EmitDirective = Field(
EmitDirective.YES,
description="Emit metadata for test definitions when enabled when set to Yes or Only",
Expand Down Expand Up @@ -167,17 +172,18 @@ def process_only_directive(cls, values):
def can_emit_node_type(self, node_type: str) -> bool:
# Node type comes from dbt's node types.

field_to_node_type_map = {
"model": "models",
"source": "sources",
"seed": "seeds",
"test": "test_definitions",
node_type_allow_map = {
"model": self.models,
"source": self.sources,
"seed": self.seeds,
"snapshot": self.snapshots,
"test": self.test_definitions,
}
field = field_to_node_type_map.get(node_type)
if not field:
allowed = node_type_allow_map.get(node_type)
if allowed is None:
return False

return self.__getattribute__(field) == EmitDirective.YES
return allowed == EmitDirective.YES

@property
def can_emit_test_results(self) -> bool:
Expand All @@ -200,6 +206,8 @@ class DBTCommonConfig(StatefulIngestionConfigBase, LineageConfig):
default=False,
description="Use model identifier instead of model name if defined (if not, default to model name).",
)
_deprecate_use_identifiers = pydantic_field_deprecated("use_identifiers")

entities_enabled: DBTEntitiesEnabled = Field(
DBTEntitiesEnabled(),
description="Controls for enabling / disabling metadata emission for different dbt entities (models, test definitions, test results, etc.)",
Expand Down Expand Up @@ -252,6 +260,9 @@ class DBTCommonConfig(StatefulIngestionConfigBase, LineageConfig):
False,
description="[deprecated] Prior to version 0.8.41, lineage edges to sources were directed to the target platform node rather than the dbt source node. This contradicted the established pattern for other lineage edges to point to upstream dbt nodes. To revert lineage logic to this legacy approach, set this flag to true.",
)
_deprecate_skip_source_on_lineage_edge = pydantic_field_deprecated(
"backcompat_skip_source_on_lineage_edge"
)

incremental_lineage: bool = Field(
# Copied from LineageConfig, and changed the default.
Expand Down Expand Up @@ -349,7 +360,7 @@ class DBTNode:

node_type: str # source, model
max_loaded_at: Optional[datetime]
materialization: Optional[str] # table, view, ephemeral, incremental
materialization: Optional[str] # table, view, ephemeral, incremental, snapshot
# see https://docs.getdbt.com/reference/artifacts/manifest-json
catalog_type: Optional[str]

Expand Down Expand Up @@ -419,7 +430,6 @@ def get_custom_properties(node: DBTNode) -> Dict[str, str]:
def get_upstreams(
upstreams: List[str],
all_nodes: Dict[str, DBTNode],
use_identifiers: bool,
target_platform: str,
target_platform_instance: Optional[str],
environment: str,
Expand All @@ -428,7 +438,7 @@ def get_upstreams(
) -> List[str]:
upstream_urns = []

for upstream in upstreams:
for upstream in sorted(upstreams):
if upstream not in all_nodes:
logger.debug(
f"Upstream node - {upstream} not found in all manifest entities."
Expand All @@ -444,7 +454,7 @@ def get_upstreams(
materialized = upstream_manifest_node.materialization

resource_type = upstream_manifest_node.node_type
if materialized in {"view", "table", "incremental"} or (
if materialized in {"view", "table", "incremental", "snapshot"} or (
resource_type == "source" and legacy_skip_source_lineage
):
# upstream urns point to the target platform
Expand Down Expand Up @@ -718,7 +728,6 @@ def create_test_entity_mcps(
upstream_urns = get_upstreams(
upstreams=node.upstream_nodes,
all_nodes=all_nodes_map,
use_identifiers=self.config.use_identifiers,
target_platform=self.config.target_platform,
target_platform_instance=self.config.target_platform_instance,
environment=self.config.env,
Expand Down Expand Up @@ -1124,7 +1133,7 @@ def get_external_url(self, node: DBTNode) -> Optional[str]:
pass

def _create_view_properties_aspect(self, node: DBTNode) -> ViewPropertiesClass:
materialized = node.materialization in {"table", "incremental"}
materialized = node.materialization in {"table", "incremental", "snapshot"}
# this function is only called when raw sql is present. assert is added to satisfy lint checks
assert node.raw_code is not None
view_properties = ViewPropertiesClass(
Expand Down Expand Up @@ -1319,7 +1328,7 @@ def _create_subType_wu(
if not node.node_type:
return None
subtypes: Optional[List[str]]
if node.node_type == "model":
if node.node_type in {"model", "snapshot"}:
if node.materialization:
subtypes = [node.materialization, "view"]
else:
Expand All @@ -1343,7 +1352,6 @@ def _create_lineage_aspect_for_dbt_node(
upstream_urns = get_upstreams(
node.upstream_nodes,
all_nodes_map,
self.config.use_identifiers,
self.config.target_platform,
self.config.target_platform_instance,
self.config.env,
Expand Down
28 changes: 9 additions & 19 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def extract_dbt_entities(
for key, manifest_node in all_manifest_entities.items():
name = manifest_node["name"]

if "identifier" in manifest_node and use_identifiers:
if use_identifiers and manifest_node.get("identifier"):
name = manifest_node["identifier"]

if (
Expand Down Expand Up @@ -393,11 +393,6 @@ def load_file_as_json(self, uri: str) -> Any:

def loadManifestAndCatalog(
self,
manifest_path: str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

catalog_path: str,
sources_path: Optional[str],
use_identifiers: bool,
tag_prefix: str,
) -> Tuple[
List[DBTNode],
Optional[str],
Expand All @@ -406,12 +401,12 @@ def loadManifestAndCatalog(
Optional[str],
Optional[str],
]:
dbt_manifest_json = self.load_file_as_json(manifest_path)
dbt_manifest_json = self.load_file_as_json(self.config.manifest_path)

dbt_catalog_json = self.load_file_as_json(catalog_path)
dbt_catalog_json = self.load_file_as_json(self.config.catalog_path)

if sources_path is not None:
dbt_sources_json = self.load_file_as_json(sources_path)
if self.config.sources_path is not None:
dbt_sources_json = self.load_file_as_json(self.config.sources_path)
sources_results = dbt_sources_json["results"]
else:
sources_results = {}
Expand All @@ -438,8 +433,8 @@ def loadManifestAndCatalog(
all_catalog_entities,
sources_results,
manifest_adapter,
use_identifiers,
tag_prefix,
self.config.use_identifiers,
self.config.tag_prefix,
self.report,
)

Expand All @@ -460,13 +455,8 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]:
manifest_adapter,
catalog_schema,
catalog_version,
) = self.loadManifestAndCatalog(
self.config.manifest_path,
self.config.catalog_path,
self.config.sources_path,
self.config.use_identifiers,
self.config.tag_prefix,
)
) = self.loadManifestAndCatalog()

additional_custom_props = {
"manifest_schema": manifest_schema,
"manifest_version": manifest_version,
Expand Down
16 changes: 16 additions & 0 deletions metadata-ingestion/tests/integration/dbt/copy-from-sample-dbt.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

set -euxo pipefail

# This arg should point at a local copy of https://github.com/hsheth2/sample-dbt,
# after the generation script has been run.
sample_dbt=$1

cp $sample_dbt/target_processed/dbt_catalog.json sample_dbt_catalog.json
cp $sample_dbt/target_processed/dbt_manifest.json sample_dbt_manifest.json
cp $sample_dbt/target_processed/dbt_sources.json sample_dbt_sources.json

# We don't currently test run_results from sample-dbt.
# cp $sample_dbt/target_processed/dbt_run_results.json sample_dbt_run_results.json


Loading