Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support retryable exceptions during query execution #368

Merged
merged 13 commits into from
Dec 17, 2024
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20241204-185912.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Add retry logic for retryable exceptions.
time: 2024-12-04T18:59:12.48816-08:00
custom:
Author: 'colin-rogers-dbt '
Issue: "368"
65 changes: 63 additions & 2 deletions dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import abc
import time
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING
from typing import (
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Tuple,
TYPE_CHECKING,
Type,
)

from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
Expand All @@ -18,6 +28,7 @@
SQLCommit,
SQLQuery,
SQLQueryStatus,
AdapterEventDebug,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -61,7 +72,50 @@ def add_query(
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
retryable_exceptions: Tuple[Type[Exception], ...] = tuple(),
retry_limit: int = 1,
) -> Tuple[Connection, Any]:
"""
Retry function encapsulated here to avoid commitment to some
user-facing interface. Right now, Redshift commits to a 1 second
retry timeout so this serves as a default.
"""

def _execute_query_with_retry(
cursor: Any,
sql: str,
bindings: Optional[Any],
retryable_exceptions: Tuple[Type[Exception], ...],
retry_limit: int,
attempt: int,
):
"""
A success sees the try exit cleanly and avoid any recursive
retries. Failure begins a sleep and retry routine.
"""
try:
cursor.execute(sql, bindings)
except retryable_exceptions as e:
# Cease retries and fail when limit is hit.
if attempt >= retry_limit:
raise e

fire_event(
AdapterEventDebug(
message=f"Got a retryable error {type(e)}. {retry_limit-attempt} retries left. Retrying in 1 second.\nError:\n{e}"
)
)
time.sleep(1)

return _execute_query_with_retry(
cursor=cursor,
sql=sql,
bindings=bindings,
retryable_exceptions=retryable_exceptions,
retry_limit=retry_limit,
attempt=attempt+1,
)

connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
self.begin()
Expand Down Expand Up @@ -90,7 +144,14 @@ def add_query(
pre = time.perf_counter()

cursor = connection.handle.cursor()
cursor.execute(sql, bindings)
_execute_query_with_retry(
cursor=cursor,
sql=sql,
bindings=bindings,
retryable_exceptions=retryable_exceptions,
retry_limit=retry_limit,
attempt=1,
)

result = self.get_response(cursor)

Expand Down
Loading