Skip to content

Commit

Permalink
Merge branch 'main' into feature/unit-testing-more-descriptive-sql-er…
Browse files Browse the repository at this point in the history
…ror-description-for-model-not-existing
  • Loading branch information
AlejandroBlanco2001 authored May 14, 2024
2 parents 839666b + b0b7d29 commit 74910af
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240503-162655.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add query recording for adapters which use SQLConnectionManager
time: 2024-05-03T16:26:55.350916-04:00
custom:
Author: peterallenwebb
Issue: "195"
67 changes: 67 additions & 0 deletions dbt/adapters/record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import dataclasses
from io import StringIO
import json
import re
from typing import Any, Optional, Mapping

from agate import Table

from dbt_common.events.contextvars import get_node_info
from dbt_common.record import Record, Recorder

from dbt.adapters.contracts.connection import AdapterResponse


@dataclasses.dataclass
class QueryRecordParams:
sql: str
auto_begin: bool = False
fetch: bool = False
limit: Optional[int] = None
node_unique_id: Optional[str] = None

def __post_init__(self) -> None:
if self.node_unique_id is None:
node_info = get_node_info()
self.node_unique_id = node_info["unique_id"] if node_info else ""

@staticmethod
def _clean_up_sql(sql: str) -> str:
sql = re.sub(r"--.*?\n", "", sql) # Remove single-line comments (--)
sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL) # Remove multi-line comments (/* */)
return sql.replace(" ", "").replace("\n", "")

def _matches(self, other: "QueryRecordParams") -> bool:
return self.node_unique_id == other.node_unique_id and self._clean_up_sql(
self.sql
) == self._clean_up_sql(other.sql)


@dataclasses.dataclass
class QueryRecordResult:
adapter_response: Optional["AdapterResponse"]
table: Optional[Table]

def _to_dict(self) -> Any:
buf = StringIO()
self.table.to_json(buf) # type: ignore

return {
"adapter_response": self.adapter_response.to_dict(), # type: ignore
"table": buf.getvalue(),
}

@classmethod
def _from_dict(cls, dct: Mapping) -> "QueryRecordResult":
return QueryRecordResult(
adapter_response=AdapterResponse.from_dict(dct["adapter_response"]),
table=Table.from_object(json.loads(dct["table"])),
)


class QueryRecord(Record):
params_cls = QueryRecordParams
result_cls = QueryRecordResult


Recorder.register_record_type(QueryRecord)
3 changes: 3 additions & 0 deletions dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
from dbt_common.exceptions import DbtInternalError, NotImplementedError
from dbt_common.record import record_function
from dbt_common.utils import cast_to_str

from dbt.adapters.base import BaseConnectionManager
Expand All @@ -19,6 +20,7 @@
SQLQuery,
SQLQueryStatus,
)
from dbt.adapters.record import QueryRecord

if TYPE_CHECKING:
import agate
Expand Down Expand Up @@ -143,6 +145,7 @@ def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> "agate.Tab

return table_from_data_flat(data, column_names)

@record_function(QueryRecord, method=True, tuple_result=True)
def execute(
self,
sql: str,
Expand Down
5 changes: 5 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Documentation

The documentation is divided into the following sub-folders:
* arch: Architecture Decision Records (ADRs) which explain and justify major architectural decisions
* guides: Informal documents which describe the code or our development practices at a high level
27 changes: 27 additions & 0 deletions docs/guides/record_replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Supporting Record/Replay in Adapters

This document describes how to implement support for dbt's Record/Replay Subsystem for adapters. Before reading it, make sure you understand the fundamental ideas behind Record/Replay, which are [documented in the dbt-common repo](https://github.com/dbt-labs/dbt-common/blob/docs/guides/record_replay.md).

## Recording and Replaying Warehouse Interaction

The goal of the Record/Replay Subsystem is to record all interactions between dbt and external systems, of which the data warehouse is the most obvious. Since, warehouse interaction is mediated by adapters, full Record/Replay support requires that adapters record all interactions they have with the warehouse. (It also requires that they record access to the local filesystem or external service, if that is access is not mediated by dbt itself. This includes authentication steps, opening and closing connections, beginning and ending transactions, and so forth.)

In practice, this means that any request sent to the warehouse must be recorded, along with the corresponding response. If this is done correctly, as described in the document linked in the intro, the Record portion of the Record/Replay subsystem should work as expected.

At the time of this writing, there is only an incomplete implementation of this goal, which can be found in `dbt-adapters/dbt/adapters/record.py`.

There are some important things to notice about this implementation. First, the QueryRecordResult class provides custom serialization methods `to_dict()` and `from_dict()`. This is necessary because the `AdapterResponse` and `Agate.Table` types cannot be automatically converted to and from JSON by the dataclass library, and JSON is the format used to persist recordings to disk and reload them for replay.

Another important feature is that `QueryRecordParams` implements the `_matches()` method. This method allows `dbt-adapters` to customize the way that the Record/Replay determines whether a query issued by dbt matches a previously recorded query. In this case, the method performs a comparison which attempts to ignore comments and whitespace which would not affect query behavior.

## Misc. Notes and Suggestions

Currently, support for recording data warehouse interaction is very rudimentary, however, even rudimentary support is valuable and we should be concentrating on extending it in a way that adds the most value with the least work. Usefulness, rather than perfection, is the initial goal.

Picking the right functions to record, at the right level of abstraction, will probably be the most important part of carrying this work forward.

Not every interaction with an external system has to be recorded in full detail, and authentication might prove to be a place where we exclude sensitive secrets from the recording. For example, since replay will not actually be communicating with the warehouse, it may be possible to exclude passwords and auth keys from the parameters recorded, and to exclude auth tokens from the results.

In addition to adding an appropriate decorator to functions which communicate with external systems, you should check those functions for side-effects. Since the function's calls will be mocked out in replay mode, those side-effects will not be carried out during replay. At present, we are focusing on support for recording and comparing recordings, but this is worth keeping in mind.

The current implementation records which dbt node issues a query, and uses that information to ensure a match during replay. The same node should issue the same query. A better model might be to monitor which connection issued which query, and associate the same connection with open/close operations, transaction starts/stops and so forth.

0 comments on commit 74910af

Please sign in to comment.