Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a base python job helper class #5802

Merged
merged 3 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20220909-115220.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Add PythonJobHelper base class in core and add more type checking
time: 2022-09-09T11:52:20.419364-07:00
custom:
Author: ChenyuLInx
Issue: "5802"
PR: "5802"
2 changes: 1 addition & 1 deletion core/dbt/adapters/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
SchemaSearchMap,
)
from dbt.adapters.base.column import Column # noqa
from dbt.adapters.base.impl import AdapterConfig, BaseAdapter # noqa
from dbt.adapters.base.impl import AdapterConfig, BaseAdapter, PythonJobHelper # noqa
from dbt.adapters.base.plugin import AdapterPlugin # noqa
83 changes: 60 additions & 23 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
SchemaSearchMap,
)
from dbt.adapters.base import Column as BaseColumn
from dbt.adapters.base import Credentials
from dbt.adapters.cache import RelationsCache, _make_key


Expand Down Expand Up @@ -127,6 +128,35 @@ def _relation_name(rel: Optional[BaseRelation]) -> str:
return str(rel)


def log_code_execution(code_execution_function):
# decorator to log code and execution time
if code_execution_function.__name__ != "submit_python_job":
raise ValueError("this should be only used to log submit_python_job now")

def execution_with_log(*args):
self = args[0]
connection_name = self.connections.get_thread_connection().name
fire_event(CodeExecution(conn_name=connection_name, code_content=args[2]))
start_time = time.time()
response = code_execution_function(*args)
fire_event(
CodeExecutionStatus(
status=response._message, elapsed=round((time.time() - start_time), 2)
)
)
return response

return execution_with_log


class PythonJobHelper:
def __init__(self, parsed_model: Dict, credential: Credentials) -> None:
raise NotImplementedError("PythonJobHelper is not implemented yet")

def submit(self, compiled_code: str) -> Any:
raise NotImplementedError("PythonJobHelper submit function is not implemented yet")


class BaseAdapter(metaclass=AdapterMeta):
"""The BaseAdapter provides an abstract base class for adapters.

Expand Down Expand Up @@ -1182,9 +1212,37 @@ def get_rows_different_sql(

return sql

@property
def python_submission_helpers(self) -> Dict[str, Type[PythonJobHelper]]:
raise NotImplementedError("python_submission_helpers is not specified")

@property
def default_python_submission_method(self) -> str:
raise NotImplementedError("default_python_submission_method is not specified")

@available.parse_none
def submit_python_job(self, parsed_model: dict, compiled_code: str):
raise NotImplementedException("`submit_python_job` is not implemented for this adapter!")
@log_code_execution
def submit_python_job(self, parsed_model: dict, compiled_code: str) -> AdapterResponse:
submission_method = parsed_model["config"].get(
"submission_method", self.default_python_submission_method
)
if submission_method not in self.python_submission_helpers:
raise NotImplementedError(
"Submission method {} is not supported for current adapter".format(
submission_method
)
)
job_helper = self.python_submission_helpers[submission_method](
parsed_model, self.connections.profile.credentials
)
submission_result = job_helper.submit(compiled_code)
# process submission result to generate adapter response
return self.generate_python_submission_response(submission_result)

def generate_python_submission_response(self, submission_result: Any) -> AdapterResponse:
raise NotImplementedException(
"Your adapter need to implement generate_python_submission_response"
)

def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Expand Down Expand Up @@ -1274,24 +1332,3 @@ def catch_as_completed(
# exc is not None, derives from Exception, and isn't ctrl+c
exceptions.append(exc)
return merge_tables(tables), exceptions


def log_code_execution(code_execution_function):
# decorator to log code and execution time
if code_execution_function.__name__ != "submit_python_job":
raise ValueError("this should be only used to log submit_python_job now")

def execution_with_log(*args):
self = args[0]
connection_name = self.connections.get_thread_connection().name
fire_event(CodeExecution(conn_name=connection_name, code_content=args[2]))
start_time = time.time()
response = code_execution_function(*args)
fire_event(
CodeExecutionStatus(
status=response._message, elapsed=round((time.time() - start_time), 2)
)
)
return response

return execution_with_log
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{%- set ref_dict = {} -%}
{%- for _ref in model.refs -%}
{%- set resolved = ref(*_ref) -%}
{%- do ref_dict.update({_ref | join("."): resolved | string}) -%}
{%- do ref_dict.update({_ref | join("."): resolved.quote(database=False, schema=False, identifier=False) | string}) -%}
{%- endfor -%}

def ref(*args,dbt_load_df_function):
Expand All @@ -18,7 +18,7 @@ def ref(*args,dbt_load_df_function):
{%- set source_dict = {} -%}
{%- for _source in model.sources -%}
{%- set resolved = source(*_source) -%}
{%- do source_dict.update({_source | join("."): resolved | string}) -%}
{%- do source_dict.update({_source | join("."): resolved.quote(database=False, schema=False, identifier=False) | string}) -%}
{%- endfor -%}

def source(*args, dbt_load_df_function):
Expand Down