Skip to content

Commit

Permalink
feat(ingest): extract powerbi endorsements to tags (#6638)
Browse files Browse the repository at this point in the history
  • Loading branch information
looppi authored Jan 18, 2023
1 parent cb12910 commit 87b3a5d
Show file tree
Hide file tree
Showing 7 changed files with 899 additions and 2 deletions.
6 changes: 6 additions & 0 deletions metadata-ingestion/docs/sources/powerbi/powerbi_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,9 @@ combine_result
`Pattern-2` is *not* supported for upstream table lineage extraction as it uses nested item-selector i.e. {Source{[Schema="public",Item="book"]}[Data], Source{[Schema="public",Item="issue_history"]}[Data]} as argument to M-QUery table function i.e. Table.Combine
`Pattern-1` is supported as it first assign the table from schema to variable and then variable is used in M-Query Table function i.e. Table.Combine
## Extract endorsements to tags
By default, extracting endorsement information to tags is disabled. The feature may be useful if organization uses [endorsements](https://learn.microsoft.com/en-us/power-bi/collaborate-share/service-endorse-content) to identify content quality.
Please note that the default implementation overwrites tags for the ingested entities, if you need to preserve existing tags, consider using a [transformer](../../../../metadata-ingestion/docs/transformer/dataset_transformer.md#simple-add-dataset-globaltags) with `semantics: PATCH` tags instead of `OVERWRITE`.
3 changes: 3 additions & 0 deletions metadata-ingestion/docs/sources/powerbi/powerbi_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ source:
client_secret: bar
# Enable / Disable ingestion of user information for dashboards
extract_ownership: true
# Enable / Disable ingestion of endorsements.
# Please notice that this may overwrite any existing tags defined to ingested entities!
extract_endorsements_to_tags: false
# dataset_type_mapping is fixed mapping of Power BI datasources type to equivalent Datahub "data platform" dataset
dataset_type_mapping:
PostgreSql: postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Constant:
CORP_USER_INFO = "corpUserInfo"
CORP_USER_KEY = "corpUserKey"
CHART_INFO = "chartInfo"
GLOBAL_TAGS = "globalTags"
STATUS = "status"
CHART_ID = "powerbi.linkedin.com/charts/{}"
CHART_KEY = "chartKey"
Expand Down Expand Up @@ -139,6 +140,12 @@ class PowerBiAPIConfig(EnvBasedSourceConfigBase):
extract_lineage: bool = pydantic.Field(
default=True, description="Whether lineage should be ingested"
)
# Enable/Disable extracting endorsements to tags. Please notice this may overwrite
# any existing tags defined to those entitiies
extract_endorsements_to_tags: bool = pydantic.Field(
default=False,
description="Whether to extract endorsements to tags, note that this may overwrite existing tags",
)
# Enable/Disable extracting lineage information from PowerBI Native query
native_query_parsing: bool = pydantic.Field(
default=True,
Expand Down
48 changes: 48 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@
DashboardKeyClass,
DatasetLineageTypeClass,
DatasetPropertiesClass,
GlobalTagsClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
StatusClass,
SubTypesClass,
TagAssociationClass,
UpstreamClass,
UpstreamLineageClass,
)
Expand Down Expand Up @@ -238,8 +240,24 @@ def to_datahub_dataset(
if self.__config.extract_lineage is True:
dataset_mcps.extend(self.extract_lineage(table, ds_urn))

self.append_tag_mcp(
dataset_mcps,
ds_urn,
Constant.DATASET,
dataset.tags,
)

return dataset_mcps

@staticmethod
def transform_tags(tags: List[str]) -> GlobalTagsClass:
return GlobalTagsClass(
tags=[
TagAssociationClass(builder.make_tag_urn(tag_to_add))
for tag_to_add in tags
]
)

def to_datahub_chart_mcp(
self, tile: PowerBiAPI.Tile, ds_mcps: List[MetadataChangeProposalWrapper]
) -> List[MetadataChangeProposalWrapper]:
Expand Down Expand Up @@ -421,8 +439,31 @@ def chart_custom_properties(dashboard: PowerBiAPI.Dashboard) -> dict:
if owner_mcp is not None:
list_of_mcps.append(owner_mcp)

self.append_tag_mcp(
list_of_mcps,
dashboard_urn,
Constant.DASHBOARD,
dashboard.tags,
)

return list_of_mcps

def append_tag_mcp(
self,
list_of_mcps: List[MetadataChangeProposalWrapper],
entity_urn: str,
entity_type: str,
tags: List[str],
) -> None:
if self.__config.extract_endorsements_to_tags and tags:
tags_mcp = self.new_mcp(
entity_type=entity_type,
entity_urn=entity_urn,
aspect_name=Constant.GLOBAL_TAGS,
aspect=self.transform_tags(tags),
)
list_of_mcps.append(tags_mcp)

def to_datahub_user(
self, user: PowerBiAPI.User
) -> List[MetadataChangeProposalWrapper]:
Expand Down Expand Up @@ -678,6 +719,13 @@ def report_to_dashboard(
if owner_mcp is not None:
list_of_mcps.append(owner_mcp)

self.append_tag_mcp(
list_of_mcps,
dashboard_urn,
Constant.DASHBOARD,
report.tags,
)

return list_of_mcps

def report_to_datahub_work_units(
Expand Down
79 changes: 77 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class Workspace:
state: str
dashboards: List[Any]
datasets: Dict[str, "PowerBiAPI.PowerBIDataset"]
report_endorsements: Dict[str, List[str]]
dashboard_endorsements: Dict[str, List[str]]

@dataclass
class DataSource:
Expand Down Expand Up @@ -89,6 +91,7 @@ class PowerBIDataset:
workspace_id: str
# Table in datasets
tables: List["PowerBiAPI.Table"]
tags: List[str]

def get_urn_part(self):
return f"datasets.{self.id}"
Expand Down Expand Up @@ -148,6 +151,7 @@ class Report:
dataset: Optional["PowerBiAPI.PowerBIDataset"]
pages: List["PowerBiAPI.Page"]
users: List["PowerBiAPI.User"]
tags: List[str]

def get_urn_part(self):
return f"reports.{self.id}"
Expand Down Expand Up @@ -181,6 +185,7 @@ class Dashboard:
workspace_name: str
tiles: List["PowerBiAPI.Tile"]
users: List["PowerBiAPI.User"]
tags: List[str]

def get_urn_part(self):
return f"dashboards.{self.id}"
Expand Down Expand Up @@ -313,6 +318,7 @@ def _get_report(
description=response_dict.get("description"),
users=[],
pages=[],
tags=[],
dataset=self.get_dataset(
workspace_id=workspace_id, dataset_id=response_dict.get("datasetId")
),
Expand Down Expand Up @@ -356,7 +362,6 @@ def get_dashboard_users(self, dashboard: Dashboard) -> List[User]:
def get_dashboards(self, workspace: Workspace) -> List[Dashboard]:
"""
Get the list of dashboard from PowerBi for the given workspace identifier
TODO: Pagination. As per REST API doc (https://docs.microsoft.com/en-us/rest/api/power-bi/dashboards/get
-dashboards), there is no information available on pagination
"""
Expand Down Expand Up @@ -394,14 +399,48 @@ def get_dashboards(self, workspace: Workspace) -> List[Dashboard]:
workspace_name=workspace.name,
tiles=[],
users=[],
tags=workspace.dashboard_endorsements.get(instance.get("id", None), []),
)
for instance in dashboards_dict
if instance is not None
]

return dashboards

def get_dataset(self, workspace_id: str, dataset_id: str) -> Any:
def get_dashboard_endorsements(self, scan_result: dict) -> Dict[str, List[str]]:
"""
Store saved dashboard endorsements into a dict with dashboard id as key and
endorsements or tags as list of strings
"""
results = {}

for scanned_dashboard in scan_result["dashboards"]:
# Iterate through response and create a list of PowerBiAPI.Dashboard
dashboard_id = scanned_dashboard.get("id")
tags = self.parse_endorsement(
scanned_dashboard.get("endorsementDetails", None)
)
results[dashboard_id] = tags

return results

@staticmethod
def parse_endorsement(endorsements: Optional[dict]) -> List[str]:
if not endorsements:
return []

endorsement = endorsements.get("endorsement", None)
if not endorsement:
return []

return [endorsement]

def get_dataset(
self,
workspace_id: str,
dataset_id: str,
endorsements: Optional[dict] = None,
) -> Any:
"""
Fetch the dataset from PowerBi for the given dataset identifier
"""
Expand Down Expand Up @@ -437,6 +476,10 @@ def get_dataset(self, workspace_id: str, dataset_id: str) -> Any:
logger.debug("datasets = {}".format(response_dict))
# PowerBi Always return the webURL, in-case if it is None then setting complete webURL to None instead of
# None/details
tags = []
if self.__config.extract_endorsements_to_tags:
tags = self.parse_endorsement(endorsements)

return PowerBiAPI.PowerBIDataset(
id=response_dict.get("id"),
name=response_dict.get("name"),
Expand All @@ -445,6 +488,7 @@ def get_dataset(self, workspace_id: str, dataset_id: str) -> Any:
else None,
workspace_id=workspace_id,
tables=[],
tags=tags,
)

def get_data_sources(
Expand Down Expand Up @@ -678,6 +722,9 @@ def get_reports(
workspace_id=workspace.id, entity="reports", _id=raw_instance["id"]
),
dataset=workspace.datasets.get(raw_instance.get("datasetId")),
tags=workspace.report_endorsements.get(
raw_instance.get("id", None), []
),
)
for raw_instance in response_dict["value"]
]
Expand All @@ -704,6 +751,8 @@ def get_workspaces(self):
state="",
datasets={},
dashboards=[],
report_endorsements={},
dashboard_endorsements={},
)
for workspace in groups.get("value", [])
if workspace.get("type", None) == "Workspace"
Expand Down Expand Up @@ -843,6 +892,7 @@ def json_to_dataset_map(scan_result: dict) -> dict:
dataset_instance: PowerBiAPI.PowerBIDataset = self.get_dataset(
workspace_id=scan_result["id"],
dataset_id=dataset_dict["id"],
endorsements=dataset_dict.get("endorsementDetails", None),
)
dataset_map[dataset_instance.id] = dataset_instance
# set dataset-name
Expand Down Expand Up @@ -877,6 +927,20 @@ def init_dashboard_tiles(workspace: PowerBiAPI.Workspace) -> None:

return None

def scan_result_to_report_endorsements(
scan_result: dict,
) -> Dict[str, List[str]]:
results = {}
reports: List[dict] = scan_result.get("reports", [])

for report in reports:
report_id = report.get("id", "")
endorsements = self.parse_endorsement(
report.get("endorsementDetails", None)
)
results[report_id] = endorsements
return results

logger.info("Creating scan job for workspace")
logger.info("{}={}".format(Constant.WorkspaceId, workspace_id))
logger.debug("Hitting URL={}".format(scan_create_endpoint))
Expand All @@ -902,7 +966,18 @@ def init_dashboard_tiles(workspace: PowerBiAPI.Workspace) -> None:
state=scan_result["state"],
datasets={},
dashboards=[],
report_endorsements={},
dashboard_endorsements={},
)

if self.__config.extract_endorsements_to_tags:
workspace.dashboard_endorsements = self.get_dashboard_endorsements(
scan_result
)
workspace.report_endorsements = scan_result_to_report_endorsements(
scan_result
)

# Get workspace dashboards
workspace.dashboards = self.get_dashboards(workspace)

Expand Down
Loading

0 comments on commit 87b3a5d

Please sign in to comment.