Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/tableau): re-authenticate if the token expires #6380

Merged
merged 5 commits into from
Nov 10, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ServerResponseError,
TableauAuth,
)
from tableauserverclient.server.endpoint.exceptions import NonXMLResponseError

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel, ConfigurationError
Expand Down Expand Up @@ -358,17 +359,34 @@ def get_connection_object_page(
connection_type: str,
query_filter: str,
count: int = 0,
current_count: int = 0,
offset: int = 0,
retry_on_auth_error: bool = True,
) -> Tuple[dict, int, int]:
logger.debug(
f"Query {connection_type} to get {count} objects with offset {current_count}"
)
query_data = query_metadata(
self.server, query, connection_type, count, current_count, query_filter
f"Query {connection_type} to get {count} objects with offset {offset}"
)
try:
query_data = query_metadata(
self.server, query, connection_type, count, offset, query_filter
)
except NonXMLResponseError:
if not retry_on_auth_error:
raise

# If ingestion has been running for over 2 hours, the Tableau
# temporary credentials will expire. If this happens, this exception
# will be thrown and we need to re-authenticate and retry.
self._authenticate()
return self.get_connection_object_page(
query, connection_type, query_filter, count, offset, False
)

if "errors" in query_data:
errors = query_data["errors"]
if all(error["extensions"]["severity"] == "WARNING" for error in errors):
if all(
error.get("extensions", {}).get("severity", None) == "WARNING"
for error in errors
):
self.report.report_warning(key=connection_type, reason=f"{errors}")
else:
raise RuntimeError(f"Query {connection_type} error: {errors}")
Expand Down Expand Up @@ -396,12 +414,12 @@ def get_connection_objects(

total_count = count_on_query
has_next_page = 1
current_count = 0
offset = 0
while has_next_page:
count = (
count_on_query
if current_count + count_on_query < total_count
else total_count - current_count
if offset + count_on_query < total_count
else total_count - offset
)
(
connection_objects,
Expand All @@ -412,10 +430,10 @@ def get_connection_objects(
connection_type,
query_filter,
count,
current_count,
offset,
)

current_count += count
offset += count

for obj in connection_objects.get("nodes", []):
yield obj
Expand Down