Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
mmmeeedddsss authored Nov 28, 2022
2 parents 63671d4 + 6eb63c2 commit 4155fda
Show file tree
Hide file tree
Showing 13 changed files with 540 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const LoadingSubheader = styled.div`
display: flex;
justify-content: center;
font-size: 12px;
color: ${ANTD_GRAY[7]};
`;

const LoadingHeader = styled(Typography.Title)`
Expand Down Expand Up @@ -53,12 +54,11 @@ const ModalHeader = styled.div`
display: flex;
padding: 10px 10px 0 10px;
padding: 5px;
font-size: 20px;
font-size: 18px;
`;

const SourceIcon = styled.img`
height: 22px;
width: 22px;
margin-right: 10px;
`;

Expand Down Expand Up @@ -115,7 +115,7 @@ function TestConnectionModal({
{isLoading && (
<ResultsWrapper>
<LoadingHeader level={4}>Testing your connection...</LoadingHeader>
<LoadingSubheader>This could take a few minutes</LoadingSubheader>
<LoadingSubheader>This could take a few minutes.</LoadingSubheader>
<LoadingWrapper>
<LoadingSvg height={100} width={100} />
</LoadingWrapper>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,4 @@ export const RECIPE_FIELDS: RecipeFields = {

export const CONNECTORS_WITH_FORM = new Set(Object.keys(RECIPE_FIELDS));

export const CONNECTORS_WITH_TEST_CONNECTION = new Set([SNOWFLAKE, LOOKER, BIGQUERY_BETA]);
export const CONNECTORS_WITH_TEST_CONNECTION = new Set([SNOWFLAKE, LOOKER, BIGQUERY_BETA, BIGQUERY]);
4 changes: 2 additions & 2 deletions datahub-web-react/src/app/ingest/source/builder/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@
{
"urn": "urn:li:dataPlatform:custom",
"name": "custom",
"displayName": "Custom",
"displayName": "Other",
"docsUrl": "https://datahubproject.io/docs/metadata-ingestion/",
"recipe": "source:\n type: <source-type>\n config:\n # Source-type specifics config\n <source-configs>"
}
]
]
2 changes: 1 addition & 1 deletion datahub-web-react/src/app/ingest/source/conf/sources.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export const SOURCE_TEMPLATE_CONFIGS: Array<SourceConfig> = [
{
type: 'custom',
placeholderRecipe: DEFAULT_PLACEHOLDER_RECIPE,
displayName: 'Custom',
displayName: 'Other',
docsUrl: 'https://datahubproject.io/docs/metadata-ingestion/',
logoComponent: <FormOutlined style={{ color: ANTD_GRAY[8], fontSize: 28 }} />,
},
Expand Down
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

### Deprecations

- In PowerBI source `workspace_id_pattern` is recommended over deprecated `workspace_id`.

### Other notable Changes

## 0.9.2
Expand Down
5 changes: 4 additions & 1 deletion metadata-ingestion/docs/sources/powerbi/powerbi_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ source:
# Your Power BI tenant identifier
tenant_id: a949d688-67c0-4bf1-a344-e939411c6c0a
# Ingest elements of below PowerBi Workspace into Datahub
workspace_id: 4bd10256-e999-45dd-8e56-571c77153a5f
workspace_id_pattern:
allow:
- 4bd10256-e999-45dd-8e56-571c77153a5f
deny:
# Workspaces dataset environments (PROD, DEV, QA, STAGE)
env: DEV
# Azure AD Application identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,12 +492,19 @@ def get_workunits(self) -> Iterable[WorkUnit]:
conn: bigquery.Client = self.get_bigquery_client()
self.add_config_to_report()

projects: List[BigqueryProject] = BigQueryDataDictionary.get_projects(conn)
if len(projects) == 0:
logger.warning(
"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account."
projects: List[BigqueryProject]
if self.config.project_id:
project = BigqueryProject(
id=self.config.project_id, name=self.config.project_id
)
return
projects = [project]
else:
projects = BigQueryDataDictionary.get_projects(conn)
if len(projects) == 0:
logger.warning(
"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account."
)
return

for project_id in projects:
if not self.config.project_id_pattern.allowed(project_id.id):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig):
# The inheritance hierarchy is wonky here, but these options need modifications.
project_id: Optional[str] = Field(
default=None,
description="[deprecated] Use project_id_pattern instead.",
description="[deprecated] Use project_id_pattern instead. You can use this property if you only want to ingest one project and don't want to give project resourcemanager.projects.list to your service account",
)
storage_project_id: None = Field(default=None, hidden_from_schema=True)

Expand Down Expand Up @@ -97,14 +97,13 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict:

if project_id_pattern == AllowDenyPattern.allow_all() and project_id:
logging.warning(
"project_id_pattern is not set but project_id is set, setting project_id as project_id_pattern. project_id will be deprecated, please use project_id_pattern instead."
"project_id_pattern is not set but project_id is set, source will only ingest the project_id project. project_id will be deprecated, please use project_id_pattern instead."
)
values["project_id_pattern"] = AllowDenyPattern(allow=[f"^{project_id}$"])
elif project_id_pattern != AllowDenyPattern.allow_all() and project_id:
logging.warning(
"project_id will be ignored in favour of project_id_pattern. project_id will be deprecated, please use project_id only."
"use project_id_pattern whenever possible. project_id will be deprecated, please use project_id_pattern only if possible."
)
values.pop("project_id")

dataset_pattern = values.get("dataset_pattern")
schema_pattern = values.get("schema_pattern")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ def get_workunits(

request = cast(BigqueryProfilerRequest, request)
profile.sizeInBytes = request.table.size_in_bytes
# If table is partitioned we profile only one partition (if nothing set then the last one)
# but for table level we can use the rows_count from the table metadata
# This way even though column statistics only reflects one partition data but the rows count
# shows the proper count.
if profile.partitionSpec and profile.partitionSpec.partition:
profile.rowCount = request.table.rows_count

dataset_name = request.pretty_name
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
Expand Down Expand Up @@ -250,7 +257,10 @@ def get_bigquery_profile_request(
profile_request = BigqueryProfilerRequest(
pretty_name=dataset_name,
batch_kwargs=dict(
schema=project, table=f"{dataset}.{table.name}", custom_sql=custom_sql
schema=project,
table=f"{dataset}.{table.name}",
custom_sql=custom_sql,
partition=partition,
),
table=table,
profile_table_level_only=profile_table_level_only,
Expand Down
129 changes: 97 additions & 32 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
import msal
import pydantic
import requests
from pydantic import root_validator

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigurationError
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.source_common import EnvBasedSourceConfigBase
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
Expand Down Expand Up @@ -111,7 +112,15 @@ class PowerBiAPIConfig(EnvBasedSourceConfigBase):
# Organsation Identifier
tenant_id: str = pydantic.Field(description="PowerBI tenant identifier")
# PowerBi workspace identifier
workspace_id: str = pydantic.Field(description="PowerBI workspace identifier")
workspace_id_pattern: AllowDenyPattern = pydantic.Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns to filter PowerBI workspaces in ingestion",
)
workspace_id: str = pydantic.Field(
description="[deprecated] Use workspace_id_pattern instead",
default=None,
)

# Dataset type mapping
dataset_type_mapping: Dict[str, str] = pydantic.Field(
description="Mapping of PowerBI datasource type to DataHub supported data-sources. See Quickstart Recipe for mapping"
Expand All @@ -133,6 +142,25 @@ class PowerBiAPIConfig(EnvBasedSourceConfigBase):
default=True, description="Whether reports should be ingested"
)

@root_validator(pre=False)
def workspace_id_backward_compatibility(cls, values: Dict) -> Dict:
workspace_id = values.get("workspace_id")
workspace_id_pattern = values.get("workspace_id_pattern")

if workspace_id_pattern == AllowDenyPattern.allow_all() and workspace_id:
logging.warning(
"workspace_id_pattern is not set but workspace_id is set, setting workspace_id as workspace_id_pattern. workspace_id will be deprecated, please use workspace_id_pattern instead."
)
values["workspace_id_pattern"] = AllowDenyPattern(
allow=[f"^{workspace_id}$"]
)
elif workspace_id_pattern != AllowDenyPattern.allow_all() and workspace_id:
logging.warning(
"workspace_id will be ignored in favour of workspace_id_pattern. workspace_id will be deprecated, please use workspace_id_pattern only."
)
values.pop("workspace_id")
return values


class PowerBiDashboardSourceConfig(PowerBiAPIConfig):
platform_name: str = "powerbi"
Expand Down Expand Up @@ -576,6 +604,33 @@ def get_dataset(self, workspace_id: str, dataset_id: str) -> Any:
datasource=None,
)

def get_groups(self):
# Replace place holders
dataset_query_endpoint = PowerBiAPI.BASE_URL
# Hit PowerBi
LOGGER.info(f"Request to get groups endpoint URL={dataset_query_endpoint}")
response = requests.get(
dataset_query_endpoint,
headers={Constant.Authorization: self.get_access_token()},
)
response.raise_for_status()
return response.json()

def get_workspaces(self):
groups = self.get_groups()
workspaces = [
PowerBiAPI.Workspace(
id=workspace.get("id"),
name=workspace.get("name"),
state="",
datasets={},
dashboards=[],
)
for workspace in groups.get("value", [])
if workspace.get("type", None) == "Workspace"
]
return workspaces

def get_data_source(self, dataset: Dataset) -> Any:
"""
Fetch the data source from PowerBi for the given dataset
Expand Down Expand Up @@ -1698,43 +1753,53 @@ def create(cls, config_dict, ctx):
config = PowerBiDashboardSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workspace_ids(self) -> Iterable[str]:
all_workspaces = self.powerbi_client.get_workspaces()
return [
workspace.id
for workspace in all_workspaces
if self.source_config.workspace_id_pattern.allowed(workspace.id)
]

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
"""
Datahub Ingestion framework invoke this method
"""
LOGGER.info("PowerBi plugin execution is started")

# Fetch PowerBi workspace for given workspace identifier
workspace = self.powerbi_client.get_workspace(self.source_config.workspace_id)

for dashboard in workspace.dashboards:

try:
# Fetch PowerBi users for dashboards
dashboard.users = self.powerbi_client.get_dashboard_users(dashboard)
# Increase dashboard and tiles count in report
self.reporter.report_dashboards_scanned()
self.reporter.report_charts_scanned(count=len(dashboard.tiles))
except Exception as e:
message = f"Error ({e}) occurred while loading dashboard {dashboard.displayName}(id={dashboard.id}) tiles."

LOGGER.exception(message, e)
self.reporter.report_warning(dashboard.id, message)
# Convert PowerBi Dashboard and child entities to Datahub work unit to ingest into Datahub
workunits = self.mapper.to_datahub_work_units(dashboard)
for workunit in workunits:
# Add workunit to report
self.reporter.report_workunit(workunit)
# Return workunit to Datahub Ingestion framework
yield workunit

if self.source_config.extract_reports:
for report in self.powerbi_client.get_reports(workspace=workspace):
for work_unit in self.mapper.report_to_datahub_work_units(
report, workspace
):
self.reporter.report_workunit(work_unit)
yield work_unit
for workspace_id in self.get_workspace_ids():
LOGGER.info(f"Scanning workspace id: {workspace_id}")
workspace = self.powerbi_client.get_workspace(workspace_id)

for dashboard in workspace.dashboards:

try:
# Fetch PowerBi users for dashboards
dashboard.users = self.powerbi_client.get_dashboard_users(dashboard)
# Increase dashboard and tiles count in report
self.reporter.report_dashboards_scanned()
self.reporter.report_charts_scanned(count=len(dashboard.tiles))
except Exception as e:
message = f"Error ({e}) occurred while loading dashboard {dashboard.displayName}(id={dashboard.id}) tiles."

LOGGER.exception(message, e)
self.reporter.report_warning(dashboard.id, message)
# Convert PowerBi Dashboard and child entities to Datahub work unit to ingest into Datahub
workunits = self.mapper.to_datahub_work_units(dashboard)
for workunit in workunits:
# Add workunit to report
self.reporter.report_workunit(workunit)
# Return workunit to Datahub Ingestion framework
yield workunit

if self.source_config.extract_reports:
for report in self.powerbi_client.get_reports(workspace=workspace):
for work_unit in self.mapper.report_to_datahub_work_units(
report, workspace
):
self.reporter.report_workunit(work_unit)
yield work_unit

def get_report(self) -> SourceReport:
return self.reporter
Loading

0 comments on commit 4155fda

Please sign in to comment.