Skip to content

Commit

Permalink
feat(ingest): looker - add support for simple column level lineage
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Sep 29, 2022
1 parent 21a8718 commit ece833c
Show file tree
Hide file tree
Showing 9 changed files with 1,124 additions and 2,817 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageTypeClass,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
UpstreamClass,
UpstreamLineage,
)
Expand All @@ -54,6 +56,7 @@
ChangeTypeClass,
DatasetPropertiesClass,
EnumTypeClass,
FineGrainedLineageClass,
GlobalTagsClass,
OwnerClass,
OwnershipClass,
Expand Down Expand Up @@ -169,6 +172,10 @@ class LookerCommonConfig(
None,
description="Reference to your github location. If present, supplies handy links to your lookml on the dataset entity page.",
)
extract_column_level_lineage: bool = Field(
True,
description="When enabled, extracts column-level lineage from Views and Explores",
)


@dataclass
Expand Down Expand Up @@ -237,6 +244,7 @@ class ViewField:
description: str
field_type: ViewFieldType
is_primary_key: bool = False
upstream_field: Optional[str] = None


class LookerUtil:
Expand Down Expand Up @@ -622,6 +630,7 @@ def from_api( # noqa: C901
is_primary_key=dim_field.primary_key
if dim_field.primary_key
else False,
upstream_field=dim_field.name,
)
)
if explore.fields.measures is not None:
Expand All @@ -643,6 +652,7 @@ def from_api( # noqa: C901
is_primary_key=measure_field.primary_key
if measure_field.primary_key
else False,
upstream_field=measure_field.name,
)
)

Expand Down Expand Up @@ -746,20 +756,52 @@ def _to_metadata_events( # noqa: C901
dataset_props.externalUrl = self._get_url(base_url)

dataset_snapshot.aspects.append(dataset_props)
view_name_to_urn_map = {}
if self.upstream_views is not None:
assert self.project_name is not None
upstreams = [
UpstreamClass(
dataset=LookerViewId(
project_name=self.project_name,
model_name=self.model_name,
view_name=view_name,
).get_urn(config),
type=DatasetLineageTypeClass.VIEW,
upstreams = []
fine_grained_lineages = []
for view_name in sorted(self.upstream_views):
view_urn = LookerViewId(
project_name=self.project_name,
model_name=self.model_name,
view_name=view_name,
).get_urn(config)

upstreams.append(
UpstreamClass(
dataset=view_urn,
type=DatasetLineageTypeClass.VIEW,
)
)
for view_name in sorted(self.upstream_views)
]
upstream_lineage = UpstreamLineage(upstreams=upstreams)
view_name_to_urn_map[view_name] = view_urn
if config.extract_column_level_lineage:
for field in self.fields or []:
if (
field.upstream_field
and len(field.upstream_field.split(".")) >= 2
):
(view_name, field_path) = field.upstream_field.split(".")[
0
], ".".join(field.upstream_field.split(".")[1:])
assert view_name
view_urn = view_name_to_urn_map.get(view_name, "")
if view_urn:
fine_grained_lineages.append(
FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
downstreamType=FineGrainedLineageDownstreamType.FIELD,
upstreams=[
builder.make_schema_field_urn(
view_urn, field_path
)
],
)
)

upstream_lineage = UpstreamLineage(
upstreams=upstreams, fineGrainedLineages=fine_grained_lineages or None
)
dataset_snapshot.aspects.append(upstream_lineage)
if self.fields is not None:
schema_metadata = LookerUtil._get_schema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.github import GitHubInfo
from datahub.configuration.source_common import EnvBasedSourceConfigBase
from datahub.emitter.mce_builder import make_schema_field_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand Down Expand Up @@ -48,6 +49,7 @@
from datahub.metadata.com.linkedin.pegasus2avro.common import BrowsePaths, Status
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageTypeClass,
FineGrainedLineageDownstreamType,
UpstreamClass,
UpstreamLineage,
ViewProperties,
Expand All @@ -57,6 +59,8 @@
from datahub.metadata.schema_classes import (
ChangeTypeClass,
DatasetPropertiesClass,
FineGrainedLineageClass,
FineGrainedLineageUpstreamTypeClass,
SubTypesClass,
)
from datahub.utilities.sql_parser import SQLParser
Expand Down Expand Up @@ -577,7 +581,10 @@ def _get_sql_info(cls, sql: str, sql_parser_path: str) -> SQLInfo:

@classmethod
def _get_fields(
cls, field_list: List[Dict], type_cls: ViewFieldType
cls,
field_list: List[Dict],
type_cls: ViewFieldType,
extract_column_level_lineage: bool,
) -> List[ViewField]:
fields = []
for field_dict in field_list:
Expand All @@ -586,6 +593,19 @@ def _get_fields(
native_type = field_dict.get("type", "string")
description = field_dict.get("description", "")
label = field_dict.get("label", "")
upstream_field = None
if type_cls == ViewFieldType.DIMENSION and extract_column_level_lineage:
if field_dict.get("sql") is not None:
upstream_field_match = re.match(
r"^.*\${TABLE}\.(.*)$", field_dict["sql"]
)
if upstream_field_match:
matched_field = upstream_field_match.group(1)
# Remove quotes from field names
matched_field = (
matched_field.replace('"', "").replace("`", "").lower()
)
upstream_field = matched_field

field = ViewField(
name=name,
Expand All @@ -594,6 +614,7 @@ def _get_fields(
description=description,
is_primary_key=is_primary_key,
field_type=type_cls,
upstream_field=upstream_field,
)
fields.append(field)
return fields
Expand All @@ -611,6 +632,7 @@ def from_looker_dict(
max_file_snippet_length: int,
parse_table_names_from_sql: bool = False,
sql_parser_path: str = "datahub.utilities.sql_parser.DefaultSQLParser",
extract_col_level_lineage: bool = False,
) -> Optional["LookerView"]:
view_name = looker_view["name"]
logger.debug(f"Handling view {view_name} in model {model_name}")
Expand All @@ -635,13 +657,19 @@ def from_looker_dict(
derived_table = looker_view.get("derived_table")

dimensions = cls._get_fields(
looker_view.get("dimensions", []), ViewFieldType.DIMENSION
looker_view.get("dimensions", []),
ViewFieldType.DIMENSION,
extract_col_level_lineage,
)
dimension_groups = cls._get_fields(
looker_view.get("dimension_groups", []), ViewFieldType.DIMENSION_GROUP
looker_view.get("dimension_groups", []),
ViewFieldType.DIMENSION_GROUP,
extract_col_level_lineage,
)
measures = cls._get_fields(
looker_view.get("measures", []), ViewFieldType.MEASURE
looker_view.get("measures", []),
ViewFieldType.MEASURE,
extract_col_level_lineage,
)
fields: List[ViewField] = dimensions + dimension_groups + measures

Expand Down Expand Up @@ -993,15 +1021,40 @@ def _get_upstream_lineage(
for sql_table_name in looker_view.sql_table_names:

sql_table_name = sql_table_name.replace('"', "").replace("`", "")
upstream_dataset_urn: str = self._construct_datalineage_urn(
sql_table_name, looker_view
)
fine_grained_lineages: List[FineGrainedLineageClass] = []
if self.source_config.extract_column_level_lineage:
for field in looker_view.fields:
if field.upstream_field is not None:
fine_grained_lineage = FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
upstreams=[
make_schema_field_urn(
upstream_dataset_urn, field.upstream_field
)
],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[
make_schema_field_urn(
looker_view.id.get_urn(self.source_config),
field.name,
)
],
)
fine_grained_lineages.append(fine_grained_lineage)

upstream = UpstreamClass(
dataset=self._construct_datalineage_urn(sql_table_name, looker_view),
dataset=upstream_dataset_urn,
type=DatasetLineageTypeClass.VIEW,
)
upstreams.append(upstream)

if upstreams != []:
return UpstreamLineage(upstreams=upstreams)
return UpstreamLineage(
upstreams=upstreams, fineGrainedLineages=fine_grained_lineages or None
)
else:
return None

Expand Down Expand Up @@ -1224,6 +1277,7 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
self.source_config.max_file_snippet_length,
self.source_config.parse_table_names_from_sql,
self.source_config.sql_parser,
self.source_config.extract_column_level_lineage,
)
except Exception as e:
self.reporter.report_warning(
Expand Down
Loading

0 comments on commit ece833c

Please sign in to comment.