Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/tableau): implement workbook_page_size parameter #7216

Merged
merged 2 commits into from
Feb 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,24 @@ class TableauConfig(

page_size: int = Field(
default=10,
description="Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using Tableau api.",
description="[advanced] Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using the Tableau API.",
)
# We've found that even with a small workbook page size (e.g. 10), the Tableau API often
# returns warnings like this:
# {
# 'message': 'Showing partial results. The request exceeded the 20000 node limit. Use pagination, additional filtering, or both in the query to adjust results.',
# 'extensions': {
# 'severity': 'WARNING',
# 'code': 'NODE_LIMIT_EXCEEDED',
# 'properties': {
# 'nodeLimit': 20000
# }
# }
# }
# Reducing the page size for the workbook queries helps to avoid this.
workbook_page_size: int = Field(
default=1,
description="[advanced] Number of workbooks to query at a time using the Tableau API.",
)

env: str = Field(
Expand Down Expand Up @@ -363,6 +380,7 @@ def _populate_usage_stat_registry(self):
def _authenticate(self):
try:
self.server = self.config.make_tableau_client()
logger.info("Authenticated to Tableau server")
# Note that we're not catching ConfigurationError, since we want that to throw.
except ValueError as e:
self.report.report_failure(
Expand Down Expand Up @@ -435,20 +453,19 @@ def get_connection_objects(
query: str,
connection_type: str,
query_filter: str,
page_size_override: Optional[int] = None,
) -> Iterable[dict]:
# Calls the get_connection_object_page function to get the objects,
# and automatically handles pagination.

count_on_query = self.config.page_size
page_size = page_size_override or self.config.page_size

total_count = count_on_query
total_count = page_size
has_next_page = 1
offset = 0
while has_next_page:
count = (
count_on_query
if offset + count_on_query < total_count
else total_count - offset
page_size if offset + page_size < total_count else total_count - offset
)
(
connection_objects,
Expand All @@ -475,7 +492,10 @@ def emit_workbooks(self) -> Iterable[MetadataWorkUnit]:
)

for workbook in self.get_connection_objects(
workbook_graphql_query, "workbooksConnection", projects
workbook_graphql_query,
"workbooksConnection",
projects,
page_size_override=self.config.workbook_page_size,
):
yield from self.emit_workbook_as_container(workbook)
for sheet in workbook.get("sheets", []):
Expand Down