From b7ceca005830bd0b098c2b036b18f8a48914a8f1 Mon Sep 17 00:00:00 2001 From: xloya <982052490@qq.com> Date: Thu, 26 Sep 2024 14:13:24 +0800 Subject: [PATCH 1/2] [#4709] improvement(client-python): Add the implementation for `getFileLocation` interface in python client (#5017) ### What changes were proposed in this pull request? Add the implementations for `getFileLocation` interface in Python Client. ### Why are the changes needed? Fix: #4709 ### How was this patch tested? Add some UTs and ITs. --- .../gravitino/audit/caller_context.py | 3 +- .../gravitino/catalog/fileset_catalog.py | 34 +++++++- .../gravitino/utils/http_client.py | 9 ++- .../tests/integration/test_fileset_catalog.py | 48 ++++++++++- .../unittests/audit/test_caller_context.py | 3 + .../unittests/test_fileset_catalog_api.py | 80 +++++++++++++++++++ 6 files changed, 172 insertions(+), 5 deletions(-) create mode 100644 clients/client-python/tests/unittests/test_fileset_catalog_api.py diff --git a/clients/client-python/gravitino/audit/caller_context.py b/clients/client-python/gravitino/audit/caller_context.py index 07e30a4d373..0b57bb3b4c6 100644 --- a/clients/client-python/gravitino/audit/caller_context.py +++ b/clients/client-python/gravitino/audit/caller_context.py @@ -68,4 +68,5 @@ def get(): @staticmethod def remove(): """Remove the CallerContext from the thread local.""" - del caller_context_holder.caller_context + if hasattr(caller_context_holder, "caller_context"): + del caller_context_holder.caller_context diff --git a/clients/client-python/gravitino/catalog/fileset_catalog.py b/clients/client-python/gravitino/catalog/fileset_catalog.py index 3b2f0f71707..cf91dbdfb5c 100644 --- a/clients/client-python/gravitino/catalog/fileset_catalog.py +++ b/clients/client-python/gravitino/catalog/fileset_catalog.py @@ -21,6 +21,7 @@ from gravitino.api.catalog import Catalog from gravitino.api.fileset import Fileset from gravitino.api.fileset_change import FilesetChange +from gravitino.audit.caller_context import CallerContextHolder, CallerContext from gravitino.catalog.base_schema_catalog import BaseSchemaCatalog from gravitino.dto.audit_dto import AuditDTO from gravitino.dto.requests.fileset_create_request import FilesetCreateRequest @@ -28,6 +29,7 @@ from gravitino.dto.requests.fileset_updates_request import FilesetUpdatesRequest from gravitino.dto.responses.drop_response import DropResponse from gravitino.dto.responses.entity_list_response import EntityListResponse +from gravitino.dto.responses.file_location_response import FileLocationResponse from gravitino.dto.responses.fileset_response import FilesetResponse from gravitino.name_identifier import NameIdentifier from gravitino.namespace import Namespace @@ -244,7 +246,29 @@ def get_file_location(self, ident: NameIdentifier, sub_path: str) -> str: Returns: The actual location of the file or directory. """ - raise NotImplementedError("Not implemented yet") + self.check_fileset_name_identifier(ident) + + full_namespace = self._get_fileset_full_namespace(ident.namespace()) + try: + caller_context: CallerContext = CallerContextHolder.get() + params = {"sub_path": encode_string(sub_path)} + + resp = self.rest_client.get( + self.format_file_location_request_path(full_namespace, ident.name()), + params=params, + headers=( + caller_context.context() if caller_context is not None else None + ), + error_handler=FILESET_ERROR_HANDLER, + ) + file_location_resp = FileLocationResponse.from_json( + resp.body, infer_missing=True + ) + file_location_resp.validate() + + return file_location_resp.file_location() + finally: + CallerContextHolder.remove() @staticmethod def check_fileset_namespace(namespace: Namespace): @@ -272,6 +296,14 @@ def format_fileset_request_path(namespace: Namespace) -> str: schema_ns = Namespace.of(namespace.level(0), namespace.level(1)) return f"{BaseSchemaCatalog.format_schema_request_path(schema_ns)}/{encode_string(namespace.level(2))}/filesets" + @staticmethod + def format_file_location_request_path(namespace: Namespace, name: str) -> str: + schema_ns = Namespace.of(namespace.level(0), namespace.level(1)) + return ( + f"{BaseSchemaCatalog.format_schema_request_path(schema_ns)}/{encode_string(namespace.level(2))}" + f"/filesets/{encode_string(name)}/location" + ) + @staticmethod def to_fileset_update_request(change: FilesetChange): if isinstance(change, FilesetChange.RenameFileset): diff --git a/clients/client-python/gravitino/utils/http_client.py b/clients/client-python/gravitino/utils/http_client.py index 696fe415cce..262c73c2b4c 100644 --- a/clients/client-python/gravitino/utils/http_client.py +++ b/clients/client-python/gravitino/utils/http_client.py @@ -217,9 +217,14 @@ def _request( f"Error handler {type(error_handler).__name__} can't handle this response, error response body: {resp}" ) from None - def get(self, endpoint, params=None, error_handler=None, **kwargs): + def get(self, endpoint, params=None, headers=None, error_handler=None, **kwargs): return self._request( - "get", endpoint, params=params, error_handler=error_handler, **kwargs + "get", + endpoint, + params=params, + headers=headers, + error_handler=error_handler, + **kwargs, ) def delete(self, endpoint, error_handler=None, **kwargs): diff --git a/clients/client-python/tests/integration/test_fileset_catalog.py b/clients/client-python/tests/integration/test_fileset_catalog.py index 6ea1831b97e..0e92ec1b090 100644 --- a/clients/client-python/tests/integration/test_fileset_catalog.py +++ b/clients/client-python/tests/integration/test_fileset_catalog.py @@ -27,7 +27,13 @@ Fileset, FilesetChange, ) -from gravitino.exceptions.base import NoSuchFilesetException, GravitinoRuntimeException +from gravitino.audit.caller_context import CallerContext, CallerContextHolder +from gravitino.audit.fileset_audit_constants import FilesetAuditConstants +from gravitino.audit.fileset_data_operation import FilesetDataOperation +from gravitino.exceptions.base import ( + NoSuchFilesetException, + GravitinoRuntimeException, +) from tests.integration.integration_test_env import IntegrationTestEnv logger = logging.getLogger(__name__) @@ -155,6 +161,18 @@ def create_fileset(self) -> Fileset: properties=self.fileset_properties, ) + def create_custom_fileset( + self, ident: NameIdentifier, storage_location: str + ) -> Fileset: + catalog = self.gravitino_client.load_catalog(name=self.catalog_name) + return catalog.as_fileset_catalog().create_fileset( + ident=ident, + fileset_type=Fileset.Type.MANAGED, + comment=self.fileset_comment, + storage_location=storage_location, + properties=self.fileset_properties, + ) + def test_create_fileset(self): fileset = self.create_fileset() self.assertIsNotNone(fileset) @@ -223,3 +241,31 @@ def test_alter_fileset(self): ) self.assertEqual(fileset_comment_removed.name(), self.fileset_name) self.assertIsNone(fileset_comment_removed.comment()) + + def test_get_file_location(self): + fileset_ident: NameIdentifier = NameIdentifier.of( + self.schema_name, "test_get_file_location" + ) + fileset_location = "/tmp/test_get_file_location" + self.create_custom_fileset(fileset_ident, fileset_location) + actual_file_location = ( + self.gravitino_client.load_catalog(name=self.catalog_name) + .as_fileset_catalog() + .get_file_location(fileset_ident, "/test/test.txt") + ) + + self.assertEqual(actual_file_location, f"file:{fileset_location}/test/test.txt") + + # test rename without sub path should throw an exception + caller_context = CallerContext( + { + FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION: FilesetDataOperation.RENAME.name + } + ) + with self.assertRaises(GravitinoRuntimeException): + CallerContextHolder.set(caller_context) + ( + self.gravitino_client.load_catalog(name=self.catalog_name) + .as_fileset_catalog() + .get_file_location(fileset_ident, "") + ) diff --git a/clients/client-python/tests/unittests/audit/test_caller_context.py b/clients/client-python/tests/unittests/audit/test_caller_context.py index 93031a256f4..4243d2d5c2f 100644 --- a/clients/client-python/tests/unittests/audit/test_caller_context.py +++ b/clients/client-python/tests/unittests/audit/test_caller_context.py @@ -62,3 +62,6 @@ def _set_thread_local_context(self, thread_name, context: Dict[str, str]): CallerContextHolder.remove() self.assertIsNone(CallerContextHolder.get()) + + # will not throw an exception if the context is not exists + CallerContextHolder.remove() diff --git a/clients/client-python/tests/unittests/test_fileset_catalog_api.py b/clients/client-python/tests/unittests/test_fileset_catalog_api.py new file mode 100644 index 00000000000..06c27d3dfab --- /dev/null +++ b/clients/client-python/tests/unittests/test_fileset_catalog_api.py @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json +import unittest +from http.client import HTTPResponse +from unittest.mock import patch, Mock + +from gravitino import GravitinoClient, Catalog, NameIdentifier +from gravitino.audit.caller_context import CallerContext, CallerContextHolder +from gravitino.audit.fileset_audit_constants import FilesetAuditConstants +from gravitino.audit.fileset_data_operation import FilesetDataOperation +from gravitino.exceptions.handlers.fileset_error_handler import FILESET_ERROR_HANDLER +from gravitino.namespace import Namespace +from gravitino.utils import Response +from tests.unittests import mock_base + + +@mock_base.mock_data +class TestFilesetCatalogApi(unittest.TestCase): + + def test_get_file_location(self, *mock_method): + json_data = {"code": 0, "fileLocation": "file:/test/1"} + json_str = json.dumps(json_data) + + mock_http_resp = Mock(HTTPResponse) + mock_http_resp.getcode.return_value = 200 + mock_http_resp.read.return_value = json_str + mock_http_resp.info.return_value = None + mock_http_resp.url = None + mock_resp = Response(mock_http_resp) + + metalake_name: str = "metalake_demo" + catalog_name: str = "fileset_catalog" + gravitino_client = GravitinoClient( + uri="http://localhost:8090", metalake_name=metalake_name + ) + catalog: Catalog = gravitino_client.load_catalog(catalog_name) + + with patch( + "gravitino.utils.http_client.HTTPClient.get", + return_value=mock_resp, + ) as mock_get: + fileset_ident: NameIdentifier = NameIdentifier.of( + "test", "test_get_file_location" + ) + context = { + FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION: FilesetDataOperation.RENAME.name + } + CallerContextHolder.set(CallerContext(context)) + file_location: str = catalog.as_fileset_catalog().get_file_location( + fileset_ident, "/test/1" + ) + # check the get input params as expected + mock_get.assert_called_once_with( + catalog.as_fileset_catalog().format_file_location_request_path( + Namespace.of("metalake_demo", "fileset_catalog", "test"), + fileset_ident.name(), + ), + params={"sub_path": "/test/1"}, + headers=context, + error_handler=FILESET_ERROR_HANDLER, + ) + # check the caller context is removed + self.assertIsNone(CallerContextHolder.get()) + # check the response is as expected + self.assertEqual(file_location, "file:/test/1") From b55b6674faf789b03fe0641a537840693c9d0926 Mon Sep 17 00:00:00 2001 From: edward Date: Wed, 25 Sep 2024 22:06:14 +0800 Subject: [PATCH 2/2] make 'cloud.name' and 'cloud.region-code' mutable. --- .../hive/TestHiveCatalogOperations.java | 2 ++ .../client/integration/test/CatalogIT.java | 30 +++++++++++++++++++ .../BaseCatalogPropertiesMetadata.java | 4 +-- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java index bc488e20be8..0acb4a345a1 100644 --- a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java +++ b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java @@ -103,7 +103,9 @@ void testPropertyMeta() { Assertions.assertFalse(propertyEntryMap.get(CHECK_INTERVAL_SEC).isRequired()); Assertions.assertFalse(propertyEntryMap.get(FETCH_TIMEOUT_SEC).isRequired()); Assertions.assertFalse(propertyEntryMap.get(CLOUD_NAME).isRequired()); + Assertions.assertFalse(propertyEntryMap.get(CLOUD_NAME).isImmutable()); Assertions.assertFalse(propertyEntryMap.get(CLOUD_REGION_CODE).isRequired()); + Assertions.assertFalse(propertyEntryMap.get(CLOUD_REGION_CODE).isImmutable()); } @Test diff --git a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/CatalogIT.java b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/CatalogIT.java index 94b9eea68e9..5458ea2b1a1 100644 --- a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/CatalogIT.java +++ b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/CatalogIT.java @@ -300,4 +300,34 @@ void testUpdateCatalogWithNullableComment() { metalake.dropCatalog(catalogName); } + + @Test + public void testAlterCatalogProperties() { + String cloudName = "aws"; + String alterCloudName = "azure"; + String regionCode = "us-east-1"; + String alterRegionCode = "us-west-2"; + + String catalogName = GravitinoITUtils.genRandomName("test_catalog"); + ImmutableMap props = + ImmutableMap.of(Catalog.CLOUD_NAME, cloudName, Catalog.CLOUD_REGION_CODE, regionCode); + Catalog catalog = + metalake.createCatalog( + catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", props); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + Assertions.assertFalse(catalog.properties().isEmpty()); + Assertions.assertEquals(cloudName, catalog.properties().get(Catalog.CLOUD_NAME)); + Assertions.assertEquals(regionCode, catalog.properties().get(Catalog.CLOUD_REGION_CODE)); + + Catalog alteredCatalog = + metalake.alterCatalog( + catalogName, + CatalogChange.setProperty(Catalog.CLOUD_NAME, alterCloudName), + CatalogChange.setProperty(Catalog.CLOUD_REGION_CODE, alterRegionCode)); + + Assertions.assertEquals(alterCloudName, alteredCatalog.properties().get(Catalog.CLOUD_NAME)); + Assertions.assertEquals( + alterRegionCode, alteredCatalog.properties().get(Catalog.CLOUD_REGION_CODE)); + metalake.dropCatalog(catalogName); + } } diff --git a/core/src/main/java/org/apache/gravitino/connector/BaseCatalogPropertiesMetadata.java b/core/src/main/java/org/apache/gravitino/connector/BaseCatalogPropertiesMetadata.java index b667b6dc795..1ba3e560640 100644 --- a/core/src/main/java/org/apache/gravitino/connector/BaseCatalogPropertiesMetadata.java +++ b/core/src/main/java/org/apache/gravitino/connector/BaseCatalogPropertiesMetadata.java @@ -63,7 +63,7 @@ public abstract class BaseCatalogPropertiesMetadata extends BasePropertiesMetada CLOUD_NAME, "The cloud that the catalog is running on", false /* required */, - true /* immutable */, + false /* immutable */, Catalog.CloudName.class, null /* The default value does not work because if the user does not set it, this property will not be displayed */, false /* hidden */, @@ -71,7 +71,7 @@ public abstract class BaseCatalogPropertiesMetadata extends BasePropertiesMetada PropertyEntry.stringOptionalPropertyEntry( CLOUD_REGION_CODE, "The region code of the cloud that the catalog is running on", - false /* required */, + false /* immutable */, null /* The default value does not work because if the user does not set it, this property will not be displayed */, false /* hidden */)), PropertyEntry::getName);