Skip to content

Commit

Permalink
fix(ingest): remove dbt disable_dbt_node_creation and load_schema
Browse files Browse the repository at this point in the history
… options (#5877)
  • Loading branch information
hsheth2 authored Sep 9, 2022
1 parent 5799604 commit 954397a
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 15,341 deletions.
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- Browse Paths have been upgraded to a new format to align more closely with the intention of the feature.
Learn more about the changes, including steps on upgrading, here: https://datahubproject.io/docs/advanced/browse-paths-upgrade
- The dbt ingestion source's `disable_dbt_node_creation` and `load_schema` options have been removed. They were no longer necessary due to the recently added sibling entities functionality.

### Potential Downtime

Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion/docs/sources/dbt/dbt_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ source:
# To use this as-is, set the environment variable DBT_PROJECT_ROOT to the root folder of your dbt project
manifest_path: "${DBT_PROJECT_ROOT}/target/manifest_file.json"
catalog_path: "${DBT_PROJECT_ROOT}/target/catalog_file.json"
sources_path: "${DBT_PROJECT_ROOT}/target/sources_file.json" # optional for freshness
sources_path: "${DBT_PROJECT_ROOT}/target/sources_file.json" # optional for freshness
test_results_path: "${DBT_PROJECT_ROOT}/target/run_results.json" # optional for recording dbt test results after running dbt test

# Options
target_platform: "my_target_platform_id" # e.g. bigquery/postgres/etc.
load_schemas: False # note: enable this only if you are not ingesting metadata from your warehouse

# sink configs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ source:
catalog_path: "./tests/integration/dbt/dbt_catalog.json"
sources_path: "./tests/integration/dbt/dbt_sources.json"
target_platform: "dbt"
load_schemas: True # or False
# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
type: "datahub-rest"
Expand Down
134 changes: 36 additions & 98 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,6 @@ class DBTConfig(StatefulIngestionConfigBase):
default=None,
description="The platform instance for the platform that dbt is operating on. Use this if you have multiple instances of the same platform (e.g. redshift) and need to distinguish between them.",
)
load_schemas: bool = Field(
default=True,
description="This flag is only consulted when disable_dbt_node_creation is set to True. Load schemas for target_platform entities from dbt catalog file, not necessary when you are already ingesting this metadata from the data platform directly. If set to False, table schema details (e.g. columns) will not be ingested.",
)
use_identifiers: bool = Field(
default=False,
description="Use model identifier instead of model name if defined (if not, default to model name).",
Expand All @@ -269,10 +265,6 @@ class DBTConfig(StatefulIngestionConfigBase):
default=AllowDenyPattern.allow_all(),
description="regex patterns for dbt model names to filter in ingestion.",
)
disable_dbt_node_creation = Field(
default=False,
description="Whether to suppress dbt dataset metadata creation. When set to True, this flag applies the dbt metadata to the target_platform entities (e.g. populating schema and column descriptions from dbt into the postgres / bigquery table metadata in DataHub) and generates lineage between the platform entities.",
)
meta_mapping: Dict = Field(
default={},
description="mapping rules that will be executed against dbt meta properties. Refer to the section below on dbt meta automated mappings.",
Expand Down Expand Up @@ -474,7 +466,6 @@ def extract_dbt_entities(
all_catalog_entities: Dict[str, Dict[str, Any]],
sources_results: List[Dict[str, Any]],
manifest_adapter: str,
load_schemas: bool,
use_identifiers: bool,
tag_prefix: str,
node_type_pattern: AllowDenyPattern,
Expand Down Expand Up @@ -650,7 +641,6 @@ def get_upstreams(
target_platform: str,
target_platform_instance: Optional[str],
environment: str,
disable_dbt_node_creation: bool,
platform_instance: Optional[str],
legacy_skip_source_lineage: Optional[bool],
) -> List[str]:
Expand All @@ -672,28 +662,18 @@ def get_upstreams(

upstream_manifest_node = all_nodes[upstream]

# This function is called to create lineages among platform nodes or dbt nodes. When we are creating lineages
# for platform nodes, implies that dbt node creation is turned off (because otherwise platform nodes only
# have one lineage edge to their corresponding dbt node). So, when disable_dbt_node_creation is true we only
# create lineages for platform nodes otherwise, for dbt node, we connect it to another dbt node or a platform
# node.
# This logic creates lineages among dbt nodes.
platform_value = DBT_PLATFORM
platform_instance_value = platform_instance

if disable_dbt_node_creation:
# we generate all urns in the target platform universe
materialized = upstream_manifest_node.get("config", {}).get("materialized")
resource_type = upstream_manifest_node["resource_type"]
if materialized in {"view", "table", "incremental"} or (
resource_type == "source" and legacy_skip_source_lineage
):
# upstream urns point to the target platform
platform_value = target_platform
platform_instance_value = target_platform_instance
else:
materialized = upstream_manifest_node.get("config", {}).get("materialized")
resource_type = upstream_manifest_node["resource_type"]

if materialized in {"view", "table", "incremental"} or (
resource_type == "source" and legacy_skip_source_lineage
):
# upstream urns point to the target platform
platform_value = target_platform
platform_instance_value = target_platform_instance

upstream_urns.append(
get_urn_from_dbtNode(
Expand Down Expand Up @@ -989,7 +969,6 @@ def load_test_results(
config.target_platform,
config.target_platform_instance,
config.env,
config.disable_dbt_node_creation,
config.platform_instance,
config.backcompat_skip_source_on_lineage_edge,
)
Expand Down Expand Up @@ -1074,8 +1053,7 @@ class DBTSource(StatefulIngestionSourceBase):
Note:
1. It also generates lineage between the `dbt` nodes (e.g. ephemeral nodes that depend on other dbt sources) as well as lineage between the `dbt` nodes and the underlying (target) platform nodes (e.g. BigQuery Table -> dbt Source, dbt View -> BigQuery View).
2. The previous version of this source (`acryl_datahub<=0.8.16.2`) did not generate `dbt` entities and lineage between `dbt` entities and platform entities. For backwards compatibility with the previous version of this source, there is a config flag `disable_dbt_node_creation` that falls back to the old behavior.
3. We also support automated actions (like add a tag, term or owner) based on properties defined in dbt meta.
2. We also support automated actions (like add a tag, term or owner) based on properties defined in dbt meta.
The artifacts used by this source are:
- [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json)
Expand Down Expand Up @@ -1214,7 +1192,6 @@ def loadManifestAndCatalog(
manifest_path: str,
catalog_path: str,
sources_path: Optional[str],
load_schemas: bool,
use_identifiers: bool,
tag_prefix: str,
node_type_pattern: AllowDenyPattern,
Expand Down Expand Up @@ -1263,7 +1240,6 @@ def loadManifestAndCatalog(
all_catalog_entities,
sources_results,
manifest_adapter,
load_schemas,
use_identifiers,
tag_prefix,
node_type_pattern,
Expand Down Expand Up @@ -1328,7 +1304,6 @@ def string_map(input_map: Dict[str, Any]) -> Dict[str, str]:
target_platform=self.config.target_platform,
target_platform_instance=self.config.target_platform_instance,
environment=self.config.env,
disable_dbt_node_creation=self.config.disable_dbt_node_creation,
platform_instance=None,
legacy_skip_source_lineage=self.config.backcompat_skip_source_on_lineage_edge,
)
Expand Down Expand Up @@ -1456,7 +1431,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
self.config.manifest_path,
self.config.catalog_path,
self.config.sources_path,
self.config.load_schemas,
self.config.use_identifiers,
self.config.tag_prefix,
self.config.node_type_pattern,
Expand All @@ -1483,14 +1457,13 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
]
test_nodes = [test_node for test_node in nodes if test_node.node_type == "test"]

if not self.config.disable_dbt_node_creation:
yield from self.create_platform_mces(
non_test_nodes,
additional_custom_props_filtered,
manifest_nodes_raw,
DBT_PLATFORM,
self.config.platform_instance,
)
yield from self.create_platform_mces(
non_test_nodes,
additional_custom_props_filtered,
manifest_nodes_raw,
DBT_PLATFORM,
self.config.platform_instance,
)

yield from self.create_platform_mces(
non_test_nodes,
Expand Down Expand Up @@ -1552,11 +1525,7 @@ def create_platform_mces(
This function creates mce based out of dbt nodes. Since dbt ingestion creates "dbt" nodes
and nodes for underlying platform the function gets called twice based on the mce_platform
parameter. Further, this function takes specific actions based on the mce_platform passed in.
If disable_dbt_node_creation = True,
Create empty entities of the underlying platform with only lineage/key aspect.
Create dbt entities with all metadata information.
If disable_dbt_node_creation = False
Create platform entities with all metadata information.
It creates platform entities with all metadata information.
"""
action_processor = OperationProcessor(
self.config.meta_mapping,
Expand Down Expand Up @@ -1619,38 +1588,25 @@ def create_platform_mces(
self.report.report_workunit(sub_type_wu)

else:
if not self.config.disable_dbt_node_creation:
# if dbt node creation is enabled we are creating empty node for platform and only add
# lineage/keyaspect.
aspects = []
if node.materialization == "ephemeral" or node.node_type == "test":
continue

# This code block is run when we are generating entities of platform type.
# We will not link the platform not to the dbt node for type "source" because
# in this case the platform table existed first.
if node.node_type != "source":
upstream_dbt_urn = get_urn_from_dbtNode(
node.database,
node.schema,
node.name,
DBT_PLATFORM,
self.config.env,
self.config.platform_instance,
)
upstreams_lineage_class = get_upstream_lineage(
[upstream_dbt_urn]
)
aspects.append(upstreams_lineage_class)
else:
# add upstream lineage
platform_upstream_aspect = (
self._create_lineage_aspect_for_platform_node(
node, manifest_nodes_raw
)
# We are creating empty node for platform and only add lineage/keyaspect.
aspects = []
if node.materialization == "ephemeral" or node.node_type == "test":
continue

# This code block is run when we are generating entities of platform type.
# We will not link the platform not to the dbt node for type "source" because
# in this case the platform table existed first.
if node.node_type != "source":
upstream_dbt_urn = get_urn_from_dbtNode(
node.database,
node.schema,
node.name,
DBT_PLATFORM,
self.config.env,
self.config.platform_instance,
)
if platform_upstream_aspect:
aspects.append(platform_upstream_aspect)
upstreams_lineage_class = get_upstream_lineage([upstream_dbt_urn])
aspects.append(upstreams_lineage_class)

if len(aspects) == 0:
continue
Expand Down Expand Up @@ -1733,16 +1689,7 @@ def get_patched_mce(self, mce):
def _create_dataset_properties_aspect(
self, node: DBTNode, additional_custom_props_filtered: Dict[str, str]
) -> DatasetPropertiesClass:
description = None
if self.config.disable_dbt_node_creation:
if node.comment and node.description and node.comment != node.description:
description = f"{self.config.target_platform} comment: {node.comment}\n\ndbt model description: {node.description}"
elif node.comment:
description = node.comment
elif node.description:
description = node.description
else:
description = node.description
description = node.description

custom_props = {
**get_custom_properties(node),
Expand Down Expand Up @@ -1823,14 +1770,7 @@ def _generate_base_aspects(

# add schema metadata aspect
schema_metadata = get_schema_metadata(self.report, node, mce_platform)
# When generating these aspects for a dbt node, we will always include schema information. When generating
# these aspects for a platform node (which only happens when disable_dbt_node_creation is set to true) we
# honor the flag.
if mce_platform == DBT_PLATFORM:
aspects.append(schema_metadata)
else:
if self.config.load_schemas:
aspects.append(schema_metadata)
aspects.append(schema_metadata)
return aspects

def _aggregate_owners(
Expand Down Expand Up @@ -1915,7 +1855,6 @@ def _create_lineage_aspect_for_dbt_node(
self.config.target_platform,
self.config.target_platform_instance,
self.config.env,
self.config.disable_dbt_node_creation,
self.config.platform_instance,
self.config.backcompat_skip_source_on_lineage_edge,
)
Expand Down Expand Up @@ -1951,7 +1890,6 @@ def _create_lineage_aspect_for_platform_node(
self.config.target_platform,
self.config.target_platform_instance,
self.config.env,
self.config.disable_dbt_node_creation,
self.config.platform_instance,
self.config.backcompat_skip_source_on_lineage_edge,
)
Expand Down
Loading

0 comments on commit 954397a

Please sign in to comment.