Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add create_temp_table support #129

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions great_expectations_provider/operators/great_expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,18 @@ def build_snowflake_connection_config_from_hook(self) -> Dict[str, str]:
def build_configured_sql_datasource_config_from_conn_id(
self,
) -> Datasource:
create_temp_table = (
self.conn.extra_dejson.get("create_temp_table")
if self.conn.extra_dejson.get("create_temp_table") is not None
else True
pankajastro marked this conversation as resolved.
Show resolved Hide resolved
)

datasource_config = {
"name": f"{self.conn.conn_id}_configured_sql_datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": create_temp_table,
**self.make_connection_configuration(),
},
"data_connectors": {
Expand Down Expand Up @@ -416,11 +423,18 @@ def build_configured_sql_datasource_batch_request(self):
def build_runtime_sql_datasource_config_from_conn_id(
self,
) -> Datasource:
create_temp_table = (
self.conn.extra_dejson.get("create_temp_table")
if self.conn.extra_dejson.get("create_temp_table") is not None
else True
)
pankajastro marked this conversation as resolved.
Show resolved Hide resolved

datasource_config = {
"name": f"{self.conn.conn_id}_runtime_sql_datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": create_temp_table,
**self.make_connection_configuration(),
},
"data_connectors": {
Expand Down
62 changes: 62 additions & 0 deletions tests/operators/test_great_expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def constructed_sql_runtime_datasource():
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": True,
"connection_string": "sqlite:///host",
},
"data_connectors": {
Expand All @@ -201,6 +202,7 @@ def constructed_sql_configured_datasource():
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": True,
"connection_string": "sqlite:///host",
},
"data_connectors": {
Expand All @@ -223,6 +225,7 @@ def constructed_sql_configured_datasource():
@pytest.fixture()
def mock_airflow_conn():
conn = mock.Mock(conn_id="sqlite_conn", schema="my_schema", host="host", conn_type="sqlite")
conn.extra_dejson.get.return_value = True
return conn


Expand Down Expand Up @@ -1076,6 +1079,7 @@ def test_great_expectations_operator__build_configured_sql_datasource_config_fro
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": True,
**test_conn_conf,
},
"data_connectors": {
Expand Down Expand Up @@ -1119,6 +1123,64 @@ def test_great_expectations_operator__build_configured_sql_datasource_config_fro
assert constructed_datasource.config == datasource_config


def test_great_expectations_operator__build_configured_sql_datasource_config_from_conn_id_uses_extra_create_temp_table(
constructed_sql_configured_datasource,
):
constructed_sql_configured_datasource["execution_engine"]["create_temp_table"] = False

operator = GreatExpectationsOperator(
task_id="task_id",
data_context_config=in_memory_data_context_config,
data_asset_name="default_schema.my_sqlite_table",
conn_id="sqlite_conn",
expectation_suite_name="suite",
schema="my_schema",
)
operator.conn = Connection(
conn_id="sqlite_conn",
conn_type="sqlite",
host="host",
login="user",
password="password",
schema="schema",
extra={"create_temp_table": False},
)

constructed_datasource = operator.build_configured_sql_datasource_config_from_conn_id()

assert isinstance(constructed_datasource, Datasource)
assert constructed_datasource.config == constructed_sql_configured_datasource


def test_great_expectations_operator__build_runtime_sql_datasource_config_from_conn_id_uses_extra_create_temp_table(
constructed_sql_runtime_datasource,
):
constructed_sql_runtime_datasource["execution_engine"]["create_temp_table"] = False

operator = GreatExpectationsOperator(
task_id="task_id",
data_context_config=in_memory_data_context_config,
data_asset_name="default_schema.my_sqlite_table",
conn_id="sqlite_conn",
expectation_suite_name="suite",
schema="my_schema",
)
operator.conn = Connection(
conn_id="sqlite_conn",
conn_type="sqlite",
host="host",
login="user",
password="password",
schema="schema",
extra={"create_temp_table": False},
)

constructed_datasource = operator.build_runtime_sql_datasource_config_from_conn_id()

assert isinstance(constructed_datasource, Datasource)
assert constructed_datasource.config == constructed_sql_runtime_datasource


def test_great_expectations_operator__make_connection_string_raise_error():
operator = GreatExpectationsOperator(
task_id="task_id",
Expand Down
Loading