Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block taking jinja2.runtime.Undefined into DatabricksAdapter #98

Merged
merged 4 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## dbt-databricks 1.2.0 (Release TBD)

## dbt-databricks 1.1.1 (Release TBD)

### Fixes
- Block taking jinja2.runtime.Undefined into DatabricksAdapter ([#98](https://github.com/databricks/dbt-databricks/pull/98))

## dbt-databricks 1.1.0 (May 11, 2022)

### Features
Expand Down
2 changes: 0 additions & 2 deletions dbt/adapters/databricks/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,6 @@ def _execute_cursor(
return self.get_result_from_cursor(cursor)

def list_schemas(self, database: Optional[str], schema: Optional[str] = None) -> Table:
database = database if isinstance(database, str) else None
schema = schema if isinstance(schema, str) else None
return self._execute_cursor(
f"GetSchemas(database={database}, schema={schema})",
lambda cursor: cursor.schemas(catalog_name=database, schema_name=schema),
Expand Down
10 changes: 6 additions & 4 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from dbt.contracts.connection import AdapterResponse
from dbt.adapters.base import AdapterConfig
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.databricks import DatabricksConnectionManager
from dbt.adapters.databricks.relation import DatabricksRelation
from dbt.adapters.databricks.column import DatabricksColumn

from dbt.adapters.spark.impl import SparkAdapter

from dbt.adapters.databricks.column import DatabricksColumn
from dbt.adapters.databricks.connections import DatabricksConnectionManager
from dbt.adapters.databricks.relation import DatabricksRelation
from dbt.adapters.databricks.utils import undefined_proof


@dataclass
class DatabricksConfig(AdapterConfig):
Expand All @@ -25,6 +26,7 @@ class DatabricksConfig(AdapterConfig):
tblproperties: Optional[Dict[str, str]] = None


@undefined_proof
class DatabricksAdapter(SparkAdapter):

Relation = DatabricksRelation
Expand Down
12 changes: 12 additions & 0 deletions dbt/adapters/databricks/relation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from dataclasses import dataclass
from typing import Any, Dict

from dbt.adapters.base.relation import Policy
from dbt.adapters.spark.relation import SparkRelation

from dbt.adapters.databricks.utils import remove_undefined


@dataclass
class DatabricksIncludePolicy(Policy):
Expand All @@ -15,6 +18,15 @@ class DatabricksIncludePolicy(Policy):
class DatabricksRelation(SparkRelation):
include_policy: DatabricksIncludePolicy = DatabricksIncludePolicy()

@classmethod
def __pre_deserialize__(cls, data: Dict[Any, Any]) -> Dict[Any, Any]:
data = super().__pre_deserialize__(data)
if "database" not in data["path"]:
data["path"]["database"] = None
else:
data["path"]["database"] = remove_undefined(data["path"]["database"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious when will "database" here be Undefined? Is there a specific configuration that can trigger this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While parsing profiles, models, etc. and building metadata that happen before setting up DatabricksRelation, it could contain Undefined.

Copy link
Collaborator Author

@ueshin ueshin May 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also macros for snapshot seems to cause the issue after we change DatabricksRelation.include_policy.database=True.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it might be helpful to add a test case here (if we set DatabricksRelation.include_policy.database=True)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what kind of test in your mind?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, construct a test with the database field being jinja Undefined.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test. Thanks!

return data

def __post_init__(self) -> None:
return

Expand Down
51 changes: 51 additions & 0 deletions dbt/adapters/databricks/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import functools
import inspect
from typing import Any, Callable, Type, TypeVar

from dbt.adapters.base import BaseAdapter
from jinja2.runtime import Undefined


A = TypeVar("A", bound=BaseAdapter)


def remove_undefined(v: Any) -> Any:
return None if isinstance(v, Undefined) else v


def undefined_proof(cls: Type[A]) -> Type[A]:
for name in cls._available_:
func = getattr(cls, name)
if not callable(func):
continue
try:
static_attr = inspect.getattr_static(cls, name)
isstatic = isinstance(static_attr, staticmethod)
isclass = isinstance(static_attr, classmethod)
except AttributeError:
isstatic = False
isclass = False
wrapped_function = _wrap_function(func.__func__ if isclass else func)
setattr(
cls,
name,
(
staticmethod(wrapped_function)
if isstatic
else classmethod(wrapped_function)
if isclass
else wrapped_function
),
)

return cls


def _wrap_function(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
new_args = [remove_undefined(arg) for arg in args]
new_kwargs = {key: remove_undefined(value) for key, value in kwargs.items()}
return func(*new_args, **new_kwargs)

return wrapper
4 changes: 2 additions & 2 deletions tests/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _get_target_databricks_sql_connector_catalog(self, project):
)

def test_two_catalog_settings(self):
with self.assertRaisesRegexp(
with self.assertRaisesRegex(
dbt.exceptions.DbtProfileError,
"Got duplicate keys: \\(`databricks.catalog` in session_properties\\)"
' all map to "database"',
Expand All @@ -92,7 +92,7 @@ def test_two_catalog_settings(self):
)

def test_database_and_catalog_settings(self):
with self.assertRaisesRegexp(
with self.assertRaisesRegex(
dbt.exceptions.DbtProfileError,
'Got duplicate keys: \\(catalog\\) all map to "database"',
):
Expand Down
67 changes: 67 additions & 0 deletions tests/unit/test_relation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import unittest

from jinja2.runtime import Undefined

from dbt.adapters.databricks.relation import DatabricksRelation


class TestDatabricksRelation(unittest.TestCase):
def test_pre_deserialize(self):
data = {
"quote_policy": {"database": False, "schema": False, "identifier": False},
"path": {
"database": "some_database",
"schema": "some_schema",
"identifier": "some_table",
},
"type": None,
}

relation = DatabricksRelation.from_dict(data)
self.assertEqual(relation.database, "some_database")
self.assertEqual(relation.schema, "some_schema")
self.assertEqual(relation.identifier, "some_table")

data = {
"quote_policy": {"database": False, "schema": False, "identifier": False},
"path": {
"database": None,
"schema": "some_schema",
"identifier": "some_table",
},
"type": None,
}

relation = DatabricksRelation.from_dict(data)
self.assertIsNone(relation.database)
self.assertEqual(relation.schema, "some_schema")
self.assertEqual(relation.identifier, "some_table")

data = {
"quote_policy": {"database": False, "schema": False, "identifier": False},
"path": {
"schema": "some_schema",
"identifier": "some_table",
},
"type": None,
}

relation = DatabricksRelation.from_dict(data)
self.assertIsNone(relation.database)
self.assertEqual(relation.schema, "some_schema")
self.assertEqual(relation.identifier, "some_table")

data = {
"quote_policy": {"database": False, "schema": False, "identifier": False},
"path": {
"database": Undefined(),
"schema": "some_schema",
"identifier": "some_table",
},
"type": None,
}

relation = DatabricksRelation.from_dict(data)
self.assertIsNone(relation.database)
self.assertEqual(relation.schema, "some_schema")
self.assertEqual(relation.identifier, "some_table")