Skip to content

Commit

Permalink
refactor(ingest): use pydantic utilities for NamingPattern (datahub-p…
Browse files Browse the repository at this point in the history
…roject#6013)

* refactor(ingest): use pydantic utilities for NamingPattern

* clean up replacement logic

* flatten config hierarchy
  • Loading branch information
hsheth2 authored and david-leifker committed Oct 6, 2022
1 parent cc7152b commit 53033e3
Showing 1 changed file with 95 additions and 119 deletions.
214 changes: 95 additions & 119 deletions metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from __future__ import print_function

import dataclasses
import datetime
import logging
import re
from dataclasses import dataclass, field as dataclasses_field
from enum import Enum
from functools import lru_cache
from typing import Dict, Iterable, List, Optional, Tuple, Union
from typing import ClassVar, Dict, Iterable, List, Optional, Tuple, Union

import pydantic
from looker_sdk.error import SDKError
from looker_sdk.sdk.api31.models import User, WriteQuery
from pydantic import BaseModel, Field
from pydantic import Field
from pydantic.class_validators import validator

import datahub.emitter.mce_builder as builder
Expand Down Expand Up @@ -73,94 +74,92 @@
logger = logging.getLogger(__name__)


# @dataclass
class NamingPattern(BaseModel):
allowed_vars: List[str]
class NamingPattern(ConfigModel):
ALLOWED_VARS: ClassVar[List[str]] = []
REQUIRE_AT_LEAST_ONE_VAR: ClassVar[bool] = True

pattern: str
variables: Optional[List[str]] = None

@classmethod
def __get_validators__(cls):
yield cls.pydantic_accept_raw_pattern
yield cls.validate
yield cls.pydantic_validate_pattern

@classmethod
def pydantic_accept_raw_pattern(cls, v):
if isinstance(v, (NamingPattern, dict)):
return v
assert isinstance(v, str), "pattern must be a string"
return {"pattern": v}

@classmethod
def pydantic_validate_pattern(cls, v):
assert isinstance(v, NamingPattern)
assert v.validate_pattern(cls.REQUIRE_AT_LEAST_ONE_VAR)
return v

@classmethod
def allowed_docstring(cls) -> str:
return f"Allowed variables are {cls.ALLOWED_VARS}"

def validate_pattern(self, at_least_one: bool) -> bool:
variables = re.findall("({[^}{]+})", self.pattern)
self.variables = [v[1:-1] for v in variables]

variables = [v[1:-1] for v in variables] # remove the {}

for v in variables:
if v[1:-1] not in self.allowed_vars:
if v not in self.ALLOWED_VARS:
raise ConfigurationError(
f"Failed to find {v} in allowed_variables {self.allowed_vars}"
f"Failed to find {v} in allowed_variables {self.ALLOWED_VARS}"
)
if at_least_one and len(variables) == 0:
raise ConfigurationError(
f"Failed to find any variable assigned to pattern {self.pattern}. Must have at least one. Allowed variables are {self.allowed_vars}"
f"Failed to find any variable assigned to pattern {self.pattern}. Must have at least one. {self.allowed_docstring()}"
)
return True


naming_pattern_variables: List[str] = [
"platform",
"env",
"project",
"model",
"name",
]
def replace_variables(self, values: Union[Dict[str, Optional[str]], object]) -> str:
if not isinstance(values, dict):
assert dataclasses.is_dataclass(values)
values = dataclasses.asdict(values)
values = {k: v for k, v in values.items() if v is not None}
return self.pattern.format(**values)


class LookerExploreNamingConfig(ConfigModel):
explore_naming_pattern: NamingPattern = pydantic.Field(
description="Pattern for providing dataset names to explores. Allowed variables are {project}, {model}, {name}. Default is `{model}.explore.{name}`",
default=NamingPattern(
allowed_vars=naming_pattern_variables, pattern="{model}.explore.{name}"
),
)
explore_browse_pattern: NamingPattern = NamingPattern(
allowed_vars=naming_pattern_variables,
pattern="/{env}/{platform}/{project}/explores",
)
@dataclass
class NamingPatternMapping:
platform: str
env: str
project: str
model: str
name: str

@validator("explore_naming_pattern", "explore_browse_pattern", pre=True)
def init_naming_pattern(cls, v):
if isinstance(v, NamingPattern):
return v
assert isinstance(v, str), "pattern must be a string"
return NamingPattern(allowed_vars=naming_pattern_variables, pattern=v)

@validator("explore_naming_pattern", "explore_browse_pattern", always=True)
def validate_naming_pattern(cls, v):
assert isinstance(v, NamingPattern)
v.validate_pattern(at_least_one=True)
return v
class LookerNamingPattern(NamingPattern):
ALLOWED_VARS = [field.name for field in dataclasses.fields(NamingPatternMapping)]


class LookerViewNamingConfig(ConfigModel):
view_naming_pattern: NamingPattern = Field(
NamingPattern(
allowed_vars=naming_pattern_variables, pattern="{project}.view.{name}"
),
description="Pattern for providing dataset names to views. Allowed variables are `{project}`, `{model}`, `{name}`",
)
view_browse_pattern: NamingPattern = Field(
NamingPattern(
allowed_vars=naming_pattern_variables,
pattern="/{env}/{platform}/{project}/views",
),
description="Pattern for providing browse paths to views. Allowed variables are `{project}`, `{model}`, `{name}`, `{platform}` and `{env}`",
class LookerCommonConfig(DatasetSourceConfigBase):
explore_naming_pattern: LookerNamingPattern = pydantic.Field(
description=f"Pattern for providing dataset names to explores. {LookerNamingPattern.allowed_docstring()}",
default=LookerNamingPattern(pattern="{model}.explore.{name}"),
)

@validator("view_naming_pattern", "view_browse_pattern", pre=True)
def init_naming_pattern(cls, v):
if isinstance(v, NamingPattern):
return v
assert isinstance(v, str), "pattern must be a string"
return NamingPattern(allowed_vars=naming_pattern_variables, pattern=v)

@validator("view_naming_pattern", "view_browse_pattern", always=True)
def validate_naming_pattern(cls, v):
assert isinstance(v, NamingPattern)
v.validate_pattern(at_least_one=True)
return v
explore_browse_pattern: LookerNamingPattern = pydantic.Field(
description=f"Pattern for providing browse paths to explores. {LookerNamingPattern.allowed_docstring()}",
default=LookerNamingPattern(pattern="/{env}/{platform}/{project}/explores"),
)

view_naming_pattern: LookerNamingPattern = Field(
LookerNamingPattern(pattern="{project}.view.{name}"),
description=f"Pattern for providing dataset names to views. {LookerNamingPattern.allowed_docstring()}",
)
view_browse_pattern: LookerNamingPattern = Field(
LookerNamingPattern(pattern="/{env}/{platform}/{project}/views"),
description=f"Pattern for providing browse paths to views. {LookerNamingPattern.allowed_docstring()}",
)

class LookerCommonConfig(
LookerViewNamingConfig, LookerExploreNamingConfig, DatasetSourceConfigBase
):
tag_measures_and_dimensions: bool = Field(
True,
description="When enabled, attaches tags to measures, dimensions and dimension groups to make them more discoverable. When disabled, adds this information to the description of the column.",
Expand All @@ -184,19 +183,14 @@ class LookerViewId:
model_name: str
view_name: str

def get_mapping(self, variable: str, config: LookerCommonConfig) -> str:
assert variable in naming_pattern_variables
if variable == "project":
return self.project_name
if variable == "model":
return self.model_name
if variable == "name":
return self.view_name
if variable == "env":
return config.env.lower()
if variable == "platform":
return config.platform_name
assert False, "Unreachable code"
def get_mapping(self, config: LookerCommonConfig) -> NamingPatternMapping:
return NamingPatternMapping(
platform=config.platform_name,
env=config.env.lower(),
project=self.project_name,
model=self.model_name,
name=self.view_name,
)

@validator("view_name")
def remove_quotes(cls, v):
Expand All @@ -205,12 +199,9 @@ def remove_quotes(cls, v):
return v

def get_urn(self, config: LookerCommonConfig) -> str:
dataset_name = config.view_naming_pattern.pattern
assert config.view_naming_pattern.variables is not None
for v in config.view_naming_pattern.variables:
dataset_name = dataset_name.replace(
"{" + v + "}", self.get_mapping(v, config)
)
dataset_name = config.view_naming_pattern.replace_variables(
self.get_mapping(config)
)

return builder.make_dataset_urn_with_platform_instance(
platform=config.platform_name,
Expand All @@ -220,12 +211,9 @@ def get_urn(self, config: LookerCommonConfig) -> str:
)

def get_browse_path(self, config: LookerCommonConfig) -> str:
browse_path = config.view_browse_pattern.pattern
assert config.view_browse_pattern.variables is not None
for v in config.view_browse_pattern.variables:
browse_path = browse_path.replace(
"{" + v + "}", self.get_mapping(v, config)
)
browse_path = config.view_browse_pattern.replace_variables(
self.get_mapping(config)
)
return browse_path


Expand Down Expand Up @@ -683,28 +671,19 @@ def from_api( # noqa: C901
)
return None

def get_mapping(self, variable: str, config: LookerCommonConfig) -> str:
assert variable in naming_pattern_variables
if variable == "project":
assert self.project_name is not None
return self.project_name
if variable == "model":
return self.model_name
if variable == "name":
return self.name
if variable == "env":
return config.env.lower()
if variable == "platform":
return config.platform_name
assert False, "Unreachable code"
def get_mapping(self, config: LookerCommonConfig) -> NamingPatternMapping:
return NamingPatternMapping(
platform=config.platform_name,
project=self.project_name, # type: ignore
model=self.model_name,
name=self.name,
env=config.env.lower(),
)

def get_explore_urn(self, config: LookerCommonConfig) -> str:
dataset_name = config.explore_naming_pattern.pattern
assert config.explore_naming_pattern.variables is not None
for v in config.explore_naming_pattern.variables:
dataset_name = dataset_name.replace(
"{" + v + "}", self.get_mapping(v, config)
)
dataset_name = config.explore_naming_pattern.replace_variables(
self.get_mapping(config)
)

return builder.make_dataset_urn_with_platform_instance(
platform=config.platform_name,
Expand All @@ -714,12 +693,9 @@ def get_explore_urn(self, config: LookerCommonConfig) -> str:
)

def get_explore_browse_path(self, config: LookerCommonConfig) -> str:
browse_path = config.explore_browse_pattern.pattern
assert config.explore_browse_pattern.variables is not None
for v in config.explore_browse_pattern.variables:
browse_path = browse_path.replace(
"{" + v + "}", self.get_mapping(v, config)
)
browse_path = config.explore_browse_pattern.replace_variables(
self.get_mapping(config)
)
return browse_path

def _get_url(self, base_url):
Expand Down

0 comments on commit 53033e3

Please sign in to comment.