Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): looker - reduce memory requirements #5815

Merged
merged 4 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,38 @@ def _to_metadata_events( # noqa: C901
return [mce, mcp]


class LookerExploreRegistry:
"""A caching registry of Looker Explores"""

def __init__(
self,
client: Looker31SDK,
report: SourceReport,
transport_options: Optional[TransportOptions],
):
self.client = client
self.report = report
self.transport_options = transport_options
self.explore_cache: Dict[Tuple[str, str], Optional[LookerExplore]] = {}

def get_explore(self, model: str, explore: str) -> Optional[LookerExplore]:
if (model, explore) not in self.explore_cache:
looker_explore = LookerExplore.from_api(
model,
explore,
self.client,
self.report,
transport_options=self.transport_options,
)
self.explore_cache[(model, explore)] = looker_explore
return self.explore_cache[(model, explore)]

def get_all_explores(self) -> Iterable[LookerExplore]:
for key, value in self.explore_cache.items():
if value is not None:
yield value


class StageLatency(Report):
name: str
start_time: Optional[datetime.datetime]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
LookerCommonConfig,
LookerDashboardSourceReport,
LookerExplore,
LookerExploreRegistry,
LookerUtil,
ViewField,
ViewFieldType,
Expand Down Expand Up @@ -163,8 +164,6 @@ class LookerDashboardSource(Source):
client: Looker31SDK
user_registry: LookerUserRegistry
explores_to_fetch_set: Dict[Tuple[str, str], List[str]] = {}
resolved_explores_map: Dict[Tuple[str, str], LookerExplore] = {}
resolved_dashboards_map: Dict[str, LookerDashboard] = {}
accessed_dashboards: int = 0
resolved_user_ids: int = 0
email_ids_missing: int = 0 # resolved users with missing email addresses
Expand All @@ -176,6 +175,13 @@ def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext):
looker_api: LookerAPI = LookerAPI(self.source_config)
self.client = looker_api.get_client()
self.user_registry = LookerUserRegistry(looker_api)
self.explore_registry = LookerExploreRegistry(
self.client,
self.reporter,
self.source_config.transport_options.get_transport_options()
if self.source_config.transport_options
else None,
)
# Keep stat generators to generate entity stat aspect later
stat_generator_config: looker_usage.StatGeneratorConfig = (
looker_usage.StatGeneratorConfig(
Expand Down Expand Up @@ -353,7 +359,6 @@ def _get_looker_dashboard_element( # noqa: C901
explore=exp,
via=f"look:{element.look_id}:query:{element.dashboard_id}",
)
# self.explores_to_fetch_set.add((element.query.model, exp))

return LookerDashboardElement(
id=element.id,
Expand Down Expand Up @@ -436,9 +441,6 @@ def _get_looker_dashboard_element( # noqa: C901
explore=exp,
via=f"Look:{element.look_id}:resultmaker:query",
)
# self.explores_to_fetch_set.add(
# (element.result_maker.query.model, exp)
# )

# In addition to the query, filters can point to fields as well
assert element.result_maker.filterables is not None
Expand All @@ -451,7 +453,6 @@ def _get_looker_dashboard_element( # noqa: C901
explore=filterable.view,
via=f"Look:{element.look_id}:resultmaker:filterable",
)
# self.explores_to_fetch_set.add((filterable.model, filterable.view))
listen = filterable.listen
query = element.result_maker.query
if listen is not None:
Expand Down Expand Up @@ -574,9 +575,7 @@ def _make_explore_metadata_events(
max_workers=self.source_config.max_threads
) as async_executor:
explore_futures = [
async_executor.submit(
self.fetch_one_explore, model, explore, self.resolved_explores_map
)
async_executor.submit(self.fetch_one_explore, model, explore)
for (model, explore) in self.explores_to_fetch_set
]
for future in concurrent.futures.as_completed(explore_futures):
Expand All @@ -590,10 +589,7 @@ def _make_explore_metadata_events(
return explore_events

def fetch_one_explore(
self,
model: str,
explore: str,
resolved_explores_map: Dict[Tuple[str, str], LookerExplore],
self, model: str, explore: str
) -> Tuple[
List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]],
str,
Expand All @@ -602,17 +598,8 @@ def fetch_one_explore(
]:
start_time = datetime.datetime.now()
events: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]] = []
looker_explore = LookerExplore.from_api(
model,
explore,
self.client,
self.reporter,
transport_options=self.source_config.transport_options.get_transport_options()
if self.source_config.transport_options is not None
else None,
)
looker_explore = self.explore_registry.get_explore(model, explore)
if looker_explore is not None:
resolved_explores_map[(model, explore)] = looker_explore
events = (
looker_explore._to_metadata_events(
self.source_config, self.reporter, self.source_config.base_url
Expand Down Expand Up @@ -811,13 +798,8 @@ def _get_looker_user(self, user_id: Optional[int]) -> Optional[LookerUser]:
return user

def process_metrics_dimensions_and_fields_for_dashboard(
self, dashboard_id: str
) -> Tuple[List[MetadataWorkUnit], str, datetime.datetime, datetime.datetime]:
start_time = datetime.datetime.now()

dashboard = self.resolved_dashboards_map.get(dashboard_id)
if dashboard is None:
return [], dashboard_id, start_time, datetime.datetime.now()
self, dashboard: LookerDashboard
) -> List[MetadataWorkUnit]:

chart_mcps = [
self._make_metrics_dimensions_chart_mcp(element, dashboard)
Expand All @@ -837,7 +819,7 @@ def process_metrics_dimensions_and_fields_for_dashboard(
for mcp in mcps
]

return workunits, dashboard_id, start_time, datetime.datetime.now()
return workunits

def _input_fields_from_dashboard_element(
self, dashboard_element: LookerDashboardElement
Expand All @@ -858,10 +840,14 @@ def _input_fields_from_dashboard_element(
view_field_for_reference = input_field.view_field

if input_field.view_field is None:
explore = self.resolved_explores_map.get(
(input_field.model, input_field.explore)
explore = self.explore_registry.get_explore(
input_field.model, input_field.explore
)
if explore is not None:
# add this to the list of explores to finally generate metadata for
self.add_explore_to_fetch(
input_field.model, input_field.explore, entity_urn
)
entity_urn = explore.get_explore_urn(self.source_config)
explore_fields = (
explore.fields if explore.fields is not None else []
Expand Down Expand Up @@ -977,9 +963,7 @@ def process_dashboard(
return [], None, dashboard_id, start_time, datetime.datetime.now()

looker_dashboard = self._get_looker_dashboard(dashboard_object, self.client)
self.resolved_dashboards_map[looker_dashboard.id] = looker_dashboard
mces = self._make_dashboard_and_chart_mces(looker_dashboard)
# for mce in mces:
workunits = [
MetadataWorkUnit(id=f"looker-{mce.proposedSnapshot.urn}", mce=mce)
if isinstance(mce, MetadataChangeEvent)
Expand All @@ -988,6 +972,13 @@ def process_dashboard(
)
for mce in mces
]

# add on metrics, dimensions, fields events
metric_dim_workunits = self.process_metrics_dimensions_and_fields_for_dashboard(
looker_dashboard
)
workunits.extend(metric_dim_workunits)

return (
workunits,
dashboard_object,
Expand Down Expand Up @@ -1183,30 +1174,5 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
yield workunit
self.reporter.report_stage_end("usage_extraction")

# after fetching explores, we need to go back and enrich each chart and dashboard with
# metadata about the fields
self.reporter.report_stage_start("field_metadata")
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.source_config.max_threads
) as async_executor:
async_workunits = [
async_executor.submit(
self.process_metrics_dimensions_and_fields_for_dashboard, # type: ignore
dashboard_id,
)
for dashboard_id in dashboard_ids
if dashboard_id is not None
]
for async_workunit in concurrent.futures.as_completed(async_workunits):
work_units, dashboard_id, start_time, end_time = async_workunit.result() # type: ignore
logger.debug(
f"Running time of process_metrics_dimensions_and_fields_for_dashboard for {dashboard_id} = {(end_time - start_time).total_seconds()}"
)
self.reporter.report_upstream_latency(start_time, end_time)
for mwu in work_units:
yield mwu
self.reporter.report_workunit(mwu)
self.reporter.report_stage_end("field_metadata")

def get_report(self) -> SourceReport:
return self.reporter
Original file line number Diff line number Diff line change
Expand Up @@ -261,21 +261,26 @@ def _fill_user_stat_aspect(
yield self.id_vs_model[id], aspect

def _execute_query(self, query: LookerQuery, query_name: str) -> List[Dict]:
start_time = datetime.datetime.now()
rows = self.config.looker_api_wrapper.execute_query(
write_query=query.to_write_query()
)
end_time = datetime.datetime.now()
rows = []
try:
start_time = datetime.datetime.now()
rows = self.config.looker_api_wrapper.execute_query(
write_query=query.to_write_query()
)
end_time = datetime.datetime.now()

logger.debug(
f"{self.ctx}: Retrieved {len(rows)} rows in {(end_time - start_time).total_seconds()} seconds"
)
self.report.report_query_latency(
f"{self.ctx}:{query_name}", (end_time - start_time).total_seconds()
)
if self.post_filter:
rows = [r for r in rows if self.get_id_from_row(r) in self.id_vs_model]
logger.debug(f"Filtered down to {len(rows)} rows")
except Exception as e:
logger.warning(f"Failed to execute {query_name} query", e)

logger.debug(
f"{self.ctx}: Retrieved {len(rows)} rows in {(end_time - start_time).total_seconds()} seconds"
)
self.report.report_query_latency(
f"{self.ctx}:{query_name}", (end_time - start_time).total_seconds()
)
if self.post_filter:
rows = [r for r in rows if self.get_id_from_row(r) in self.id_vs_model]
logger.debug(f"Filtered down to {len(rows)} rows")
return rows

def _append_filters(self, query: LookerQuery) -> LookerQuery:
Expand Down