-
Notifications
You must be signed in to change notification settings - Fork 409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#5221] feat(python-client): Support OSS for fileset python client #5225
Changes from all commits
d2447a2
7e5a8b5
f53c5ef
36fedcd
e93fba5
b1e04b6
db00e65
c793582
013f5cb
dba5753
16dfc73
278fcd8
3fb55ad
cd04666
d0bf13e
ffaa064
32d7f3d
d82bf76
dfdb772
8708a8a
ba9f8fa
dae99f7
e22053b
4fb89e0
e5746c0
b2d7bed
380717b
f4041ec
66247ab
3cfb94f
7d1150f
608081b
9edfe82
3079bf0
da49e60
b621d89
05dd006
9d5b8dc
e58f9a0
c521daf
46e996a
da0b7ca
e9ccda4
ba1fe5f
4ffe389
7c44a57
992ba0a
5dbca5f
f27520a
e29e47b
bc1e76f
8a9d3bf
2115e31
c2e55d4
557aa02
5c3fa5c
408eca7
a02065d
8762bae
dc7a915
c230991
dc54880
7ecc040
da46321
27bc2ab
017c42e
9dc0f5a
41ff00d
1fee1e4
1789bd2
2ee1709
05e5d20
8f28211
35cba1e
bcf2f12
f25a37d
a3da011
3517996
27a911a
e34dbea
6bae7e5
fe13f5e
0181632
3ff9eef
2ce660c
f0fa87b
d2921a8
dc68dd1
6431e2f
242888f
3ec2dcc
f754997
2cdfb35
67dbc3a
70a545e
4d54acf
15bbf99
cfcc544
acf51e1
11f9992
4f00a2f
b9ef8f0
9f65fb5
4defcc6
3a907f4
76912b7
4478673
5a194df
c8b5c7c
6ff9353
1dac0f0
f592289
7069b6b
1281e72
f022d6e
0d2ccab
55633e8
217cc5f
6958aa8
b6837bb
92961ea
8ed6298
e256c8d
73e00f0
d56cf30
8b9b8d7
b4d5728
0306ac5
a35a40d
cfd8a89
a131564
34e3ff4
2cc234a
ba0237e
4df5ea1
4e49e6e
354418f
d22c2f6
fc3c40a
015b788
c414b4d
895937c
d261b70
798b4d1
aa96f63
456ba31
8d04cd4
804622c
b399365
120adc3
72e8822
e7b4da7
1c19261
8914b5b
89c79d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,6 +49,7 @@ class StorageType(Enum): | |
LOCAL = "file" | ||
GCS = "gs" | ||
S3A = "s3a" | ||
OSS = "oss" | ||
|
||
|
||
class FilesetContextPair: | ||
|
@@ -318,6 +319,7 @@ def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): | |
StorageType.HDFS, | ||
StorageType.GCS, | ||
StorageType.S3A, | ||
StorageType.OSS, | ||
]: | ||
src_context_pair.filesystem().mv( | ||
self._strip_storage_protocol(storage_type, src_actual_path), | ||
|
@@ -567,6 +569,14 @@ def _convert_actual_path( | |
or storage_location.startswith(f"{StorageType.S3A.value}://") | ||
): | ||
actual_prefix = infer_storage_options(storage_location)["path"] | ||
elif storage_location.startswith(f"{StorageType.OSS.value}:/"): | ||
ops = infer_storage_options(storage_location) | ||
if "host" not in ops or "path" not in ops: | ||
raise GravitinoRuntimeException( | ||
f"Storage location:{storage_location} doesn't support now." | ||
) | ||
|
||
actual_prefix = ops["host"] + ops["path"] | ||
elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"): | ||
actual_prefix = storage_location[len(f"{StorageType.LOCAL.value}:") :] | ||
else: | ||
|
@@ -733,6 +743,8 @@ def _recognize_storage_type(path: str): | |
return StorageType.GCS | ||
if path.startswith(f"{StorageType.S3A.value}://"): | ||
return StorageType.S3A | ||
if path.startswith(f"{StorageType.OSS.value}://"): | ||
return StorageType.OSS | ||
raise GravitinoRuntimeException( | ||
f"Storage type doesn't support now. Path:{path}" | ||
) | ||
|
@@ -756,12 +768,46 @@ def _strip_storage_protocol(storage_type: StorageType, path: str): | |
:param storage_type: The storage type | ||
:param path: The path | ||
:return: The stripped path | ||
|
||
We will handle OSS differently from S3 and GCS, because OSS has different behavior than S3 and GCS. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You'd better explain the difference behavior clearly, otherwise someone may not be able to spot the difference at first glance when looking at the examples. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add sentence
|
||
Please see the following example: | ||
|
||
``` | ||
>> oss = context_pair.filesystem() | ||
>> oss.ls('oss://bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/test_gvfs_fileset/test_ls') | ||
DEBUG:ossfs:Get directory listing page for bucket-xiaoyu/test_gvfs_catalog678/ | ||
test_gvfs_schema/test_gvfs_fileset | ||
DEBUG:ossfs:CALL: ObjectIterator - () - {'prefix': 'test_gvfs_catalog678/test_gvfs_schema | ||
/test_gvfs_fileset/', 'delimiter': '/'} | ||
[] | ||
>> oss.ls('bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/test_gvfs_fileset/test_ls') | ||
DEBUG:ossfs:Get directory listing page for bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema | ||
/test_gvfs_fileset/test_ls | ||
DEBUG:ossfs:CALL: ObjectIterator - () - {'prefix': 'test_gvfs_catalog678/test_gvfs_schema | ||
/test_gvfs_fileset/test_ls/', 'delimiter': '/'} | ||
[{'name': 'bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/test_gvfs_fileset/test_ls | ||
/test.file', 'type': 'file', 'size': 0, 'LastModified': 1729754793, | ||
'Size': 0, 'Key': 'bucket-xiaoyu/test_gvfs_catalog678/test_gvfs_schema/ | ||
test_gvfs_fileset/test_ls/test.file'}] | ||
|
||
``` | ||
|
||
Please take a look at the above example: if we do not remove the protocol (starts with oss://), | ||
it will always return an empty array when we call `oss.ls`, however, if we remove the protocol, | ||
it will produce the correct result as expected. | ||
""" | ||
if storage_type in (StorageType.HDFS, StorageType.GCS, StorageType.S3A): | ||
return path | ||
if storage_type == StorageType.LOCAL: | ||
return path[len(f"{StorageType.LOCAL.value}:") :] | ||
|
||
# OSS has different behavior than S3 and GCS, if we do not remove the | ||
# protocol, it will always return an empty array. | ||
if storage_type == StorageType.OSS: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better add the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, please explain more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added. |
||
if path.startswith(f"{StorageType.OSS.value}://"): | ||
return path[len(f"{StorageType.OSS.value}://") :] | ||
return path | ||
|
||
raise GravitinoRuntimeException( | ||
f"Storage type:{storage_type} doesn't support now." | ||
) | ||
|
@@ -835,6 +881,8 @@ def _get_filesystem(self, actual_file_location: str): | |
fs = self._get_gcs_filesystem() | ||
elif storage_type == StorageType.S3A: | ||
fs = self._get_s3_filesystem() | ||
elif storage_type == StorageType.OSS: | ||
fs = self._get_oss_filesystem() | ||
else: | ||
raise GravitinoRuntimeException( | ||
f"Storage type: `{storage_type}` doesn't support now." | ||
|
@@ -887,5 +935,35 @@ def _get_s3_filesystem(self): | |
endpoint_url=aws_endpoint_url, | ||
) | ||
|
||
def _get_oss_filesystem(self): | ||
# get 'oss_access_key_id' from oss options, if the key is not found, throw an exception | ||
oss_access_key_id = self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ACCESS_KEY) | ||
if oss_access_key_id is None: | ||
raise GravitinoRuntimeException( | ||
"OSS access key id is not found in the options." | ||
) | ||
|
||
# get 'oss_secret_access_key' from oss options, if the key is not found, throw an exception | ||
oss_secret_access_key = self._options.get( | ||
GVFSConfig.GVFS_FILESYSTEM_OSS_SECRET_KEY | ||
) | ||
if oss_secret_access_key is None: | ||
raise GravitinoRuntimeException( | ||
"OSS secret access key is not found in the options." | ||
) | ||
|
||
# get 'oss_endpoint_url' from oss options, if the key is not found, throw an exception | ||
oss_endpoint_url = self._options.get(GVFSConfig.GVFS_FILESYSTEM_OSS_ENDPOINT) | ||
if oss_endpoint_url is None: | ||
raise GravitinoRuntimeException( | ||
"OSS endpoint url is not found in the options." | ||
) | ||
|
||
return importlib.import_module("ossfs").OSSFileSystem( | ||
key=oss_access_key_id, | ||
secret=oss_secret_access_key, | ||
endpoint=oss_endpoint_url, | ||
) | ||
|
||
|
||
fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's better to check the
ops
whether hashost
andpath
attributes.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK