Skip to content

Commit

Permalink
fix(ingest/tableau): implement workbook_page_size parameter (datahub-…
Browse files Browse the repository at this point in the history
…project#7216)

Co-authored-by: John Joyce <[email protected]>
  • Loading branch information
2 people authored and Oleg Ruban committed Feb 28, 2023
1 parent 8c3bfe8 commit 20737c2
Showing 1 changed file with 27 additions and 7 deletions.
34 changes: 27 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,24 @@ class TableauConfig(

page_size: int = Field(
default=10,
description="Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using Tableau api.",
description="[advanced] Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using the Tableau API.",
)
# We've found that even with a small workbook page size (e.g. 10), the Tableau API often
# returns warnings like this:
# {
# 'message': 'Showing partial results. The request exceeded the 20000 node limit. Use pagination, additional filtering, or both in the query to adjust results.',
# 'extensions': {
# 'severity': 'WARNING',
# 'code': 'NODE_LIMIT_EXCEEDED',
# 'properties': {
# 'nodeLimit': 20000
# }
# }
# }
# Reducing the page size for the workbook queries helps to avoid this.
workbook_page_size: int = Field(
default=1,
description="[advanced] Number of workbooks to query at a time using the Tableau API.",
)

env: str = Field(
Expand Down Expand Up @@ -363,6 +380,7 @@ def _populate_usage_stat_registry(self):
def _authenticate(self):
try:
self.server = self.config.make_tableau_client()
logger.info("Authenticated to Tableau server")
# Note that we're not catching ConfigurationError, since we want that to throw.
except ValueError as e:
self.report.report_failure(
Expand Down Expand Up @@ -435,20 +453,19 @@ def get_connection_objects(
query: str,
connection_type: str,
query_filter: str,
page_size_override: Optional[int] = None,
) -> Iterable[dict]:
# Calls the get_connection_object_page function to get the objects,
# and automatically handles pagination.

count_on_query = self.config.page_size
page_size = page_size_override or self.config.page_size

total_count = count_on_query
total_count = page_size
has_next_page = 1
offset = 0
while has_next_page:
count = (
count_on_query
if offset + count_on_query < total_count
else total_count - offset
page_size if offset + page_size < total_count else total_count - offset
)
(
connection_objects,
Expand All @@ -475,7 +492,10 @@ def emit_workbooks(self) -> Iterable[MetadataWorkUnit]:
)

for workbook in self.get_connection_objects(
workbook_graphql_query, "workbooksConnection", projects
workbook_graphql_query,
"workbooksConnection",
projects,
page_size_override=self.config.workbook_page_size,
):
yield from self.emit_workbook_as_container(workbook)
for sheet in workbook.get("sheets", []):
Expand Down

0 comments on commit 20737c2

Please sign in to comment.