Skip to content

Commit

Permalink
feat(ingest): add platform instance to tableau (datahub-project#5978)
Browse files Browse the repository at this point in the history
Co-authored-by: Arne Laponin <[email protected]>
Co-authored-by: Amanda Hernando <[email protected]>
Co-authored-by: amanda-her <[email protected]>
Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
5 people authored and cccs-tom committed Nov 18, 2022
1 parent 0f5d1a7 commit f32ecd8
Show file tree
Hide file tree
Showing 7 changed files with 33,098 additions and 54 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/docs/sources/tableau/tableau_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ source:
# Coordinates
connect_uri: https://prod-ca-a.online.tableau.com
site: acryl
platform_instance: acryl_instance
projects: ["default", "Project 2"]

# Credentials
Expand Down
18 changes: 14 additions & 4 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,14 @@ def make_data_job_urn(
)


def make_dashboard_urn(platform: str, name: str) -> str:
def make_dashboard_urn(
platform: str, name: str, platform_instance: Optional[str] = None
) -> str:
# FIXME: dashboards don't currently include data platform urn prefixes.
return f"urn:li:dashboard:({platform},{name})"
if platform_instance:
return f"urn:li:dashboard:({platform},{platform_instance}.{name})"
else:
return f"urn:li:dashboard:({platform},{name})"


def dashboard_urn_to_key(dashboard_urn: str) -> Optional[DashboardKeyClass]:
Expand All @@ -224,9 +229,14 @@ def dashboard_urn_to_key(dashboard_urn: str) -> Optional[DashboardKeyClass]:
return None


def make_chart_urn(platform: str, name: str) -> str:
def make_chart_urn(
platform: str, name: str, platform_instance: Optional[str] = None
) -> str:
# FIXME: charts don't currently include data platform urn prefixes.
return f"urn:li:chart:({platform},{name})"
if platform_instance:
return f"urn:li:chart:({platform},{platform_instance}.{name})"
else:
return f"urn:li:chart:({platform},{name})"


def chart_urn_to_key(chart_urn: str) -> Optional[ChartKeyClass]:
Expand Down
74 changes: 52 additions & 22 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
ChartUsageStatisticsClass,
DashboardInfoClass,
DashboardUsageStatisticsClass,
DataPlatformInstanceClass,
DatasetPropertiesClass,
OwnerClass,
OwnershipClass,
Expand Down Expand Up @@ -143,6 +144,10 @@ class TableauConnectionConfig(ConfigModel):
default="",
description="Tableau Site. Always required for Tableau Online. Use emptystring to connect with Default site on Tableau Server.",
)
platform_instance: Optional[str] = Field(
default=None,
description="Unique relationship between the Tableau Server and site",
)

ssl_verify: Union[bool, str] = Field(
default=True,
Expand Down Expand Up @@ -249,11 +254,7 @@ class UsageStat:
@platform_name("Tableau")
@config_class(TableauConfig)
@support_status(SupportStatus.INCUBATING)
@capability(
SourceCapability.PLATFORM_INSTANCE,
"Not applicable to source",
supported=False,
)
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.DOMAINS, "Requires transformer", supported=False)
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default")
@capability(
Expand Down Expand Up @@ -333,6 +334,16 @@ def _authenticate(self):
reason=str(e),
)

def get_data_platform_instance(self) -> DataPlatformInstanceClass:
return DataPlatformInstanceClass(
platform=builder.make_data_platform_urn(self.platform),
instance=builder.make_dataplatform_instance_urn(
self.platform, self.config.platform_instance
)
if self.config.platform_instance
else None,
)

def get_connection_object_page(
self,
query: str,
Expand Down Expand Up @@ -444,8 +455,11 @@ def _create_upstream_table_lineage(
if ds["id"] not in self.datasource_ids_being_used:
self.datasource_ids_being_used.append(ds["id"])

datasource_urn = builder.make_dataset_urn(
self.platform, ds["id"], self.config.env
datasource_urn = builder.make_dataset_urn_with_platform_instance(
platform=self.platform,
name=ds["id"],
platform_instance=self.config.platform_instance,
env=self.config.env,
)
upstream_table = UpstreamClass(
dataset=datasource_urn,
Expand Down Expand Up @@ -550,10 +564,15 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:

for csql in unique_custom_sql:
csql_id: str = csql["id"]
csql_urn = builder.make_dataset_urn(self.platform, csql_id, self.config.env)
csql_urn = builder.make_dataset_urn_with_platform_instance(
platform=self.platform,
name=csql_id,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
dataset_snapshot = DatasetSnapshot(
urn=csql_urn,
aspects=[],
aspects=[self.get_data_platform_instance()],
)

datasource_name = None
Expand Down Expand Up @@ -666,8 +685,11 @@ def _create_lineage_from_csql_datasource(
self, csql_urn: str, csql_datasource: List[dict]
) -> Iterable[MetadataWorkUnit]:
for datasource in csql_datasource:
datasource_urn = builder.make_dataset_urn(
self.platform, datasource.get("id", ""), self.config.env
datasource_urn = builder.make_dataset_urn_with_platform_instance(
self.platform,
datasource.get("id", ""),
self.config.platform_instance,
self.config.env,
)
upstream_csql = UpstreamClass(
dataset=csql_urn,
Expand Down Expand Up @@ -804,15 +826,15 @@ def emit_datasource(
else ""
)
datasource_id = datasource["id"]
datasource_urn = builder.make_dataset_urn(
self.platform, datasource_id, self.config.env
datasource_urn = builder.make_dataset_urn_with_platform_instance(
self.platform, datasource_id, self.config.platform_instance, self.config.env
)
if datasource_id not in self.datasource_ids_being_used:
self.datasource_ids_being_used.append(datasource_id)

dataset_snapshot = DatasetSnapshot(
urn=datasource_urn,
aspects=[],
aspects=[self.get_data_platform_instance()],
)

datasource_name = datasource.get("name") or datasource_id
Expand Down Expand Up @@ -1012,10 +1034,12 @@ def _get_chart_stat_wu(

def emit_sheets_as_charts(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
for sheet in workbook.get("sheets", []):
sheet_urn: str = builder.make_chart_urn(self.platform, sheet.get("id"))
sheet_urn: str = builder.make_chart_urn(
self.platform, sheet.get("id"), self.config.platform_instance
)
chart_snapshot = ChartSnapshot(
urn=sheet_urn,
aspects=[],
aspects=[self.get_data_platform_instance()],
)

creator: Optional[str] = workbook["owner"].get("username")
Expand Down Expand Up @@ -1051,7 +1075,9 @@ def emit_sheets_as_charts(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
data_sources = self.get_sheetwise_upstream_datasources(sheet)

for ds_id in data_sources:
ds_urn = builder.make_dataset_urn(self.platform, ds_id, self.config.env)
ds_urn = builder.make_dataset_urn_with_platform_instance(
self.platform, ds_id, self.config.platform_instance, self.config.env
)
datasource_urn.append(ds_urn)
if ds_id not in self.datasource_ids_being_used:
self.datasource_ids_being_used.append(ds_id)
Expand Down Expand Up @@ -1156,7 +1182,9 @@ def emit_workbook_as_container(self, workbook: Dict) -> Iterable[MetadataWorkUni

def gen_workbook_key(self, workbook):
return WorkbookKey(
platform=self.platform, instance=None, workbook_id=workbook["id"]
platform=self.platform,
instance=self.config.platform_instance,
workbook_id=workbook["id"],
)

@staticmethod
Expand Down Expand Up @@ -1208,11 +1236,11 @@ def _get_dashboard_stat_wu(
def emit_dashboards(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
for dashboard in workbook.get("dashboards", []):
dashboard_urn: str = builder.make_dashboard_urn(
self.platform, dashboard["id"]
self.platform, dashboard["id"], self.config.platform_instance
)
dashboard_snapshot = DashboardSnapshot(
urn=dashboard_urn,
aspects=[],
aspects=[self.get_data_platform_instance()],
)

creator = workbook.get("owner", {}).get("username", "")
Expand All @@ -1228,7 +1256,9 @@ def emit_dashboards(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
else ""
)
chart_urns = [
builder.make_chart_urn(self.platform, sheet.get("id"))
builder.make_chart_urn(
self.platform, sheet.get("id"), self.config.platform_instance
)
for sheet in dashboard.get("sheets", [])
]
dashboard_info_class = DashboardInfoClass(
Expand Down Expand Up @@ -1392,4 +1422,4 @@ def get_report(self) -> StaleEntityRemovalSourceReport:
return self.report

def get_platform_instance_id(self) -> str:
return self.platform
return self.config.platform_instance or self.platform
Loading

0 comments on commit f32ecd8

Please sign in to comment.