From 1f662571b2133df09da22aea35936bb10b8ebffa Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sun, 12 Dec 2021 15:44:27 +0100 Subject: [PATCH] Fix MyPy errors in google.cloud.sensors (#20228) Part of #19891 --- airflow/providers/google/cloud/hooks/datafusion.py | 2 +- airflow/providers/google/cloud/sensors/bigquery_dts.py | 4 +++- airflow/providers/google/cloud/sensors/datafusion.py | 2 +- airflow/providers/google/cloud/sensors/dataproc.py | 4 ++-- airflow/providers/google/cloud/sensors/workflows.py | 7 +++++-- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/datafusion.py b/airflow/providers/google/cloud/hooks/datafusion.py index b6321dc3f5d8c..3f2828a410d57 100644 --- a/airflow/providers/google/cloud/hooks/datafusion.py +++ b/airflow/providers/google/cloud/hooks/datafusion.py @@ -407,7 +407,7 @@ def get_pipeline_workflow( instance_url: str, pipeline_id: str, namespace: str = "default", - ) -> str: + ) -> Any: url = os.path.join( self._base_url(instance_url, namespace), quote(pipeline_name), diff --git a/airflow/providers/google/cloud/sensors/bigquery_dts.py b/airflow/providers/google/cloud/sensors/bigquery_dts.py index 4b92cb4ee9cea..8a782a426bdb0 100644 --- a/airflow/providers/google/cloud/sensors/bigquery_dts.py +++ b/airflow/providers/google/cloud/sensors/bigquery_dts.py @@ -106,7 +106,9 @@ def _normalize_state_list(self, states) -> Set[TransferState]: result = set() for state in states: if isinstance(state, str): - result.add(TransferState[state.upper()]) + # The proto.Enum type is indexable (via MetaClass and aliased) but MyPy is not able to + # infer this https://github.com/python/mypy/issues/8968 + result.add(TransferState[state.upper()]) # type: ignore[misc] elif isinstance(state, int): result.add(TransferState(state)) elif isinstance(state, TransferState): diff --git a/airflow/providers/google/cloud/sensors/datafusion.py b/airflow/providers/google/cloud/sensors/datafusion.py index 54c7e4ee94b32..6b3e6e1a312a4 100644 --- a/airflow/providers/google/cloud/sensors/datafusion.py +++ b/airflow/providers/google/cloud/sensors/datafusion.py @@ -72,7 +72,7 @@ def __init__( expected_statuses: Set[str], instance_name: str, location: str, - failure_statuses: Set[str] = None, + failure_statuses: Optional[Set[str]] = None, project_id: Optional[str] = None, namespace: str = "default", gcp_conn_id: str = 'google_cloud_default', diff --git a/airflow/providers/google/cloud/sensors/dataproc.py b/airflow/providers/google/cloud/sensors/dataproc.py index 5f2b2358132c0..0fab0e4062147 100644 --- a/airflow/providers/google/cloud/sensors/dataproc.py +++ b/airflow/providers/google/cloud/sensors/dataproc.py @@ -56,7 +56,7 @@ def __init__( *, project_id: str, dataproc_job_id: str, - region: str = None, + region: Optional[str] = None, location: Optional[str] = None, gcp_conn_id: str = 'google_cloud_default', wait_timeout: Optional[int] = None, @@ -79,7 +79,7 @@ def __init__( self.dataproc_job_id = dataproc_job_id self.region = region self.wait_timeout = wait_timeout - self.start_sensor_time = None + self.start_sensor_time: Optional[float] = None def execute(self, context: Dict): self.start_sensor_time = time.monotonic() diff --git a/airflow/providers/google/cloud/sensors/workflows.py b/airflow/providers/google/cloud/sensors/workflows.py index 5950458b71f13..a1162a1d83e96 100644 --- a/airflow/providers/google/cloud/sensors/workflows.py +++ b/airflow/providers/google/cloud/sensors/workflows.py @@ -73,8 +73,11 @@ def __init__( ): super().__init__(**kwargs) - self.success_states = success_states or {Execution.State.SUCCEEDED} - self.failure_states = failure_states or {Execution.State.FAILED, Execution.State.CANCELLED} + self.success_states = success_states or {Execution.State(Execution.State.SUCCEEDED)} + self.failure_states = failure_states or { + Execution.State(Execution.State.FAILED), + Execution.State(Execution.State.CANCELLED), + } self.workflow_id = workflow_id self.execution_id = execution_id self.location = location