Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/dbt): lowercase external urns + cleanup config #6289

Merged
merged 2 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
## Next
- LookML source will only emit views that are reachable from explores while scanning your git repo. Previous behavior can be achieved by setting `emit_reachable_views_only` to False.
- LookML source will always lowercase urns for lineage edges from views to upstream tables. There is no fallback provided to previous behavior because it was inconsistent in application of lower-casing earlier.
- dbt config `node_type_pattern` which was previously deprecated has been removed. Use `entities_enabled` instead to control whether to emit metadata for sources, models, seeds, tests, etc.

### Breaking Changes
- Java version 11 or greater is required.
Expand Down
Empty file.
73 changes: 27 additions & 46 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import dateutil.parser
import pydantic
import requests
from cached_property import cached_property
from pydantic import BaseModel, root_validator, validator
from pydantic.fields import Field

from datahub.configuration.common import (
AllowDenyPattern,
ConfigEnum,
ConfigModel,
ConfigurationError,
)
from datahub.configuration.github import GitHubReference
Expand Down Expand Up @@ -152,13 +152,9 @@ class EmitDirective(ConfigEnum):
ONLY = auto() # Only emit metadata for this type and no others


class DBTEntitiesEnabled(BaseModel):
class DBTEntitiesEnabled(ConfigModel):
"""Controls which dbt entities are going to be emitted by this source"""

class Config:
# Needed to allow cached_property to work. See https://github.com/samuelcolvin/pydantic/issues/1241 for more info.
keep_untouched = (cached_property,)

models: EmitDirective = Field(
EmitDirective.YES,
description="Emit metadata for dbt models when set to Yes or Only",
Expand All @@ -175,54 +171,47 @@ class Config:
EmitDirective.YES,
description="Emit metadata for test definitions when enabled when set to Yes or Only",
)

test_results: EmitDirective = Field(
EmitDirective.YES,
description="Emit metadata for test results when set to Yes or Only",
)

@root_validator
def only_one_can_be_set_to_only(cls, values):
def process_only_directive(cls, values):
# Checks that at most one is set to ONLY, and then sets the others to NO.

only_values = [k for k in values if values.get(k) == EmitDirective.ONLY]
if len(only_values) > 1:
raise ValueError(
f"Cannot have more than 1 type of entity emission set to ONLY. Found {only_values}"
)

if len(only_values) == 1:
for k in values:
values[k] = EmitDirective.NO
values[only_values[0]] = EmitDirective.YES

return values

def _any_other_only_set(self, attribute: str) -> bool:
"""Return true if any attribute other than the one passed in is set to ONLY"""
other_onlies = [
k
for k, v in self.__dict__.items()
if k != attribute and v == EmitDirective.ONLY
]
return len(other_onlies) != 0

@cached_property # type: ignore
def node_type_emit_decision_cache(self) -> Dict[str, bool]:
node_type_for_field_map = {
"models": "model",
"sources": "source",
"seeds": "seed",
"test_definitions": "test",
}
return {
node_type_for_field_map[k]: False
if self._any_other_only_set(k)
or self.__getattribute__(k) == EmitDirective.NO
else True
for k in ["models", "sources", "seeds", "test_definitions"]
def can_emit_node_type(self, node_type: str) -> bool:
# Node type comes from dbt's node types.

field_to_node_type_map = {
"model": "models",
"source": "sources",
"seed": "seeds",
"test": "test_definitions",
}
field = field_to_node_type_map.get(node_type)
if not field:
return False

def can_emit_node_type(self, node_type: str) -> bool:
return self.node_type_emit_decision_cache.get(node_type, False)
return self.__getattribute__(field) == EmitDirective.YES

@property
def can_emit_test_results(self) -> bool:
return (
not self._any_other_only_set("test_results")
and self.test_results != EmitDirective.NO
)
return self.test_results == EmitDirective.YES


class DBTConfig(StatefulIngestionConfigBase):
Expand Down Expand Up @@ -255,10 +244,6 @@ class DBTConfig(StatefulIngestionConfigBase):
default=False,
description="Use model identifier instead of model name if defined (if not, default to model name).",
)
node_type_pattern: AllowDenyPattern = Field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this requires updating the updating.md doc as this is a backwards incompatible change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update in a follow up PR

default=AllowDenyPattern.allow_all(),
description="Deprecated: use entities_enabled instead. Regex patterns for dbt nodes to filter in ingestion.",
)
entities_enabled: DBTEntitiesEnabled = Field(
DBTEntitiesEnabled(),
description="Controls for enabling / disabling metadata emission for different dbt entities (models, test definitions, test results, etc.)",
Expand Down Expand Up @@ -458,6 +443,8 @@ def get_urn(
data_platform_instance: Optional[str],
) -> str:
db_fqn = self.get_db_fqn()
if target_platform != DBT_PLATFORM:
db_fqn = db_fqn.lower()
return mce_builder.make_dataset_urn_with_platform_instance(
platform=target_platform,
name=db_fqn,
Expand Down Expand Up @@ -1364,12 +1351,6 @@ def filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
for node in all_nodes:
key = node.dbt_name

if not self.config.node_type_pattern.allowed(node.node_type):
logger.debug(
f"Not extracting dbt entity {key} since node type {node.node_type} is disabled"
)
continue

if not self.config.node_name_pattern.allowed(key):
continue

Expand Down