Skip to content

Commit

Permalink
[#4708] improvement(client-java/server): Add implementations for the …
Browse files Browse the repository at this point in the history
…`getFileLocation` interface in Java Client / Server (#4858)

### What changes were proposed in this pull request?

Add the implementations for `getFileLocation` interfaces in Java Client
/ Server.

### Why are the changes needed?

Fix: #4708 

### How was this patch tested?

Add some UTs and ITs.

---------

Co-authored-by: xiaojiebao <[email protected]>
  • Loading branch information
xloya and xiaojiebao authored Sep 6, 2024
1 parent 0403f31 commit 3731e53
Show file tree
Hide file tree
Showing 7 changed files with 393 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@
import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.dto.AuditDTO;
import org.apache.gravitino.dto.CatalogDTO;
import org.apache.gravitino.dto.requests.FilesetCreateRequest;
import org.apache.gravitino.dto.requests.FilesetUpdateRequest;
import org.apache.gravitino.dto.requests.FilesetUpdatesRequest;
import org.apache.gravitino.dto.responses.DropResponse;
import org.apache.gravitino.dto.responses.EntityListResponse;
import org.apache.gravitino.dto.responses.FileLocationResponse;
import org.apache.gravitino.dto.responses.FilesetResponse;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
Expand Down Expand Up @@ -238,7 +241,23 @@ public boolean dropFileset(NameIdentifier ident) {
@Override
public String getFileLocation(NameIdentifier ident, String subPath)
throws NoSuchFilesetException {
throw new UnsupportedOperationException("Not implemented");
checkFilesetNameIdentifier(ident);
Namespace fullNamespace = getFilesetFullNamespace(ident.namespace());

CallerContext callerContext = CallerContext.CallerContextHolder.get();

Map<String, String> params = new HashMap<>();
params.put("sub_path", RESTUtils.encodeString(subPath));
FileLocationResponse resp =
restClient.get(
formatFileLocationRequestPath(fullNamespace, ident.name()),
params,
FileLocationResponse.class,
callerContext != null ? callerContext.context() : Collections.emptyMap(),
ErrorHandlers.filesetErrorHandler());
resp.validate();

return resp.getFileLocation();
}

@VisibleForTesting
Expand All @@ -252,6 +271,19 @@ static String formatFilesetRequestPath(Namespace ns) {
.toString();
}

@VisibleForTesting
static String formatFileLocationRequestPath(Namespace ns, String name) {
Namespace schemaNs = Namespace.of(ns.level(0), ns.level(1));
return new StringBuilder()
.append(formatSchemaRequestPath(schemaNs))
.append("/")
.append(RESTUtils.encodeString(ns.level(2)))
.append("/filesets/")
.append(RESTUtils.encodeString(name))
.append("/location")
.toString();
}

/**
* Check whether the namespace of a fileset is valid.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,19 @@
import com.google.common.collect.ImmutableMap;
import java.nio.file.NoSuchFileException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.audit.InternalClientType;
import org.apache.gravitino.dto.AuditDTO;
import org.apache.gravitino.dto.CatalogDTO;
import org.apache.gravitino.dto.file.FilesetDTO;
Expand All @@ -43,17 +52,23 @@
import org.apache.gravitino.dto.responses.DropResponse;
import org.apache.gravitino.dto.responses.EntityListResponse;
import org.apache.gravitino.dto.responses.ErrorResponse;
import org.apache.gravitino.dto.responses.FileLocationResponse;
import org.apache.gravitino.dto.responses.FilesetResponse;
import org.apache.gravitino.exceptions.AlreadyExistsException;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NotFoundException;
import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.rest.RESTUtils;
import org.apache.hc.core5.http.Method;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockserver.matchers.Times;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.model.Parameter;

public class TestFilesetCatalog extends TestBase {

Expand Down Expand Up @@ -400,6 +415,127 @@ public void testAlterFileset() throws JsonProcessingException {
"internal error");
}

@Test
public void testGetFileLocation() throws JsonProcessingException {
NameIdentifier fileset = NameIdentifier.of(metalakeName, catalogName, "schema1", "fileset1");
String mockSubPath = "mock_location/test";
String filesetPath =
withSlash(
FilesetCatalog.formatFileLocationRequestPath(
Namespace.of(metalakeName, catalogName, "schema1"), fileset.name()));
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString(mockSubPath));

String mockFileLocation =
String.format("file:/fileset/%s/%s/%s/%s", catalogName, "schema1", "fileset1", mockSubPath);
FileLocationResponse resp = new FileLocationResponse(mockFileLocation);
buildMockResource(Method.GET, filesetPath, queryParams, null, resp, SC_OK);

String actualFileLocation =
catalog
.asFilesetCatalog()
.getFileLocation(
NameIdentifier.of(fileset.namespace().level(2), fileset.name()), mockSubPath);
Assertions.assertTrue(StringUtils.isNotBlank(actualFileLocation));
Assertions.assertEquals(mockFileLocation, actualFileLocation);

// Throw schema not found exception
ErrorResponse errResp =
ErrorResponse.notFound(NoSuchSchemaException.class.getSimpleName(), "schema not found");
buildMockResource(Method.GET, filesetPath, null, errResp, SC_NOT_FOUND);
Assertions.assertThrows(
NoSuchSchemaException.class,
() ->
catalog
.asFilesetCatalog()
.getFileLocation(
NameIdentifier.of(fileset.namespace().level(2), fileset.name()), mockSubPath),
"schema not found");

ErrorResponse errResp1 =
ErrorResponse.notFound(NotFoundException.class.getSimpleName(), "fileset not found");
buildMockResource(Method.GET, filesetPath, null, errResp1, SC_NOT_FOUND);
Assertions.assertThrows(
NotFoundException.class,
() ->
catalog
.asFilesetCatalog()
.getFileLocation(
NameIdentifier.of(fileset.namespace().level(2), fileset.name()), mockSubPath),
"fileset not found");

ErrorResponse errResp2 = ErrorResponse.internalError("internal error");
buildMockResource(Method.GET, filesetPath, null, errResp2, SC_SERVER_ERROR);
Assertions.assertThrows(
RuntimeException.class,
() ->
catalog
.asFilesetCatalog()
.getFileLocation(
NameIdentifier.of(fileset.namespace().level(2), fileset.name()), mockSubPath),
"internal error");
}

@Test
public void testCallerContextToHeader() throws JsonProcessingException {
NameIdentifier fileset = NameIdentifier.of(metalakeName, catalogName, "schema1", "fileset1");
String mockSubPath = "mock_location/test";
String filesetPath =
withSlash(
FilesetCatalog.formatFileLocationRequestPath(
Namespace.of(metalakeName, catalogName, "schema1"), fileset.name()));
Map<String, String> queryParams = new HashMap<>();
queryParams.put("sub_path", RESTUtils.encodeString(mockSubPath));
String mockFileLocation =
String.format("file:/fileset/%s/%s/%s/%s", catalogName, "schema1", "fileset1", mockSubPath);
FileLocationResponse resp = new FileLocationResponse(mockFileLocation);
String respJson = MAPPER.writeValueAsString(resp);

List<Parameter> parameters =
queryParams.entrySet().stream()
.map(kv -> new Parameter(kv.getKey(), kv.getValue()))
.collect(Collectors.toList());
HttpRequest mockRequest =
HttpRequest.request(filesetPath)
.withMethod(Method.GET.name())
.withQueryStringParameters(parameters);
HttpResponse mockResponse = HttpResponse.response().withStatusCode(SC_OK).withBody(respJson);

// set the thread local context
Map<String, String> context = new HashMap<>();
context.put(
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
InternalClientType.HADOOP_GVFS.name());
context.put(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION,
FilesetDataOperation.GET_FILE_STATUS.name());
CallerContext callerContext = CallerContext.builder().withContext(context).build();
CallerContext.CallerContextHolder.set(callerContext);

// Using Times.exactly(1) will only match once for the request, so we could set difference
// responses for the same request and path.
AtomicReference<String> internalClientType = new AtomicReference<>(null);
AtomicReference<String> dataOperation = new AtomicReference<>(null);
mockServer
.when(mockRequest, Times.exactly(1))
.respond(
httpRequest -> {
internalClientType.set(
httpRequest.getFirstHeader(
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE));
dataOperation.set(
httpRequest.getFirstHeader(
FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION));
return mockResponse;
});
catalog
.asFilesetCatalog()
.getFileLocation(
NameIdentifier.of(fileset.namespace().level(2), fileset.name()), mockSubPath);
Assertions.assertEquals(FilesetDataOperation.GET_FILE_STATUS.name(), dataOperation.get());
Assertions.assertEquals(InternalClientType.HADOOP_GVFS.name(), internalClientType.get());
}

private FilesetDTO mockFilesetDTO(
String name,
Fileset.Type type,
Expand Down
65 changes: 65 additions & 0 deletions docs/open-api/filesets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,46 @@ paths:
"5xx":
$ref: "./openapi.yaml#/components/responses/ServerErrorResponse"

/metalakes/{metalake}/catalogs/{catalog}/schemas/{schema}/filesets/{fileset}/location:
parameters:
- $ref: "./openapi.yaml#/components/parameters/metalake"
- $ref: "./openapi.yaml#/components/parameters/catalog"
- $ref: "./openapi.yaml#/components/parameters/schema"
- $ref: "./openapi.yaml#/components/parameters/fileset"

get:
tags:
- location
summary: Get file location
operationId: getFileLocation
description: Returns the specified file location object
parameters:
- name: sub_path
in: query
required: true
schema:
type: string
description: The sub path to the file or directory
responses:
"200":
$ref: "#/components/responses/FileLocationResponse"
"404":
description: Not Found - The target fileset does not exist
content:
application/vnd.gravitino.v1+json:
schema:
$ref: "./openapi.yaml#/components/schemas/ErrorModel"
examples:
NoSuchMetalakeException:
$ref: "./metalakes.yaml#/components/examples/NoSuchMetalakeException"
NoSuchCatalogException:
$ref: "./catalogs.yaml#/components/examples/NoSuchCatalogException"
NoSuchSchemaException:
$ref: "./schemas.yaml#/components/examples/NoSuchSchemaException"
NoSuchFilesetException:
$ref: "#/components/examples/NoSuchFilesetException"
"5xx":
$ref: "./openapi.yaml#/components/responses/ServerErrorResponse"

components:

Expand Down Expand Up @@ -356,6 +396,25 @@ components:
FilesetResponse:
$ref: "#/components/examples/FilesetResponse"

FileLocationResponse:
description: The response of the file location object
content:
application/vnd.gravitino.v1+json:
schema:
type: object
properties:
code:
type: integer
format: int32
description: Status code of the response
enum:
- 0
fileLocation:
type: string
description: The actual file location
examples:
FileLocationResponse:
$ref: "#/components/examples/FileLocationResponse"

examples:
FilesetCreateRequest:
Expand Down Expand Up @@ -385,6 +444,12 @@ components:
}
}

FileLocationResponse:
value: {
"code": 0,
"fileLocation": "hdfs://host/user/fileset/schema/fileset1/test.parquet"
}

FilesetAlreadyExistsException:
value: {
"code": 1004,
Expand Down
27 changes: 27 additions & 0 deletions server/src/main/java/org/apache/gravitino/server/web/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
*/
package org.apache.gravitino.server.web;

import com.google.common.collect.Maps;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.Optional;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.UserPrincipal;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.audit.InternalClientType;
import org.apache.gravitino.auth.AuthConstants;
import org.apache.gravitino.dto.responses.ErrorResponse;
import org.apache.gravitino.utils.PrincipalUtils;
Expand Down Expand Up @@ -148,4 +154,25 @@ public static Response doAs(
}
return PrincipalUtils.doAs(principal, action);
}

public static Map<String, String> filterFilesetAuditHeaders(HttpServletRequest httpRequest) {
Map<String, String> filteredHeaders = Maps.newHashMap();

String internalClientType =
httpRequest.getHeader(FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE);
if (StringUtils.isNotBlank(internalClientType)
&& InternalClientType.checkValid(internalClientType)) {
filteredHeaders.put(
FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE, internalClientType);
}

String dataOperation =
httpRequest.getHeader(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION);
if (StringUtils.isNotBlank(
httpRequest.getHeader(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION))
&& FilesetDataOperation.checkValid(dataOperation)) {
filteredHeaders.put(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, dataOperation);
}
return filteredHeaders;
}
}
Loading

0 comments on commit 3731e53

Please sign in to comment.