Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: trino cursor #25897

Merged
merged 3 commits into from
Nov 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions superset/db_engine_specs/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:

@classmethod
def execute_with_cursor(
cls, cursor: Any, sql: str, query: Query, session: Session
cls, cursor: Cursor, sql: str, query: Query, session: Session
) -> None:
"""
Trigger execution of a query and handle the resulting cursor.
Expand All @@ -193,34 +193,40 @@ def execute_with_cursor(
in another thread and invoke `handle_cursor` to poll for the query ID
to appear on the cursor in parallel.
"""
# Fetch the query ID beforehand, since it might fail inside the thread due to
# how the SQLAlchemy session is handled.
query_id = query.id
betodealmeida marked this conversation as resolved.
Show resolved Hide resolved

execute_result: dict[str, Any] = {}
execute_event = threading.Event()

def _execute(results: dict[str, Any]) -> None:
logger.debug("Query %d: Running query: %s", query.id, sql)
def _execute(results: dict[str, Any], event: threading.Event) -> None:
logger.debug("Query %d: Running query: %s", query_id, sql)

# Pass result / exception information back to the parent thread
try:
cls.execute(cursor, sql)
results["complete"] = True
except Exception as ex: # pylint: disable=broad-except
results["complete"] = True
results["error"] = ex
finally:
event.set()

execute_thread = threading.Thread(target=_execute, args=(execute_result,))
execute_thread = threading.Thread(
target=_execute,
args=(execute_result, execute_event),
)
execute_thread.start()

# Wait for a query ID to be available before handling the cursor, as
# it's required by that method; it may never become available on error.
while not cursor.query_id and not execute_result.get("complete"):
while not cursor.query_id and not execute_event.is_set():
time.sleep(0.1)

logger.debug("Query %d: Handling cursor", query.id)
logger.debug("Query %d: Handling cursor", query_id)
cls.handle_cursor(cursor, query, session)

# Block until the query completes; same behaviour as the client itself
logger.debug("Query %d: Waiting for query to complete", query.id)
while not execute_result.get("complete"):
time.sleep(0.5)
logger.debug("Query %d: Waiting for query to complete", query_id)
execute_event.wait()

# Unfortunately we'll mangle the stack trace due to the thread, but
# throwing the original exception allows mapping database errors as normal
Expand All @@ -234,7 +240,7 @@ def prepare_cancel_query(cls, query: Query, session: Session) -> None:
session.commit()

@classmethod
def cancel_query(cls, cursor: Any, query: Query, cancel_query_id: str) -> bool:
def cancel_query(cls, cursor: Cursor, query: Query, cancel_query_id: str) -> bool:
"""
Cancel query in the underlying database.

Expand Down