diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py index 2747e118078152..66f6e302dce6c5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py @@ -3,7 +3,7 @@ import logging import os from functools import lru_cache -from typing import Dict, List, MutableMapping, Optional, Sequence, Union, cast +from typing import Dict, List, MutableMapping, Optional, Sequence, Set, Union, cast import looker_sdk from looker_sdk.error import SDKError @@ -77,7 +77,7 @@ def __init__(self, config: LookerAPIConfig) -> None: # try authenticating current user to check connectivity # (since it's possible to initialize an invalid client without any complaints) try: - self.client.me( + self.me = self.client.me( transport_options=self.transport_options if config.transport_options is not None else None @@ -94,6 +94,19 @@ def __fields_mapper(fields: Union[str, List[str]]) -> str: """Helper method to turn single string or list of fields into Looker API compatible fields param""" return fields if isinstance(fields, str) else ",".join(fields) + def get_available_permissions(self) -> Set[str]: + user_id = self.me.id + assert user_id + + roles = self.client.user_roles(user_id) + + permissions: Set[str] = set() + for role in roles: + if role.permission_set and role.permission_set.permissions: + permissions.update(role.permission_set.permissions) + + return permissions + @lru_cache(maxsize=2000) def get_user(self, id_: int, user_fields: str) -> Optional[User]: self.client_stats.user_calls += 1 diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index 58a8e8302dc99f..a36fab38c6db69 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -12,6 +12,7 @@ MutableMapping, Optional, Sequence, + Set, Tuple, Union, cast, @@ -27,11 +28,18 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SupportStatus, + capability, config_class, platform_name, support_status, ) -from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.source import ( + CapabilityReport, + SourceCapability, + SourceReport, + TestableSource, + TestConnectionReport, +) from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.looker import looker_usage from datahub.ingestion.source.looker.looker_common import ( @@ -138,7 +146,15 @@ def platform_instance_not_supported(cls, v: str) -> str: @platform_name("Looker") @support_status(SupportStatus.CERTIFIED) @config_class(LookerDashboardSourceConfig) -class LookerDashboardSource(Source): +@capability(SourceCapability.DESCRIPTIONS, "Enabled by default") +@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default") +@capability( + SourceCapability.OWNERSHIP, "Enabled by default, configured using `extract_owners`" +) +@capability( + SourceCapability.USAGE_STATS, "Can be enabled using `extract_usage_history`" +) +class LookerDashboardSource(TestableSource): """ This plugin extracts the following: - Looker dashboards, dashboard elements (charts) and explores @@ -190,6 +206,90 @@ def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext): config=stat_generator_config, ) + @staticmethod + def test_connection(config_dict: dict) -> TestConnectionReport: + test_report = TestConnectionReport() + try: + self = cast( + LookerDashboardSource, + LookerDashboardSource.create( + config_dict, PipelineContext("looker-test-connection") + ), + ) + test_report.basic_connectivity = CapabilityReport(capable=True) + test_report.capability_report = {} + + permissions = self.looker_api.get_available_permissions() + + BASIC_INGEST_REQUIRED_PERMISSIONS = { + # TODO: Make this a bit more granular. + "access_data", + "explore", + "manage_models", + "see_datagroups", + "see_lookml", + "see_lookml_dashboards", + "see_looks", + "see_pdts", + "see_queries", + "see_schedules", + "see_sql", + "see_user_dashboards", + "see_users", + } + + USAGE_INGEST_REQUIRED_PERMISSIONS = { + "see_system_activity", + } + + LookerDashboardSource._set_test_connection_capability( + test_report, + permissions, + SourceCapability.DESCRIPTIONS, + BASIC_INGEST_REQUIRED_PERMISSIONS, + ) + LookerDashboardSource._set_test_connection_capability( + test_report, + permissions, + SourceCapability.OWNERSHIP, + BASIC_INGEST_REQUIRED_PERMISSIONS, + ) + LookerDashboardSource._set_test_connection_capability( + test_report, + permissions, + SourceCapability.USAGE_STATS, + USAGE_INGEST_REQUIRED_PERMISSIONS, + ) + except Exception as e: + logger.exception(f"Failed to test connection due to {e}") + test_report.internal_failure = True + test_report.internal_failure_reason = f"{e}" + + if test_report.basic_connectivity is None: + test_report.basic_connectivity = CapabilityReport( + capable=False, failure_reason=f"{e}" + ) + + return test_report + + @staticmethod + def _set_test_connection_capability( + test_report: TestConnectionReport, + permissions: Set[str], + perm: SourceCapability, + required: Set[str], + ) -> None: + assert test_report.capability_report is not None + + if required.issubset(permissions): + test_report.capability_report[perm] = CapabilityReport(capable=True) + else: + missing = required - permissions + test_report.capability_report[perm] = CapabilityReport( + capable=False, + error_message=f"Missing required permissions: {', '.join(missing)}", + ) + @staticmethod def _extract_view_from_field(field: str) -> str: assert (