Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): looker test connection #5768

Merged
merged 5 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import os
from functools import lru_cache
from typing import Dict, List, MutableMapping, Optional, Sequence, Union, cast
from typing import Dict, List, MutableMapping, Optional, Sequence, Set, Union, cast

import looker_sdk
from looker_sdk.error import SDKError
Expand Down Expand Up @@ -77,7 +77,7 @@ def __init__(self, config: LookerAPIConfig) -> None:
# try authenticating current user to check connectivity
# (since it's possible to initialize an invalid client without any complaints)
try:
self.client.me(
self.me = self.client.me(
transport_options=self.transport_options
if config.transport_options is not None
else None
Expand All @@ -94,6 +94,19 @@ def __fields_mapper(fields: Union[str, List[str]]) -> str:
"""Helper method to turn single string or list of fields into Looker API compatible fields param"""
return fields if isinstance(fields, str) else ",".join(fields)

def get_available_permissions(self) -> Set[str]:
user_id = self.me.id
assert user_id

roles = self.client.user_roles(user_id)

permissions: Set[str] = set()
for role in roles:
if role.permission_set and role.permission_set.permissions:
permissions.update(role.permission_set.permissions)

return permissions

@lru_cache(maxsize=2000)
def get_user(self, id_: int, user_fields: str) -> Optional[User]:
self.client_stats.user_calls += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
MutableMapping,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
Expand All @@ -27,11 +28,18 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import (
CapabilityReport,
SourceCapability,
SourceReport,
TestableSource,
TestConnectionReport,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.looker import looker_usage
from datahub.ingestion.source.looker.looker_common import (
Expand Down Expand Up @@ -138,7 +146,15 @@ def platform_instance_not_supported(cls, v: str) -> str:
@platform_name("Looker")
@support_status(SupportStatus.CERTIFIED)
@config_class(LookerDashboardSourceConfig)
class LookerDashboardSource(Source):
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default")
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(
SourceCapability.OWNERSHIP, "Enabled by default, configured using `extract_owners`"
)
@capability(
SourceCapability.USAGE_STATS, "Can be enabled using `extract_usage_history`"
)
class LookerDashboardSource(TestableSource):
"""
This plugin extracts the following:
- Looker dashboards, dashboard elements (charts) and explores
Expand Down Expand Up @@ -190,6 +206,90 @@ def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext):
config=stat_generator_config,
)

@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
test_report = TestConnectionReport()
try:
self = cast(
LookerDashboardSource,
LookerDashboardSource.create(
config_dict, PipelineContext("looker-test-connection")
),
)
test_report.basic_connectivity = CapabilityReport(capable=True)
test_report.capability_report = {}

permissions = self.looker_api.get_available_permissions()

BASIC_INGEST_REQUIRED_PERMISSIONS = {
# TODO: Make this a bit more granular.
"access_data",
"explore",
"manage_models",
"see_datagroups",
"see_lookml",
"see_lookml_dashboards",
"see_looks",
"see_pdts",
"see_queries",
"see_schedules",
"see_sql",
"see_user_dashboards",
"see_users",
}

USAGE_INGEST_REQUIRED_PERMISSIONS = {
"see_system_activity",
}

LookerDashboardSource._set_test_connection_capability(
test_report,
permissions,
SourceCapability.DESCRIPTIONS,
BASIC_INGEST_REQUIRED_PERMISSIONS,
)
LookerDashboardSource._set_test_connection_capability(
test_report,
permissions,
SourceCapability.OWNERSHIP,
BASIC_INGEST_REQUIRED_PERMISSIONS,
)
LookerDashboardSource._set_test_connection_capability(
test_report,
permissions,
SourceCapability.USAGE_STATS,
USAGE_INGEST_REQUIRED_PERMISSIONS,
)
except Exception as e:
logger.exception(f"Failed to test connection due to {e}")
test_report.internal_failure = True
test_report.internal_failure_reason = f"{e}"

if test_report.basic_connectivity is None:
test_report.basic_connectivity = CapabilityReport(
capable=False, failure_reason=f"{e}"
)

return test_report

@staticmethod
def _set_test_connection_capability(
test_report: TestConnectionReport,
permissions: Set[str],
perm: SourceCapability,
required: Set[str],
) -> None:
assert test_report.capability_report is not None

if required.issubset(permissions):
test_report.capability_report[perm] = CapabilityReport(capable=True)
else:
missing = required - permissions
test_report.capability_report[perm] = CapabilityReport(
capable=False,
error_message=f"Missing required permissions: {', '.join(missing)}",
)

@staticmethod
def _extract_view_from_field(field: str) -> str:
assert (
Expand Down