Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingest): use pydantic utilities for NamingPattern #6013

Merged
merged 3 commits into from
Oct 5, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 95 additions & 119 deletions metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from __future__ import print_function

import dataclasses
import datetime
import logging
import re
from dataclasses import dataclass, field as dataclasses_field
from enum import Enum
from functools import lru_cache
from typing import Dict, Iterable, List, Optional, Tuple, Union
from typing import ClassVar, Dict, Iterable, List, Optional, Tuple, Union

import pydantic
from looker_sdk.error import SDKError
from looker_sdk.sdk.api31.models import User, WriteQuery
from pydantic import BaseModel, Field
from pydantic import Field
from pydantic.class_validators import validator

import datahub.emitter.mce_builder as builder
Expand Down Expand Up @@ -70,94 +71,92 @@
logger = logging.getLogger(__name__)


# @dataclass
class NamingPattern(BaseModel):
allowed_vars: List[str]
class NamingPattern(ConfigModel):
ALLOWED_VARS: ClassVar[List[str]] = []
REQUIRE_AT_LEAST_ONE_VAR: ClassVar[bool] = True

pattern: str
variables: Optional[List[str]] = None

@classmethod
def __get_validators__(cls):
yield cls.pydantic_accept_raw_pattern
yield cls.validate
yield cls.pydantic_validate_pattern

@classmethod
def pydantic_accept_raw_pattern(cls, v):
if isinstance(v, (NamingPattern, dict)):
return v
assert isinstance(v, str), "pattern must be a string"
return {"pattern": v}

@classmethod
def pydantic_validate_pattern(cls, v):
assert isinstance(v, NamingPattern)
assert v.validate_pattern(cls.REQUIRE_AT_LEAST_ONE_VAR)
return v

@classmethod
def allowed_docstring(cls) -> str:
return f"Allowed variables are {cls.ALLOWED_VARS}"

def validate_pattern(self, at_least_one: bool) -> bool:
variables = re.findall("({[^}{]+})", self.pattern)
self.variables = [v[1:-1] for v in variables]

variables = [v[1:-1] for v in variables] # remove the {}

for v in variables:
if v[1:-1] not in self.allowed_vars:
if v not in self.ALLOWED_VARS:
raise ConfigurationError(
f"Failed to find {v} in allowed_variables {self.allowed_vars}"
f"Failed to find {v} in allowed_variables {self.ALLOWED_VARS}"
)
if at_least_one and len(variables) == 0:
raise ConfigurationError(
f"Failed to find any variable assigned to pattern {self.pattern}. Must have at least one. Allowed variables are {self.allowed_vars}"
f"Failed to find any variable assigned to pattern {self.pattern}. Must have at least one. {self.allowed_docstring()}"
)
return True


naming_pattern_variables: List[str] = [
"platform",
"env",
"project",
"model",
"name",
]
def replace_variables(self, values: Union[Dict[str, Optional[str]], object]) -> str:
if not isinstance(values, dict):
assert dataclasses.is_dataclass(values)
values = dataclasses.asdict(values)
values = {k: v for k, v in values.items() if v is not None}
return self.pattern.format(**values)


class LookerExploreNamingConfig(ConfigModel):
explore_naming_pattern: NamingPattern = pydantic.Field(
description="Pattern for providing dataset names to explores. Allowed variables are {project}, {model}, {name}. Default is `{model}.explore.{name}`",
default=NamingPattern(
allowed_vars=naming_pattern_variables, pattern="{model}.explore.{name}"
),
)
explore_browse_pattern: NamingPattern = NamingPattern(
allowed_vars=naming_pattern_variables,
pattern="/{env}/{platform}/{project}/explores",
)
@dataclass
class NamingPatternMapping:
platform: str
env: str
project: str
model: str
name: str

@validator("explore_naming_pattern", "explore_browse_pattern", pre=True)
def init_naming_pattern(cls, v):
if isinstance(v, NamingPattern):
return v
assert isinstance(v, str), "pattern must be a string"
return NamingPattern(allowed_vars=naming_pattern_variables, pattern=v)

@validator("explore_naming_pattern", "explore_browse_pattern", always=True)
def validate_naming_pattern(cls, v):
assert isinstance(v, NamingPattern)
v.validate_pattern(at_least_one=True)
return v
class LookerNamingPattern(NamingPattern):
ALLOWED_VARS = [field.name for field in dataclasses.fields(NamingPatternMapping)]


class LookerViewNamingConfig(ConfigModel):
view_naming_pattern: NamingPattern = Field(
NamingPattern(
allowed_vars=naming_pattern_variables, pattern="{project}.view.{name}"
),
description="Pattern for providing dataset names to views. Allowed variables are `{project}`, `{model}`, `{name}`",
)
view_browse_pattern: NamingPattern = Field(
NamingPattern(
allowed_vars=naming_pattern_variables,
pattern="/{env}/{platform}/{project}/views",
),
description="Pattern for providing browse paths to views. Allowed variables are `{project}`, `{model}`, `{name}`, `{platform}` and `{env}`",
class LookerCommonConfig(DatasetSourceConfigBase):
explore_naming_pattern: LookerNamingPattern = pydantic.Field(
description=f"Pattern for providing dataset names to explores. {LookerNamingPattern.allowed_docstring()}",
default=LookerNamingPattern(pattern="{model}.explore.{name}"),
)

@validator("view_naming_pattern", "view_browse_pattern", pre=True)
def init_naming_pattern(cls, v):
if isinstance(v, NamingPattern):
return v
assert isinstance(v, str), "pattern must be a string"
return NamingPattern(allowed_vars=naming_pattern_variables, pattern=v)

@validator("view_naming_pattern", "view_browse_pattern", always=True)
def validate_naming_pattern(cls, v):
assert isinstance(v, NamingPattern)
v.validate_pattern(at_least_one=True)
return v
explore_browse_pattern: LookerNamingPattern = pydantic.Field(
description=f"Pattern for providing browse paths to explores. {LookerNamingPattern.allowed_docstring()}",
default=LookerNamingPattern(pattern="/{env}/{platform}/{project}/explores"),
)

view_naming_pattern: LookerNamingPattern = Field(
LookerNamingPattern(pattern="{project}.view.{name}"),
description=f"Pattern for providing dataset names to views. {LookerNamingPattern.allowed_docstring()}",
)
view_browse_pattern: LookerNamingPattern = Field(
LookerNamingPattern(pattern="/{env}/{platform}/{project}/views"),
description=f"Pattern for providing browse paths to views. {LookerNamingPattern.allowed_docstring()}",
)

class LookerCommonConfig(
LookerViewNamingConfig, LookerExploreNamingConfig, DatasetSourceConfigBase
):
tag_measures_and_dimensions: bool = Field(
True,
description="When enabled, attaches tags to measures, dimensions and dimension groups to make them more discoverable. When disabled, adds this information to the description of the column.",
Expand All @@ -177,19 +176,14 @@ class LookerViewId:
model_name: str
view_name: str

def get_mapping(self, variable: str, config: LookerCommonConfig) -> str:
assert variable in naming_pattern_variables
if variable == "project":
return self.project_name
if variable == "model":
return self.model_name
if variable == "name":
return self.view_name
if variable == "env":
return config.env.lower()
if variable == "platform":
return config.platform_name
assert False, "Unreachable code"
def get_mapping(self, config: LookerCommonConfig) -> NamingPatternMapping:
return NamingPatternMapping(
platform=config.platform_name,
env=config.env.lower(),
project=self.project_name,
model=self.model_name,
name=self.view_name,
)

@validator("view_name")
def remove_quotes(cls, v):
Expand All @@ -198,12 +192,9 @@ def remove_quotes(cls, v):
return v

def get_urn(self, config: LookerCommonConfig) -> str:
dataset_name = config.view_naming_pattern.pattern
assert config.view_naming_pattern.variables is not None
for v in config.view_naming_pattern.variables:
dataset_name = dataset_name.replace(
"{" + v + "}", self.get_mapping(v, config)
)
dataset_name = config.view_naming_pattern.replace_variables(
self.get_mapping(config)
)

return builder.make_dataset_urn_with_platform_instance(
platform=config.platform_name,
Expand All @@ -213,12 +204,9 @@ def get_urn(self, config: LookerCommonConfig) -> str:
)

def get_browse_path(self, config: LookerCommonConfig) -> str:
browse_path = config.view_browse_pattern.pattern
assert config.view_browse_pattern.variables is not None
for v in config.view_browse_pattern.variables:
browse_path = browse_path.replace(
"{" + v + "}", self.get_mapping(v, config)
)
browse_path = config.view_browse_pattern.replace_variables(
self.get_mapping(config)
)
return browse_path


Expand Down Expand Up @@ -673,28 +661,19 @@ def from_api( # noqa: C901
)
return None

def get_mapping(self, variable: str, config: LookerCommonConfig) -> str:
assert variable in naming_pattern_variables
if variable == "project":
assert self.project_name is not None
return self.project_name
if variable == "model":
return self.model_name
if variable == "name":
return self.name
if variable == "env":
return config.env.lower()
if variable == "platform":
return config.platform_name
assert False, "Unreachable code"
def get_mapping(self, config: LookerCommonConfig) -> NamingPatternMapping:
return NamingPatternMapping(
platform=config.platform_name,
project=self.project_name, # type: ignore
model=self.model_name,
name=self.name,
env=config.env.lower(),
)

def get_explore_urn(self, config: LookerCommonConfig) -> str:
dataset_name = config.explore_naming_pattern.pattern
assert config.explore_naming_pattern.variables is not None
for v in config.explore_naming_pattern.variables:
dataset_name = dataset_name.replace(
"{" + v + "}", self.get_mapping(v, config)
)
dataset_name = config.explore_naming_pattern.replace_variables(
self.get_mapping(config)
)

return builder.make_dataset_urn_with_platform_instance(
platform=config.platform_name,
Expand All @@ -704,12 +683,9 @@ def get_explore_urn(self, config: LookerCommonConfig) -> str:
)

def get_explore_browse_path(self, config: LookerCommonConfig) -> str:
browse_path = config.explore_browse_pattern.pattern
assert config.explore_browse_pattern.variables is not None
for v in config.explore_browse_pattern.variables:
browse_path = browse_path.replace(
"{" + v + "}", self.get_mapping(v, config)
)
browse_path = config.explore_browse_pattern.replace_variables(
self.get_mapping(config)
)
return browse_path

def _get_url(self, base_url):
Expand Down