From bf43a07edd18bc5b47dc6e94708b636369dfd596 Mon Sep 17 00:00:00 2001
From: "github-actions[bot]"
<41898282+github-actions[bot]@users.noreply.github.com>
Date: Fri, 25 Oct 2024 10:09:39 +0800
Subject: [PATCH] [#5221] feat(python-client): Support OSS for fileset python
client (#5239)
### What changes were proposed in this pull request?
Add support for Aliyun OSS python client.
### Why are the changes needed?
It's a need
Fix: #5221
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
Test locally. Do the following change to `test_gvfs_with_oss.py` and run
`./gradlew :clients:client-python:test -PskipDockerTests=false`
Co-authored-by: Qi Yu
---
.../gravitino/filesystem/gvfs.py | 78 ++++
.../gravitino/filesystem/gvfs_config.py | 4 +
clients/client-python/requirements.txt | 3 +-
.../tests/integration/test_gvfs_with_oss.py | 353 ++++++++++++++++++
4 files changed, 437 insertions(+), 1 deletion(-)
create mode 100644 clients/client-python/tests/integration/test_gvfs_with_oss.py
diff --git a/clients/client-python/gravitino/filesystem/gvfs.py b/clients/client-python/gravitino/filesystem/gvfs.py
index a9201a83326..8176c325a81 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -49,6 +49,7 @@ class StorageType(Enum):
LOCAL = "file"
GCS = "gs"
S3A = "s3a"
+ OSS = "oss"
class FilesetContextPair:
@@ -318,6 +319,7 @@ def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
StorageType.HDFS,
StorageType.GCS,
StorageType.S3A,
+ StorageType.OSS,
]:
src_context_pair.filesystem().mv(
self._strip_storage_protocol(storage_type, src_actual_path),
@@ -567,6 +569,14 @@ def _convert_actual_path(
or storage_location.startswith(f"{StorageType.S3A.value}://")
):
actual_prefix = infer_storage_options(storage_location)["path"]
+ elif storage_location.startswith(f"{StorageType.OSS.value}:/"):
+ ops = infer_storage_options(storage_location)
+ if "host" not in ops or "path" not in ops:
+ raise GravitinoRuntimeException(
+ f"Storage location:{storage_location} doesn't support now."
+ )
+
+ actual_prefix = ops["host"] + ops["path"]
elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"):
actual_prefix = storage_location[len(f"{StorageType.LOCAL.value}:") :]
else:
@@ -733,6 +743,8 @@ def _recognize_storage_type(path: str):
return StorageType.GCS
if path.startswith(f"{StorageType.S3A.value}://"):
return StorageType.S3A
+ if path.startswith(f"{StorageType.OSS.value}://"):
+ return StorageType.OSS
raise GravitinoRuntimeException(
f"Storage type doesn't support now. Path:{path}"
)
@@ -756,12 +768,46 @@ def _strip_storage_protocol(storage_type: StorageType, path: str):
:param storage_type: The storage type
:param path: The path
:return: The stripped path
+
+ We will handle OSS differently from S3 and GCS, because OSS has different behavior than S3 and GCS.
+ Please see the following example:
+
+ ```
+ >> oss = context_pair.filesystem()
+ >> oss.ls('oss://bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/test_gvfs_fileset/test_ls')
+ DEBUG:ossfs:Get directory listing page for bucket-xiaoyu/test_gvfs_catalog678/
+ test_gvfs_schema/test_gvfs_fileset
+ DEBUG:ossfs:CALL: ObjectIterator - () - {'prefix': 'test_gvfs_catalog678/test_gvfs_schema
+ /test_gvfs_fileset/', 'delimiter': '/'}
+ []
+ >> oss.ls('bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/test_gvfs_fileset/test_ls')
+ DEBUG:ossfs:Get directory listing page for bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema
+ /test_gvfs_fileset/test_ls
+ DEBUG:ossfs:CALL: ObjectIterator - () - {'prefix': 'test_gvfs_catalog678/test_gvfs_schema
+ /test_gvfs_fileset/test_ls/', 'delimiter': '/'}
+ [{'name': 'bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/test_gvfs_fileset/test_ls
+ /test.file', 'type': 'file', 'size': 0, 'LastModified': 1729754793,
+ 'Size': 0, 'Key': 'bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/
+ test_gvfs_fileset/test_ls/test.file'}]
+
+ ```
+
+ Please take a look at the above example: if we do not remove the protocol (starts with oss://),
+ it will always return an empty array when we call `oss.ls`, however, if we remove the protocol,
+ it will produce the correct result as expected.
"""
if storage_type in (StorageType.HDFS, StorageType.GCS, StorageType.S3A):
return path
if storage_type == StorageType.LOCAL:
return path[len(f"{StorageType.LOCAL.value}:") :]
+ # OSS has different behavior than S3 and GCS, if we do not remove the
+ # protocol, it will always return an empty array.
+ if storage_type == StorageType.OSS:
+ if path.startswith(f"{StorageType.OSS.value}://"):
+ return path[len(f"{StorageType.OSS.value}://") :]
+ return path
+
raise GravitinoRuntimeException(
f"Storage type:{storage_type} doesn't support now."
)
@@ -835,6 +881,8 @@ def _get_filesystem(self, actual_file_location: str):
fs = self._get_gcs_filesystem()
elif storage_type == StorageType.S3A:
fs = self._get_s3_filesystem()
+ elif storage_type == StorageType.OSS:
+ fs = self._get_oss_filesystem()
else:
raise GravitinoRuntimeException(
f"Storage type: `{storage_type}` doesn't support now."
@@ -887,5 +935,35 @@ def _get_s3_filesystem(self):
endpoint_url=aws_endpoint_url,
)
+ def _get_oss_filesystem(self):
+ # get 'oss_access_key_id' from oss options, if the key is not found, throw an exception
+ oss_access_key_id = self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY)
+ if oss_access_key_id is None:
+ raise GravitinoRuntimeException(
+ "OSS access key id is not found in the options."
+ )
+
+ # get 'oss_secret_access_key' from oss options, if the key is not found, throw an exception
+ oss_secret_access_key = self._options.get(
+ GVFSConfig.GVFS_FILESYSTEM_OSS_SECRET_KEY
+ )
+ if oss_secret_access_key is None:
+ raise GravitinoRuntimeException(
+ "OSS secret access key is not found in the options."
+ )
+
+ # get 'oss_endpoint_url' from oss options, if the key is not found, throw an exception
+ oss_endpoint_url = self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT)
+ if oss_endpoint_url is None:
+ raise GravitinoRuntimeException(
+ "OSS endpoint url is not found in the options."
+ )
+
+ return importlib.import_module("ossfs").OSSFileSystem(
+ key=oss_access_key_id,
+ secret=oss_secret_access_key,
+ endpoint=oss_endpoint_url,
+ )
+
fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem)
diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py b/clients/client-python/gravitino/filesystem/gvfs_config.py
index 7ffacdb095d..00ae8c6419e 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_config.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_config.py
@@ -37,3 +37,7 @@ class GVFSConfig:
GVFS_FILESYSTEM_S3_ACCESS_KEY = "s3_access_key"
GVFS_FILESYSTEM_S3_SECRET_KEY = "s3_secret_key"
GVFS_FILESYSTEM_S3_ENDPOINT = "s3_endpoint"
+
+ GVFS_FILESYSTEM_OSS_ACCESS_KEY = "oss_access_key"
+ GVFS_FILESYSTEM_OSS_SECRET_KEY = "oss_secret_key"
+ GVFS_FILESYSTEM_OSS_ENDPOINT = "oss_endpoint"
diff --git a/clients/client-python/requirements.txt b/clients/client-python/requirements.txt
index 1d0f4fadd5d..8eebd572770 100644
--- a/clients/client-python/requirements.txt
+++ b/clients/client-python/requirements.txt
@@ -24,4 +24,5 @@ fsspec==2024.3.1
pyarrow==15.0.2
cachetools==5.3.3
gcsfs==2024.3.1
-s3fs==2024.3.1
\ No newline at end of file
+s3fs==2024.3.1
+ossfs==2023.12.0
\ No newline at end of file
diff --git a/clients/client-python/tests/integration/test_gvfs_with_oss.py b/clients/client-python/tests/integration/test_gvfs_with_oss.py
new file mode 100644
index 00000000000..95b385ea925
--- /dev/null
+++ b/clients/client-python/tests/integration/test_gvfs_with_oss.py
@@ -0,0 +1,353 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+from random import randint
+import unittest
+
+
+from ossfs import OSSFileSystem
+
+from tests.integration.test_gvfs_with_hdfs import TestGvfsWithHDFS
+from gravitino import (
+ gvfs,
+ GravitinoClient,
+ Catalog,
+ Fileset,
+)
+from gravitino.exceptions.base import GravitinoRuntimeException
+from gravitino.filesystem.gvfs_config import GVFSConfig
+
+
+logger = logging.getLogger(__name__)
+
+
+@unittest.skip("This test require oss service account")
+class TestGvfsWithOSS(TestGvfsWithHDFS):
+ # Before running this test, please set the make sure aliyun-bundle-x.jar has been
+ # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory
+ oss_access_key = "your_access_key"
+ oss_secret_key = "your_secret_key"
+ oss_endpoint = "your_endpoint"
+ bucket_name = "your_bucket_name"
+
+ metalake_name: str = "TestGvfsWithOSS_metalake" + str(randint(1, 10000))
+
+ def setUp(self):
+ self.options = {
+ f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY}": self.oss_access_key,
+ f"{GVFSConfig.GVFS_FILESYSTEM_OSS_SECRET_KEY}": self.oss_secret_key,
+ f"{GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT}": self.oss_endpoint,
+ }
+
+ def tearDown(self):
+ self.options = {}
+
+ @classmethod
+ def setUpClass(cls):
+ cls._get_gravitino_home()
+
+ cls.hadoop_conf_path = f"{cls.gravitino_home}/catalogs/hadoop/conf/hadoop.conf"
+ # restart the server
+ cls.restart_server()
+ # create entity
+ cls._init_test_entities()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls._clean_test_data()
+ # reset server conf in case of other ITs like HDFS has changed it and fail
+ # to reset it
+ cls._reset_conf(cls.config, cls.hadoop_conf_path)
+ # restart server
+ cls.restart_server()
+
+ # clear all config in the conf_path
+ @classmethod
+ def _reset_conf(cls, config, conf_path):
+ logger.info("Reset %s.", conf_path)
+ if not os.path.exists(conf_path):
+ raise GravitinoRuntimeException(f"Conf file is not found at `{conf_path}`.")
+ filtered_lines = []
+ with open(conf_path, mode="r", encoding="utf-8") as file:
+ origin_lines = file.readlines()
+
+ for line in origin_lines:
+ line = line.strip()
+ if line.startswith("#"):
+ # append annotations directly
+ filtered_lines.append(line + "\n")
+
+ with open(conf_path, mode="w", encoding="utf-8") as file:
+ for line in filtered_lines:
+ file.write(line)
+
+ @classmethod
+ def _init_test_entities(cls):
+ cls.gravitino_admin_client.create_metalake(
+ name=cls.metalake_name, comment="", properties={}
+ )
+ cls.gravitino_client = GravitinoClient(
+ uri="http://localhost:8090", metalake_name=cls.metalake_name
+ )
+
+ cls.config = {}
+ cls.conf = {}
+ catalog = cls.gravitino_client.create_catalog(
+ name=cls.catalog_name,
+ catalog_type=Catalog.Type.FILESET,
+ provider=cls.catalog_provider,
+ comment="",
+ properties={
+ "filesystem-providers": "oss",
+ "gravitino.bypass.fs.oss.accessKeyId": cls.oss_access_key,
+ "gravitino.bypass.fs.oss.accessKeySecret": cls.oss_secret_key,
+ "gravitino.bypass.fs.oss.endpoint": cls.oss_endpoint,
+ "gravitino.bypass.fs.oss.impl": "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem",
+ },
+ )
+ catalog.as_schemas().create_schema(
+ schema_name=cls.schema_name, comment="", properties={}
+ )
+
+ cls.fileset_storage_location: str = (
+ f"oss://{cls.bucket_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ cls.fileset_gvfs_location = (
+ f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}"
+ )
+ catalog.as_fileset_catalog().create_fileset(
+ ident=cls.fileset_ident,
+ fileset_type=Fileset.Type.MANAGED,
+ comment=cls.fileset_comment,
+ storage_location=cls.fileset_storage_location,
+ properties=cls.fileset_properties,
+ )
+
+ cls.fs = OSSFileSystem(
+ key=cls.oss_access_key,
+ secret=cls.oss_secret_key,
+ endpoint=cls.oss_endpoint,
+ )
+
+ def check_mkdir(self, gvfs_dir, actual_dir, gvfs_instance):
+ # OSS will not create a directory, so the directory will not exist.
+ self.fs.mkdir(actual_dir)
+ self.assertFalse(self.fs.exists(actual_dir))
+ self.assertFalse(gvfs_instance.exists(gvfs_dir))
+
+ def check_makedirs(self, gvfs_dir, actual_dir, gvfs_instance):
+ self.fs.makedirs(actual_dir)
+ self.assertFalse(self.fs.exists(actual_dir))
+ self.assertFalse(gvfs_instance.exists(gvfs_dir))
+
+ def test_modified(self):
+ modified_dir = self.fileset_gvfs_location + "/test_modified"
+ modified_actual_dir = self.fileset_storage_location + "/test_modified"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ self.check_mkdir(modified_dir, modified_actual_dir, fs)
+ # S3 only supports getting the `object` modify time, so the modified time will be None
+ # if it's a directory.
+ # >>> gcs.mkdir('example_qazwsx/catalog/schema/fileset3')
+ # >>> r = gcs.modified('example_qazwsx/catalog/schema/fileset3')
+ # >>> print(r)
+ # None
+ # self.assertIsNone(fs.modified(modified_dir))
+
+ # create a file under the dir 'modified_dir'.
+ file_path = modified_dir + "/test.txt"
+ fs.touch(file_path)
+ self.assertTrue(fs.exists(file_path))
+ self.assertIsNotNone(fs.modified(file_path))
+
+ def test_rm(self):
+ rm_dir = self.fileset_gvfs_location + "/test_rm"
+ rm_actual_dir = self.fileset_storage_location + "/test_rm"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+ self.check_mkdir(rm_dir, rm_actual_dir, fs)
+
+ rm_file = self.fileset_gvfs_location + "/test_rm/test.file"
+ rm_actual_file = self.fileset_storage_location + "/test_rm/test.file"
+ fs.touch(rm_file)
+ self.assertTrue(self.fs.exists(rm_actual_file))
+ self.assertTrue(fs.exists(rm_file))
+
+ # test delete file
+ fs.rm(rm_file)
+ self.assertFalse(fs.exists(rm_file))
+
+ # test delete dir with recursive = false
+ rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file"
+ rm_new_actual_file = self.fileset_storage_location + "/test_rm/test_new.file"
+ self.fs.touch(rm_new_actual_file)
+ self.assertTrue(self.fs.exists(rm_new_actual_file))
+ self.assertTrue(fs.exists(rm_new_file))
+
+ def test_rmdir(self):
+ rmdir_dir = self.fileset_gvfs_location + "/test_rmdir"
+ rmdir_actual_dir = self.fileset_storage_location + "/test_rmdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+ self.check_mkdir(rmdir_dir, rmdir_actual_dir, fs)
+
+ rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file"
+ rmdir_actual_file = self.fileset_storage_location + "/test_rmdir/test.file"
+ self.fs.touch(rmdir_actual_file)
+ self.assertTrue(self.fs.exists(rmdir_actual_file))
+ self.assertTrue(fs.exists(rmdir_file))
+
+ fs.rm_file(rmdir_file)
+
+ def test_mkdir(self):
+ mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+ mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ # it actually takes no effect.
+ self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+ # check whether it will automatically create the bucket if 'create_parents'
+ # is set to True.
+ new_bucket = self.bucket_name + "1"
+ mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
+ mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, new_bucket)
+ fs.mkdir(mkdir_dir, create_parents=True)
+
+ with self.assertRaises(FileNotFoundError):
+ self.fs.exists(mkdir_actual_dir)
+
+ self.assertFalse(fs.exists(mkdir_dir))
+
+ with self.assertRaises(FileNotFoundError):
+ self.fs.exists("oss://" + new_bucket)
+
+ def test_makedirs(self):
+ mkdir_dir = self.fileset_gvfs_location + "/test_mkdir"
+ mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ # it actually takes no effect.
+ self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs)
+
+ # check whether it will automatically create the bucket if 'create_parents'
+ # is set to True.
+ new_bucket = self.bucket_name + "1"
+ mkdir_dir = mkdir_dir.replace(self.bucket_name, new_bucket)
+ mkdir_actual_dir = mkdir_actual_dir.replace(self.bucket_name, new_bucket)
+
+ # it takes no effect.
+ fs.makedirs(mkdir_dir)
+
+ with self.assertRaises(FileNotFoundError):
+ self.fs.exists(mkdir_actual_dir)
+
+ self.assertFalse(fs.exists(mkdir_dir))
+ with self.assertRaises(FileNotFoundError):
+ self.fs.exists("oss://" + new_bucket)
+
+ def test_rm_file(self):
+ rm_file_dir = self.fileset_gvfs_location + "/test_rm_file"
+ rm_file_actual_dir = self.fileset_storage_location + "/test_rm_file"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+ self.check_mkdir(rm_file_dir, rm_file_actual_dir, fs)
+
+ rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file"
+ rm_file_actual_file = self.fileset_storage_location + "/test_rm_file/test.file"
+ self.fs.touch(rm_file_actual_file)
+ self.assertTrue(self.fs.exists(rm_file_actual_file))
+ self.assertTrue(fs.exists(rm_file_file))
+
+ # test delete file
+ fs.rm_file(rm_file_file)
+ self.assertFalse(fs.exists(rm_file_file))
+
+ # test delete dir
+ fs.rm_file(rm_file_dir)
+
+ def test_info(self):
+ info_dir = self.fileset_gvfs_location + "/test_info"
+ info_actual_dir = self.fileset_storage_location + "/test_info"
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:8090",
+ metalake_name=self.metalake_name,
+ options=self.options,
+ **self.conf,
+ )
+
+ self.check_mkdir(info_dir, info_actual_dir, fs)
+
+ info_file = self.fileset_gvfs_location + "/test_info/test.file"
+ info_actual_file = self.fileset_storage_location + "/test_info/test.file"
+ self.fs.touch(info_actual_file)
+ self.assertTrue(self.fs.exists(info_actual_file))
+
+ ## OSS info has different behavior than S3 info. For OSS info, the name of the
+ ## directory will have a trailing slash if it's a directory and the path
+ # does not end with a slash, while S3 info will not have a trailing
+ # slash if it's a directory.
+
+ # >> > oss.info('bucket-xiaoyu/lisi')
+ # {'name': 'bucket-xiaoyu/lisi/', 'type': 'directory',
+ # 'size': 0, 'Size': 0, 'Key': 'bucket-xiaoyu/lisi/'}
+ # >> > oss.info('bucket-xiaoyu/lisi/')
+ # {'name': 'bucket-xiaoyu/lisi', 'size': 0,
+ # 'type': 'directory', 'Size': 0,
+ # 'Key': 'bucket-xiaoyu/lisi'
+
+ # >> > s3.info('paimon-bucket/lisi');
+ # {'name': 'paimon-bucket/lisi', 'type': 'directory', 'size': 0,
+ # 'StorageClass': 'DIRECTORY'}
+ # >> > s3.info('paimon-bucket/lisi/');
+ # {'name': 'paimon-bucket/lisi', 'type': 'directory', 'size': 0,
+ # 'StorageClass': 'DIRECTORY'}
+
+ dir_info = fs.info(info_dir)
+ self.assertEqual(dir_info["name"][:-1], info_dir[len("gvfs://") :])
+
+ file_info = fs.info(info_file)
+ self.assertEqual(file_info["name"], info_file[len("gvfs://") :])