Skip to content

Commit

Permalink
fix(ingest): compat with mypy 0.981 (#6056)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Sep 27, 2022
1 parent 3f1d47c commit 7da2e98
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 84 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def get_long_description():
"flake8>=3.8.3",
"flake8-tidy-imports>=4.3.0",
"isort>=5.7.0",
"mypy>=0.950",
"mypy>=0.981",
# pydantic 1.8.2 is incompatible with mypy 0.910.
# See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910.
# Restricting top version to <1.10 until we can fix our types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from looker_sdk.sdk.api31.models import Dashboard, LookWithQuery

import datahub.emitter.mce_builder as builder
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mce_builder import Aspect, AspectAbstract
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.source.looker import looker_common
from datahub.ingestion.source.looker.looker_common import (
Expand Down Expand Up @@ -192,11 +192,13 @@ def get_filter(self) -> Dict[ViewField, str]:
pass

@abstractmethod
def to_entity_absolute_stat_aspect(self, looker_object: ModelForUsage) -> Aspect:
def to_entity_absolute_stat_aspect(
self, looker_object: ModelForUsage
) -> AspectAbstract:
pass

@abstractmethod
def to_entity_timeseries_stat_aspect(self, row: Dict) -> Aspect:
def to_entity_timeseries_stat_aspect(self, row: Dict) -> AspectAbstract:
pass

@abstractmethod
Expand Down Expand Up @@ -242,9 +244,9 @@ def _get_user_identifier(self, row: Dict) -> int:

def _process_entity_timeseries_rows(
self, rows: List[Dict]
) -> Dict[Tuple[str, str], Aspect]:
) -> Dict[Tuple[str, str], AspectAbstract]:
# Convert Looker entity stat i.e. rows to DataHub stat aspect
entity_stat_aspect: Dict[Tuple[str, str], Aspect] = {}
entity_stat_aspect: Dict[Tuple[str, str], AspectAbstract] = {}

for row in rows:
logger.debug(row)
Expand All @@ -254,8 +256,8 @@ def _process_entity_timeseries_rows(

return entity_stat_aspect

def _process_absolute_aspect(self) -> List[Tuple[ModelForUsage, Aspect]]:
aspects: List[Tuple[ModelForUsage, Aspect]] = []
def _process_absolute_aspect(self) -> List[Tuple[ModelForUsage, AspectAbstract]]:
aspects: List[Tuple[ModelForUsage, AspectAbstract]] = []
for looker_object in self.looker_models:
aspects.append(
(looker_object, self.to_entity_absolute_stat_aspect(looker_object))
Expand Down Expand Up @@ -463,20 +465,19 @@ def _get_mcp_attributes(self, model: ModelForUsage) -> Dict:
"aspectName": "dashboardUsageStatistics",
}

def to_entity_absolute_stat_aspect(self, looker_object: ModelForUsage) -> Aspect:
def to_entity_absolute_stat_aspect(
self, looker_object: ModelForUsage
) -> DashboardUsageStatisticsClass:
looker_dashboard: LookerDashboardForUsage = cast(
LookerDashboardForUsage, looker_object
)
if looker_dashboard.view_count:
self.report.dashboards_with_activity.add(str(looker_dashboard.id))
return cast(
Aspect,
DashboardUsageStatisticsClass(
timestampMillis=round(datetime.datetime.now().timestamp() * 1000),
favoritesCount=looker_dashboard.favorite_count,
viewsCount=looker_dashboard.view_count,
lastViewedAt=looker_dashboard.last_viewed_at,
),
return DashboardUsageStatisticsClass(
timestampMillis=round(datetime.datetime.now().timestamp() * 1000),
favoritesCount=looker_dashboard.favorite_count,
viewsCount=looker_dashboard.view_count,
lastViewedAt=looker_dashboard.last_viewed_at,
)

def get_entity_timeseries_query(self) -> LookerQuery:
Expand All @@ -485,20 +486,19 @@ def get_entity_timeseries_query(self) -> LookerQuery:
def get_entity_user_timeseries_query(self) -> LookerQuery:
return query_collection[QueryId.DASHBOARD_PER_USER_PER_DAY_USAGE_STAT]

def to_entity_timeseries_stat_aspect(self, row: dict) -> Aspect:
def to_entity_timeseries_stat_aspect(
self, row: dict
) -> DashboardUsageStatisticsClass:
self.report.dashboards_with_activity.add(
row[HistoryViewField.HISTORY_DASHBOARD_ID]
)
return cast(
Aspect,
DashboardUsageStatisticsClass(
timestampMillis=self._round_time(
row[HistoryViewField.HISTORY_CREATED_DATE]
),
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
uniqueUserCount=row[HistoryViewField.HISTORY_DASHBOARD_USER],
executionsCount=row[HistoryViewField.HISTORY_DASHBOARD_RUN_COUNT],
return DashboardUsageStatisticsClass(
timestampMillis=self._round_time(
row[HistoryViewField.HISTORY_CREATED_DATE]
),
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
uniqueUserCount=row[HistoryViewField.HISTORY_DASHBOARD_USER],
executionsCount=row[HistoryViewField.HISTORY_DASHBOARD_RUN_COUNT],
)

def append_user_stat(
Expand Down Expand Up @@ -584,18 +584,17 @@ def _get_mcp_attributes(self, model: ModelForUsage) -> Dict:
"aspectName": "chartUsageStatistics",
}

def to_entity_absolute_stat_aspect(self, looker_object: ModelForUsage) -> Aspect:
def to_entity_absolute_stat_aspect(
self, looker_object: ModelForUsage
) -> ChartUsageStatisticsClass:
looker_look: LookerChartForUsage = cast(LookerChartForUsage, looker_object)
assert looker_look.id
if looker_look.view_count:
self.report.charts_with_activity.add(looker_look.id)

return cast(
Aspect,
ChartUsageStatisticsClass(
timestampMillis=round(datetime.datetime.now().timestamp() * 1000),
viewsCount=looker_look.view_count,
),
return ChartUsageStatisticsClass(
timestampMillis=round(datetime.datetime.now().timestamp() * 1000),
viewsCount=looker_look.view_count,
)

def get_entity_timeseries_query(self) -> LookerQuery:
Expand All @@ -604,18 +603,15 @@ def get_entity_timeseries_query(self) -> LookerQuery:
def get_entity_user_timeseries_query(self) -> LookerQuery:
return query_collection[QueryId.LOOK_PER_USER_PER_DAY_USAGE_STAT]

def to_entity_timeseries_stat_aspect(self, row: dict) -> Aspect:
def to_entity_timeseries_stat_aspect(self, row: dict) -> ChartUsageStatisticsClass:
self.report.charts_with_activity.add(str(row[LookViewField.LOOK_ID]))

return cast(
Aspect,
ChartUsageStatisticsClass(
timestampMillis=self._round_time(
row[HistoryViewField.HISTORY_CREATED_DATE]
),
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
viewsCount=row[HistoryViewField.HISTORY_COUNT],
return ChartUsageStatisticsClass(
timestampMillis=self._round_time(
row[HistoryViewField.HISTORY_CREATED_DATE]
),
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
viewsCount=row[HistoryViewField.HISTORY_COUNT],
)

def append_user_stat(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, List, Optional, Union, cast
from typing import Callable, List, Optional, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import (
Expand All @@ -22,12 +22,7 @@


class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel):
# Workaround for https://github.com/python/mypy/issues/708.
# Suggested by https://stackoverflow.com/a/64528725/5004662.
get_owners_to_add: Union[
Callable[[str], List[OwnerClass]],
Callable[[str], List[OwnerClass]],
]
get_owners_to_add: Callable[[str], List[OwnerClass]]
default_actor: str = builder.make_user_urn("etl")

_resolve_owner_fn = pydantic_resolve_key("get_owners_to_add")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, List, Optional, Union, cast
from typing import Callable, List, Optional, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import (
Expand All @@ -20,12 +20,7 @@


class AddDatasetSchemaTagsConfig(TransformerSemanticsConfigModel):
# Workaround for https://github.com/python/mypy/issues/708.
# Suggested by https://stackoverflow.com/a/64528725/5004662.
get_tags_to_add: Union[
Callable[[str], List[TagAssociationClass]],
Callable[[str], List[TagAssociationClass]],
]
get_tags_to_add: Callable[[str], List[TagAssociationClass]]

_resolve_tag_fn = pydantic_resolve_key("get_tags_to_add")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, Dict, List, Optional, Union, cast
from typing import Callable, Dict, List, Optional, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import (
Expand All @@ -21,12 +21,7 @@


class AddDatasetSchemaTermsConfig(TransformerSemanticsConfigModel):
# Workaround for https://github.com/python/mypy/issues/708.
# Suggested by https://stackoverflow.com/a/64528725/5004662.
get_terms_to_add: Union[
Callable[[str], List[GlossaryTermAssociationClass]],
Callable[[str], List[GlossaryTermAssociationClass]],
]
get_terms_to_add: Callable[[str], List[GlossaryTermAssociationClass]]

_resolve_term_fn = pydantic_resolve_key("get_terms_to_add")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable, List, Optional, Union, cast
from typing import Callable, List, Optional, cast

from datahub.configuration.common import (
KeyValuePattern,
Expand All @@ -14,12 +14,7 @@


class AddDatasetTagsConfig(TransformerSemanticsConfigModel):
# Workaround for https://github.com/python/mypy/issues/708.
# Suggested by https://stackoverflow.com/a/64528725/5004662.
get_tags_to_add: Union[
Callable[[str], List[TagAssociationClass]],
Callable[[str], List[TagAssociationClass]],
]
get_tags_to_add: Callable[[str], List[TagAssociationClass]]

_resolve_tag_fn = pydantic_resolve_key("get_tags_to_add")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Callable, List, Optional, Union, cast
from typing import Callable, List, Optional, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import (
Expand All @@ -20,12 +20,7 @@


class AddDatasetTermsConfig(TransformerSemanticsConfigModel):
# Workaround for https://github.com/python/mypy/issues/708.
# Suggested by https://stackoverflow.com/a/64528725/5004662.
get_terms_to_add: Union[
Callable[[str], List[GlossaryTermAssociationClass]],
Callable[[str], List[GlossaryTermAssociationClass]],
]
get_terms_to_add: Callable[[str], List[GlossaryTermAssociationClass]]

_resolve_term_fn = pydantic_resolve_key("get_terms_to_add")

Expand Down
16 changes: 10 additions & 6 deletions metadata-ingestion/src/datahub/utilities/memory_leak_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
from functools import wraps
from typing import Any, Callable, Dict, List, TypeVar, Union, cast

import click
from typing_extensions import Concatenate, ParamSpec

logger = logging.getLogger(__name__)
T = TypeVar("T")
P = ParamSpec("P")


def _trace_has_file(trace: tracemalloc.Traceback, file_pattern: str) -> bool:
Expand Down Expand Up @@ -75,20 +79,20 @@ def _perform_leak_detection() -> None:
tracemalloc.stop()


# TODO: Transition to ParamSpec with the first arg being click.Context (using typing_extensions.Concatenate)
# once fully supported by mypy.
def with_leak_detection(func: Callable[..., T]) -> Callable[..., T]:
def with_leak_detection(
func: Callable[Concatenate[click.Context, P], T]
) -> Callable[Concatenate[click.Context, P], T]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
detect_leaks: bool = args[0].obj.get("detect_memory_leaks", False)
def wrapper(ctx: click.Context, *args: P.args, **kwargs: P.kwargs) -> Any:
detect_leaks: bool = ctx.obj.get("detect_memory_leaks", False)
if detect_leaks:
logger.info(
f"Initializing memory leak detection on command: {func.__module__}.{func.__name__}"
)
_init_leak_detection()

try:
return func(*args, **kwargs)
return func(ctx, *args, **kwargs)
finally:
if detect_leaks:
logger.info(
Expand Down

0 comments on commit 7da2e98

Please sign in to comment.