Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fsspec base Hook for filesystem operations #33578

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airflow/providers/alibaba/cloud/hooks/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from inspect import signature
from typing import TYPE_CHECKING, Callable, TypeVar, cast
from urllib.parse import urlsplit
from airflow.providers.common.filesystem.hooks.filesystem import FsApiHook

import oss2
from oss2.exceptions import ClientError
Expand Down Expand Up @@ -76,7 +77,7 @@ def get_key() -> str:
return cast(T, wrapper)


class OSSHook(BaseHook):
class OSSHook(BaseHook, FsApiHook):
"""Interact with Alibaba Cloud OSS, using the oss2 library."""

conn_name_attr = "alibabacloud_conn_id"
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/amazon/aws/hooks/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast
from urllib.parse import urlsplit
from uuid import uuid4
from airflow.providers.common.filesystem.hooks.filesystem import FsApiHook

if TYPE_CHECKING:
try:
Expand Down Expand Up @@ -145,7 +146,7 @@ def wrapper(*args, **kwargs) -> T:
return cast(T, wrapper)


class S3Hook(AwsBaseHook):
class S3Hook(AwsBaseHook, FsApiHook):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, what kind of behaviour expected here?

"""
Interact with Amazon Simple Storage Service (S3).

Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/common/filesystem/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
45 changes: 45 additions & 0 deletions airflow/providers/common/filesystem/hooks/filesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from urllib.parse import urlsplit

import fsspec

from airflow.hooks.base import BaseHook


class FsApiHook(BaseHook):
"""
Abstract base class for fsspec hooks.
"""

# Override to have a default connection id for a particular hook
default_fs_conn_name = "default_conn_id"

def __init__(self, *args, fs_conn_name: str = default_fs_conn_name, **kwargs):
self.fs_conn_name = fs_conn_name
super().__init__(*args, **kwargs)

def get_conn(self) -> fsspec.AbstractFileSystem:
conn = self.get_connection(self.fs_conn_name)
uri = urlsplit(conn.get_uri())
extras = conn.extra_dejson.get("fsspec", {})

fs = fsspec.filesystem(uri.scheme, *extras)
if full_path := uri.hostname or "" + uri.path:
fs = DirFileSystem(full_path, fs=fs)

return fs
40 changes: 40 additions & 0 deletions airflow/providers/common/filesystem/provider.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

---
package-name: apache-airflow-providers-common-fsspec
name: Common File System
description: |
`Common file system provider`

suspended: false
versions:
- 1.0.0

dependencies:
- apache-airflow>=2.4.0
- fsspec>=2023.6.0

integrations:
- integration-name: Common file system
external-doc-url: https://filesystem-spec.readthedocs.io/en/latest/
tags: [software]

hooks:
- integration-name: Common file system
python-modules:
- airflow.providers.common.filesystem.hooks.filesystem
3 changes: 2 additions & 1 deletion airflow/providers/databricks/hooks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from requests import exceptions as requests_exceptions

from airflow.exceptions import AirflowException
from airflow.providers.common.filesystem.hooks.filesystem import FsApiHook
from airflow.providers.databricks.hooks.databricks_base import BaseDatabricksHook

RESTART_CLUSTER_ENDPOINT = ("POST", "api/2.0/clusters/restart")
Expand Down Expand Up @@ -108,7 +109,7 @@ def from_json(cls, data: str) -> RunState:
return RunState(**json.loads(data))


class DatabricksHook(BaseDatabricksHook):
class DatabricksHook(BaseDatabricksHook, FsApiHook):
"""
Interact with Databricks.

Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/ftp/hooks/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
from typing import Any, Callable

from airflow.hooks.base import BaseHook
from airflow.providers.common.filesystem.hooks.filesystem import FsApiHook


class FTPHook(BaseHook):
class FTPHook(BaseHook, FsApiHook):
"""
Interact with FTP.

Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/github/hooks/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from airflow.providers.common.filesystem.hooks.filesystem import FsApiHook

from github import Github as GithubClient

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook


class GithubHook(BaseHook):
class GithubHook(BaseHook, FsApiHook):
"""
Interact with GitHub.

Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/google/cloud/hooks/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from urllib.parse import urlsplit

from aiohttp import ClientSession
from airflow.providers.common.filesystem.hooks.filesystem import FsApiHook
from gcloud.aio.storage import Storage
from google.api_core.exceptions import GoogleAPICallError, NotFound
from google.api_core.retry import Retry
Expand Down Expand Up @@ -142,7 +143,7 @@ def _inner_wrapper(self, *args, **kwargs) -> RT:
PROVIDE_BUCKET: str = cast(str, None)


class GCSHook(GoogleBaseHook):
class GCSHook(GoogleBaseHook, FsApiHook):
"""Use the Google Cloud connection to interact with Google Cloud Storage."""

_conn: storage.Client | None = None
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/http/hooks/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import aiohttp
import requests
from airflow.providers.common.filesystem.hooks.filesystem import FsApiHook
import tenacity
from aiohttp import ClientResponseError
from asgiref.sync import sync_to_async
Expand All @@ -35,7 +36,7 @@
from aiohttp.client_reqrep import ClientResponse


class HttpHook(BaseHook):
class HttpHook(BaseHook, FsApiHook):
"""Interact with HTTP servers.

:param method: the API method to be called
Expand Down