Skip to content

Commit

Permalink
feat(ingest): patch based updates to upstream edge to dbt nodes from …
Browse files Browse the repository at this point in the history
…external platforms
  • Loading branch information
mayurinehate committed Nov 10, 2022
1 parent ae2ea52 commit 75c3d45
Show file tree
Hide file tree
Showing 11 changed files with 4,127 additions and 7,951 deletions.
24 changes: 22 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
ConfigEnum,
ConfigModel,
ConfigurationError,
LineageConfig,
)
from datahub.configuration.github import GitHubReference
from datahub.emitter import mce_builder
Expand Down Expand Up @@ -122,6 +123,7 @@
UpstreamLineageClass,
ViewPropertiesClass,
)
from datahub.specific.dataset import DatasetPatchBuilder
from datahub.utilities.mapping import Constants, OperationProcessor
from datahub.utilities.time import datetime_to_ts_millis

Expand Down Expand Up @@ -214,7 +216,7 @@ def can_emit_test_results(self) -> bool:
return self.test_results == EmitDirective.YES


class DBTConfig(StatefulIngestionConfigBase):
class DBTConfig(StatefulIngestionConfigBase, LineageConfig):
manifest_path: str = Field(
description="Path to dbt manifest JSON. See https://docs.getdbt.com/reference/artifacts/manifest-json Note this can be a local file or a URI."
)
Expand Down Expand Up @@ -1447,7 +1449,25 @@ def create_platform_mces(
self.config.platform_instance,
)
upstreams_lineage_class = get_upstream_lineage([upstream_dbt_urn])
aspects.append(upstreams_lineage_class)
if self.config.incremental_lineage:
patch_builder: DatasetPatchBuilder = DatasetPatchBuilder(
urn=node_datahub_urn
)
for upstream in upstreams_lineage_class.upstreams:
patch_builder.add_upstream_lineage(upstream)

lineage_workunits = [
MetadataWorkUnit(
id=f"upstreamLineage-for-{node_datahub_urn}",
mcp_raw=mcp,
)
for mcp in patch_builder.build()
]
for wu in lineage_workunits:
yield wu
self.report.report_workunit(wu)
else:
aspects.append(upstreams_lineage_class)

if len(aspects) == 0:
continue
Expand Down
Loading

0 comments on commit 75c3d45

Please sign in to comment.