From 133108cb520e75db0f6dce138ea872f3e2ec05a4 Mon Sep 17 00:00:00 2001 From: Bernardo Meireles Date: Mon, 16 Dec 2024 14:04:16 +0000 Subject: [PATCH 1/5] Move reflections to rest API calls and extend implementation --- ...source.sh => create_and_format_sources.sh} | 26 +- .github/workflows/ci.yml | 4 +- dbt/adapters/dremio/api/rest/client.py | 31 +- dbt/adapters/dremio/api/rest/url_builder.py | 62 ++- dbt/adapters/dremio/api/rest/utils.py | 22 +- dbt/adapters/dremio/connections.py | 118 +++++- dbt/adapters/dremio/impl.py | 22 +- dbt/include/dremio/dbt_project.yml | 2 +- .../dremio/macros/adapters/metadata.sql | 17 +- .../reflection/create_reflection.sql | 93 +---- .../materializations/reflection/helpers.sql | 11 +- .../reflection/reflection.sql | 52 ++- .../dremio_specific/test_reflections.py | 361 ++++++++++++++++++ 13 files changed, 670 insertions(+), 151 deletions(-) rename .github/scripts/{create_dremio_s3_source.sh => create_and_format_sources.sh} (63%) create mode 100644 tests/functional/adapter/dremio_specific/test_reflections.py diff --git a/.github/scripts/create_dremio_s3_source.sh b/.github/scripts/create_and_format_sources.sh similarity index 63% rename from .github/scripts/create_dremio_s3_source.sh rename to .github/scripts/create_and_format_sources.sh index 68820803..acc3bea9 100644 --- a/.github/scripts/create_dremio_s3_source.sh +++ b/.github/scripts/create_and_format_sources.sh @@ -50,4 +50,28 @@ curl -s -X PUT "http://localhost:9047/apiv2/source/dbt_test_source" \ -H "Authorization: _dremio$AUTH_TOKEN" \ --data "{\"name\":\"dbt_test_source\",\"config\":{\"credentialType\":\"ACCESS_KEY\",\"accessKey\":\"$MINIO_ROOT_USER\",\"accessSecret\":\"$MINIO_ROOT_PASSWORD\",\"secure\":false,\"externalBucketList\":[],\"enableAsync\":true,\"enableFileStatusCheck\":true,\"rootPath\":\"/\",\"defaultCtasFormat\":\"ICEBERG\",\"propertyList\":[{\"name\":\"fs.s3a.path.style.access\",\"value\":\"true\"},{\"name\":\"fs.s3a.endpoint\",\"value\":\"minio:9000\"},{\"name\":\"dremio.s3.compat\",\"value\":\"true\"}],\"whitelistedBuckets\":[],\"isCachingEnabled\":false,\"maxCacheSpacePct\":100},\"type\":\"S3\",\"metadataPolicy\":{\"deleteUnavailableDatasets\":true,\"autoPromoteDatasets\":false,\"namesRefreshMillis\":3600000,\"datasetDefinitionRefreshAfterMillis\":3600000,\"datasetDefinitionExpireAfterMillis\":10800000,\"authTTLMillis\":86400000,\"updateMode\":\"PREFETCH_QUERIED\"}}" -echo "S3 Source created in Dremio." \ No newline at end of file +echo "S3 Source created in Dremio." + +echo "Creating the Samples source in Dremio..." +curl -s -X PUT "http://localhost:9047/apiv2/source/Samples" \ + -H "Content-Type: application/json" \ + -H "Authorization: _dremio$AUTH_TOKEN" \ + --data-raw "{\"name\":\"Samples\",\"config\":{\"externalBucketList\":[\"samples.dremio.com\"],\"credentialType\":\"NONE\",\"secure\":false,\"propertyList\":[]},\"name\":\"Samples\",\"accelerationRefreshPeriod\":3600000,\"accelerationGracePeriod\":10800000,\"accelerationNeverRefresh\":true,\"accelerationNeverExpire\":true,\"accelerationActivePolicyType\":\"PERIOD\",\"accelerationRefreshSchedule\":\"0 0 8 * * *\",\"accelerationRefreshOnDataChanges\":false,\"type\":\"S3\"}" + +echo "Samples source created in Dremio." + +echo "Formatting SF_incidents2016..." +curl -s -X PUT "http://localhost:9047/apiv2/source/Samples/file_format/samples.dremio.com/SF_incidents2016.json" \ + -H "Content-Type: application/json" \ + -H "Authorization: _dremio$AUTH_TOKEN" \ + --data-raw "{\"type\":\"JSON\"}" + +echo "SF_incidents2016 formatted in Dremio." + +echo "Formatting NYC-taxi-trips..." +curl -s -X PUT "http://localhost:9047/apiv2/source/Samples/folder_format/samples.dremio.com/NYC-taxi-trips" \ + -H "Content-Type: application/json" \ + -H "Authorization: _dremio$AUTH_TOKEN" \ + --data-raw "{\"ignoreOtherFileFormats\":false,\"type\":\"Parquet\"}" + +echo "NYC-taxi-trips formatted in Dremio." \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fec1c8b8..5ef1e48d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -48,8 +48,8 @@ jobs: - name: Create MinIO bucket run: bash .github/scripts/create_minio_bucket.sh - - name: Create Dremio S3 Source - run: bash .github/scripts/create_dremio_s3_source.sh + - name: Create and Format Sources + run: bash .github/scripts/create_and_format_sources.sh - name: Install Dependencies uses: actions/setup-python@v4 diff --git a/dbt/adapters/dremio/api/rest/client.py b/dbt/adapters/dremio/api/rest/client.py index 26a2b307..92bfb49d 100644 --- a/dbt/adapters/dremio/api/rest/client.py +++ b/dbt/adapters/dremio/api/rest/client.py @@ -15,12 +15,11 @@ # limitations under the License. - import requests from dbt.adapters.dremio.api.authentication import DremioPatAuthentication from dbt.adapters.dremio.api.parameters import Parameters -from dbt.adapters.dremio.api.rest.utils import _post, _get, _delete +from dbt.adapters.dremio.api.rest.utils import _post, _get, _put, _delete from dbt.adapters.dremio.api.rest.url_builder import UrlBuilder from dbt.adapters.events.logging import AdapterLogger @@ -132,4 +131,30 @@ def delete_catalog(self, cid): url, self._parameters.authentication.get_headers(), ssl_verify=self._parameters.authentication.verify_ssl, - ) \ No newline at end of file + ) + + def get_reflections(self, dataset_id): + url = UrlBuilder.get_reflection_url(self._parameters, dataset_id) + return _get( + url, + self._parameters.authentication.get_headers(), + ssl_verify=self._parameters.authentication.verify_ssl, + ) + + def create_reflection(self, payload): + url = UrlBuilder.create_reflection_url(self._parameters) + return _post( + url, + self._parameters.authentication.get_headers(), + json=payload, + ssl_verify=self._parameters.authentication.verify_ssl, + ) + + def update_reflection(self, reflection_id, payload): + url = UrlBuilder.update_reflection_url(self._parameters, reflection_id) + return _put( + url, + self._parameters.authentication.get_headers(), + json=payload, + ssl_verify=self._parameters.authentication.verify_ssl, + ) diff --git a/dbt/adapters/dremio/api/rest/url_builder.py b/dbt/adapters/dremio/api/rest/url_builder.py index c47a6eef..a92327bf 100644 --- a/dbt/adapters/dremio/api/rest/url_builder.py +++ b/dbt/adapters/dremio/api/rest/url_builder.py @@ -34,6 +34,12 @@ class UrlBuilder: SOFTWARE_CATALOG_ENDPOINT = "/api/v3/catalog" CLOUD_CATALOG_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/catalog" + SOFTWARE_REFLECTIONS_ENDPOINT = "/api/v3/reflection" + CLOUD_REFLECTIONS_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/reflection" + + SOFTWARE_DATASET_ENDPOIT = "/api/v3/dataset" + CLOUD_DATASET_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/dataset" + # https://docs.dremio.com/software/rest-api/jobs/get-job/ OFFSET_DEFAULT = 0 LIMIT_DEFAULT = 100 @@ -56,10 +62,10 @@ def sql_url(cls, parameters: Parameters): def job_status_url(cls, parameters: Parameters, job_id): if type(parameters) is CloudParameters: return ( - parameters.base_url - + UrlBuilder.CLOUD_JOB_ENDPOINT.format(parameters.cloud_project_id) - + "/" - + job_id + parameters.base_url + + UrlBuilder.CLOUD_JOB_ENDPOINT.format(parameters.cloud_project_id) + + "/" + + job_id ) return parameters.base_url + UrlBuilder.SOFTWARE_JOB_ENDPOINT + "/" + job_id @@ -75,11 +81,11 @@ def job_cancel_url(cls, parameters: Parameters, job_id): @classmethod def job_results_url( - cls, - parameters: Parameters, - job_id, - offset=OFFSET_DEFAULT, - limit=LIMIT_DEFAULT, + cls, + parameters: Parameters, + job_id, + offset=OFFSET_DEFAULT, + limit=LIMIT_DEFAULT, ): url_path = parameters.base_url if type(parameters) is CloudParameters: @@ -139,3 +145,41 @@ def catalog_item_by_path_url(cls, parameters: Parameters, path_list): joined_path_str = "/".join(quoted_path_list).replace('"', "") endpoint = f"/by-path/{joined_path_str}" return url_path + endpoint + + @classmethod + def create_reflection_url(cls, parameters: Parameters): + url_path = parameters.base_url + if type(parameters) is CloudParameters: + url_path += UrlBuilder.CLOUD_REFLECTIONS_ENDPOINT.format( + parameters.cloud_project_id + ) + else: + url_path += UrlBuilder.SOFTWARE_REFLECTIONS_ENDPOINT + + return url_path + + @classmethod + def update_reflection_url(cls, parameters: Parameters, dataset_id): + url_path = parameters.base_url + if type(parameters) is CloudParameters: + url_path += UrlBuilder.CLOUD_REFLECTIONS_ENDPOINT.format( + parameters.cloud_project_id + ) + else: + url_path += UrlBuilder.SOFTWARE_REFLECTIONS_ENDPOINT + + endpoint = "/{}".format(dataset_id) + return url_path + endpoint + + @classmethod + def get_reflection_url(cls, parameters: Parameters, dataset_id): + url_path = parameters.base_url + if type(parameters) is CloudParameters: + url_path += UrlBuilder.CLOUD_DATASET_ENDPOINT.format( + parameters.cloud_project_id + ) + else: + url_path += UrlBuilder.SOFTWARE_DATASET_ENDPOIT + + endpoint = "/{}/reflection".format(dataset_id) + return url_path + endpoint diff --git a/dbt/adapters/dremio/api/rest/utils.py b/dbt/adapters/dremio/api/rest/utils.py index 39e8bec7..a90cc8cb 100644 --- a/dbt/adapters/dremio/api/rest/utils.py +++ b/dbt/adapters/dremio/api/rest/utils.py @@ -31,7 +31,6 @@ import json as jsonlib from requests.exceptions import HTTPError - from dbt.adapters.events.logging import AdapterLogger logger = AdapterLogger("dremio") @@ -45,12 +44,12 @@ def _get(url, request_headers, details="", ssl_verify=True): def _post( - url, - request_headers=None, - json=None, - details="", - ssl_verify=True, - timeout=None, + url, + request_headers=None, + json=None, + details="", + ssl_verify=True, + timeout=None, ): if isinstance(json, str): json = jsonlib.loads(json) @@ -64,6 +63,13 @@ def _post( return _check_error(response, details) +def _put(url, request_headers, json=None, details="", ssl_verify=True): + response = session.put( + url, headers=request_headers, verify=ssl_verify, json=json + ) + return _check_error(response, details) + + def _delete(url, request_headers, details="", ssl_verify=True): response = session.delete(url, headers=request_headers, verify=ssl_verify) return _check_error(response, details) @@ -148,5 +154,3 @@ def _check_error(response, details=""): "Gateway Timeout:" + details, error, response ) raise DremioException("Unknown error", error) - - diff --git a/dbt/adapters/dremio/connections.py b/dbt/adapters/dremio/connections.py index 24ca039e..43b92edf 100644 --- a/dbt/adapters/dremio/connections.py +++ b/dbt/adapters/dremio/connections.py @@ -13,12 +13,13 @@ # limitations under the License. import agate -from typing import Tuple, Optional +from typing import Tuple, Optional, List from contextlib import contextmanager from dbt.adapters.dremio.api.cursor import DremioCursor from dbt.adapters.dremio.api.handle import DremioHandle from dbt.adapters.dremio.api.parameters import ParametersBuilder +from dbt.adapters.dremio.relation import DremioRelation from dbt_common.clients import agate_helper @@ -130,8 +131,8 @@ def add_commit_query(self): # Auto_begin may not be relevant with the rest_api def add_query( - self, sql, auto_begin=True, bindings=None, abridge_sql_log=False, - fetch=False + self, sql, auto_begin=True, bindings=None, abridge_sql_log=False, + fetch=False ): connection = self.get_thread_connection() if auto_begin and connection.transaction_open is False: @@ -174,11 +175,11 @@ def get_response(cls, cursor: DremioCursor) -> AdapterResponse: return AdapterResponse(_message=message, rows_affected=rows) def execute( - self, - sql: str, - auto_begin: bool = False, - fetch: bool = False, - limit: Optional[int] = None, + self, + sql: str, + auto_begin: bool = False, + fetch: bool = False, + limit: Optional[int] = None, ) -> Tuple[AdapterResponse, agate.Table]: sql = self._add_query_comment(sql) _, cursor = self.add_query(sql, auto_begin, fetch=fetch) @@ -230,6 +231,102 @@ def create_catalog(self, relation): self._create_folders(database, schema, rest_client) return + def create_reflection(self, name: str, type: str, anchor: DremioRelation, display: List[str], dimensions: List[str], + date_dimensions: List[str], measures: List[str], + computations: List[str], partition_by: List[str], partition_transform: List[str], + partition_method: str, distribute_by: List[str], localsort_by: List[str], + arrow_cache: bool) -> None: + thread_connection = self.get_thread_connection() + connection = self.open(thread_connection) + rest_client = connection.handle.get_client() + + database = anchor.database + schema = anchor.schema + path = self._create_path_list(database, schema) + identifier = anchor.identifier + + path.append(identifier) + + catalog_info = rest_client.get_catalog_item( + catalog_id=None, + catalog_path=path, + ) + + dataset_id = catalog_info.get("id") + + payload = { + "type": type, + "name": name, + "datasetId": dataset_id, + "enabled": True, + "arrowCachingEnabled": arrow_cache, + "partitionDistributionStrategy": partition_method.upper(), + "entityType": "reflection" + } + + if display: + payload["displayFields"] = [{"name": field} for field in display] + + if dimensions: + payload["dimensionFields"] = [{"name": dimension} for dimension in dimensions] + + if date_dimensions: + payload["dateFields"] = [{"name": date_dimension, "granularity": "DATE"} for date_dimension in + date_dimensions] + + if measures and computations: + payload["measureFields"] = [{"name": measure, "measureTypeList": computation.split(',')} for + measure, computation in zip(measures, computations)] + + if partition_by: + if not partition_transform: + partition_transform = ["IDENTITY"] * len(partition_by) + + partition_fields = [] + for partition, transform in zip(partition_by, partition_transform): + transform = transform.upper() + partition_field = {"name": partition, "transform": None} + + if transform in ["YEAR", "MONTH", "DAY", "HOUR", "IDENTITY"]: + partition_field["transform"] = {"type": transform} + elif transform.startswith("BUCKET"): + bucket_count = int(transform.split("(")[1].split(")")[0]) + partition_field["transform"] = { + "type": "BUCKET", + "bucketTransform": {"bucketCount": bucket_count}, + } + elif transform.startswith("TRUNCATE"): + truncate_length = int(transform.split("(")[1].split(")")[0]) + partition_field["transform"] = { + "type": "TRUNCATE", + "truncateTransform": {"truncateLength": truncate_length}, + } + partition_fields.append(partition_field) + + payload["partitionFields"] = partition_fields + + if distribute_by: + payload["distributionFields"] = [{"name": distribute} for distribute in distribute_by] + + if localsort_by: + payload["sortFields"] = [{"name": sort} for sort in localsort_by] + + dataset_info = rest_client.get_reflections(dataset_id) + reflections_info = dataset_info.get("data") + + updated = False + for reflection in reflections_info: + if reflection.get("name") == name: + logger.debug(f"Reflection {name} already exists. Updating it") + payload["tag"] = reflection.get("tag") + rest_client.update_reflection(reflection.get("id"), payload) + updated = True + break + + if not updated: + logger.debug(f"Reflection {name} does not exist. Creating it") + rest_client.create_reflection(payload) + def _make_new_space_json(self, name) -> json: python_dict = {"entityType": "space", "name": name} return json.dumps(python_dict) @@ -263,6 +360,7 @@ def _create_folders(self, database, schema, rest_client: DremioRestClient): def _create_path_list(self, database, schema): path = [database] - folders = schema.split(".") - path.extend(folders) + if schema != 'no_schema': + folders = schema.split(".") + path.extend(folders) return path diff --git a/dbt/adapters/dremio/impl.py b/dbt/adapters/dremio/impl.py index c5175ab6..db139b74 100644 --- a/dbt/adapters/dremio/impl.py +++ b/dbt/adapters/dremio/impl.py @@ -20,6 +20,7 @@ from typing import List from typing import Optional +from dbt.adapters.base.meta import available from dbt.adapters.base.relation import BaseRelation from dbt.adapters.capability import ( @@ -91,16 +92,16 @@ def drop_schema(self, relation: DremioRelation) -> None: self.connections.drop_catalog(database, schema) def timestamp_add_sql( - self, add_to: str, number: int = 1, interval: str = "hour" + self, add_to: str, number: int = 1, interval: str = "hour" ) -> str: return f"DATE_ADD({add_to}, CAST({number} AS INTERVAL {interval}))" def get_rows_different_sql( - self, - relation_a: BaseRelation, - relation_b: BaseRelation, - column_names: Optional[List[str]] = ["*"], - except_operator: str = "EXCEPT", + self, + relation_a: BaseRelation, + relation_b: BaseRelation, + column_names: Optional[List[str]] = ["*"], + except_operator: str = "EXCEPT", ) -> str: """Generate SQL for a query that returns a single row with a two columns: the number of rows that are different between the two @@ -177,6 +178,15 @@ def run_sql_for_tests(self, sql, fetch, conn): finally: conn.transaction_open = False + @available + def create_reflection(self, name: str, type: str, anchor: DremioRelation, display: List[str], dimensions: List[str], + date_dimensions: List[str], measures: List[str], computations: List[str], + partition_by: List[str], partition_transform: List[str], partition_method: str, + distribute_by: List[str], localsort_by: List[str], arrow_cache: bool) -> None: + self.connections.create_reflection(name, type, anchor, display, dimensions, date_dimensions, measures, + computations, partition_by, partition_transform, partition_method, + distribute_by, localsort_by, arrow_cache) + COLUMNS_EQUAL_SQL = """ with diff_count as ( diff --git a/dbt/include/dremio/dbt_project.yml b/dbt/include/dremio/dbt_project.yml index 5d055c14..a2224fed 100644 --- a/dbt/include/dremio/dbt_project.yml +++ b/dbt/include/dremio/dbt_project.yml @@ -24,5 +24,5 @@ quoting: macro-paths: ["macros"] vars: - "dremio:reflections_enabled": false + "dremio:reflections_metadata_enabled": false "dremio:exact_search_enabled": false diff --git a/dbt/include/dremio/macros/adapters/metadata.sql b/dbt/include/dremio/macros/adapters/metadata.sql index b6a316e3..c19e00f1 100644 --- a/dbt/include/dremio/macros/adapters/metadata.sql +++ b/dbt/include/dremio/macros/adapters/metadata.sql @@ -136,16 +136,11 @@ limitations under the License.*/ {%- endmacro -%} {% macro dremio__get_catalog_relations_result_sql(relations) %} - {%- if var('dremio:reflections_enabled', default=false) %} - {{get_catalog_reflections(relations)}} - {% else %} - - select * - from t - join columns on (t.table_schema = columns.table_schema - and t.table_name = columns.table_name) - order by "column_index" - {% endif %} + select * + from t + join columns on (t.table_schema = columns.table_schema + and t.table_name = columns.table_name) + order by "column_index" {%- endmacro -%} {% macro get_catalog_reflections(relations) %} @@ -242,7 +237,7 @@ limitations under the License.*/ + (('.' + schema) if schema != 'no_schema' else '') -%} {% call statement('list_relations_without_caching', fetch_result=True) -%} - {%- if var('dremio:reflections_enabled', default=false) -%} + {%- if var('dremio:reflections_metadata_enabled', default=false) -%} with cte1 as ( select diff --git a/dbt/include/dremio/macros/materializations/reflection/create_reflection.sql b/dbt/include/dremio/macros/materializations/reflection/create_reflection.sql index 065e2f88..b12f1e52 100644 --- a/dbt/include/dremio/macros/materializations/reflection/create_reflection.sql +++ b/dbt/include/dremio/macros/materializations/reflection/create_reflection.sql @@ -37,85 +37,18 @@ ADD EXTERNAL REFLECTION name USING target #} -{%- macro create_reflection(reflection_type, anchor, reflection, external_target=none, - display=none, dimensions=none, by_day_dimensions=none, measures=none) %} - alter dataset {{ anchor }} - create {{ reflection_type }} reflection {{ reflection.include(database=False, schema=False) }} - using - {%- if reflection_type == 'raw' %} - {{ display_clause(display) }} - {%- elif reflection_type == 'aggregate' %} - {{ dimensions_clause(dimensions=dimensions, by_day_dimensions=by_day_dimensions) }} - {{ measures_clause(measures) }} - {%- else -%} - {{ external_target }} - {% endif -%} - {%- if reflection_type in ['raw', 'aggregate'] %} - {{ partition_method() }} {{ config_cols("partition by") }} - {{ config_cols("localsort by") }} - {{ config_cols("distribute by") }} - {{ arrow_cache_clause() }} - {%- endif -%} -{% endmacro -%} - -{%- macro display_clause(display=none) %} - {%- set cols = config.get('display', validator=validation.any[list, basestring]) or display -%} - {%- if cols is not none %} - {%- if cols is string -%} - {%- set cols = [cols] -%} - {%- endif -%} - display ( - {%- for item in cols -%} - {{ adapter.quote(item) }} - {%- if not loop.last -%},{%- endif -%} - {%- endfor -%} - ) - {%- endif %} -{% endmacro -%} - -{%- macro dimensions_clause(dimensions=none, by_day_dimensions=none) %} - {%- set cols = config.get('dimensions', validator=validation.any[list, basestring]) or dimensions -%} - {%- set by_day_cols = config.get('by_day_dimensions', validator=validation.any[list, basestring]) or by_day_dimensions -%} - {%- if cols is not none %} - {%- if cols is string -%} - {%- set cols = [cols] -%} - {%- endif -%} - {%- if by_day_cols is string -%} - {%- set by_day_cols = [by_day_cols] -%} - {%- endif -%} - dimensions ( - {%- for item in cols -%} - {{ adapter.quote(item) ~ (' by day' if item in by_day_cols else "") }} - {%- if not loop.last -%},{%- endif -%} - {%- endfor -%} - ) - {%- endif %} -{% endmacro -%} +{%- macro create_reflection(reflection_name, reflection_type, anchor, + display=none, dimensions=none, date_dimensions=none, measures=none, computations=none, partition_by=none, partition_transform=none, partition_method=none, distribute_by=none, localsort_by=none, arrow_cache=false) %} + + {%- if reflection_type == 'raw' %} + {% set reflection_type = 'RAW' %} + {%- elif reflection_type in ['aggregate', 'aggregation'] %} + {% set reflection_type = 'AGGREGATION' %} + {%- else -%} + {% do exceptions.CompilationError("invalid reflection type") %} + {%- endif -%} -{%- macro measures_clause(measures=none) %} - {%- set cols = config.get('measures', validator=validation.any[list, basestring]) or measures -%} - {%- set comp_cols = config.get('computations', validator=validation.any[list, basestring]) or [] -%} - {%- if cols is not none %} - {%- if cols is string -%} - {%- set cols = [cols] -%} - {%- endif -%} - {%- if comp_cols is string -%} - {%- set comp_cols = [comp_cols] -%} - {%- endif -%} - measures ( - {%- for item in cols -%} - {%- set computations = (' (' ~ comp_cols[loop.index0] ~ ')') - if loop.index0 < comp_cols | length and comp_cols[loop.index0] is not none else '' -%} - {{ adapter.quote(item) ~ computations }} - {%- if not loop.last -%},{%- endif -%} - {%- endfor -%} - ) - {%- endif %} -{% endmacro -%} + {% do adapter.create_reflection(reflection_name, reflection_type, anchor, display, dimensions, date_dimensions, measures, computations, partition_by, partition_transform, partition_method, distribute_by, localsort_by, arrow_cache) %} -{%- macro arrow_cache_clause() -%} - {%- set arrow_cache = config.get('arrow_cache', validator=validation.any[boolean]) -%} - {%- if arrow_cache is not none and arrow_cache -%} - arrow cache - {%- endif -%} -{% endmacro -%} + SELECT 1 +{% endmacro -%} \ No newline at end of file diff --git a/dbt/include/dremio/macros/materializations/reflection/helpers.sql b/dbt/include/dremio/macros/materializations/reflection/helpers.sql index a2559ecb..9a0a773d 100644 --- a/dbt/include/dremio/macros/materializations/reflection/helpers.sql +++ b/dbt/include/dremio/macros/materializations/reflection/helpers.sql @@ -12,17 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.*/ -{% macro drop_reflection_if_exists(relation, reflection) %} - {% if reflection is not none and reflection.type == 'materialized_view' %} - {% call statement('drop reflection') -%} - alter dataset {{ relation }} - drop reflection {{ reflection.include(database=False, schema=False) }} - {%- endcall %} - {% endif %} -{% endmacro %} - {% macro dbt_dremio_validate_get_reflection_type(raw_reflection_type) %} - {% set accepted_types = ['raw', 'aggregate', 'external'] %} + {% set accepted_types = ['raw', 'aggregate', 'aggregation', 'external'] %} {% set invalid_reflection_type_msg -%} Invalid reflection type provided: {{ raw_reflection_type }} Expected one of: {{ accepted_types | join(', ') }} diff --git a/dbt/include/dremio/macros/materializations/reflection/reflection.sql b/dbt/include/dremio/macros/materializations/reflection/reflection.sql index 9778fec0..2d3f56f5 100644 --- a/dbt/include/dremio/macros/materializations/reflection/reflection.sql +++ b/dbt/include/dremio/macros/materializations/reflection/reflection.sql @@ -13,17 +13,31 @@ See the License for the specific language governing permissions and limitations under the License.*/ {% materialization reflection, adapter='dremio' %} - {%- if not var('dremio:reflections_enabled', default=false) -%} - {% do exceptions.CompilationError("reflections are disabled, set 'dremio:reflections_enabled' variable to true to enable them") %} - {%- endif -%} + {% set reflection_name = config.get('name', validator=validation.any[basetring]) or 'Unnamed Reflection' %} {% set raw_reflection_type = config.get('reflection_type', validator=validation.any[basestring]) or 'raw' %} {% set raw_anchor = config.get('anchor', validator=validation.any[list, basestring]) %} {% set raw_external_target = config.get('external_target', validator=validation.any[list, basestring]) %} {% set identifier = model['alias'] %} {%- set display = config.get('display', validator=validation.any[list, basestring]) -%} {%- set dimensions = config.get('dimensions', validator=validation.any[list, basestring]) -%} + {%- set date_dimensions = config.get('date_dimensions', validator=validation.any[list, basestring]) -%} {%- set measures = config.get('measures', validator=validation.any[list, basestring]) -%} + {%- set computations = config.get('computations', validator=validation.any[list, basestring]) -%} + {%- set partition_by = config.get('partition_by', validator=validation.any[basestring]) -%} + {%- set partition_transform = config.get('partition_transform', validator=validation.any[basestring]) -%} + {%- set partition_method = config.get('partition_method', validator=validation.any[basestring]) or 'striped' -%} + {%- set distribute_by = config.get('distribute_by', validator=validation.any[basestring])-%} + {%- set localsort_by = config.get('localsort_by', validator=validation.any[basestring]) -%} + {%- set arrow_cache = config.get('arrow_cache') -%} + + {% set relation = this %} + + {% if measures is not none and computations is not none %} + {% if measures | length != computations | length %} + {% do exceptions.CompilationError("measures and computations should match in length") %} + {%- endif -%} + {%- endif %} {% if model.refs | length + model.sources | length == 1 %} {% if model.refs | length == 1 %} @@ -60,28 +74,48 @@ limitations under the License.*/ {%- set reflection_type = dbt_dremio_validate_get_reflection_type(raw_reflection_type) -%} {% if (reflection_type == 'raw' and display is none) - or (reflection_type == 'aggregate' and (dimensions is none or measures is none)) %} + or (reflection_type in ['aggregate', 'aggregation'] and (dimensions is none or measures is none or computations is none or partition_transform is none)) %} {% set columns = adapter.get_columns_in_relation(anchor) %} {% if reflection_type == 'raw' %} {% set display = columns | map(attribute='name') | list %} - {% elif reflection_type == 'aggregate' %} + {% elif reflection_type in ['aggregate', 'aggregation'] %} {% if dimensions is none %} {% set dimensions = columns | rejectattr('dtype', 'in', ['decimal', 'float', 'double']) | map(attribute='name') | list %} - {% set by_day_dimensions = columns | selectattr('dtype', 'in', ['timestamp']) | map(attribute='name') | list %} + {% set date_dimensions = columns | selectattr('dtype', 'in', ['timestamp']) | map(attribute='name') | list %} {% endif %} {% if measures is none %} {% set measures = columns | selectattr('dtype', 'in', ['decimal', 'float', 'double']) | map(attribute='name') | list %} {% endif %} + {% if computations is none %} + {% if measures is not none %} + {% set computations = [] %} + {% for measure in measures %} + {% if measure in columns | selectattr('dtype', 'in', ['decimal', 'float', 'double']) | map(attribute='name') | list %} + {% set _ = computations.append("SUM, COUNT") %} + {% else %} + {% set _ = computations.append("COUNT") %} + {% endif %} + {% endfor %} + {% else %} + {{ log("measures is null or undefined; not setting default computations.") }} + {% endif %} + {% endif %} + {% if partition_transform is none %} + {% if partition_by is not none %} + {% set partition_transform = ['IDENTITY'] * (partition_by | length) %} + {% else %} + {{ log("partition_by is null or undefined; not setting default partition_transform.") }} + {% endif %} + {% endif %} {% endif %} {% endif %} {{ run_hooks(pre_hooks) }} - {{ drop_reflection_if_exists(anchor, old_relation) }} -- build model {% call statement('main') -%} - {{ create_reflection(reflection_type, anchor, target_relation, external_target, - display=display, dimensions=dimensions, by_day_dimensions=by_day_dimensions, measures=measures) }} + {{ create_reflection(reflection_name, reflection_type, anchor, + display=display, dimensions=dimensions, date_dimensions=date_dimensions, measures=measures, computations=computations, partition_by=partition_by, partition_transform=partition_transform, partition_method=partition_method, distribute_by=distribute_by, localsort_by=localsort_by, arrow_cache=arrow_cache) }} {%- endcall %} {{ run_hooks(post_hooks) }} diff --git a/tests/functional/adapter/dremio_specific/test_reflections.py b/tests/functional/adapter/dremio_specific/test_reflections.py new file mode 100644 index 00000000..b4bc9bbe --- /dev/null +++ b/tests/functional/adapter/dremio_specific/test_reflections.py @@ -0,0 +1,361 @@ +import pytest +from build.lib.dbt.adapters.dremio.api.rest.client import DremioRestClient +from dbt.adapters.dremio import DremioCredentials +from dbt.adapters.dremio.api.parameters import ParametersBuilder +from dbt.adapters.dremio.api.authentication import DremioAuthentication +from dbt.tests.util import run_dbt +from pydantic.experimental.pipeline import transform + +view1_model = """ +SELECT IncidntNum, Category, Descript, DayOfWeek, TO_DATE("SF_incidents2016.json"."Date", 'YYYY-MM-DD', 1) AS "Date", "SF_incidents2016.json"."Time" AS "Time", PdDistrict, Resolution, Address, X, Y, Location, PdId +FROM Samples."samples.dremio.com"."SF_incidents2016.json" AS "SF_incidents2016.json"; +""" + +view2_model = """ +SELECT pickup_datetime, trip_distance_mi FROM Samples."samples.dremio.com"."NYC-taxi-trips" LIMIT 100 +""" + +basic_raw_model = """ +{{ config(name='Basic Raw Reflection', + materialized='reflection', + display=['Date', 'DayOfWeek', 'PdDistrict', 'Category'], + reflection_type='raw')}} +-- depends_on: {{ ref('view1') }} +""" + +basic_aggregation_model = """ +{{ config(name='Basic Aggregation Reflection', + materialized='reflection', + reflection_type='aggregation', + dimensions=['Date', 'DayOfWeek', 'PdDistrict', 'Category'], + measures=['PdId', 'Location'], + computations=['COUNT,SUM', 'COUNT'])}} +-- depends_on: {{ ref('view1') }} +""" + +partition_by_model = """ +{{ config(name='Partition By Reflection', + materialized='reflection', + display=['Date', 'DayOfWeek', 'PdDistrict', 'Category'], + reflection_type='raw', + partition_by=['DayOfWeek'])}} +-- depends_on: {{ ref('view1') }} +""" + +consolidated_partition_method_model = """ +{{ config( + name='Consolidated Partition Method Reflection', + materialized='reflection', + display=['Date', 'DayOfWeek', 'PdDistrict', 'Category'], + reflection_type='raw', + partition_by=['DayOfWeek'], + partition_method='consolidated' +) }} +-- depends_on: {{ ref('view1') }} +""" + +localsort_by_model = """ +{{ config( + name='Localsort By Reflection', + materialized='reflection', + display=['Date', 'DayOfWeek', 'PdDistrict', 'Category'], + reflection_type='raw', + partition_by=['DayOfWeek'], + partition_method='striped', + localsort_by=['Date'] +) }} +-- depends_on: {{ ref('view1') }} +""" + +granular_date_dimension_model = """ +{{ config(name='Granulate Date Dimension Reflection', + materialized='reflection', + reflection_type='aggregate', + dimensions=['pickup_datetime'], + measures=['trip_distance_mi'], + date_dimensions=['pickup_datetime'])}} +-- depends_on: {{ ref('view2') }} +""" + +distribute_by_model = """ +{{ config( + name='Distribute By Reflection', + materialized='reflection', + reflection_type='aggregate', + dimensions=['Date', 'DayOfWeek', 'PdDistrict', 'Category'], + distribute_by=['DayOfWeek'] +) }} +-- depends_on: {{ ref('view1') }} +""" + +transformations_model = """ +{{ config( + name='Transformations Reflection', + materialized='reflection', + reflection_type='aggregate', + dimensions=['Date', 'DayOfWeek', 'PdDistrict', 'Category'], + partition_by=['Date', 'DayOfWeek', 'PdDistrict', 'Category'], + partition_transform=['YEAR', 'TRUNCATE(2)', 'BUCKET(5)', 'IDENTITY'], +) }} +-- depends_on: {{ ref('view1') }} +""" + +default_transformations_model = """ +{{ config( + name='Default Transformations Reflection', + materialized='reflection', + reflection_type='aggregate', + dimensions=['Date', 'DayOfWeek', 'PdDistrict', 'Category'], + partition_by=['Date', 'DayOfWeek', 'PdDistrict', 'Category'] +) }} +-- depends_on: {{ ref('view1') }} +""" + +default_computations_model = """ +{{ config(name='Default Computations Reflection', + materialized='reflection', + reflection_type='aggregate', + measures=['pickup_datetime', 'trip_distance_mi'])}} +-- depends_on: {{ ref('view2') }} +""" + +default_displays_model = """ +{{ config(name='Default Displays Reflection', + materialized='reflection', + reflection_type='raw')}} +-- depends_on: {{ ref('view1') }} +""" + + +class TestReflectionsDremio: + @pytest.fixture(scope="class") + def client(self, adapter): + credentials = adapter.connections.profile.credentials + parameters = ParametersBuilder.build(credentials) + client = DremioRestClient(parameters.get_parameters()) + + return client + + @pytest.fixture(scope="class") + def models(self): + return { + "view1.sql": view1_model, + "view2.sql": view2_model, + "basic_raw.sql": basic_raw_model, + "basic_aggregation.sql": basic_aggregation_model, + "partition_by.sql": partition_by_model, + "consolidated_partition_method.sql": consolidated_partition_method_model, + "localsort_by.sql": localsort_by_model, + "granular_date_dimension.sql": granular_date_dimension_model, + "distribute_by.sql": distribute_by_model, + "transformations.sql": transformations_model, + "default_transformations.sql": default_transformations_model, + "default_computations.sql": default_computations_model, + "default_displays_model.sql": default_displays_model, + } + + def _create_path_list(self, database, schema): + path = [database] + if schema != 'no_schema': + folders = schema.split(".") + path.extend(folders) + return path + + def _get_reflection(self, project, client, identifier, expected_name): + client.start() + + database = project.database + schema = project.test_schema + path = self._create_path_list(database, schema) + + path.append(identifier) + + catalog_info = client.get_catalog_item( + catalog_id=None, + catalog_path=path, + ) + + reflections = client.get_reflections(catalog_info['id'])['data'] + + reflection = next((x for x in reflections if x['name'] == expected_name), None) + + return reflection + + def testBasicRawReflection(self, project, client): + run_dbt(["run", "--select", "view1", "basic_raw"]) + + reflection = self._get_reflection(project, client, "view1", "Basic Raw Reflection") + + assert reflection + assert reflection["name"] == "Basic Raw Reflection" + assert reflection["type"] == "RAW" + assert reflection["displayFields"] == [{"name": x} for x in ["Date", "DayOfWeek", "PdDistrict", "Category"]] + assert "dimensionFields" not in reflection + assert "measureFields" not in reflection + assert "distributionFields" not in reflection + assert "partitionFields" not in reflection + assert "sortFields" not in reflection + assert reflection["partitionDistributionStrategy"] == "STRIPED" + + def testBasicAggregationReflection(self, project, client): + run_dbt(["run", "--select", "view1", "basic_aggregation"]) + + reflection = self._get_reflection(project, client, "view1", "Basic Aggregation Reflection") + + assert reflection["name"] == "Basic Aggregation Reflection" + assert reflection["type"] == "AGGREGATION" + assert reflection["dimensionFields"] == [{"name": x, "granularity": "DATE"} for x in + ["Date", "DayOfWeek", "PdDistrict", "Category"]] + assert reflection["measureFields"] == [ + {"name": "PdId", "measureTypeList": ["COUNT", "SUM"]}, + {"name": "Location", "measureTypeList": ["COUNT"]} + ] + assert "distributionFields" not in reflection + assert "partitionFields" not in reflection + assert "sortFields" not in reflection + assert reflection["partitionDistributionStrategy"] == "STRIPED" + + def testPartitionByReflection(self, project, client): + run_dbt(["run", "--select", "view1", "partition_by"]) + + reflection = self._get_reflection(project, client, "view1", "Partition By Reflection") + + assert reflection["name"] == "Partition By Reflection" + assert reflection["type"] == "RAW" + assert reflection["displayFields"] == [{"name": x} for x in ["Date", "DayOfWeek", "PdDistrict", "Category"]] + assert reflection["partitionFields"] == [{"name": "DayOfWeek", "transform": {"type": "IDENTITY"}}] + assert "dimensionFields" not in reflection + assert "measureFields" not in reflection + assert "distributionFields" not in reflection + assert "sortFields" not in reflection + assert reflection["partitionDistributionStrategy"] == "STRIPED" + + def testConsolidatedPartitionMethodReflection(self, project, client): + run_dbt(["run", "--select", "view1", "consolidated_partition_method"]) + + reflection = self._get_reflection(project, client, "view1", "Consolidated Partition Method Reflection") + + assert reflection["name"] == "Consolidated Partition Method Reflection" + assert reflection["type"] == "RAW" + assert reflection["displayFields"] == [{"name": x} for x in ["Date", "DayOfWeek", "PdDistrict", "Category"]] + assert reflection["partitionFields"] == [{"name": "DayOfWeek", "transform": {"type": "IDENTITY"}}] + assert "dimensionFields" not in reflection + assert "measureFields" not in reflection + assert "distributionFields" not in reflection + assert "sortFields" not in reflection + assert reflection["partitionDistributionStrategy"] == "CONSOLIDATED" + + def testLocalsortByReflection(self, project, client): + run_dbt(["run", "--select", "view1", "localsort_by"]) + + reflection = self._get_reflection(project, client, "view1", "Localsort By Reflection") + + assert reflection["name"] == "Localsort By Reflection" + assert reflection["type"] == "RAW" + assert reflection["displayFields"] == [{"name": x} for x in ["Date", "DayOfWeek", "PdDistrict", "Category"]] + assert reflection["partitionFields"] == [{"name": "DayOfWeek", "transform": {"type": "IDENTITY"}}] + assert "dimensionFields" not in reflection + assert "measureFields" not in reflection + assert "distributionFields" not in reflection + assert reflection["sortFields"] == [{"name": "Date"}] + assert reflection["partitionDistributionStrategy"] == "STRIPED" + + def testGranularDateDimensionReflection(self, project, client): + run_dbt(["run", "--select", "view2", "granular_date_dimension"]) + + reflection = self._get_reflection(project, client, "view2", "Granulate Date Dimension Reflection") + + assert reflection["name"] == "Granulate Date Dimension Reflection" + assert reflection["type"] == "AGGREGATION" + assert reflection["dimensionFields"] == [{"name": "pickup_datetime", "granularity": "DATE"}] + assert reflection["measureFields"] == [{"name": "trip_distance_mi", "measureTypeList": ["SUM", "COUNT"]}] + assert "distributionFields" not in reflection + assert "partitionFields" not in reflection + assert "sortFields" not in reflection + assert reflection["partitionDistributionStrategy"] == "STRIPED" + + def testDistributeByReflection(self, project, client): + run_dbt(["run", "--select", "view1", "distribute_by"]) + + reflection = self._get_reflection(project, client, "view1", "Distribute By Reflection") + + assert reflection["name"] == "Distribute By Reflection" + assert reflection["type"] == "AGGREGATION" + assert reflection["dimensionFields"] == [{"name": x, "granularity": "DATE"} for x in + ["Date", "DayOfWeek", "PdDistrict", "Category"]] + assert "measureFields" not in reflection + assert reflection["distributionFields"] == [{"name": "DayOfWeek"}] + assert "partitionFields" not in reflection + assert "sortFields" not in reflection + assert reflection["partitionDistributionStrategy"] == "STRIPED" + + def testTransformationsReflection(self, project, client): + run_dbt(["run", "--select", "view1", "transformations"]) + + reflection = self._get_reflection(project, client, "view1", "Transformations Reflection") + + assert reflection["name"] == "Transformations Reflection" + assert reflection["type"] == "AGGREGATION" + assert reflection["dimensionFields"] == [{"name": x, "granularity": "DATE"} for x in + ["Date", "DayOfWeek", "PdDistrict", "Category"]] + assert "measureFields" not in reflection + assert "distributionFields" not in reflection + assert reflection["partitionFields"] == [ + {"name": "Date", "transform": {"type": "YEAR"}}, + {"name": "DayOfWeek", "transform": {"type": "TRUNCATE", "truncateTransform": {"truncateLength": 2}}}, + {"name": "PdDistrict", "transform": {"type": "BUCKET", "bucketTransform": {"bucketCount": 5}}}, + {"name": "Category", "transform": {"type": "IDENTITY"}} + ] + assert "sortFields" not in reflection + assert reflection["partitionDistributionStrategy"] == "STRIPED" + + def testDefaultTransformationsReflection(self, project, client): + run_dbt(["run", "--select", "view1", "default_transformations"]) + + reflection = self._get_reflection(project, client, "view1", "Default Transformations Reflection") + + assert reflection["name"] == "Default Transformations Reflection" + assert reflection["type"] == "AGGREGATION" + assert reflection["dimensionFields"] == [{"name": x, "granularity": "DATE"} for x in + ["Date", "DayOfWeek", "PdDistrict", "Category"]] + assert "measureFields" not in reflection + assert "distributionFields" not in reflection + assert reflection["partitionFields"] == [ + {"name": x, "transform": {"type": "IDENTITY"}} for x in ["Date", "DayOfWeek", "PdDistrict", "Category"] + ] + assert "sortFields" not in reflection + assert reflection["partitionDistributionStrategy"] == "STRIPED" + + def testDefaultComputationsReflection(self, project, client): + run_dbt(["run", "--select", "view2", "default_computations"]) + + reflection = self._get_reflection(project, client, "view2", "Default Computations Reflection") + + assert reflection["name"] == "Default Computations Reflection" + assert reflection["type"] == "AGGREGATION" + assert reflection["dimensionFields"] == [{"name": "pickup_datetime", "granularity": "DATE"}] + assert reflection["measureFields"] == [ + {"name": "pickup_datetime", "measureTypeList": ["COUNT"]}, + {"name": "trip_distance_mi", "measureTypeList": ["SUM", "COUNT"]} + ] + assert "distributionFields" not in reflection + assert "partitionFields" not in reflection + assert "sortFields" not in reflection + assert reflection["partitionDistributionStrategy"] == "STRIPED" + + def testDefaultDisplaysReflection(self, project, client): + run_dbt(["run", "--select", "view1", "default_displays_model"]) + + reflection = self._get_reflection(project, client, "view1", "Default Displays Reflection") + + assert reflection["name"] == "Default Displays Reflection" + assert reflection["type"] == "RAW" + assert reflection["displayFields"] == [{"name": x} for x in + ["IncidntNum", "Category", "Descript", "DayOfWeek", "Date", "Time", + "PdDistrict", "Resolution", "Address", "X", "Y", "Location", "PdId"]] + assert "dimensionFields" not in reflection + assert "measureFields" not in reflection + assert "distributionFields" not in reflection + assert "partitionFields" not in reflection + assert "sortFields" not in reflection + assert reflection["partitionDistributionStrategy"] == "STRIPED" From 62472305ef931b306ea8fd88845d36d0f6f1e525 Mon Sep 17 00:00:00 2001 From: Bernardo Meireles Date: Tue, 7 Jan 2025 10:08:48 +0000 Subject: [PATCH 2/5] address reviews --- CHANGELOG.md | 13 +- dbt/adapters/dremio/api/handle.py | 1 - dbt/adapters/dremio/api/rest/client.py | 23 ++- .../dremio/api/rest/entities/__init__.py | 0 .../dremio/api/rest/entities/reflection.py | 136 ++++++++++++++++++ dbt/adapters/dremio/connections.py | 60 +------- .../reflection/create_reflection.sql | 2 +- .../materializations/reflection/helpers.sql | 3 + .../reflection/reflection.sql | 4 +- .../dremio_specific/test_reflections.py | 2 - 10 files changed, 167 insertions(+), 77 deletions(-) create mode 100644 dbt/adapters/dremio/api/rest/entities/__init__.py create mode 100644 dbt/adapters/dremio/api/rest/entities/reflection.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b351c69..9171b06c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,18 @@ ## Changes - Added [DremioRestClient](dbt/adapters/dremio/api/rest/client.py) to isolate all Dremio API calls inside one class - +- [#256](https://github.com/dremio/dbt-dremio/pull/256) Reflections are now handled through the Rest API + - Non-admin users are now able to use reflections + - It is now possible to set a custom name for reflections + - If a reflection already exists in the dataset with the same name defined in the model, it will be updated instead of creating a new one + - New `date_dimensions` parameter was added to the reflection materialization, to set fields that have a `DATE` granularity + - Added Distribution Fields under `distribute_by` + - Added partition transformations under `partition_transform` + - Defaults to Original/Identity if not defined + - year/month/day/hour/bucket(n), truncate(n) + - Computations default to `SUM, COUNT` if mapped measure is numeric, `COUNT` if not + - `reflections_enabled` adapter option has been renamed to `reflections_metadata_enabled` (requires user privileges to run in dremio) + - ## Dependency - [#222](https://github.com/dremio/dbt-dremio/issues/222) Upgrade dbt-core to 1.8.8 and dbt-tests-adapter to 1.8.0 diff --git a/dbt/adapters/dremio/api/handle.py b/dbt/adapters/dremio/api/handle.py index addac087..94fd4864 100644 --- a/dbt/adapters/dremio/api/handle.py +++ b/dbt/adapters/dremio/api/handle.py @@ -34,7 +34,6 @@ def cursor(self): if self.closed: raise Exception("HandleClosed") if self._cursor is None or self._cursor.closed: - self._rest_client.start() self._cursor = DremioCursor(self._rest_client) return self._cursor diff --git a/dbt/adapters/dremio/api/rest/client.py b/dbt/adapters/dremio/api/rest/client.py index 92bfb49d..c1f0e0b6 100644 --- a/dbt/adapters/dremio/api/rest/client.py +++ b/dbt/adapters/dremio/api/rest/client.py @@ -31,29 +31,26 @@ class DremioRestClient: def __init__(self, api_parameters: Parameters): - self._parameters = api_parameters + self._parameters = self.__login(api_parameters) - def start(self): - self._parameters = self.__login() + def __login(self, api_parameters, timeout=10): + if isinstance(api_parameters.authentication, DremioPatAuthentication): + return api_parameters - def __login(self, timeout=10): - if isinstance(self._parameters.authentication, DremioPatAuthentication): - return self._parameters - - url = UrlBuilder.login_url(self._parameters) + url = UrlBuilder.login_url(api_parameters) response = _post( url, json={ - "userName": self._parameters.authentication.username, - "password": self._parameters.authentication.password, + "userName": api_parameters.authentication.username, + "password": api_parameters.authentication.password, }, timeout=timeout, - ssl_verify=self._parameters.authentication.verify_ssl, + ssl_verify=api_parameters.authentication.verify_ssl, ) - self._parameters.authentication.token = response["token"] + api_parameters.authentication.token = response["token"] - return self._parameters + return api_parameters def sql_endpoint(self, query, context=None): url = UrlBuilder.sql_url(self._parameters) diff --git a/dbt/adapters/dremio/api/rest/entities/__init__.py b/dbt/adapters/dremio/api/rest/entities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/dbt/adapters/dremio/api/rest/entities/reflection.py b/dbt/adapters/dremio/api/rest/entities/reflection.py new file mode 100644 index 00000000..fa58de92 --- /dev/null +++ b/dbt/adapters/dremio/api/rest/entities/reflection.py @@ -0,0 +1,136 @@ +from enum import Enum + +class TransformType(Enum): + YEAR = "YEAR" + MONTH = "MONTH" + DAY = "DAY" + HOUR = "HOUR" + IDENTITY = "IDENTITY" + BUCKET = "BUCKET" + TRUNCATE = "TRUNCATE" + + @classmethod + def from_string(cls, transform_str): + transform_str = transform_str.upper() + + if transform_str.startswith("BUCKET("): + return cls.BUCKET + elif transform_str.startswith("TRUNCATE("): + return cls.TRUNCATE + + try: + return cls(transform_str) + except ValueError: + return cls.IDENTITY + + def to_transform(self, raw_str): + if self in ( + TransformType.YEAR, + TransformType.MONTH, + TransformType.DAY, + TransformType.HOUR, + TransformType.IDENTITY + ): + return {"type": self.value} + + if self == TransformType.BUCKET: + bucket_count = int(raw_str.split("(")[1].split(")")[0]) + return { + "type": "BUCKET", + "bucketTransform": {"bucketCount": bucket_count}, + } + + if self == TransformType.TRUNCATE: + truncate_length = int(raw_str.split("(")[1].split(")")[0]) + return { + "type": "TRUNCATE", + "truncateTransform": {"truncateLength": truncate_length}, + } + + return {"type": TransformType.IDENTITY.value} + +class ReflectionEntity: + def __init__(self, name, reflection_type, dataset_id, display_fields, dimensions, date_dimensions, measures, computations, partition_by, partition_transform, partition_method, distribute_by, localsort_by, arrow_cache): + self.__name = name + self.__type = reflection_type + self.__dataset_id = dataset_id + self.__partition_method = partition_method.upper() + self.__display_fields = display_fields + self.__dimensions_fields = dimensions + self.__date_dimensions_fields = date_dimensions + self.__measure_fields = measures + self.__computation_fields = computations + self.__partition_by_fields = partition_by + self.__partition_transformations = partition_transform + self.__partition_method = partition_method + self.__distribute_by_fields = distribute_by + self.__local_sort_fields = localsort_by + self.__arrow_cache = arrow_cache + + def buildDisplayFields(self): + return [{"name": field} for field in self.__display_fields] + + def buildDimensionFields(self): + return [{"name": field} for field in self.__dimensions_fields] + + def buildDateFields(self): + return [{"name": date_dimension, "granularity": "DATE"} for date_dimension in self.__date_dimensions_fields] + + def buildMeasureFields(self): + return [{"name": measure, "measureTypeList": computation.split(',')} for + measure, computation in zip(self.__measure_fields, self.__computation_fields)] + + def buildPartitionFields(self): + if not self.__partition_transformations: + self.__partition_transformations = ["IDENTITY"] * len(self.__partition_by_fields) + + partition_fields = [] + + for partition, transform in zip(self.__partition_by_fields, self.__partition_transformations): + transform_type = TransformType.from_string(transform) + partition_fields.append({ + "name": partition, + "transform": transform_type.to_transform(transform) + }) + + return partition_fields + + def buildDistributionFields(self): + return [{"name": distribute} for distribute in self.__distribute_by_fields] + + def buildSortFields(self): + return [{"name": sort} for sort in self.__local_sort_fields] + + def build_payload(self): + payload = { + "type": self.__type, + "name": self.__name, + "datasetId": self.__dataset_id, + "enabled": True, + "arrowCachingEnabled": self.__arrow_cache, + "partitionDistributionStrategy": self.__partition_method.upper(), + "entityType": "reflection" + } + + if self.__display_fields: + payload["displayFields"] = self.buildDisplayFields() + + if self.__dimensions_fields: + payload["dimensionFields"] = self.buildDimensionFields() + + if self.__date_dimensions_fields: + payload["dateFields"] = self.buildDateFields() + + if self.__measure_fields and self.__computation_fields: + payload["measureFields"] = self.buildMeasureFields() + + if self.__partition_by_fields: + payload["partitionFields"] = self.buildPartitionFields() + + if self.__distribute_by_fields: + payload["distributionFields"] = self.buildDistributionFields() + + if self.__local_sort_fields: + payload["sortFields"] = self.buildSortFields() + + return payload diff --git a/dbt/adapters/dremio/connections.py b/dbt/adapters/dremio/connections.py index 43b92edf..eaebf05f 100644 --- a/dbt/adapters/dremio/connections.py +++ b/dbt/adapters/dremio/connections.py @@ -19,6 +19,7 @@ from dbt.adapters.dremio.api.cursor import DremioCursor from dbt.adapters.dremio.api.handle import DremioHandle from dbt.adapters.dremio.api.parameters import ParametersBuilder +from dbt.adapters.dremio.api.rest.entities.reflection import ReflectionEntity from dbt.adapters.dremio.relation import DremioRelation from dbt_common.clients import agate_helper @@ -231,7 +232,7 @@ def create_catalog(self, relation): self._create_folders(database, schema, rest_client) return - def create_reflection(self, name: str, type: str, anchor: DremioRelation, display: List[str], dimensions: List[str], + def create_reflection(self, name: str, reflection_type: str, anchor: DremioRelation, display: List[str], dimensions: List[str], date_dimensions: List[str], measures: List[str], computations: List[str], partition_by: List[str], partition_transform: List[str], partition_method: str, distribute_by: List[str], localsort_by: List[str], @@ -254,62 +255,7 @@ def create_reflection(self, name: str, type: str, anchor: DremioRelation, displa dataset_id = catalog_info.get("id") - payload = { - "type": type, - "name": name, - "datasetId": dataset_id, - "enabled": True, - "arrowCachingEnabled": arrow_cache, - "partitionDistributionStrategy": partition_method.upper(), - "entityType": "reflection" - } - - if display: - payload["displayFields"] = [{"name": field} for field in display] - - if dimensions: - payload["dimensionFields"] = [{"name": dimension} for dimension in dimensions] - - if date_dimensions: - payload["dateFields"] = [{"name": date_dimension, "granularity": "DATE"} for date_dimension in - date_dimensions] - - if measures and computations: - payload["measureFields"] = [{"name": measure, "measureTypeList": computation.split(',')} for - measure, computation in zip(measures, computations)] - - if partition_by: - if not partition_transform: - partition_transform = ["IDENTITY"] * len(partition_by) - - partition_fields = [] - for partition, transform in zip(partition_by, partition_transform): - transform = transform.upper() - partition_field = {"name": partition, "transform": None} - - if transform in ["YEAR", "MONTH", "DAY", "HOUR", "IDENTITY"]: - partition_field["transform"] = {"type": transform} - elif transform.startswith("BUCKET"): - bucket_count = int(transform.split("(")[1].split(")")[0]) - partition_field["transform"] = { - "type": "BUCKET", - "bucketTransform": {"bucketCount": bucket_count}, - } - elif transform.startswith("TRUNCATE"): - truncate_length = int(transform.split("(")[1].split(")")[0]) - partition_field["transform"] = { - "type": "TRUNCATE", - "truncateTransform": {"truncateLength": truncate_length}, - } - partition_fields.append(partition_field) - - payload["partitionFields"] = partition_fields - - if distribute_by: - payload["distributionFields"] = [{"name": distribute} for distribute in distribute_by] - - if localsort_by: - payload["sortFields"] = [{"name": sort} for sort in localsort_by] + payload = ReflectionEntity(name, reflection_type, dataset_id, display, dimensions, date_dimensions, measures, computations, partition_by, partition_transform, partition_method, distribute_by, localsort_by, arrow_cache).build_payload() dataset_info = rest_client.get_reflections(dataset_id) reflections_info = dataset_info.get("data") diff --git a/dbt/include/dremio/macros/materializations/reflection/create_reflection.sql b/dbt/include/dremio/macros/materializations/reflection/create_reflection.sql index b12f1e52..e82dd1f7 100644 --- a/dbt/include/dremio/macros/materializations/reflection/create_reflection.sql +++ b/dbt/include/dremio/macros/materializations/reflection/create_reflection.sql @@ -42,7 +42,7 @@ USING target {%- if reflection_type == 'raw' %} {% set reflection_type = 'RAW' %} - {%- elif reflection_type in ['aggregate', 'aggregation'] %} + {%- elif reflection_type == 'aggregate' %} {% set reflection_type = 'AGGREGATION' %} {%- else -%} {% do exceptions.CompilationError("invalid reflection type") %} diff --git a/dbt/include/dremio/macros/materializations/reflection/helpers.sql b/dbt/include/dremio/macros/materializations/reflection/helpers.sql index 9a0a773d..56ad6a68 100644 --- a/dbt/include/dremio/macros/materializations/reflection/helpers.sql +++ b/dbt/include/dremio/macros/materializations/reflection/helpers.sql @@ -21,5 +21,8 @@ limitations under the License.*/ {% if raw_reflection_type not in accepted_types %} {% do exceptions.CompilationError(invalid_reflection_type_msg) %} {% endif %} + {% if raw_reflection_type in ['aggregate', 'aggregation'] %} + {% do return('aggregate') %} + {% endif %} {% do return(raw_reflection_type) %} {% endmacro %} diff --git a/dbt/include/dremio/macros/materializations/reflection/reflection.sql b/dbt/include/dremio/macros/materializations/reflection/reflection.sql index 2d3f56f5..1aa4fcae 100644 --- a/dbt/include/dremio/macros/materializations/reflection/reflection.sql +++ b/dbt/include/dremio/macros/materializations/reflection/reflection.sql @@ -74,11 +74,11 @@ limitations under the License.*/ {%- set reflection_type = dbt_dremio_validate_get_reflection_type(raw_reflection_type) -%} {% if (reflection_type == 'raw' and display is none) - or (reflection_type in ['aggregate', 'aggregation'] and (dimensions is none or measures is none or computations is none or partition_transform is none)) %} + or (reflection_type == 'aggregate' and (dimensions is none or measures is none or computations is none or partition_transform is none)) %} {% set columns = adapter.get_columns_in_relation(anchor) %} {% if reflection_type == 'raw' %} {% set display = columns | map(attribute='name') | list %} - {% elif reflection_type in ['aggregate', 'aggregation'] %} + {% elif reflection_type == 'aggregate' %} {% if dimensions is none %} {% set dimensions = columns | rejectattr('dtype', 'in', ['decimal', 'float', 'double']) | map(attribute='name') | list %} {% set date_dimensions = columns | selectattr('dtype', 'in', ['timestamp']) | map(attribute='name') | list %} diff --git a/tests/functional/adapter/dremio_specific/test_reflections.py b/tests/functional/adapter/dremio_specific/test_reflections.py index b4bc9bbe..d98b059c 100644 --- a/tests/functional/adapter/dremio_specific/test_reflections.py +++ b/tests/functional/adapter/dremio_specific/test_reflections.py @@ -162,8 +162,6 @@ def _create_path_list(self, database, schema): return path def _get_reflection(self, project, client, identifier, expected_name): - client.start() - database = project.database schema = project.test_schema path = self._create_path_list(database, schema) From 81026104321721d70f6dbc8470f1dfe60842971c Mon Sep 17 00:00:00 2001 From: Bernardo Meireles Date: Tue, 7 Jan 2025 11:15:41 +0000 Subject: [PATCH 3/5] fix --- dbt/adapters/dremio/api/handle.py | 1 + dbt/adapters/dremio/api/rest/client.py | 23 +++++++++++-------- .../dremio_specific/test_reflections.py | 1 + 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/dbt/adapters/dremio/api/handle.py b/dbt/adapters/dremio/api/handle.py index 94fd4864..addac087 100644 --- a/dbt/adapters/dremio/api/handle.py +++ b/dbt/adapters/dremio/api/handle.py @@ -34,6 +34,7 @@ def cursor(self): if self.closed: raise Exception("HandleClosed") if self._cursor is None or self._cursor.closed: + self._rest_client.start() self._cursor = DremioCursor(self._rest_client) return self._cursor diff --git a/dbt/adapters/dremio/api/rest/client.py b/dbt/adapters/dremio/api/rest/client.py index c1f0e0b6..92bfb49d 100644 --- a/dbt/adapters/dremio/api/rest/client.py +++ b/dbt/adapters/dremio/api/rest/client.py @@ -31,26 +31,29 @@ class DremioRestClient: def __init__(self, api_parameters: Parameters): - self._parameters = self.__login(api_parameters) + self._parameters = api_parameters - def __login(self, api_parameters, timeout=10): - if isinstance(api_parameters.authentication, DremioPatAuthentication): - return api_parameters + def start(self): + self._parameters = self.__login() - url = UrlBuilder.login_url(api_parameters) + def __login(self, timeout=10): + if isinstance(self._parameters.authentication, DremioPatAuthentication): + return self._parameters + + url = UrlBuilder.login_url(self._parameters) response = _post( url, json={ - "userName": api_parameters.authentication.username, - "password": api_parameters.authentication.password, + "userName": self._parameters.authentication.username, + "password": self._parameters.authentication.password, }, timeout=timeout, - ssl_verify=api_parameters.authentication.verify_ssl, + ssl_verify=self._parameters.authentication.verify_ssl, ) - api_parameters.authentication.token = response["token"] + self._parameters.authentication.token = response["token"] - return api_parameters + return self._parameters def sql_endpoint(self, query, context=None): url = UrlBuilder.sql_url(self._parameters) diff --git a/tests/functional/adapter/dremio_specific/test_reflections.py b/tests/functional/adapter/dremio_specific/test_reflections.py index d98b059c..3cc686c0 100644 --- a/tests/functional/adapter/dremio_specific/test_reflections.py +++ b/tests/functional/adapter/dremio_specific/test_reflections.py @@ -133,6 +133,7 @@ def client(self, adapter): credentials = adapter.connections.profile.credentials parameters = ParametersBuilder.build(credentials) client = DremioRestClient(parameters.get_parameters()) + client.start() return client From 40d40b27f4c7ffbea10e105968240bfe456a29e4 Mon Sep 17 00:00:00 2001 From: Bernardo Meireles Date: Tue, 7 Jan 2025 12:15:26 +0000 Subject: [PATCH 4/5] formatting --- .../dremio/api/rest/entities/reflection.py | 17 +++++++++++------ dbt/adapters/dremio/connections.py | 7 +++++-- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/dbt/adapters/dremio/api/rest/entities/reflection.py b/dbt/adapters/dremio/api/rest/entities/reflection.py index fa58de92..5ae81ebb 100644 --- a/dbt/adapters/dremio/api/rest/entities/reflection.py +++ b/dbt/adapters/dremio/api/rest/entities/reflection.py @@ -1,5 +1,6 @@ from enum import Enum + class TransformType(Enum): YEAR = "YEAR" MONTH = "MONTH" @@ -25,11 +26,11 @@ def from_string(cls, transform_str): def to_transform(self, raw_str): if self in ( - TransformType.YEAR, - TransformType.MONTH, - TransformType.DAY, - TransformType.HOUR, - TransformType.IDENTITY + TransformType.YEAR, + TransformType.MONTH, + TransformType.DAY, + TransformType.HOUR, + TransformType.IDENTITY ): return {"type": self.value} @@ -49,8 +50,12 @@ def to_transform(self, raw_str): return {"type": TransformType.IDENTITY.value} + +# https://docs.dremio.com/24.3.x/reference/api/reflections/ class ReflectionEntity: - def __init__(self, name, reflection_type, dataset_id, display_fields, dimensions, date_dimensions, measures, computations, partition_by, partition_transform, partition_method, distribute_by, localsort_by, arrow_cache): + def __init__(self, name, reflection_type, dataset_id, display_fields, dimensions, date_dimensions, measures, + computations, partition_by, partition_transform, partition_method, distribute_by, localsort_by, + arrow_cache): self.__name = name self.__type = reflection_type self.__dataset_id = dataset_id diff --git a/dbt/adapters/dremio/connections.py b/dbt/adapters/dremio/connections.py index eaebf05f..6cbeaea4 100644 --- a/dbt/adapters/dremio/connections.py +++ b/dbt/adapters/dremio/connections.py @@ -232,7 +232,8 @@ def create_catalog(self, relation): self._create_folders(database, schema, rest_client) return - def create_reflection(self, name: str, reflection_type: str, anchor: DremioRelation, display: List[str], dimensions: List[str], + def create_reflection(self, name: str, reflection_type: str, anchor: DremioRelation, display: List[str], + dimensions: List[str], date_dimensions: List[str], measures: List[str], computations: List[str], partition_by: List[str], partition_transform: List[str], partition_method: str, distribute_by: List[str], localsort_by: List[str], @@ -255,7 +256,9 @@ def create_reflection(self, name: str, reflection_type: str, anchor: DremioRelat dataset_id = catalog_info.get("id") - payload = ReflectionEntity(name, reflection_type, dataset_id, display, dimensions, date_dimensions, measures, computations, partition_by, partition_transform, partition_method, distribute_by, localsort_by, arrow_cache).build_payload() + payload = ReflectionEntity(name, reflection_type, dataset_id, display, dimensions, date_dimensions, measures, + computations, partition_by, partition_transform, partition_method, distribute_by, + localsort_by, arrow_cache).build_payload() dataset_info = rest_client.get_reflections(dataset_id) reflections_info = dataset_info.get("data") From 4564e700dba391b27e6a8f829b422bd2b7ddf3b2 Mon Sep 17 00:00:00 2001 From: Bernardo Meireles Date: Tue, 7 Jan 2025 13:21:24 +0000 Subject: [PATCH 5/5] latest api docs --- dbt/adapters/dremio/api/rest/entities/reflection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/dremio/api/rest/entities/reflection.py b/dbt/adapters/dremio/api/rest/entities/reflection.py index 5ae81ebb..cb5cd7d4 100644 --- a/dbt/adapters/dremio/api/rest/entities/reflection.py +++ b/dbt/adapters/dremio/api/rest/entities/reflection.py @@ -51,7 +51,7 @@ def to_transform(self, raw_str): return {"type": TransformType.IDENTITY.value} -# https://docs.dremio.com/24.3.x/reference/api/reflections/ +# https://docs.dremio.com/current/reference/api/reflections/ class ReflectionEntity: def __init__(self, name, reflection_type, dataset_id, display_fields, dimensions, date_dimensions, measures, computations, partition_by, partition_transform, partition_method, distribute_by, localsort_by,