Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make 'cloud.name' and 'cloud.region-code' mutable. #3

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ void testPropertyMeta() {
Assertions.assertFalse(propertyEntryMap.get(CHECK_INTERVAL_SEC).isRequired());
Assertions.assertFalse(propertyEntryMap.get(FETCH_TIMEOUT_SEC).isRequired());
Assertions.assertFalse(propertyEntryMap.get(CLOUD_NAME).isRequired());
Assertions.assertFalse(propertyEntryMap.get(CLOUD_NAME).isImmutable());
Assertions.assertFalse(propertyEntryMap.get(CLOUD_REGION_CODE).isRequired());
Assertions.assertFalse(propertyEntryMap.get(CLOUD_REGION_CODE).isImmutable());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,34 @@ void testUpdateCatalogWithNullableComment() {

metalake.dropCatalog(catalogName);
}

@Test
public void testAlterCatalogProperties() {
String cloudName = "aws";
String alterCloudName = "azure";
String regionCode = "us-east-1";
String alterRegionCode = "us-west-2";

String catalogName = GravitinoITUtils.genRandomName("test_catalog");
ImmutableMap<String, String> props =
ImmutableMap.of(Catalog.CLOUD_NAME, cloudName, Catalog.CLOUD_REGION_CODE, regionCode);
Catalog catalog =
metalake.createCatalog(
catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", props);
Assertions.assertTrue(metalake.catalogExists(catalogName));
Assertions.assertFalse(catalog.properties().isEmpty());
Assertions.assertEquals(cloudName, catalog.properties().get(Catalog.CLOUD_NAME));
Assertions.assertEquals(regionCode, catalog.properties().get(Catalog.CLOUD_REGION_CODE));

Catalog alteredCatalog =
metalake.alterCatalog(
catalogName,
CatalogChange.setProperty(Catalog.CLOUD_NAME, alterCloudName),
CatalogChange.setProperty(Catalog.CLOUD_REGION_CODE, alterRegionCode));

Assertions.assertEquals(alterCloudName, alteredCatalog.properties().get(Catalog.CLOUD_NAME));
Assertions.assertEquals(
alterRegionCode, alteredCatalog.properties().get(Catalog.CLOUD_REGION_CODE));
metalake.dropCatalog(catalogName);
}
}
3 changes: 2 additions & 1 deletion clients/client-python/gravitino/audit/caller_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ def get():
@staticmethod
def remove():
"""Remove the CallerContext from the thread local."""
del caller_context_holder.caller_context
if hasattr(caller_context_holder, "caller_context"):
del caller_context_holder.caller_context
34 changes: 33 additions & 1 deletion clients/client-python/gravitino/catalog/fileset_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
from gravitino.api.catalog import Catalog
from gravitino.api.fileset import Fileset
from gravitino.api.fileset_change import FilesetChange
from gravitino.audit.caller_context import CallerContextHolder, CallerContext
from gravitino.catalog.base_schema_catalog import BaseSchemaCatalog
from gravitino.dto.audit_dto import AuditDTO
from gravitino.dto.requests.fileset_create_request import FilesetCreateRequest
from gravitino.dto.requests.fileset_update_request import FilesetUpdateRequest
from gravitino.dto.requests.fileset_updates_request import FilesetUpdatesRequest
from gravitino.dto.responses.drop_response import DropResponse
from gravitino.dto.responses.entity_list_response import EntityListResponse
from gravitino.dto.responses.file_location_response import FileLocationResponse
from gravitino.dto.responses.fileset_response import FilesetResponse
from gravitino.name_identifier import NameIdentifier
from gravitino.namespace import Namespace
Expand Down Expand Up @@ -244,7 +246,29 @@ def get_file_location(self, ident: NameIdentifier, sub_path: str) -> str:
Returns:
The actual location of the file or directory.
"""
raise NotImplementedError("Not implemented yet")
self.check_fileset_name_identifier(ident)

full_namespace = self._get_fileset_full_namespace(ident.namespace())
try:
caller_context: CallerContext = CallerContextHolder.get()
params = {"sub_path": encode_string(sub_path)}

resp = self.rest_client.get(
self.format_file_location_request_path(full_namespace, ident.name()),
params=params,
headers=(
caller_context.context() if caller_context is not None else None
),
error_handler=FILESET_ERROR_HANDLER,
)
file_location_resp = FileLocationResponse.from_json(
resp.body, infer_missing=True
)
file_location_resp.validate()

return file_location_resp.file_location()
finally:
CallerContextHolder.remove()

@staticmethod
def check_fileset_namespace(namespace: Namespace):
Expand Down Expand Up @@ -272,6 +296,14 @@ def format_fileset_request_path(namespace: Namespace) -> str:
schema_ns = Namespace.of(namespace.level(0), namespace.level(1))
return f"{BaseSchemaCatalog.format_schema_request_path(schema_ns)}/{encode_string(namespace.level(2))}/filesets"

@staticmethod
def format_file_location_request_path(namespace: Namespace, name: str) -> str:
schema_ns = Namespace.of(namespace.level(0), namespace.level(1))
return (
f"{BaseSchemaCatalog.format_schema_request_path(schema_ns)}/{encode_string(namespace.level(2))}"
f"/filesets/{encode_string(name)}/location"
)

@staticmethod
def to_fileset_update_request(change: FilesetChange):
if isinstance(change, FilesetChange.RenameFileset):
Expand Down
9 changes: 7 additions & 2 deletions clients/client-python/gravitino/utils/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,14 @@ def _request(
f"Error handler {type(error_handler).__name__} can't handle this response, error response body: {resp}"
) from None

def get(self, endpoint, params=None, error_handler=None, **kwargs):
def get(self, endpoint, params=None, headers=None, error_handler=None, **kwargs):
return self._request(
"get", endpoint, params=params, error_handler=error_handler, **kwargs
"get",
endpoint,
params=params,
headers=headers,
error_handler=error_handler,
**kwargs,
)

def delete(self, endpoint, error_handler=None, **kwargs):
Expand Down
48 changes: 47 additions & 1 deletion clients/client-python/tests/integration/test_fileset_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@
Fileset,
FilesetChange,
)
from gravitino.exceptions.base import NoSuchFilesetException, GravitinoRuntimeException
from gravitino.audit.caller_context import CallerContext, CallerContextHolder
from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
from gravitino.audit.fileset_data_operation import FilesetDataOperation
from gravitino.exceptions.base import (
NoSuchFilesetException,
GravitinoRuntimeException,
)
from tests.integration.integration_test_env import IntegrationTestEnv

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -155,6 +161,18 @@ def create_fileset(self) -> Fileset:
properties=self.fileset_properties,
)

def create_custom_fileset(
self, ident: NameIdentifier, storage_location: str
) -> Fileset:
catalog = self.gravitino_client.load_catalog(name=self.catalog_name)
return catalog.as_fileset_catalog().create_fileset(
ident=ident,
fileset_type=Fileset.Type.MANAGED,
comment=self.fileset_comment,
storage_location=storage_location,
properties=self.fileset_properties,
)

def test_create_fileset(self):
fileset = self.create_fileset()
self.assertIsNotNone(fileset)
Expand Down Expand Up @@ -223,3 +241,31 @@ def test_alter_fileset(self):
)
self.assertEqual(fileset_comment_removed.name(), self.fileset_name)
self.assertIsNone(fileset_comment_removed.comment())

def test_get_file_location(self):
fileset_ident: NameIdentifier = NameIdentifier.of(
self.schema_name, "test_get_file_location"
)
fileset_location = "/tmp/test_get_file_location"
self.create_custom_fileset(fileset_ident, fileset_location)
actual_file_location = (
self.gravitino_client.load_catalog(name=self.catalog_name)
.as_fileset_catalog()
.get_file_location(fileset_ident, "/test/test.txt")
)

self.assertEqual(actual_file_location, f"file:{fileset_location}/test/test.txt")

# test rename without sub path should throw an exception
caller_context = CallerContext(
{
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION: FilesetDataOperation.RENAME.name
}
)
with self.assertRaises(GravitinoRuntimeException):
CallerContextHolder.set(caller_context)
(
self.gravitino_client.load_catalog(name=self.catalog_name)
.as_fileset_catalog()
.get_file_location(fileset_ident, "")
)
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,6 @@ def _set_thread_local_context(self, thread_name, context: Dict[str, str]):
CallerContextHolder.remove()

self.assertIsNone(CallerContextHolder.get())

# will not throw an exception if the context is not exists
CallerContextHolder.remove()
80 changes: 80 additions & 0 deletions clients/client-python/tests/unittests/test_fileset_catalog_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import unittest
from http.client import HTTPResponse
from unittest.mock import patch, Mock

from gravitino import GravitinoClient, Catalog, NameIdentifier
from gravitino.audit.caller_context import CallerContext, CallerContextHolder
from gravitino.audit.fileset_audit_constants import FilesetAuditConstants
from gravitino.audit.fileset_data_operation import FilesetDataOperation
from gravitino.exceptions.handlers.fileset_error_handler import FILESET_ERROR_HANDLER
from gravitino.namespace import Namespace
from gravitino.utils import Response
from tests.unittests import mock_base


@mock_base.mock_data
class TestFilesetCatalogApi(unittest.TestCase):

def test_get_file_location(self, *mock_method):
json_data = {"code": 0, "fileLocation": "file:/test/1"}
json_str = json.dumps(json_data)

mock_http_resp = Mock(HTTPResponse)
mock_http_resp.getcode.return_value = 200
mock_http_resp.read.return_value = json_str
mock_http_resp.info.return_value = None
mock_http_resp.url = None
mock_resp = Response(mock_http_resp)

metalake_name: str = "metalake_demo"
catalog_name: str = "fileset_catalog"
gravitino_client = GravitinoClient(
uri="http://localhost:8090", metalake_name=metalake_name
)
catalog: Catalog = gravitino_client.load_catalog(catalog_name)

with patch(
"gravitino.utils.http_client.HTTPClient.get",
return_value=mock_resp,
) as mock_get:
fileset_ident: NameIdentifier = NameIdentifier.of(
"test", "test_get_file_location"
)
context = {
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION: FilesetDataOperation.RENAME.name
}
CallerContextHolder.set(CallerContext(context))
file_location: str = catalog.as_fileset_catalog().get_file_location(
fileset_ident, "/test/1"
)
# check the get input params as expected
mock_get.assert_called_once_with(
catalog.as_fileset_catalog().format_file_location_request_path(
Namespace.of("metalake_demo", "fileset_catalog", "test"),
fileset_ident.name(),
),
params={"sub_path": "/test/1"},
headers=context,
error_handler=FILESET_ERROR_HANDLER,
)
# check the caller context is removed
self.assertIsNone(CallerContextHolder.get())
# check the response is as expected
self.assertEqual(file_location, "file:/test/1")
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ public abstract class BaseCatalogPropertiesMetadata extends BasePropertiesMetada
CLOUD_NAME,
"The cloud that the catalog is running on",
false /* required */,
true /* immutable */,
false /* immutable */,
Catalog.CloudName.class,
null /* The default value does not work because if the user does not set it, this property will not be displayed */,
false /* hidden */,
false /* reserved */),
PropertyEntry.stringOptionalPropertyEntry(
CLOUD_REGION_CODE,
"The region code of the cloud that the catalog is running on",
false /* required */,
false /* immutable */,
null /* The default value does not work because if the user does not set it, this property will not be displayed */,
false /* hidden */)),
PropertyEntry::getName);
Expand Down
Loading