Skip to content

Commit

Permalink
Allow session isolation for some of the models (#496)
Browse files Browse the repository at this point in the history
* Added ability to spawn new isolated session in case of "meta" parameter presence

* Refactoring: Removed useless override logic since credentials were already overridden

* Update documentation

* Added functional test for custom meta

* Updated meta param documentation

* Added functional test for custom meta

* Added functional test for custom meta

* Added functional test for custom meta

---------

Co-authored-by: YMokliak <[email protected]>
  • Loading branch information
yaroslav-ost and YMokliak authored Feb 7, 2025
1 parent 394cf94 commit 2af24d9
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 18 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## Future Release
- Allow spawning new isolated sessions for the models that require different session configuration

## v1.9.0
- Allow to load big seed files
- Migrates the PySpark code for the Iceberg file format at a macro level, making the impl.py file more readable.
Expand Down
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,15 @@ The table below describes all the options.

When materializing a model as `table`, you may include several optional configs that are specific to the dbt-spark plugin, in addition to the standard model configs.

| Option | Description | Required? | Example |
|---------|----------------------------------------------------|-------------------------|--------------------------|
| file_format | The file format to use when creating tables (`parquet`, `csv`, `json`, `text`, `jdbc` or `orc`). | Optional | `parquet`|
| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `date_day` |
| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `country_code` |
| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` |
| custom_location | By default, the adapter will store your data in the following path: `location path`/`schema`/`table`. If you don't want to follow that default behaviour, you can use this parameter to set your own custom location on S3 | No | `s3://mycustombucket/mycustompath` |
| hudi_options | When using file_format `hudi`, gives the ability to overwrite any of the default configuration options. | Optional | `{'hoodie.schema.on.read.enable': 'true'}` |
| Option | Description | Required? | Example |
|-----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------|---------------------------------------------------|
| file_format | The file format to use when creating tables (`parquet`, `csv`, `json`, `text`, `jdbc` or `orc`). | Optional | `parquet` |
| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `date_day` |
| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `country_code` |
| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` |
| custom_location | By default, the adapter will store your data in the following path: `location path`/`schema`/`table`. If you don't want to follow that default behaviour, you can use this parameter to set your own custom location on S3 | No | `s3://mycustombucket/mycustompath` |
| hudi_options | When using file_format `hudi`, gives the ability to overwrite any of the default configuration options. | Optional | `{'hoodie.schema.on.read.enable': 'true'}` |
| meta | Spawns isolated Glue session with different session configuration. Use Case: When specific models require configurations different from the default session settings. For example, a particular model might require more Glue workers or larger worker type. | Optional | `meta = { "workers": 50, "worker_type": "G.1X" }` |
## Incremental models

dbt seeks to offer useful and intuitive modeling abstractions by means of its built-in configurations and materializations.
Expand Down
13 changes: 7 additions & 6 deletions dbt/adapters/glue/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dbt.adapters.events.logging import AdapterLogger
from dbt_common.events.contextvars import get_node_info
from dbt_common.clients.agate_helper import table_from_data_flat
from copy import deepcopy

logger = AdapterLogger("Glue")

Expand Down Expand Up @@ -44,21 +45,21 @@ def open(cls, connection):
logger.debug("Connection is already open, skipping open.")
return connection

credentials: GlueCredentials = connection.credentials
credentials: GlueCredentials = deepcopy(connection.credentials)
try:
node_meta = get_node_info().get("meta", {})
credentials.enable_session_per_model = credentials.enable_session_per_model or node_meta

connection_args = {
"credentials": credentials
}

if credentials.enable_session_per_model:
key = get_node_info().get("unique_id", "no-node")
connection_args['session_id_suffix'] = key

session_config_overrides = {}
for session_config in credentials._connection_keys():
if get_node_info().get("meta", {}).get(session_config):
session_config_overrides[session_config] = get_node_info().get("meta", {}).get(session_config)
connection_args['session_config_overrides'] = session_config_overrides
if node_meta.get(session_config):
setattr(credentials,session_config,node_meta.get(session_config))

else:
key = cls.get_thread_identifier()
Expand Down
5 changes: 2 additions & 3 deletions dbt/adapters/glue/gluedbapi/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ class GlueConnection:
_boto3_client_lock = threading.Lock()
_connect_lock = threading.Lock()

def __init__(self, credentials: GlueCredentials, session_id_suffix: str = None, session_config_overrides = {}):
def __init__(self, credentials: GlueCredentials, session_id_suffix: str = None):
self.credentials = credentials
self._session_id_suffix = session_id_suffix
self._session_config_overrides = session_config_overrides

self._client = None
self._session_waiter = None
Expand All @@ -42,7 +41,7 @@ def __init__(self, credentials: GlueCredentials, session_id_suffix: str = None,
self._create_session_config = {}

for key in self.credentials._connection_keys():
self._create_session_config[key] = self._session_config_overrides.get(key) or getattr(self.credentials, key)
self._create_session_config[key] = getattr(self.credentials, key)


def _build_session_id(
Expand Down
24 changes: 23 additions & 1 deletion tests/functional/adapter/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@
config_incremental_strategy = """
{{ config(incremental_strategy='insert_overwrite') }}
"""
config_materialized_with_custom_meta = """
{{ config(materialized="table", meta={"workers": 3, "idle_timeout": 2}) }}
"""
model_base = """
select * from {{ source('raw', 'seed') }}
"""
base_materialized_var_sql = config_materialized_var + config_incremental_strategy + model_base

table_with_custom_meta = config_materialized_with_custom_meta + model_base

@pytest.mark.skip(
reason="Fails because the test tries to fetch the table metadata during the compile step, "
Expand All @@ -65,13 +69,31 @@ def project_config_update(self):
def models(self):
return {
"view_model.sql": base_view_sql,
"table_model.sql": base_table_sql,
"table_model.sql": table_with_custom_meta,
"swappable.sql": base_materialized_var_sql,
"schema.yml": schema_base_yml,
}

pass

class TestSimpleMaterializationsWithCustomMeta(TestSimpleMaterializationsGlue):
@pytest.fixture(scope="class")
def models(self):
return {
"view_model.sql": base_view_sql,
"table_model.sql": table_with_custom_meta,
"swappable.sql": base_materialized_var_sql,
"schema.yml": schema_base_yml,
}
def test_base(self, project):
super().test_base(project)
catalog = run_dbt(["docs", "generate"])
compile_results = catalog._compile_results.results
assert len(compile_results) > 0, "No models were found in the compile results."
for result in compile_results:
node = result.node
if node.name == "table_model":
assert node.config.meta, f"Meta parameter is not present for table_model."

class TestSingularTestsGlue(BaseSingularTests):
pass
Expand Down

0 comments on commit 2af24d9

Please sign in to comment.