From f07215b1c805a0348eeff33cd4f58f1f5a592058 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Mon, 24 Jul 2023 20:18:20 +0200 Subject: [PATCH 1/3] Rework provider manager to treat Airflow core hooks like others --- airflow/cli/commands/connection_command.py | 2 +- airflow/hooks/filesystem.py | 47 +++++++++++++++++++++- airflow/providers_manager.py | 32 +++++++++++++++ airflow/www/forms.py | 4 -- 4 files changed, 78 insertions(+), 7 deletions(-) diff --git a/airflow/cli/commands/connection_command.py b/airflow/cli/commands/connection_command.py index 02251a70ec022..3a6909bc158d0 100644 --- a/airflow/cli/commands/connection_command.py +++ b/airflow/cli/commands/connection_command.py @@ -150,7 +150,7 @@ def _valid_uri(uri: str) -> bool: @cache def _get_connection_types() -> list[str]: """Returns connection types available.""" - _connection_types = ["fs", "mesos_framework-id", "email", "generic"] + _connection_types = [] providers_manager = ProvidersManager() for connection_type, provider_info in providers_manager.hooks.items(): if provider_info: diff --git a/airflow/hooks/filesystem.py b/airflow/hooks/filesystem.py index 39517e8cdc1ad..7660d796e8dd2 100644 --- a/airflow/hooks/filesystem.py +++ b/airflow/hooks/filesystem.py @@ -17,6 +17,9 @@ # under the License. from __future__ import annotations +from pathlib import Path +from typing import Any + from airflow.hooks.base import BaseHook @@ -33,9 +36,37 @@ class FSHook(BaseHook): Extra: {"path": "/tmp"} """ - def __init__(self, conn_id: str = "fs_default"): + conn_name_attr = "fs_conn_id" + default_conn_name = "fs_default" + conn_type = "fs" + hook_name = "File (path)" + + @staticmethod + def get_connection_form_widgets() -> dict[str, Any]: + """Returns connection widgets to add to connection form.""" + from flask_appbuilder.fieldwidgets import BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import StringField + from wtforms.validators import InputRequired + + return { + "path": StringField( + lazy_gettext("Path"), validators=[InputRequired()], widget=BS3TextFieldWidget() + ), + } + + @staticmethod + def get_ui_field_behaviour() -> dict[str, Any]: + """Returns custom field behaviour.""" + return { + "hidden_fields": ["host", "schema", "port", "login", "password", "extra"], + "relabeling": {}, + "placeholders": {}, + } + + def __init__(self, fs_conn_id: str = default_conn_name): super().__init__() - conn = self.get_connection(conn_id) + conn = self.get_connection(fs_conn_id) self.basepath = conn.extra_dejson.get("path", "") self.conn = conn @@ -49,3 +80,15 @@ def get_path(self) -> str: :return: the path. """ return self.basepath + + def test_connection(self): + """Test File connection.""" + try: + p = self.get_path() + if not p: + return False, "File Path is undefined." + if not Path(p).exists(): + return False, f"Path {p} does not exist." + return True, f"Path {p} is existing." + except Exception as e: + return False, str(e) diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py index b7689a92da619..bba64e316e1ad 100644 --- a/airflow/providers_manager.py +++ b/airflow/providers_manager.py @@ -36,6 +36,7 @@ from packaging.utils import canonicalize_name from airflow.exceptions import AirflowOptionalProviderFeatureException +from airflow.hooks.filesystem import FSHook from airflow.typing_compat import Literal from airflow.utils import yaml from airflow.utils.entry_points import entry_points_with_dist @@ -431,6 +432,37 @@ def __init__(self): ) # Set of plugins contained in providers self._plugins_set: set[PluginInfo] = set() + self._init_airflow_core_hooks() + + def _init_airflow_core_hooks(self): + """Initializes the hooks dict with default hooks from Airflow core.""" + core_dummy_hooks = { + "generic": "Generic", + "email": "Email", + "mesos_framework-id": "Mesos Framework ID", + } + for key, display in core_dummy_hooks.items(): + self._hooks_lazy_dict[key] = HookInfo( + hook_class_name=None, + connection_id_attribute_name=None, + package_name=None, + hook_name=display, + connection_type=None, + connection_testable=False, + ) + for cls in [FSHook]: + package_name = cls.__module__ + hook_class_name = f"{cls.__module__}.{cls.__name__}" + hook_info = self._import_hook( + connection_type=None, + provider_info=None, + hook_class_name=hook_class_name, + package_name=package_name, + ) + self._hook_provider_dict[hook_info.connection_type] = HookClassProvider( + hook_class_name=hook_class_name, package_name=package_name + ) + self._hooks_lazy_dict[hook_info.connection_type] = hook_info @provider_info_cache("list") def initialize_providers_list(self): diff --git a/airflow/www/forms.py b/airflow/www/forms.py index 9e143ce7b94a6..03e2d1cc9d012 100644 --- a/airflow/www/forms.py +++ b/airflow/www/forms.py @@ -195,10 +195,6 @@ def create_connection_form_class() -> type[DynamicForm]: def _iter_connection_types() -> Iterator[tuple[str, str]]: """List available connection types.""" - yield ("email", "Email") - yield ("fs", "File (path)") - yield ("generic", "Generic") - yield ("mesos_framework-id", "Mesos Framework ID") for connection_type, provider_info in providers_manager.hooks.items(): if provider_info: yield (connection_type, provider_info.hook_name) From 55b40d98033ea0cc5ae6869e6c518f6f4b892a95 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Tue, 1 Aug 2023 22:28:36 +0200 Subject: [PATCH 2/3] Rework provider manager to treat Airflow core hooks like others, fix pytest --- tests/always/test_connection.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/always/test_connection.py b/tests/always/test_connection.py index b367e9897262e..390f05133a038 100644 --- a/tests/always/test_connection.py +++ b/tests/always/test_connection.py @@ -753,14 +753,14 @@ def test_connection_test_success(self): @mock.patch.dict( "os.environ", { - "AIRFLOW_CONN_TEST_URI_NO_HOOK": "fs://", + "AIRFLOW_CONN_TEST_URI_NO_HOOK": "unknown://", }, ) def test_connection_test_no_hook(self): - conn = Connection(conn_id="test_uri_no_hook", conn_type="fs") + conn = Connection(conn_id="test_uri_no_hook", conn_type="unknown") res = conn.test_connection() assert res[0] is False - assert res[1] == 'Unknown hook type "fs"' + assert res[1] == 'Unknown hook type "unknown"' @mock.patch.dict( "os.environ", From 4e649aaf8db4e3d335b981452d396073165bba46 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Thu, 3 Aug 2023 22:14:26 +0200 Subject: [PATCH 3/3] Rework provider manager to treat Airflow core hooks like others, revert validation --- airflow/hooks/filesystem.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/airflow/hooks/filesystem.py b/airflow/hooks/filesystem.py index 7660d796e8dd2..d436e0050caa0 100644 --- a/airflow/hooks/filesystem.py +++ b/airflow/hooks/filesystem.py @@ -47,13 +47,8 @@ def get_connection_form_widgets() -> dict[str, Any]: from flask_appbuilder.fieldwidgets import BS3TextFieldWidget from flask_babel import lazy_gettext from wtforms import StringField - from wtforms.validators import InputRequired - return { - "path": StringField( - lazy_gettext("Path"), validators=[InputRequired()], widget=BS3TextFieldWidget() - ), - } + return {"path": StringField(lazy_gettext("Path"), widget=BS3TextFieldWidget())} @staticmethod def get_ui_field_behaviour() -> dict[str, Any]: