Skip to content

Commit

Permalink
Backtrack some of the abstraction and make this simpler. Still pipe t…
Browse files Browse the repository at this point in the history
…hrough retry number to query execution.
  • Loading branch information
VersusFacit committed Dec 16, 2024
1 parent a77f5f2 commit 9147081
Showing 1 changed file with 44 additions and 45 deletions.
89 changes: 44 additions & 45 deletions dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,48 @@ def add_query(
abridge_sql_log: bool = False,
retryable_exceptions: Iterable[Type[Exception]] = [],
retry_limit: int = 1,
retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1,
) -> Tuple[Connection, Any]:

"""
Retry function encapsulated here to avoid commitment to some
user-facing interface. Right now, Redshift commits to a 1 second
retry timeout so this serves as a default.
"""
def __execute_query_with_retry(
cursor: Any,
sql: str,
bindings: Optional[Any],
retryable_exceptions: Tuple[Type[Exception], ...],
retry_limit: int,
attempt: int,
):
"""
A success sees the try exit cleanly and avoid any recursive
retries. Failure begins a sleep and retry routine.
"""
try:
cursor.execute(sql, bindings)
except retryable_exceptions as e:
# Cease retries and fail when limit is hit.
if attempt >= retry_limit:
raise e

fire_event(
AdapterEventDebug(
message=f"Got a retryable error {type(e)}. {retry_limit-attempt} retries left. Retrying in 1 second.\nError:\n{e}"
)
)
time.sleep(1)

return self.__execute_query_with_retry(
cursor=cursor,
sql=sql,
bindings=bindings,
retryable_exceptions=retryable_exceptions,
retry_limit=retry_limit,
attempt=1,
)

connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
self.begin()
Expand Down Expand Up @@ -107,13 +147,13 @@ def add_query(
pre = time.perf_counter()

cursor = connection.handle.cursor()
self._retryable_cursor_execute(
execute_fn=cursor.execute,
self.__execute_query_with_retry(
cursor=cursor,
sql=sql,
bindings=bindings,
retryable_exceptions=retryable_exceptions,
retry_limit=retry_limit,
retry_timeout=retry_timeout,
attempt=1,
)

result = self.get_response(cursor)
Expand Down Expand Up @@ -224,44 +264,3 @@ def commit(self):

return connection

def _retryable_cursor_execute(
self,
execute_fn: Callable,
sql: str,
bindings: Optional[Any] = None,
retryable_exceptions: Iterable[Type[Exception]] = [],
retry_limit: int = 1,
retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1,
_attempts: int = 0,
) -> None:
timeout = retry_timeout(_attempts) if callable(retry_timeout) else retry_timeout
if timeout < 0:
raise DbtRuntimeError("retry_timeout cannot be negative or return a negative time.")

try:
execute_fn(sql, bindings)

except tuple(retryable_exceptions) as e:
retry_limit -= 1
if retry_limit <= 0:
raise e
fire_event(
AdapterEventDebug(
message=f"Got a retryable error {type(e)} when attempting to execute a query.\n"
f"{retry_limit} attempts remaining. Retrying in {timeout} seconds.\n"
f"Error:\n{e}"
)
)

time.sleep(timeout)
return self._retryable_cursor_execute(
execute_fn=execute_fn,
sql=sql,
retry_limit=retry_limit - 1,
retry_timeout=retry_timeout,
retryable_exceptions=retryable_exceptions,
_attempts=_attempts + 1,
)

except Exception as e:
raise e

0 comments on commit 9147081

Please sign in to comment.