Skip to content

Commit

Permalink
feat(ingest): looker - add test connection (#5768)
Browse files Browse the repository at this point in the history
Co-authored-by: Shirshanka Das <[email protected]>
  • Loading branch information
hsheth2 and shirshanka authored Sep 6, 2022
1 parent bee27e8 commit 7e16ce0
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
from functools import lru_cache
from typing import Dict, List, MutableMapping, Optional, Sequence, Union, cast
from typing import Dict, List, MutableMapping, Optional, Sequence, Set, Union, cast

import looker_sdk
from looker_sdk.error import SDKError
Expand Down Expand Up @@ -77,7 +77,7 @@ def __init__(self, config: LookerAPIConfig) -> None:
# try authenticating current user to check connectivity
# (since it's possible to initialize an invalid client without any complaints)
try:
self.client.me(
self.me = self.client.me(
transport_options=self.transport_options
if config.transport_options is not None
else None
Expand All @@ -94,6 +94,19 @@ def __fields_mapper(fields: Union[str, List[str]]) -> str:
"""Helper method to turn single string or list of fields into Looker API compatible fields param"""
return fields if isinstance(fields, str) else ",".join(fields)

def get_available_permissions(self) -> Set[str]:
user_id = self.me.id
assert user_id

roles = self.client.user_roles(user_id)

permissions: Set[str] = set()
for role in roles:
if role.permission_set and role.permission_set.permissions:
permissions.update(role.permission_set.permissions)

return permissions

@lru_cache(maxsize=2000)
def get_user(self, id_: int, user_fields: str) -> Optional[User]:
self.client_stats.user_calls += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
MutableMapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
Expand All @@ -27,11 +28,18 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import (
CapabilityReport,
SourceCapability,
SourceReport,
TestableSource,
TestConnectionReport,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.looker import looker_usage
from datahub.ingestion.source.looker.looker_common import (
Expand Down Expand Up @@ -138,7 +146,15 @@ def platform_instance_not_supported(cls, v: str) -> str:
@platform_name("Looker")
@support_status(SupportStatus.CERTIFIED)
@config_class(LookerDashboardSourceConfig)
class LookerDashboardSource(Source):
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default")
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(
SourceCapability.OWNERSHIP, "Enabled by default, configured using `extract_owners`"
)
@capability(
SourceCapability.USAGE_STATS, "Can be enabled using `extract_usage_history`"
)
class LookerDashboardSource(TestableSource):
"""
This plugin extracts the following:
- Looker dashboards, dashboard elements (charts) and explores
Expand Down Expand Up @@ -190,6 +206,90 @@ def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext):
config=stat_generator_config,
)

@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
test_report = TestConnectionReport()
try:
self = cast(
LookerDashboardSource,
LookerDashboardSource.create(
config_dict, PipelineContext("looker-test-connection")
),
)
test_report.basic_connectivity = CapabilityReport(capable=True)
test_report.capability_report = {}

permissions = self.looker_api.get_available_permissions()

BASIC_INGEST_REQUIRED_PERMISSIONS = {
# TODO: Make this a bit more granular.
"access_data",
"explore",
"manage_models",
"see_datagroups",
"see_lookml",
"see_lookml_dashboards",
"see_looks",
"see_pdts",
"see_queries",
"see_schedules",
"see_sql",
"see_user_dashboards",
"see_users",
}

USAGE_INGEST_REQUIRED_PERMISSIONS = {
"see_system_activity",
}

LookerDashboardSource._set_test_connection_capability(
test_report,
permissions,
SourceCapability.DESCRIPTIONS,
BASIC_INGEST_REQUIRED_PERMISSIONS,
)
LookerDashboardSource._set_test_connection_capability(
test_report,
permissions,
SourceCapability.OWNERSHIP,
BASIC_INGEST_REQUIRED_PERMISSIONS,
)
LookerDashboardSource._set_test_connection_capability(
test_report,
permissions,
SourceCapability.USAGE_STATS,
USAGE_INGEST_REQUIRED_PERMISSIONS,
)
except Exception as e:
logger.exception(f"Failed to test connection due to {e}")
test_report.internal_failure = True
test_report.internal_failure_reason = f"{e}"

if test_report.basic_connectivity is None:
test_report.basic_connectivity = CapabilityReport(
capable=False, failure_reason=f"{e}"
)

return test_report

@staticmethod
def _set_test_connection_capability(
test_report: TestConnectionReport,
permissions: Set[str],
perm: SourceCapability,
required: Set[str],
) -> None:
assert test_report.capability_report is not None

if required.issubset(permissions):
test_report.capability_report[perm] = CapabilityReport(capable=True)
else:
missing = required - permissions
test_report.capability_report[perm] = CapabilityReport(
capable=False,
error_message=f"Missing required permissions: {', '.join(missing)}",
)

@staticmethod
def _extract_view_from_field(field: str) -> str:
assert (
Expand Down

0 comments on commit 7e16ce0

Please sign in to comment.