Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/dbt): support emitting only model performance #10714

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 29 additions & 32 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1251,9 +1251,6 @@ def create_dbt_platform_mces(
) -> Iterable[MetadataWorkUnit]:
"""Create MCEs and MCPs for the dbt platform."""

mce_platform = DBT_PLATFORM
mce_platform_instance = self.config.platform_instance

action_processor = OperationProcessor(
self.config.meta_mapping,
self.config.tag_prefix,
Expand All @@ -1269,15 +1266,10 @@ def create_dbt_platform_mces(
)
for node in sorted(dbt_nodes, key=lambda n: n.dbt_name):
node_datahub_urn = node.get_urn(
mce_platform,
DBT_PLATFORM,
self.config.env,
mce_platform_instance,
self.config.platform_instance,
)
if not self.config.entities_enabled.can_emit_node_type(node.node_type):
logger.debug(
f"Skipping emission of node {node_datahub_urn} because node_type {node.node_type} is disabled"
)
continue

meta_aspects: Dict[str, Any] = {}
if self.config.enable_meta_mapping and node.meta:
Expand All @@ -1289,7 +1281,7 @@ def create_dbt_platform_mces(
) # mutates meta_aspects

aspects = self._generate_base_dbt_aspects(
node, additional_custom_props_filtered, mce_platform, meta_aspects
node, additional_custom_props_filtered, DBT_PLATFORM, meta_aspects
)

# Upstream lineage.
Expand All @@ -1304,29 +1296,36 @@ def create_dbt_platform_mces(
if view_prop_aspect:
aspects.append(view_prop_aspect)

# Subtype.
sub_type_wu = self._create_subType_wu(node, node_datahub_urn)
if sub_type_wu:
yield sub_type_wu
# Generate main MCE.
if self.config.entities_enabled.can_emit_node_type(node.node_type):
# Subtype.
sub_type_wu = self._create_subType_wu(node, node_datahub_urn)
if sub_type_wu:
yield sub_type_wu

# DataPlatformInstance aspect.
yield MetadataChangeProposalWrapper(
entityUrn=node_datahub_urn,
aspect=self._make_data_platform_instance_aspect(),
).as_workunit()
# DataPlatformInstance aspect.
yield MetadataChangeProposalWrapper(
entityUrn=node_datahub_urn,
aspect=self._make_data_platform_instance_aspect(),
).as_workunit()

if len(aspects) == 0:
continue
dataset_snapshot = DatasetSnapshot(urn=node_datahub_urn, aspects=aspects)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
if self.config.write_semantics == "PATCH":
mce = self.get_patched_mce(mce)
yield MetadataWorkUnit(id=dataset_snapshot.urn, mce=mce)
dataset_snapshot = DatasetSnapshot(
urn=node_datahub_urn, aspects=aspects
)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
if self.config.write_semantics == "PATCH":
mce = self.get_patched_mce(mce)
yield MetadataWorkUnit(id=dataset_snapshot.urn, mce=mce)
else:
logger.debug(
f"Skipping emission of node {node_datahub_urn} because node_type {node.node_type} is disabled"
)

# Model performance.
yield from auto_workunit(
self._create_dataprocess_instance_mcps(node, upstream_lineage_class)
)
if self.config.entities_enabled.can_emit_model_performance:
yield from auto_workunit(
self._create_dataprocess_instance_mcps(node, upstream_lineage_class)
)

def _create_dataprocess_instance_mcps(
self,
Expand All @@ -1335,8 +1334,6 @@ def _create_dataprocess_instance_mcps(
) -> Iterable[MetadataChangeProposalWrapper]:
if not node.model_performances:
return
if not self.config.entities_enabled.can_emit_model_performance:
return

node_datahub_urn = node.get_urn(
DBT_PLATFORM,
Expand Down
Loading