From 399af244323a99653649556a0ba6b75d6161d1b0 Mon Sep 17 00:00:00 2001 From: Arun Bhima Date: Fri, 22 Dec 2023 03:44:41 +0530 Subject: [PATCH 1/3] Changes for handling S3 requests This handles: 1. PUT, GET, DELETE, HEAD, LIST 2. Multipart uploads 3. Also made changes in frontend.properties file to start frontend on ssl port and talk to mysql --- .../InMemoryUnknownAccountService.java | 10 +- .../com/github/ambry/account/Container.java | 12 + .../github/ambry/frontend/NamedBlobPath.java | 20 +- .../com/github/ambry/rest/RequestPath.java | 36 ++ .../java/com/github/ambry/rest/RestUtils.java | 17 +- .../frontend/AmbryIdConverterFactory.java | 25 +- .../ambry/frontend/AmbrySecurityService.java | 7 +- .../frontend/AmbryUrlSigningService.java | 4 +- .../frontend/CompleteMultipartUpload.java | 44 ++ .../CompleteMultipartUploadResult.java | 63 ++ .../com/github/ambry/frontend/Contents.java | 72 +++ .../frontend/FrontendRestRequestService.java | 57 +- .../ambry/frontend/HeadBlobHandler.java | 96 ++- .../InitiateMultipartUploadResult.java | 53 ++ .../ambry/frontend/ListBucketResult.java | 106 ++++ .../ambry/frontend/ListPartsResult.java | 53 ++ .../ambry/frontend/NamedBlobListHandler.java | 77 ++- .../ambry/frontend/NamedBlobPostHandler.java | 566 ++++++++++++++++++ .../ambry/frontend/NamedBlobPutHandler.java | 58 +- .../java/com/github/ambry/frontend/Part.java | 47 ++ .../ambry/frontend/PostBlobHandler.java | 2 +- .../ambry/router/OperationController.java | 65 +- .../com/github/ambry/router/PutOperation.java | 12 +- config/frontend.properties | 13 +- config/log4j2.xml | 4 + 25 files changed, 1474 insertions(+), 45 deletions(-) create mode 100644 ambry-frontend/src/main/java/com/github/ambry/frontend/CompleteMultipartUpload.java create mode 100644 ambry-frontend/src/main/java/com/github/ambry/frontend/CompleteMultipartUploadResult.java create mode 100644 ambry-frontend/src/main/java/com/github/ambry/frontend/Contents.java create mode 100644 ambry-frontend/src/main/java/com/github/ambry/frontend/InitiateMultipartUploadResult.java create mode 100644 ambry-frontend/src/main/java/com/github/ambry/frontend/ListBucketResult.java create mode 100644 ambry-frontend/src/main/java/com/github/ambry/frontend/ListPartsResult.java create mode 100644 ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPostHandler.java create mode 100644 ambry-frontend/src/main/java/com/github/ambry/frontend/Part.java diff --git a/ambry-account/src/main/java/com/github/ambry/account/InMemoryUnknownAccountService.java b/ambry-account/src/main/java/com/github/ambry/account/InMemoryUnknownAccountService.java index c870b02a47..10b067ecf5 100644 --- a/ambry-account/src/main/java/com/github/ambry/account/InMemoryUnknownAccountService.java +++ b/ambry-account/src/main/java/com/github/ambry/account/InMemoryUnknownAccountService.java @@ -31,14 +31,18 @@ class InMemoryUnknownAccountService implements AccountService { Account.ACL_INHERITED_BY_CONTAINER_DEFAULT_VALUE, Account.SNAPSHOT_VERSION_DEFAULT_VALUE, Arrays.asList(Container.UNKNOWN_CONTAINER, Container.DEFAULT_PUBLIC_CONTAINER, Container.DEFAULT_PRIVATE_CONTAINER), Account.QUOTA_RESOURCE_TYPE_DEFAULT_VALUE); + // Create a hardcoded Account "named-blob-account" which will be used for s3 prototype tests + static final Account NAMED_BLOB_ACCOUNT = new Account((short) 101, "named-blob-sandbox", Account.AccountStatus.ACTIVE, + Account.ACL_INHERITED_BY_CONTAINER_DEFAULT_VALUE, Account.SNAPSHOT_VERSION_DEFAULT_VALUE, + Collections.singletonList(Container.NAMED_BLOB_CONTAINER), Account.QUOTA_RESOURCE_TYPE_DEFAULT_VALUE); private static final Collection accounts = - Collections.unmodifiableCollection(Collections.singletonList(UNKNOWN_ACCOUNT)); + Collections.unmodifiableCollection(Collections.singletonList(NAMED_BLOB_ACCOUNT)); private volatile boolean isOpen = true; @Override public Account getAccountById(short accountId) { checkOpen(); - return accountId == Account.UNKNOWN_ACCOUNT_ID ? UNKNOWN_ACCOUNT : null; + return NAMED_BLOB_ACCOUNT; } @Override @@ -59,7 +63,7 @@ public boolean removeAccountUpdateConsumer(Consumer> account public Account getAccountByName(String accountName) { checkOpen(); Objects.requireNonNull(accountName, "accountName cannot be null."); - return UNKNOWN_ACCOUNT; + return NAMED_BLOB_ACCOUNT; } @Override diff --git a/ambry-api/src/main/java/com/github/ambry/account/Container.java b/ambry-api/src/main/java/com/github/ambry/account/Container.java index bf868e1cd2..4abbc1cfc7 100644 --- a/ambry-api/src/main/java/com/github/ambry/account/Container.java +++ b/ambry-api/src/main/java/com/github/ambry/account/Container.java @@ -288,6 +288,18 @@ public class Container { SNAPSHOT_VERSION_DEFAULT_VALUE, ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT_VALUE, CACHE_TTL_IN_SECOND_DEFAULT_VALUE, USER_METADATA_KEYS_TO_NOT_PREFIX_IN_RESPONSE_DEFAULT_VALUE); + // Create a container 'container-a' which will be used for s3 prototype tests + public static final Container NAMED_BLOB_CONTAINER = + new Container((short) 8, "container-a", UNKNOWN_CONTAINER_STATUS, UNKNOWN_CONTAINER_DESCRIPTION, + UNKNOWN_CONTAINER_ENCRYPTED_SETTING, UNKNOWN_CONTAINER_PREVIOUSLY_ENCRYPTED_SETTING, + UNKNOWN_CONTAINER_CACHEABLE_SETTING, UNKNOWN_CONTAINER_MEDIA_SCAN_DISABLED_SETTING, null, + UNKNOWN_CONTAINER_TTL_REQUIRED_SETTING, SECURE_PATH_REQUIRED_DEFAULT_VALUE, + CONTENT_TYPE_WHITELIST_FOR_FILENAMES_ON_DOWNLOAD_DEFAULT_VALUE, BACKUP_ENABLED_DEFAULT_VALUE, + OVERRIDE_ACCOUNT_ACL_DEFAULT_VALUE, NamedBlobMode.OPTIONAL, (short) 101, + UNKNOWN_CONTAINER_DELETE_TRIGGER_TIME, LAST_MODIFIED_TIME_DEFAULT_VALUE, SNAPSHOT_VERSION_DEFAULT_VALUE, + ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT_VALUE, CACHE_TTL_IN_SECOND_DEFAULT_VALUE, + USER_METADATA_KEYS_TO_NOT_PREFIX_IN_RESPONSE_DEFAULT_VALUE); + /** * A container defined specifically for the blobs put without specifying target container but isPrivate flag is * set to {@code false}. diff --git a/ambry-api/src/main/java/com/github/ambry/frontend/NamedBlobPath.java b/ambry-api/src/main/java/com/github/ambry/frontend/NamedBlobPath.java index 50d4a113d5..e5b1770737 100644 --- a/ambry-api/src/main/java/com/github/ambry/frontend/NamedBlobPath.java +++ b/ambry-api/src/main/java/com/github/ambry/frontend/NamedBlobPath.java @@ -21,6 +21,8 @@ import java.util.Map; import java.util.Objects; +import static com.github.ambry.rest.RestUtils.InternalKeys.*; + /** * Represents the blob url parsing results for named blob. @@ -51,23 +53,31 @@ public static NamedBlobPath parse(String path, Map args) throws String[] splitPath = path.split("/", 4); String blobNamePrefix = RestUtils.getHeader(args, PREFIX_PARAM, false); boolean isListRequest = blobNamePrefix != null; + // S3 can issue "HEAD /s3/named-blob-sandbox" on the bucket-name. + // The converted named blob would be /named/named-blob-sandbox/container-a. So, don't check for number of expected + // segments for S3 as of now + /* int expectedSegments = isListRequest ? 3 : 4; if (splitPath.length != expectedSegments || !Operations.NAMED_BLOB.equalsIgnoreCase(splitPath[0])) { throw new RestServiceException(String.format( "Path must have format '/named//%s. Received path='%s', blobNamePrefix='%s'", isListRequest ? "" : "/'", path, blobNamePrefix), RestServiceErrorCode.BadRequest); } + */ String accountName = splitPath[1]; String containerName = splitPath[2]; if (isListRequest) { String pageToken = RestUtils.getHeader(args, PAGE_PARAM, false); return new NamedBlobPath(accountName, containerName, null, blobNamePrefix, pageToken); } else { - String blobName = splitPath[3]; - if (blobName.length() > MAX_BLOB_NAME_LENGTH) { - throw new RestServiceException( - String.format("Blob name maximum length should be less than %s", MAX_BLOB_NAME_LENGTH), - RestServiceErrorCode.BadRequest); + String blobName = null; + if (splitPath.length == 4) { + blobName = splitPath[3]; + if (blobName.length() > MAX_BLOB_NAME_LENGTH) { + throw new RestServiceException( + String.format("Blob name maximum length should be less than %s", MAX_BLOB_NAME_LENGTH), + RestServiceErrorCode.BadRequest); + } } return new NamedBlobPath(accountName, containerName, blobName, null, null); } diff --git a/ambry-api/src/main/java/com/github/ambry/rest/RequestPath.java b/ambry-api/src/main/java/com/github/ambry/rest/RequestPath.java index 81ac7e3385..a1ccb8bbb2 100644 --- a/ambry-api/src/main/java/com/github/ambry/rest/RequestPath.java +++ b/ambry-api/src/main/java/com/github/ambry/rest/RequestPath.java @@ -19,8 +19,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static com.github.ambry.rest.RestUtils.*; +import static com.github.ambry.rest.RestUtils.InternalKeys.*; import static com.github.ambry.router.GetBlobOptions.*; @@ -35,6 +38,7 @@ public class RequestPath { private static final char PATH_SEPARATOR_CHAR = '/'; private static final String PATH_SEPARATOR_STRING = String.valueOf(PATH_SEPARATOR_CHAR); private static final String SEGMENT = SubResource.Segment.toString(); + private static final Logger logger = LoggerFactory.getLogger(RequestPath.class); /** * Parse the request path (and additional headers in some cases). The path will match the following regex-like @@ -74,6 +78,34 @@ public static RequestPath parse(RestRequest restRequest, List prefixesTo } catch (IllegalArgumentException e) { throw new RestServiceException("Invalid URI path", e, RestServiceErrorCode.BadRequest); } + + // S3 requests to Ambry are in the form "PUT /s3/named-blob-sandbox/checkpoints/87833badf879a3fc7bf151adfe928eac/chk-1/_metadata" + // where "named-blob-sandbox" is the account name (no container name is added) and + // "checkpoints/87833badf879a3fc7bf151adfe928eac/chk-1/_metadata" is the key name. + + // We convert it to named blob request in the form "/named/named-blob-sandbox/container-a/checkpoints/87833badf879a3fc7bf151adfe928eac/chk-1/_metadata" + // i.e. we hardcode container name to 'container-a' + + logger.info("S3 API | Input path: {}", path); + if (path.startsWith("/s3")) { + // Convert to named blob request internally + int accountStart = "/s3/".length(); + int accountEnd = path.indexOf("/", accountStart); + if (accountEnd == -1) { + accountEnd = path.length(); + } + String accountName = path.substring(accountStart, accountEnd); + String containerName = "container-a"; + String remainingPath = path.substring(accountEnd); + String namedPath = + "/named/" + accountName + "/" + containerName + (remainingPath.length() > 0 ? remainingPath : ""); + logger.info("S3 API | Converting S3 path to Named path. S3 path: {}, Named path: {}", path, namedPath); + path = namedPath; // Store the converted path + restRequest.setArg(S3_REQUEST, "true"); // signifies this is a s3 request + restRequest.setArg(S3_BUCKET, accountName); // store the bucket name + restRequest.setArg(S3_KEY, remainingPath); // store the key name + } + return parse(path, restRequest.getArgs(), prefixesToRemove, clusterName); } @@ -94,6 +126,10 @@ public static RequestPath parse(String path, Map args, List args) throw * @throws RestServiceException if required arguments aren't present or if they aren't in the format expected. */ public static long getTtlFromRequestHeader(Map args) throws RestServiceException { - long ttl = Utils.Infinite_Time; + long ttl = 86400; Long ttlFromHeader = getLongHeader(args, Headers.TTL, false); if (ttlFromHeader != null) { if (ttlFromHeader < -1) { diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryIdConverterFactory.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryIdConverterFactory.java index bc23380baf..0cbea72178 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryIdConverterFactory.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryIdConverterFactory.java @@ -42,6 +42,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.github.ambry.rest.RestUtils.InternalKeys.*; + /** * Factory that instantiates an {@link IdConverter} implementation for the frontend. @@ -106,10 +108,26 @@ public Future convert(RestRequest restRequest, String input, BlobInfo bl frontendMetrics.idConverterRequestRate.mark(); long startTimeInMs = System.currentTimeMillis(); try { + LOGGER.info("AmbryIdConverter | convert method. Rest request: {}", restRequest); if (!isOpen) { exception = new RestServiceException("IdConverter is closed", RestServiceErrorCode.ServiceUnavailable); - } else if (restRequest.getRestMethod().equals(RestMethod.POST)) { + } else if (restRequest.getRestMethod().equals(RestMethod.POST) && !restRequest.getArgs() + .containsKey(S3_REQUEST)) { + // For S3, POST requests with ?uploadId=<> are used in completion of multipart uploads. For eg, + // POST /s3/named-blob-sandbox/checkpoints/246cd68fa3480b2b0f9e6524fa473bca?uploadId=. + // For such use-case, we want to treat it as named blob upload convertedId = "/" + signIdIfRequired(restRequest, input); + } else if (restRequest.getRestMethod().equals(RestMethod.PUT) && RestUtils.isChunkUpload(restRequest.getArgs()) + && restRequest.getArgs().containsKey(S3_REQUEST)) { + // For S3, PUT requests with ?uploadId=<> are used in adding individual part of multipart upload. For eg, + // PUT /s3_named-blob-sandbox_container-a/checkpoints/42b6b3f29b2f9e0b629ff03dac4e9302/shared/ + // c29b1701-de55-463d-a129-adaa90c1fc23?uploadId= + // http%3A%2F%2Flocalhost%3A1174%2F%3Fx-ambry-ttl%3D2419200%26x-ambry-service-id%3DFlink-S3-Client + // %26x-ambry-content-type%3Dapplication%252Foctet-stream%26x-ambry-chunk-upload%3Dtrue%26x-ambry-url-type%3D + // POST%26x-ambry-session%3D3a2aeb6f-aeed-4944-881e-19d41a6b7a22%26et%3D1703180930&partNumber=1 + // For such case, we want to give out chunk ID. + convertedId = signIdIfRequired(restRequest, input); + LOGGER.info("chunk upload for S3. Converted id {}", convertedId); } else { CallbackUtils.callCallbackAfter(convertId(input, restRequest, blobInfo), (id, e) -> completeConversion(id, e, future, callback)); @@ -180,8 +198,9 @@ private CompletionStage convertId(String input, RestRequest restRequest, conversionFuture = getNamedBlobDb().get(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(), namedBlobPath.getBlobName(), getOption).thenApply(NamedBlobRecord::getBlobId); } - } else if (restRequest.getRestMethod() == RestMethod.PUT && RestUtils.getRequestPath(restRequest) - .matchesOperation(Operations.NAMED_BLOB)) { + } else if ((restRequest.getRestMethod() == RestMethod.PUT || restRequest.getRestMethod() == RestMethod.POST) + && RestUtils.getRequestPath(restRequest).matchesOperation(Operations.NAMED_BLOB)) { + // For S3, Multipart upload completion is a POST method. So, adding POST method check avove. Objects.requireNonNull(blobInfo, "blobInfo cannot be null."); NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs()); String blobId = RestUtils.stripSlashAndExtensionFromId(input); diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbrySecurityService.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbrySecurityService.java index 6fbddcc955..1338d5e960 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbrySecurityService.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbrySecurityService.java @@ -284,7 +284,12 @@ public void processResponse(RestRequest restRequest, RestResponseChannel respons case OPTIONS: case PUT: if (requestPath.matchesOperation(Operations.NAMED_BLOB)) { - responseChannel.setStatus(ResponseStatus.Created); + if (restRequest.getArgs().containsKey("uploadId")) { + // If PUT + ?uploadId is present, this is a s3 multipart upload for chunk. Return 200 instead of 201 + responseChannel.setStatus(ResponseStatus.Ok); + } else { + responseChannel.setStatus(ResponseStatus.Created); + } responseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, 0); responseChannel.setHeader(RestUtils.Headers.CREATION_TIME, new Date(blobInfo.getBlobProperties().getCreationTimeInMs())); diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryUrlSigningService.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryUrlSigningService.java index 18a12cbdbe..664f2049ce 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryUrlSigningService.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryUrlSigningService.java @@ -180,7 +180,9 @@ public String getSignedUrl(RestRequest restRequest) throws RestServiceException argsForUrl.put(RestUtils.Headers.RESERVED_METADATA_ID, reservedMetadataBlobId); } } - argsForUrl.put(RestUtils.Headers.MAX_UPLOAD_SIZE, maxUploadSize); + // Commenting this for allowing multipart uploads with 5 MB. But if ambry "router.max.put.chunk.size.bytes" is + // changed to 5 MB, we don't need to comment this. + //argsForUrl.put(RestUtils.Headers.MAX_UPLOAD_SIZE, maxUploadSize); } argsForUrl.put(LINK_EXPIRY_TIME, time.seconds() + urlTtlSecs); diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/CompleteMultipartUpload.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/CompleteMultipartUpload.java new file mode 100644 index 0000000000..3ba37a53ef --- /dev/null +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/CompleteMultipartUpload.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + */ + +package com.github.ambry.frontend; + +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; + + +public class CompleteMultipartUpload { + + @JacksonXmlProperty(localName = "Part") + @JacksonXmlElementWrapper(useWrapping = false) + private Part[] part; + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (Part part : part) { + sb.append(part).append(","); + } + return sb.toString(); + } + + public Part[] getPart() { + return part; + } + + public void setPart(Part[] part) { + this.part = part; + } +} diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/CompleteMultipartUploadResult.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/CompleteMultipartUploadResult.java new file mode 100644 index 0000000000..0440bdc656 --- /dev/null +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/CompleteMultipartUploadResult.java @@ -0,0 +1,63 @@ +/* + * Copyright 2023 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + */ + +package com.github.ambry.frontend; + +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; + + +public class CompleteMultipartUploadResult { + + @JacksonXmlProperty(localName = "Bucket") + private String bucket; + @JacksonXmlProperty(localName = "Key") + private String key; + @JacksonXmlProperty(localName = "Location") + private String location; + @JacksonXmlProperty(localName = "ETag") + private String eTag; + + public String getBucket() { + return bucket; + } + + public String getKey() { + return key; + } + + public String getLocation() { + return location; + } + + public void setBucket(String bucket) { + this.bucket = bucket; + } + + public void setKey(String key) { + this.key = key; + } + + public void setLocation(String location) { + this.location = location; + } + + public String geteTag() { + return eTag; + } + + public void seteTag(String eTag) { + this.eTag = eTag; + } +} diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/Contents.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/Contents.java new file mode 100644 index 0000000000..4b33224fcb --- /dev/null +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/Contents.java @@ -0,0 +1,72 @@ +/* + * Copyright 2023 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + */ + +package com.github.ambry.frontend; + +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; + + +public class Contents { + @JacksonXmlProperty(localName = "Key") + private String key; + @JacksonXmlProperty(localName = "LastModified") + private String lastModified; + @JacksonXmlProperty(localName = "ETag") + private String eTag; + @JacksonXmlProperty(localName = "Size") + private long size; + @JacksonXmlProperty(localName = "StorageClass") + private String storageClass; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getLastModified() { + return lastModified; + } + + public void setLastModified(String lastModified) { + this.lastModified = lastModified; + } + + public String geteTag() { + return eTag; + } + + public void seteTag(String eTag) { + this.eTag = eTag; + } + + public long getSize() { + return size; + } + + public void setSize(long size) { + this.size = size; + } + + public String getStorageClass() { + return storageClass; + } + + public void setStorageClass(String storageClass) { + this.storageClass = storageClass; + } +} diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java index 3b375c6a0c..f34fd7dcf9 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java @@ -88,6 +88,7 @@ class FrontendRestRequestService implements RestRequestService { //private NamedBlobGetHandler namedBlobGetHandler; //private NamedBlobDeleteHandler namedBlobDeleteHandler; private NamedBlobPutHandler namedBlobPutHandler; + private NamedBlobPostHandler namedBlobPostHandler; private GetBlobHandler getBlobHandler; private PostBlobHandler postBlobHandler; private TtlUpdateHandler ttlUpdateHandler; @@ -189,24 +190,32 @@ public void start() throws InstantiationException { new DeleteBlobHandler(router, securityService, idConverter, accountAndContainerInjector, frontendMetrics, clusterMap, quotaManager, accountService); deleteDatasetHandler = - new DeleteDatasetHandler(securityService, accountService, frontendMetrics, accountAndContainerInjector, deleteBlobHandler); + new DeleteDatasetHandler(securityService, accountService, frontendMetrics, accountAndContainerInjector, + deleteBlobHandler); headBlobHandler = new HeadBlobHandler(frontendConfig, router, securityService, idConverter, accountAndContainerInjector, - frontendMetrics, clusterMap, quotaManager); + frontendMetrics, clusterMap, quotaManager, accountService); undeleteHandler = new UndeleteHandler(router, securityService, idConverter, accountAndContainerInjector, frontendMetrics, clusterMap, quotaManager); namedBlobListHandler = new NamedBlobListHandler(securityService, namedBlobDb, accountAndContainerInjector, frontendMetrics); - namedBlobPutHandler = - new NamedBlobPutHandler(securityService, namedBlobDb, idConverter, idSigningService, router, accountAndContainerInjector, - frontendConfig, frontendMetrics, clusterName, quotaManager, accountService, deleteBlobHandler); + namedBlobPutHandler = new NamedBlobPutHandler(securityService, namedBlobDb, idConverter, idSigningService, router, + accountAndContainerInjector, frontendConfig, frontendMetrics, clusterName, quotaManager, accountService, + deleteBlobHandler); + + // Adding named blob post handler to handle S3 POST requests during multipart uploads + namedBlobPostHandler = new NamedBlobPostHandler(securityService, namedBlobDb, idConverter, idSigningService, router, + accountAndContainerInjector, frontendConfig, frontendMetrics, clusterName, quotaManager, accountService, + deleteBlobHandler, urlSigningService); getClusterMapSnapshotHandler = new GetClusterMapSnapshotHandler(securityService, frontendMetrics, clusterMap); getAccountsHandler = new GetAccountsHandler(securityService, accountService, frontendMetrics); - getDatasetsHandler = new GetDatasetsHandler(securityService, accountService, frontendMetrics, accountAndContainerInjector); - listDatasetsHandler = new ListDatasetsHandler(securityService, accountService, frontendMetrics, accountAndContainerInjector); + getDatasetsHandler = + new GetDatasetsHandler(securityService, accountService, frontendMetrics, accountAndContainerInjector); + listDatasetsHandler = + new ListDatasetsHandler(securityService, accountService, frontendMetrics, accountAndContainerInjector); listDatasetVersionHandler = new ListDatasetVersionHandler(securityService, accountService, frontendMetrics, accountAndContainerInjector); getStatsReportHandler = new GetStatsReportHandler(securityService, frontendMetrics, accountStatsStore); @@ -288,12 +297,22 @@ public void handleGet(final RestRequest restRequest, final RestResponseChannel r } else if (requestPath.matchesOperation(Operations.STATS_REPORT)) { getStatsReportHandler.handle(restRequest, restResponseChannel, (result, exception) -> submitResponse(restRequest, restResponseChannel, result, exception)); + } else if (requestPath.matchesOperation(Operations.NAMED_BLOB) && restRequest.getArgs().containsKey("uploadId")) { + // If GET with ?uploadId is present, this is a request from s3 to list all chunks of multipart upload. + // Eg URL:- GET /s3/named-blob-sandbox/checkpoints/246cd68fa3480b2b0f9e6524fa473bca/shared/ + // 28774e7f-f490-4c25-9fb6-90eacbb3645f + // ?uploadId=http%3A%2F%2Flocalhost%3A1174%2F%3Fx-ambry-ttl%3D2419200%26x-ambry-service-id% + // 3DFlink-S3-Client%26x-ambry-content-type%3Dapplication%252Foctet-stream%26x-ambry-chunk-upload% + // 3Dtrue%26x-ambry-url-type%3DPOST%26x-ambry-session%3Da7a8586f-91a3-4f0b-a8cd-5a03813d6969%26et%3D1703187666 + // &part-number-marker=0 HTTP/1.1 + namedBlobListHandler.handle(restRequest, restResponseChannel, + (result, exception) -> submitResponse(restRequest, restResponseChannel, result, exception)); } else if (requestPath.matchesOperation(Operations.NAMED_BLOB) && NamedBlobPath.parse(requestPath, restRequest.getArgs()).getBlobName() == null) { namedBlobListHandler.handle(restRequest, restResponseChannel, (result, exception) -> submitResponse(restRequest, restResponseChannel, result, exception)); } else if (RestUtils.getBooleanHeader(restRequest.getArgs(), ENABLE_DATASET_VERSION_LISTING, false) - && DatasetVersionPath.parse(requestPath, restRequest.getArgs()).getVersion() == null){ + && DatasetVersionPath.parse(requestPath, restRequest.getArgs()).getVersion() == null) { listDatasetVersionHandler.handle(restRequest, restResponseChannel, (result, exception) -> submitResponse(restRequest, restResponseChannel, result, exception)); } else { @@ -314,6 +333,11 @@ public void handlePost(RestRequest restRequest, RestResponseChannel restResponse } else if (requestPath.matchesOperation(Operations.ACCOUNTS)) { postAccountsHandler.handle(restRequest, restResponseChannel, (result, exception) -> submitResponse(restRequest, restResponseChannel, result, exception)); + } else if (requestPath.matchesOperation(NAMED_BLOB)) { + // S3 sends POST for multipart uploads. Eg url:- + // POST /s3/named-blob-sandbox/checkpoints/246cd68fa3480b2b0f9e6524fa473bca/shared/28774e7f-f490-4c25-9fb6-90eacbb3645f?uploads HTTP/1.1 + namedBlobPostHandler.handle(restRequest, restResponseChannel, + (r, e) -> submitResponse(restRequest, restResponseChannel, r, e)); } else { postBlobHandler.handle(restRequest, restResponseChannel, (result, exception) -> submitResponse(restRequest, restResponseChannel, null, exception)); @@ -373,10 +397,18 @@ public void handleHead(RestRequest restRequest, RestResponseChannel restResponse RestRequestMetrics requestMetrics = frontendMetrics.headBlobMetricsGroup.getRestRequestMetrics(restRequest.isSslUsed(), false); restRequest.getMetricsTracker().injectMetrics(requestMetrics); - - headBlobHandler.handle(restRequest, restResponseChannel, (r, e) -> { - submitResponse(restRequest, restResponseChannel, null, e); - }); + if (requestPath.matchesOperation(Operations.NAMED_BLOB) + && NamedBlobPath.parse(requestPath, restRequest.getArgs()).getBlobName() == null) { + // S3 could send HEAD requests on accounts. Eg url: HEAD /s3/named-blob-sandbox + logger.info("Handling head request for named blob account"); + headBlobHandler.handleAccounts(restRequest, restResponseChannel, (r, e) -> { + submitResponse(restRequest, restResponseChannel, r, e); + }); + } else { + headBlobHandler.handle(restRequest, restResponseChannel, (r, e) -> { + submitResponse(restRequest, restResponseChannel, null, e); + }); + } }; preProcessAndRouteRequest(restRequest, restResponseChannel, frontendMetrics.headPreProcessingMetrics, routingAction); @@ -481,6 +513,7 @@ private void preProcessAndRouteRequest(RestRequest restRequest, RestResponseChan checkAvailable(); securityService.preProcessRequest(restRequest, FrontendUtils.buildCallback(preProcessingMetrics, r -> { RequestPath requestPath = RequestPath.parse(restRequest, frontendConfig.pathPrefixesToRemove, clusterName); + logger.info("FrontendRestRequestService | PreProcessAndRouteRequest. Request path is {}", requestPath); restRequest.setArg(REQUEST_PATH, requestPath); routingAction.accept(requestPath); }, restRequest.getUri(), logger, errorCallback)); diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/HeadBlobHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/HeadBlobHandler.java index fcebc9505e..a4fcc2b991 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/HeadBlobHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/HeadBlobHandler.java @@ -13,22 +13,33 @@ */ package com.github.ambry.frontend; +import com.github.ambry.account.AccountCollectionSerde; +import com.github.ambry.account.AccountService; +import com.github.ambry.account.AccountServiceException; +import com.github.ambry.account.Container; import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.commons.BlobId; +import com.github.ambry.commons.ByteBufferReadableStreamChannel; import com.github.ambry.commons.Callback; import com.github.ambry.config.FrontendConfig; import com.github.ambry.protocol.GetOption; import com.github.ambry.quota.QuotaManager; import com.github.ambry.quota.QuotaUtils; +import com.github.ambry.rest.RequestPath; import com.github.ambry.rest.RestRequest; import com.github.ambry.rest.RestRequestMetrics; import com.github.ambry.rest.RestResponseChannel; +import com.github.ambry.rest.RestServiceErrorCode; import com.github.ambry.rest.RestServiceException; import com.github.ambry.rest.RestUtils; import com.github.ambry.router.GetBlobOptions; import com.github.ambry.router.GetBlobOptionsBuilder; import com.github.ambry.router.GetBlobResult; +import com.github.ambry.router.ReadableStreamChannel; import com.github.ambry.router.Router; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.GregorianCalendar; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,10 +62,11 @@ public class HeadBlobHandler { private final FrontendMetrics metrics; private final ClusterMap clusterMap; private final QuotaManager quotaManager; + private final AccountService accountService; HeadBlobHandler(FrontendConfig frontendConfig, Router router, SecurityService securityService, IdConverter idConverter, AccountAndContainerInjector accountAndContainerInjector, FrontendMetrics metrics, - ClusterMap clusterMap, QuotaManager quotaManager) { + ClusterMap clusterMap, QuotaManager quotaManager, AccountService accountService) { this.frontendConfig = frontendConfig; this.router = router; this.securityService = securityService; @@ -63,6 +75,7 @@ public class HeadBlobHandler { this.metrics = metrics; this.clusterMap = clusterMap; this.quotaManager = quotaManager; + this.accountService = accountService; } void handle(RestRequest restRequest, RestResponseChannel restResponseChannel, Callback callback) @@ -74,10 +87,22 @@ void handle(RestRequest restRequest, RestResponseChannel restResponseChannel, Ca new CallbackChain(restRequest, restResponseChannel, callback).start(); } + // Add method to handle HEAD /account-name from S3 + void handleAccounts(RestRequest restRequest, RestResponseChannel restResponseChannel, + Callback callback) throws RestServiceException { + // named blob requests have their account/container in the URI, so checks can be done prior to ID conversion. + if (getRequestPath(restRequest).matchesOperation(Operations.NAMED_BLOB)) { + accountAndContainerInjector.injectAccountContainerForNamedBlob(restRequest, metrics.headBlobMetricsGroup); + } + new CallbackChain(restRequest, restResponseChannel, callback, false).start(); + } + private class CallbackChain { private final RestRequest restRequest; private final RestResponseChannel restResponseChannel; private final Callback finalCallback; + private final Callback finalReadableStreamChannelCallback; + private boolean isAccountRequest = false; /** * @param restRequest the {@link RestRequest}. @@ -89,6 +114,21 @@ private CallbackChain(RestRequest restRequest, RestResponseChannel restResponseC this.restRequest = restRequest; this.restResponseChannel = restResponseChannel; this.finalCallback = finalCallback; + this.finalReadableStreamChannelCallback = null; + } + + /** + * @param restRequest the {@link RestRequest}. + * @param restResponseChannel the {@link RestResponseChannel}. + * @param finalCallback the {@link Callback} to call on completion. + */ + private CallbackChain(RestRequest restRequest, RestResponseChannel restResponseChannel, + Callback finalCallback, boolean unused) { + this.restRequest = restRequest; + this.restResponseChannel = restResponseChannel; + this.finalReadableStreamChannelCallback = finalCallback; + this.finalCallback = null; + this.isAccountRequest = true; } /** @@ -106,8 +146,12 @@ private void start() { */ private Callback securityProcessRequestCallback() { return buildCallback(metrics.headBlobSecurityProcessRequestMetrics, result -> { - String blobIdStr = getRequestPath(restRequest).getOperationOrBlobId(false); - idConverter.convert(restRequest, blobIdStr, idConverterCallback()); + if (!isAccountRequest) { + String blobIdStr = getRequestPath(restRequest).getOperationOrBlobId(false); + idConverter.convert(restRequest, blobIdStr, idConverterCallback()); + } else { + securityService.postProcessRequest(restRequest, securityPostProcessRequestCallback()); + } }, restRequest.getUri(), LOGGER, finalCallback); } @@ -177,5 +221,51 @@ private Callback securityProcessResponseCallback() { return buildCallback(metrics.headBlobSecurityProcessResponseMetrics, securityCheckResult -> finalCallback.onCompletion(null, null), restRequest.getUri(), LOGGER, finalCallback); } + + /** + * After {@link SecurityService#postProcessRequest} finishes, call the final callback with the response body to + * sen + * @return a {@link Callback} to be used with {@link SecurityService#postProcessRequest}. + */ + private Callback securityPostProcessRequestCallback() { + return buildCallback(metrics.getAccountsSecurityPostProcessRequestMetrics, securityCheckResult -> { + byte[] serialized; + RequestPath requestPath = (RequestPath) restRequest.getArgs().get(REQUEST_PATH); + NamedBlobPath namedBlobPath = NamedBlobPath.parse(requestPath, restRequest.getArgs()); + restRequest.setArg(Headers.TARGET_ACCOUNT_NAME, namedBlobPath.getAccountName()); + restRequest.setArg(Headers.TARGET_CONTAINER_NAME, namedBlobPath.getContainerName()); + LOGGER.debug("Received head request for named blob account and container: {}", + restRequest.getArgs().get(REQUEST_PATH)); + Container container = getContainer(); + serialized = AccountCollectionSerde.serializeContainersInJson(Collections.singletonList(container)); + restResponseChannel.setHeader(RestUtils.Headers.TARGET_ACCOUNT_ID, container.getParentAccountId()); + ReadableStreamChannel channel = new ByteBufferReadableStreamChannel(ByteBuffer.wrap(serialized)); + restResponseChannel.setHeader(RestUtils.Headers.DATE, new GregorianCalendar().getTime()); + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, RestUtils.JSON_CONTENT_TYPE); + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, channel.getSize()); + finalReadableStreamChannelCallback.onCompletion(channel, null); + }, restRequest.getUri(), LOGGER, finalReadableStreamChannelCallback); + } + + /** + * @return requested container. + * @throws RestServiceException + */ + private Container getContainer() throws RestServiceException { + String accountName = RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.TARGET_ACCOUNT_NAME, true); + String containerName = RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.TARGET_CONTAINER_NAME, true); + Container container; + try { + container = accountService.getContainerByName(accountName, containerName); + } catch (AccountServiceException e) { + throw new RestServiceException("Failed to get container " + containerName + " from account " + accountName, + RestServiceErrorCode.getRestServiceErrorCode(e.getErrorCode())); + } + if (container == null) { + throw new RestServiceException("Container " + containerName + " in account " + accountName + " is not found.", + RestServiceErrorCode.NotFound); + } + return container; + } } } diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/InitiateMultipartUploadResult.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/InitiateMultipartUploadResult.java new file mode 100644 index 0000000000..14cb334875 --- /dev/null +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/InitiateMultipartUploadResult.java @@ -0,0 +1,53 @@ +/* + * Copyright 2023 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + */ + +package com.github.ambry.frontend; + +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; + + +public class InitiateMultipartUploadResult { + + @JacksonXmlProperty(localName = "Bucket") + private String bucket; + @JacksonXmlProperty(localName = "Key") + private String key; + @JacksonXmlProperty(localName = "UploadId") + private String uploadId; + + public String getBucket() { + return bucket; + } + + public String getKey() { + return key; + } + + public String getUploadId() { + return uploadId; + } + + public void setBucket(String bucket) { + this.bucket = bucket; + } + + public void setKey(String key) { + this.key = key; + } + + public void setUploadId(String uploadId) { + this.uploadId = uploadId; + } +} diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/ListBucketResult.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/ListBucketResult.java new file mode 100644 index 0000000000..a81801da1d --- /dev/null +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/ListBucketResult.java @@ -0,0 +1,106 @@ +/* + * Copyright 2023 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + */ + +package com.github.ambry.frontend; + +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; +import java.util.List; + + +public class ListBucketResult { + + @JacksonXmlProperty(localName = "Name") + private String name; + @JacksonXmlProperty(localName = "Prefix") + private String prefix; + @JacksonXmlProperty(localName = "Marker") + private String marker; + @JacksonXmlProperty(localName = "MaxKeys") + private int maxKeys; + @JacksonXmlProperty(localName = "Delimiter") + private String delimiter; + @JacksonXmlProperty(localName = "IsTruncated") + private boolean isTruncated; + @JacksonXmlProperty(localName = "Contents") + @JacksonXmlElementWrapper(useWrapping = false) + private List contents; + @JacksonXmlProperty(localName = "EncodingType") + private String encodingType; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getPrefix() { + return prefix; + } + + public void setPrefix(String prefix) { + this.prefix = prefix; + } + + public String getMarker() { + return marker; + } + + public void setMarker(String marker) { + this.marker = marker; + } + + public int getMaxKeys() { + return maxKeys; + } + + public void setMaxKeys(int maxKeys) { + this.maxKeys = maxKeys; + } + + public String getDelimiter() { + return delimiter; + } + + public void setDelimiter(String delimiter) { + this.delimiter = delimiter; + } + + public boolean isTruncated() { + return isTruncated; + } + + public void setTruncated(boolean truncated) { + isTruncated = truncated; + } + + public List getContents() { + return contents; + } + + public void setContents(List contents) { + this.contents = contents; + } + + public String getEncodingType() { + return encodingType; + } + + public void setEncodingType(String encodingType) { + this.encodingType = encodingType; + } +} diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/ListPartsResult.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/ListPartsResult.java new file mode 100644 index 0000000000..9e92190b8d --- /dev/null +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/ListPartsResult.java @@ -0,0 +1,53 @@ +/* + * Copyright 2023 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + */ + +package com.github.ambry.frontend; + +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; + + +public class ListPartsResult { + + @JacksonXmlProperty(localName = "Bucket") + private String bucket; + @JacksonXmlProperty(localName = "Key") + private String key; + @JacksonXmlProperty(localName = "UploadId") + private String uploadId; + + public String getBucket() { + return bucket; + } + + public String getKey() { + return key; + } + + public String getUploadId() { + return uploadId; + } + + public void setBucket(String bucket) { + this.bucket = bucket; + } + + public void setKey(String key) { + this.key = key; + } + + public void setUploadId(String uploadId) { + this.uploadId = uploadId; + } +} diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobListHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobListHandler.java index e1c5dcadfe..142f81197b 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobListHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobListHandler.java @@ -15,6 +15,9 @@ package com.github.ambry.frontend; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import com.github.ambry.commons.ByteBufferReadableStreamChannel; import com.github.ambry.commons.Callback; import com.github.ambry.commons.CallbackUtils; import com.github.ambry.named.NamedBlobDb; @@ -26,11 +29,19 @@ import com.github.ambry.rest.RestServiceException; import com.github.ambry.rest.RestUtils; import com.github.ambry.router.ReadableStreamChannel; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; import java.util.GregorianCalendar; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static com.github.ambry.frontend.FrontendUtils.*; +import static com.github.ambry.rest.RestUtils.InternalKeys.*; /** @@ -141,13 +152,67 @@ private Callback securityPostProcessRequestCallback() { */ private Callback> listBlobsCallback() { return buildCallback(frontendMetrics.listDbLookupMetrics, page -> { - ReadableStreamChannel channel = - serializeJsonToChannel(page.toJson(record -> new NamedBlobListEntry(record).toJson())); - restResponseChannel.setHeader(RestUtils.Headers.DATE, new GregorianCalendar().getTime()); - restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, RestUtils.JSON_CONTENT_TYPE); - restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, channel.getSize()); - finalCallback.onCompletion(channel, null); + //ReadableStreamChannel channel = + // serializeJsonToChannel(page.toJson(record -> new NamedBlobListEntry(record).toJson())); + + // S3 expects listing of blobs in xml format. + + if (restRequest.getArgs().containsKey("uploadId")) { + String bucket = (String) restRequest.getArgs().get(S3_BUCKET); + String key = (String) restRequest.getArgs().get(S3_KEY); + String uploadId = (String) restRequest.getArgs().get("uploadId"); + LOGGER.info( + "NamedBlobListHandler | Sending response for list upload parts. Bucket = {}, Key = {}, Upload Id = {}", + bucket, key, uploadId); + ListPartsResult listPartsResult = new ListPartsResult(); + listPartsResult.setBucket(bucket); + listPartsResult.setKey(key); + listPartsResult.setUploadId(uploadId); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ObjectMapper objectMapper = new XmlMapper(); + objectMapper.writeValue(outputStream, listPartsResult); + ReadableStreamChannel channel = + new ByteBufferReadableStreamChannel(ByteBuffer.wrap(outputStream.toByteArray())); + restResponseChannel.setHeader(RestUtils.Headers.DATE, new GregorianCalendar().getTime()); + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, "application/xml"); + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, channel.getSize()); + finalCallback.onCompletion(channel, null); + } else { + ReadableStreamChannel channel = serializeAsXml(page); + restResponseChannel.setHeader(RestUtils.Headers.DATE, new GregorianCalendar().getTime()); + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, "application/xml"); + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, channel.getSize()); + finalCallback.onCompletion(channel, null); + } }, uri, LOGGER, finalCallback); } + + private ReadableStreamChannel serializeAsXml(Page namedBlobRecordPage) throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + ListBucketResult listBucketResult = new ListBucketResult(); + listBucketResult.setName(restRequest.getPath()); + listBucketResult.setPrefix(restRequest.getArgs().get("prefix").toString()); + listBucketResult.setMaxKeys(1); + listBucketResult.setDelimiter("/"); + listBucketResult.setEncodingType("url"); + + List contentsList = new ArrayList<>(); + List namedBlobRecords = namedBlobRecordPage.getEntries(); + for (NamedBlobRecord namedBlobRecord : namedBlobRecords) { + Contents contents = new Contents(); + contents.setKey(namedBlobRecord.getBlobName()); + String todayDate = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").format(Calendar.getInstance().getTime()); + contents.setLastModified(todayDate); + contentsList.add(contents); + } + + listBucketResult.setContents(contentsList); + + ObjectMapper objectMapper = new XmlMapper(); + objectMapper.writeValue(outputStream, listBucketResult); + + return new ByteBufferReadableStreamChannel(ByteBuffer.wrap(outputStream.toByteArray())); + } } } diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPostHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPostHandler.java new file mode 100644 index 0000000000..17297365f1 --- /dev/null +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPostHandler.java @@ -0,0 +1,566 @@ +/* + * Copyright 2020 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.frontend; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import com.github.ambry.account.AccountService; +import com.github.ambry.account.Container; +import com.github.ambry.account.Dataset; +import com.github.ambry.commons.BlobId; +import com.github.ambry.commons.ByteBufferReadableStreamChannel; +import com.github.ambry.commons.Callback; +import com.github.ambry.commons.RetainingAsyncWritableChannel; +import com.github.ambry.commons.RetryExecutor; +import com.github.ambry.commons.RetryPolicies; +import com.github.ambry.commons.RetryPolicy; +import com.github.ambry.config.FrontendConfig; +import com.github.ambry.messageformat.BlobInfo; +import com.github.ambry.messageformat.BlobProperties; +import com.github.ambry.named.NamedBlobDb; +import com.github.ambry.named.NamedBlobRecord; +import com.github.ambry.quota.QuotaManager; +import com.github.ambry.quota.QuotaUtils; +import com.github.ambry.rest.RequestPath; +import com.github.ambry.rest.RestRequest; +import com.github.ambry.rest.RestResponseChannel; +import com.github.ambry.rest.RestServiceErrorCode; +import com.github.ambry.rest.RestServiceException; +import com.github.ambry.rest.RestUtils; +import com.github.ambry.router.ChunkInfo; +import com.github.ambry.router.ReadableStreamChannel; +import com.github.ambry.router.Router; +import com.github.ambry.router.RouterErrorCode; +import com.github.ambry.router.RouterException; +import com.github.ambry.utils.Pair; +import com.github.ambry.utils.Utils; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.github.ambry.frontend.FrontendUtils.*; +import static com.github.ambry.rest.RestUtils.*; +import static com.github.ambry.rest.RestUtils.InternalKeys.*; +import static com.github.ambry.router.RouterErrorCode.*; + +// This class is copied from NamedBlobPutHandler with some modifications to handle S3 multipart upload requests + +public class NamedBlobPostHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(NamedBlobPostHandler.class); + /** + * Key to represent the time at which a blob will expire in ms. Used within the metadata map in signed IDs. + */ + static final String EXPIRATION_TIME_MS_KEY = "et"; + private final SecurityService securityService; + private final NamedBlobDb namedBlobDb; + private final IdConverter idConverter; + private final IdSigningService idSigningService; + private final AccountService accountService; + private final Router router; + private final AccountAndContainerInjector accountAndContainerInjector; + private final FrontendConfig frontendConfig; + private final FrontendMetrics frontendMetrics; + private final String clusterName; + private final QuotaManager quotaManager; + private final RetryPolicy retryPolicy = RetryPolicies.defaultPolicy(); + private final RetryExecutor retryExecutor = new RetryExecutor(Executors.newScheduledThreadPool(2)); + private final Set retriableRouterError = + EnumSet.of(AmbryUnavailable, ChannelClosed, UnexpectedInternalError, OperationTimedOut); + private final DeleteBlobHandler deleteBlobHandler; + private final UrlSigningService urlSigningService; + + /** + * Constructs a handler for handling requests for uploading or stitching blobs. + * + * @param securityService the {@link SecurityService} to use. + * @param namedBlobDb the {@link NamedBlobDb} to use. + * @param idConverter the {@link IdConverter} to use. + * @param idSigningService the {@link IdSigningService} to use. + * @param router the {@link Router} to use. + * @param accountAndContainerInjector helper to resolve account and container for a given request. + * @param frontendConfig the {@link FrontendConfig} to use. + * @param frontendMetrics {@link FrontendMetrics} instance where metrics should be recorded. + * @param clusterName the name of the storage cluster that the router communicates with + * @param quotaManager The {@link QuotaManager} class to account for quota usage in serving requests. + * @param accountService The {@link AccountService} to get the account and container id based on names. + * @param deleteBlobHandler + * @param urlSigningService + */ + NamedBlobPostHandler(SecurityService securityService, NamedBlobDb namedBlobDb, IdConverter idConverter, + IdSigningService idSigningService, Router router, AccountAndContainerInjector accountAndContainerInjector, + FrontendConfig frontendConfig, FrontendMetrics frontendMetrics, String clusterName, QuotaManager quotaManager, + AccountService accountService, DeleteBlobHandler deleteBlobHandler, UrlSigningService urlSigningService) { + this.securityService = securityService; + this.namedBlobDb = namedBlobDb; + this.idConverter = idConverter; + this.idSigningService = idSigningService; + this.router = router; + this.accountAndContainerInjector = accountAndContainerInjector; + this.frontendConfig = frontendConfig; + this.frontendMetrics = frontendMetrics; + this.clusterName = clusterName; + this.quotaManager = quotaManager; + this.accountService = accountService; + this.deleteBlobHandler = deleteBlobHandler; + this.urlSigningService = urlSigningService; + } + + /** + * Handles a request for post a blob + * @param restRequest the {@link RestRequest} that contains the request parameters. + * @param restResponseChannel the {@link RestResponseChannel} where headers should be set. + * @param callback the {@link Callback} to invoke when the response is ready (or if there is an exception). + */ + void handle(RestRequest restRequest, RestResponseChannel restResponseChannel, + Callback callback) { + restRequest.setArg(SEND_FAILURE_REASON, Boolean.TRUE); + new NamedBlobPostHandler.CallbackChain(restRequest, restResponseChannel, callback).start(); + } + + /** + * Represents the chain of actions to take. Keeps request context that is relevant to all callback stages. + */ + private class CallbackChain { + private final RestRequest restRequest; + private final RestResponseChannel restResponseChannel; + private final Callback finalCallback; + private final Callback deleteDatasetCallback; + private final String uri; + + /** + * @param restRequest the {@link RestRequest}. + * @param restResponseChannel the {@link RestResponseChannel}. + * @param finalCallback the {@link Callback} to call on completion. + */ + private CallbackChain(RestRequest restRequest, RestResponseChannel restResponseChannel, + Callback finalCallback) { + this.restRequest = restRequest; + this.restResponseChannel = restResponseChannel; + this.finalCallback = finalCallback; + this.deleteDatasetCallback = deleteDatasetVersionIfUploadFailedCallBack(finalCallback); + this.uri = restRequest.getUri(); + } + + /** + * Start the chain by calling {@link SecurityService#processRequest}. + */ + private void start() { + restRequest.getMetricsTracker() + .injectMetrics(frontendMetrics.putBlobMetricsGroup.getRestRequestMetrics(restRequest.isSslUsed(), false)); + try { + // Start the callback chain by parsing blob info headers and performing request security processing. + securityService.processRequest(restRequest, securityProcessRequestCallback()); + } catch (Exception e) { + finalCallback.onCompletion(null, e); + } + } + + /** + * After {@link SecurityService#processRequest} finishes, call {@link SecurityService#postProcessRequest} to perform + * request time security checks that rely on the request being fully parsed and any additional arguments set. + * @return a {@link Callback} to be used with {@link SecurityService#processRequest}. + */ + private Callback securityProcessRequestCallback() { + return buildCallback(frontendMetrics.putSecurityProcessRequestMetrics, securityCheckResult -> { + BlobInfo blobInfo = getBlobInfoFromRequest(); + securityService.postProcessRequest(restRequest, securityPostProcessRequestCallback(blobInfo)); + }, uri, LOGGER, finalCallback); + } + + /** + * After {@link SecurityService#postProcessRequest} finishes, call {@link Router#putBlob} to persist the blob in the + * storage layer. + * @param blobInfo the {@link BlobInfo} to make the router call with. + * @return a {@link Callback} to be used with {@link SecurityService#postProcessRequest}. + */ + private Callback securityPostProcessRequestCallback(BlobInfo blobInfo) { + return buildCallback(frontendMetrics.putSecurityPostProcessRequestMetrics, securityCheckResult -> { + + if (restRequest.getArgs().containsKey("uploads")) { + restRequest.setArg(RestUtils.Headers.URL_TYPE, "POST"); + restRequest.setArg(RestUtils.Headers.URL_TTL, "300"); + restRequest.setArg(RestUtils.Headers.MAX_UPLOAD_SIZE, "5242880"); + restRequest.setArg(RestUtils.Headers.CHUNK_UPLOAD, "true"); + + // Create signed url + String signedUrl = urlSigningService.getSignedUrl(restRequest); + LOGGER.debug("NamedBlobPostHandler | Generated {} from {}", signedUrl, restRequest); + + // Create xml response + String bucket = (String) restRequest.getArgs().get(S3_BUCKET); + String key = (String) restRequest.getArgs().get(S3_KEY); + LOGGER.info( + "NamedBlobPostHandler | Sending response for Multipart begin upload. Bucket = {}, Key = {}, Upload Id = {}", + bucket, key, signedUrl); + InitiateMultipartUploadResult initiateMultipartUploadResult = new InitiateMultipartUploadResult(); + initiateMultipartUploadResult.setBucket(bucket); + initiateMultipartUploadResult.setKey(key); + initiateMultipartUploadResult.setUploadId(signedUrl); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ObjectMapper objectMapper = new XmlMapper(); + objectMapper.writeValue(outputStream, initiateMultipartUploadResult); + ReadableStreamChannel channel = + new ByteBufferReadableStreamChannel(ByteBuffer.wrap(outputStream.toByteArray())); + restResponseChannel.setHeader(RestUtils.Headers.DATE, new GregorianCalendar().getTime()); + restResponseChannel.setHeader(RestUtils.Headers.SIGNED_URL, signedUrl); + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, "application/xml"); + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, channel.getSize()); + + finalCallback.onCompletion(channel, null); + } else if (restRequest.getArgs().containsKey("uploadId")) { + LOGGER.info("NamedBlobPostHandler | Received complete multipart upload request"); + RetainingAsyncWritableChannel channel = + new RetainingAsyncWritableChannel(frontendConfig.maxJsonRequestSizeBytes); + restRequest.readInto(channel, fetchStitchRequestBodyCallback(channel, blobInfo)); + } + }, uri, LOGGER, deleteDatasetCallback); + } + + /** + * After reading the body of the stitch request, parse the request body, + * and make a call to {@link Router#stitchBlob}. + * @param channel the {@link RetainingAsyncWritableChannel} that will contain the request body. + * @param blobInfo the {@link BlobInfo} to make the router call with. + * @return a {@link Callback} to be used with {@link RestRequest#readInto}. + */ + private Callback fetchStitchRequestBodyCallback(RetainingAsyncWritableChannel channel, BlobInfo blobInfo) { + return buildCallback(frontendMetrics.putReadStitchRequestMetrics, + bytesRead -> router.stitchBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), + getChunksToStitch(blobInfo.getBlobProperties(), deserializeXml(channel)), + routerStitchBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true)), + uri, LOGGER, deleteDatasetCallback); + } + + private CompleteMultipartUpload deserializeXml(RetainingAsyncWritableChannel channel) throws RestServiceException { + CompleteMultipartUpload completeMultipartUpload; + try (InputStream inputStream = channel.consumeContentAsInputStream()) { + ObjectMapper objectMapper = new XmlMapper(); + completeMultipartUpload = objectMapper.readValue(inputStream, CompleteMultipartUpload.class); + LOGGER.info("NamedBlobPostHandler | deserialized xml {}", completeMultipartUpload); + } catch (IOException e) { + throw new RestServiceException("Could not parse xml request body", e, RestServiceErrorCode.BadRequest); + } + return completeMultipartUpload; + } + + /** + * After {@link Router#putBlob} finishes, call {@link IdConverter#convert} to convert the returned ID into a format + * that will be returned in the "Location" header. + * @param blobInfo the {@link BlobInfo} to use for security checks. + * @return a {@link Callback} to be used with {@link Router#putBlob}. + */ + private Callback routerStitchBlobCallback(BlobInfo blobInfo) { + return buildCallback(frontendMetrics.putRouterStitchBlobMetrics, + blobId -> idConverter.convert(restRequest, blobId, blobInfo, idConverterCallback(blobInfo, blobId)), uri, + LOGGER, deleteDatasetCallback); + } + + /** + * After {@link IdConverter#convert} finishes, call {@link SecurityService#postProcessRequest} to perform + * request time security checks that rely on the request being fully parsed and any additional arguments set. + * @param blobInfo the {@link BlobInfo} to use for security checks. + * @param blobId the blob ID returned by the router (without decoration or obfuscation by id converter). + * @return a {@link Callback} to be used with {@link IdConverter#convert}. + */ + private Callback idConverterCallback(BlobInfo blobInfo, String blobId) { + return buildCallback(frontendMetrics.putIdConversionMetrics, convertedBlobId -> { + restResponseChannel.setHeader(Headers.LOCATION, convertedBlobId); + if (blobInfo.getBlobProperties().getTimeToLiveInSeconds() == Utils.Infinite_Time) { + // Do ttl update with retryExecutor. Use the blob ID returned from the router instead of the converted ID + // since the converted ID may be changed by the ID converter. + String serviceId = blobInfo.getBlobProperties().getServiceId(); + retryExecutor.runWithRetries(retryPolicy, + callback -> router.updateBlobTtl(blobId, serviceId, Utils.Infinite_Time, callback, + QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, false)), this::isRetriable, + routerTtlUpdateCallback(blobInfo, blobId)); + } else { + securityService.processResponse(restRequest, restResponseChannel, blobInfo, + securityProcessResponseCallback()); + } + }, uri, LOGGER, deleteDatasetCallback); + } + + /** + * @param throwable the error to check. + * @return true if the router error is retriable. + */ + private boolean isRetriable(Throwable throwable) { + return throwable instanceof RouterException && retriableRouterError.contains( + ((RouterException) throwable).getErrorCode()); + } + + /** + * After TTL update finishes, call {@link SecurityService#postProcessRequest} to perform + * request time security checks that rely on the request being fully parsed and any additional arguments set. + * @param blobInfo the {@link BlobInfo} to use for security checks. + * @param blobId the {@link String} to use for blob id. + * @return a {@link Callback} to be used with {@link Router#updateBlobTtl(String, String, long)}. + */ + private Callback routerTtlUpdateCallback(BlobInfo blobInfo, String blobId) { + return buildCallback(frontendMetrics.updateBlobTtlRouterMetrics, convertedBlobId -> { + // Set the named blob state to be 'READY' after the Ttl update succeed + if (!restRequest.getArgs().containsKey(InternalKeys.NAMED_BLOB_VERSION)) { + throw new RestServiceException( + "Internal key " + InternalKeys.NAMED_BLOB_VERSION + " is required in Named Blob TTL update callback!", + RestServiceErrorCode.InternalServerError); + } + long namedBlobVersion = (long) restRequest.getArgs().get(NAMED_BLOB_VERSION); + String blobIdClean = RestUtils.stripSlashAndExtensionFromId(blobId); + NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs()); + NamedBlobRecord record = new NamedBlobRecord(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(), + namedBlobPath.getBlobName(), blobIdClean, Utils.Infinite_Time, namedBlobVersion); + namedBlobDb.updateBlobTtlAndStateToReady(record).get(); + securityService.processResponse(restRequest, restResponseChannel, blobInfo, securityProcessResponseCallback()); + }, uri, LOGGER, deleteDatasetCallback); + } + + /** + * After {@link SecurityService#processResponse}, call {@code finalCallback}. + * @return a {@link Callback} to be used with {@link SecurityService#processResponse}. + */ + private Callback securityProcessResponseCallback() { + return buildCallback(frontendMetrics.putBlobSecurityProcessResponseMetrics, securityCheckResult -> { + if (restRequest.getArgs().containsKey("uploadId") && restRequest.getArgs().containsKey(S3_REQUEST)) { + // Create xml response + String bucket = (String) restRequest.getArgs().get(S3_BUCKET); + String key = (String) restRequest.getArgs().get(S3_KEY); + LOGGER.info( + "NamedBlobPostHandler | Sending response for Multipart upload complete. Bucket = {}, Key = {}, etag = {}", + bucket, key, restResponseChannel.getHeader(Headers.LOCATION)); + + CompleteMultipartUploadResult completeMultipartUploadResult = new CompleteMultipartUploadResult(); + completeMultipartUploadResult.setBucket(bucket); + completeMultipartUploadResult.setKey(key); + completeMultipartUploadResult.seteTag((String) restResponseChannel.getHeader(Headers.LOCATION)); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ObjectMapper objectMapper = new XmlMapper(); + objectMapper.writeValue(outputStream, completeMultipartUploadResult); + ReadableStreamChannel channel = + new ByteBufferReadableStreamChannel(ByteBuffer.wrap(outputStream.toByteArray())); + restResponseChannel.setHeader(RestUtils.Headers.DATE, new GregorianCalendar().getTime()); + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, "application/xml"); + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, channel.getSize()); + finalCallback.onCompletion(channel, null); + } else { + finalCallback.onCompletion(null, null); + } + }, restRequest.getUri(), LOGGER, finalCallback); + } + + /** + * Parse {@link BlobInfo} from the request arguments. This method will also ensure that the correct account and + * container objects are attached to the request. + * @return the {@link BlobInfo} parsed from the request arguments. + * @throws RestServiceException if there is an error while parsing the {@link BlobInfo} arguments. + */ + private BlobInfo getBlobInfoFromRequest() throws RestServiceException { + long propsBuildStartTime = System.currentTimeMillis(); + accountAndContainerInjector.injectAccountContainerForNamedBlob(restRequest, frontendMetrics.putBlobMetricsGroup); + if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) { + accountAndContainerInjector.injectDatasetForNamedBlob(restRequest); + } + restRequest.setArg(Headers.SERVICE_ID, "Flink-S3-Client"); + restRequest.setArg(Headers.AMBRY_CONTENT_TYPE, restRequest.getArgs().get(Headers.CONTENT_TYPE)); + BlobProperties blobProperties = RestUtils.buildBlobProperties(restRequest.getArgs()); + Container container = RestUtils.getContainerFromArgs(restRequest.getArgs()); + if (blobProperties.getTimeToLiveInSeconds() + TimeUnit.MILLISECONDS.toSeconds( + blobProperties.getCreationTimeInMs()) > Integer.MAX_VALUE) { + LOGGER.debug("TTL set to very large value in PUT request with BlobProperties {}", blobProperties); + frontendMetrics.ttlTooLargeError.inc(); + } else if (container.isTtlRequired() && (blobProperties.getTimeToLiveInSeconds() == Utils.Infinite_Time + || blobProperties.getTimeToLiveInSeconds() > frontendConfig.maxAcceptableTtlSecsIfTtlRequired)) { + String descriptor = RestUtils.getAccountFromArgs(restRequest.getArgs()).getName() + ":" + container.getName(); + if (frontendConfig.failIfTtlRequiredButNotProvided) { + throw new RestServiceException( + "TTL < " + frontendConfig.maxAcceptableTtlSecsIfTtlRequired + " is required for upload to " + descriptor, + RestServiceErrorCode.InvalidArgs); + } else { + LOGGER.debug("{} attempted an upload with ttl {} to {}", blobProperties.getServiceId(), + blobProperties.getTimeToLiveInSeconds(), descriptor); + frontendMetrics.ttlNotCompliantError.inc(); + restResponseChannel.setHeader(Headers.NON_COMPLIANCE_WARNING, + "TTL < " + frontendConfig.maxAcceptableTtlSecsIfTtlRequired + " will be required for future uploads"); + } + } + // inject encryption frontendMetrics if applicable + if (blobProperties.isEncrypted()) { + restRequest.getMetricsTracker() + .injectMetrics(frontendMetrics.putBlobMetricsGroup.getRestRequestMetrics(restRequest.isSslUsed(), true)); + } + Map userMetadataFromRequest = new HashMap<>(restRequest.getArgs()); + byte[] userMetadata = RestUtils.buildUserMetadata(userMetadataFromRequest); + frontendMetrics.blobPropsBuildForNameBlobPutTimeInMs.update(System.currentTimeMillis() - propsBuildStartTime); + LOGGER.trace("Blob properties of blob being PUT - {}", blobProperties); + return new BlobInfo(blobProperties, userMetadata); + } + + /** + * Parse and verify the signed chunk IDs in the body of a stitch request. + * + * @param stitchedBlobProperties the {@link BlobProperties} for the final stitched blob. + * @param completeMultipartUpload + * @return a list of chunks to stitch that can be provided to the router. + * @throws RestServiceException + */ + List getChunksToStitch(BlobProperties stitchedBlobProperties, + CompleteMultipartUpload completeMultipartUpload) throws RestServiceException { + String reservedMetadataBlobId = null; + List signedChunkIds = new ArrayList<>(); + + LOGGER.info("NamedBlobPostHandler | received parts are {}", completeMultipartUpload.getPart()); + + for (Part part : completeMultipartUpload.getPart()) { + LOGGER.info("NamedBlobPostHandler | part is {}", part); + signedChunkIds.add(part.geteTag()); + } + + if (signedChunkIds.isEmpty()) { + throw new RestServiceException("Must provide at least one ID in stitch request", + RestServiceErrorCode.MissingArgs); + } + List chunksToStitch = new ArrayList<>(signedChunkIds.size()); + String expectedSession = null; + long totalStitchedBlobSize = 0; + for (String signedChunkId : signedChunkIds) { + signedChunkId = + RequestPath.parse(signedChunkId, Collections.emptyMap(), frontendConfig.pathPrefixesToRemove, clusterName) + .getOperationOrBlobId(false); + if (!idSigningService.isIdSigned(signedChunkId)) { + throw new RestServiceException("All chunks IDs must be signed: " + signedChunkId, + RestServiceErrorCode.BadRequest); + } + Pair> idAndMetadata = idSigningService.parseSignedId(signedChunkId); + String blobId = idAndMetadata.getFirst(); + Map metadata = idAndMetadata.getSecond(); + + expectedSession = RestUtils.verifyChunkUploadSession(metadata, expectedSession); + @SuppressWarnings("ConstantConditions") + long chunkSizeBytes = RestUtils.getLongHeader(metadata, Headers.BLOB_SIZE, true); + + totalStitchedBlobSize += chunkSizeBytes; + // Expiration time is sent to the router, but not verified in this handler. The router is responsible for making + // checks related to internal ambry requirements, like making sure that the chunks do not expire before the + // metadata blob. + @SuppressWarnings("ConstantConditions") + long expirationTimeMs = RestUtils.getLongHeader(metadata, EXPIRATION_TIME_MS_KEY, true); + verifyChunkAccountAndContainer(blobId, stitchedBlobProperties); + reservedMetadataBlobId = getAndVerifyReservedMetadataBlobId(metadata, reservedMetadataBlobId, blobId); + + chunksToStitch.add(new ChunkInfo(blobId, chunkSizeBytes, expirationTimeMs, reservedMetadataBlobId)); + } + //the actual blob size for stitched blob is the sum of all the chunk sizes + restResponseChannel.setHeader(Headers.BLOB_SIZE, totalStitchedBlobSize); + return chunksToStitch; + } + + /** + * Check that the account and container IDs encoded in a chunk's blob ID matches those in the properties for the + * stitched blob. + * @param chunkBlobId the blob ID for the chunk. + * @param stitchedBlobProperties the {@link BlobProperties} for the stitched blob. + * @throws RestServiceException if the account or container ID does not match. + */ + private void verifyChunkAccountAndContainer(String chunkBlobId, BlobProperties stitchedBlobProperties) + throws RestServiceException { + Pair accountAndContainer; + try { + accountAndContainer = BlobId.getAccountAndContainerIds(chunkBlobId); + } catch (Exception e) { + throw new RestServiceException("Invalid blob ID in signed chunk ID", RestServiceErrorCode.BadRequest); + } + if (stitchedBlobProperties.getAccountId() != accountAndContainer.getFirst() + || stitchedBlobProperties.getContainerId() != accountAndContainer.getSecond()) { + throw new RestServiceException("Account and container for chunk: (" + accountAndContainer.getFirst() + ", " + + accountAndContainer.getSecond() + ") does not match account and container for stitched blob: (" + + stitchedBlobProperties.getAccountId() + ", " + stitchedBlobProperties.getContainerId() + ")", + RestServiceErrorCode.BadRequest); + } + } + + /** + * Verify that the reserved metadata id for the specified chunkId is same as seen for previous chunks. + * Also return the chunk's reserved metadata id. + * @param metadata {@link Map} of metadata set in the signed ids. + * @param reservedMetadataBlobId Reserved metadata id for the chunks. Can be {@code null}. + * @param chunkId The chunk id. + * @return The reserved metadata id. + * @throws RestServiceException in case of any exception. + */ + private String getAndVerifyReservedMetadataBlobId(Map metadata, String reservedMetadataBlobId, + String chunkId) throws RestServiceException { + String chunkReservedMetadataBlobId = RestUtils.getHeader(metadata, Headers.RESERVED_METADATA_ID, false); + if (chunkReservedMetadataBlobId == null) { + ReservedMetadataIdMetrics.getReservedMetadataIdMetrics( + frontendMetrics.getMetricRegistry()).noReservedMetadataForChunkedUploadCount.inc(); + throwRestServiceExceptionIfEnabled( + new RestServiceException(String.format("No reserved metadata id present in chunk %s signed url", chunkId), + RestServiceErrorCode.BadRequest), router.getRouterConfig().routerReservedMetadataEnabled); + } + if (reservedMetadataBlobId != null && !reservedMetadataBlobId.equals(chunkReservedMetadataBlobId)) { + ReservedMetadataIdMetrics.getReservedMetadataIdMetrics( + frontendMetrics.getMetricRegistry()).mismatchedReservedMetadataForChunkedUploadCount.inc(); + throwRestServiceExceptionIfEnabled(new RestServiceException(String.format( + "Reserved metadata id for the chunks are not same. For chunk: %s the reserved metadata id is %s. But reserved metadata id %s was found earlier.", + chunkId, chunkReservedMetadataBlobId, reservedMetadataBlobId), RestServiceErrorCode.BadRequest), + router.getRouterConfig().routerReservedMetadataEnabled); + } + return chunkReservedMetadataBlobId; + } + + /** + * Create a {@link BlobProperties} for the router upload (putBlob or stitchBlob) with a finite TTL such that + * orphaned blobs will not be created if the write to the named blob metadata DB fails. + * @param blobInfoFromRequest the {@link BlobInfo} parsed from the request. + * @return a {@link BlobProperties} for a TTL-ed initial router call. + */ + BlobProperties getPropertiesForRouterUpload(BlobInfo blobInfoFromRequest) { + BlobProperties properties; + if (blobInfoFromRequest.getBlobProperties().getTimeToLiveInSeconds() == Utils.Infinite_Time) { + properties = new BlobProperties(blobInfoFromRequest.getBlobProperties()); + // For blob with infinite time, the procedure is putBlob with a TTL, record insert to database with + // infinite TTL, and ttlUpdate. + properties.setTimeToLiveInSeconds(frontendConfig.permanentNamedBlobInitialPutTtl); + } else { + properties = blobInfoFromRequest.getBlobProperties(); + } + return properties; + } + + /** + * When upload named blob failed, we take the best effort to delete the dataset version which create before uploading. + * @param callback the final callback which submit the response. + */ + private Callback deleteDatasetVersionIfUploadFailedCallBack(Callback callback) { + return (r, e) -> { + if (callback != null) { + callback.onCompletion(r, e); + } + }; + } + } +} diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPutHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPutHandler.java index 33f03d794b..87dfceef4d 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPutHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPutHandler.java @@ -13,6 +13,7 @@ */ package com.github.ambry.frontend; +import com.azure.core.util.UrlBuilder; import com.github.ambry.account.AccountService; import com.github.ambry.account.AccountServiceException; import com.github.ambry.account.Container; @@ -47,6 +48,7 @@ import com.github.ambry.router.RouterErrorCode; import com.github.ambry.router.RouterException; import com.github.ambry.utils.Pair; +import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Utils; import java.util.ArrayList; import java.util.Collections; @@ -181,6 +183,20 @@ private void start() { .injectMetrics(frontendMetrics.putBlobMetricsGroup.getRestRequestMetrics(restRequest.isSslUsed(), false)); try { // Start the callback chain by parsing blob info headers and performing request security processing. + + if (restRequest.getArgs().containsKey("uploadId")) { + // This is a put chunk request in s3 multipart uploads + String signedUrl = (String) restRequest.getArgs().get("uploadId"); + LOGGER.info("Multipart chunk upload for S3. URL string is {}", signedUrl); + UrlBuilder urlBuilder = UrlBuilder.parse(signedUrl); + LOGGER.info("Built URL from string. URL is {}", urlBuilder); + for (String parameter : urlBuilder.getQuery().keySet()) { + String value = urlBuilder.getQuery().get(parameter); + LOGGER.info("Multipart chunk upload for S3. Adding query parameter {} and value {}", parameter, value); + restRequest.setArg(parameter, value); + } + } + securityService.processRequest(restRequest, securityProcessRequestCallback()); } catch (Exception e) { finalCallback.onCompletion(null, e); @@ -223,6 +239,7 @@ private Callback securityPostProcessRequestCallback(BlobInfo blobInfo) { frontendMetrics.addDatasetVersionRate.mark(); addDatasetVersion(blobInfo.getBlobProperties(), restRequest); } + PutBlobOptions options = getPutBlobOptionsFromRequest(); router.putBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), restRequest, options, routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true)); @@ -238,7 +255,7 @@ private Callback securityPostProcessRequestCallback(BlobInfo blobInfo) { */ private Callback routerPutBlobCallback(BlobInfo blobInfo) { return buildCallback(frontendMetrics.putRouterPutBlobMetrics, blobId -> { - restResponseChannel.setHeader(RestUtils.Headers.BLOB_SIZE, restRequest.getBlobBytesReceived()); + setSignedIdMetadataAndBlobSize(blobInfo.getBlobProperties()); idConverter.convert(restRequest, blobId, blobInfo, idConverterCallback(blobInfo, blobId)); }, uri, LOGGER, deleteDatasetCallback); } @@ -280,6 +297,9 @@ private Callback routerStitchBlobCallback(BlobInfo blobInfo) { private Callback idConverterCallback(BlobInfo blobInfo, String blobId) { return buildCallback(frontendMetrics.putIdConversionMetrics, convertedBlobId -> { restResponseChannel.setHeader(RestUtils.Headers.LOCATION, convertedBlobId); + if (restRequest.getArgs().containsKey("uploadId")) { + restResponseChannel.setHeader("ETag", convertedBlobId); + } if (blobInfo.getBlobProperties().getTimeToLiveInSeconds() == Utils.Infinite_Time) { // Do ttl update with retryExecutor. Use the blob ID returned from the router instead of the converted ID // since the converted ID may be changed by the ID converter. @@ -373,6 +393,8 @@ private BlobInfo getBlobInfoFromRequest() throws RestServiceException { if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) { accountAndContainerInjector.injectDatasetForNamedBlob(restRequest); } + restRequest.setArg(Headers.SERVICE_ID, "Flink-S3-Client"); + restRequest.setArg(Headers.AMBRY_CONTENT_TYPE, restRequest.getArgs().get(Headers.CONTENT_TYPE)); BlobProperties blobProperties = RestUtils.buildBlobProperties(restRequest.getArgs()); Container container = RestUtils.getContainerFromArgs(restRequest.getArgs()); if (blobProperties.getTimeToLiveInSeconds() + TimeUnit.MILLISECONDS.toSeconds( @@ -438,7 +460,9 @@ private Map getDatasetUserTags(RestRequest restRequest) throws R * @return the {@link PutBlobOptions} to use, parsed from the request. */ private PutBlobOptions getPutBlobOptionsFromRequest() throws RestServiceException { - PutBlobOptionsBuilder builder = new PutBlobOptionsBuilder().chunkUpload(false).restRequest(restRequest); + boolean chunkUpload = isChunkUpload(restRequest.getArgs()); + LOGGER.info("Is chunk upload {}", chunkUpload); + PutBlobOptionsBuilder builder = new PutBlobOptionsBuilder().chunkUpload(chunkUpload).restRequest(restRequest); Long maxUploadSize = RestUtils.getLongHeader(restRequest.getArgs(), RestUtils.Headers.MAX_UPLOAD_SIZE, false); if (maxUploadSize != null) { builder.maxUploadSize(maxUploadSize); @@ -446,6 +470,36 @@ private PutBlobOptions getPutBlobOptionsFromRequest() throws RestServiceExceptio return builder.build(); } + /** + * Attach the metadata to include in a signed ID to the {@link RestRequest} if the request is for a chunk upload. + * This will tell the ID converter that it needs to produce a signed ID to give back to the client. + * @param blobProperties the {@link BlobProperties} from the request. + * @throws RestServiceException + */ + private void setSignedIdMetadataAndBlobSize(BlobProperties blobProperties) throws RestServiceException { + if (RestUtils.isChunkUpload(restRequest.getArgs())) { + Map metadata = new HashMap<>(2); + metadata.put(RestUtils.Headers.BLOB_SIZE, Long.toString(restRequest.getBlobBytesReceived())); + metadata.put(RestUtils.Headers.SESSION, + RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.SESSION, true)); + metadata.put(EXPIRATION_TIME_MS_KEY, Long.toString( + Utils.addSecondsToEpochTime(SystemTime.getInstance().milliseconds(), + blobProperties.getTimeToLiveInSeconds()))); + if (blobProperties.getReservedMetadataBlobId() != null) { + metadata.put(RestUtils.Headers.RESERVED_METADATA_ID, blobProperties.getReservedMetadataBlobId()); + } else { + ReservedMetadataIdMetrics.getReservedMetadataIdMetrics( + frontendMetrics.getMetricRegistry()).noReservedMetadataFoundForChunkedUploadResponseCount.inc(); + throwRestServiceExceptionIfEnabled( + new RestServiceException("No reserved metadata id present to set in chunked upload response", + RestServiceErrorCode.BadRequest), router.getRouterConfig().routerReservedMetadataEnabled); + } + restRequest.setArg(RestUtils.InternalKeys.SIGNED_ID_METADATA_KEY, metadata); + } + //the actual blob size is the number of bytes read + restResponseChannel.setHeader(RestUtils.Headers.BLOB_SIZE, restRequest.getBlobBytesReceived()); + } + /** * Parse and verify the signed chunk IDs in the body of a stitch request. * @param stitchedBlobProperties the {@link BlobProperties} for the final stitched blob. diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/Part.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/Part.java new file mode 100644 index 0000000000..31d1f105f6 --- /dev/null +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/Part.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + */ + +package com.github.ambry.frontend; + +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; + + +public class Part { + @JacksonXmlProperty(localName = "PartNumber") + private String partNumber; + @JacksonXmlProperty(localName = "ETag") + private String eTag; + + public String getPartNumber() { + return partNumber; + } + + public void setPartNumber(String partNumber) { + this.partNumber = partNumber; + } + + public String geteTag() { + return eTag; + } + + public void seteTag(String eTag) { + this.eTag = eTag; + } + + @Override + public String toString() { + return "Part number = " + partNumber + ", eTag = " + eTag; + } +} diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/PostBlobHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/PostBlobHandler.java index a0e4358f29..959e768f04 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/PostBlobHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/PostBlobHandler.java @@ -346,7 +346,7 @@ private void checkUploadRequirements(BlobProperties blobProperties) throws RestS // ensure that the x-ambry-session header is present. RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.SESSION, true); // validate that a max chunk size is set. - RestUtils.getLongHeader(restRequest.getArgs(), RestUtils.Headers.MAX_UPLOAD_SIZE, true); + //RestUtils.getLongHeader(restRequest.getArgs(), RestUtils.Headers.MAX_UPLOAD_SIZE, true); // validate that the TTL for the chunk is set correctly. long chunkTtl = blobProperties.getTimeToLiveInSeconds(); if (chunkTtl <= 0 || chunkTtl > frontendConfig.chunkUploadMaxChunkTtlSecs) { diff --git a/ambry-router/src/main/java/com/github/ambry/router/OperationController.java b/ambry-router/src/main/java/com/github/ambry/router/OperationController.java index 346f24393a..e44e3b72b6 100644 --- a/ambry-router/src/main/java/com/github/ambry/router/OperationController.java +++ b/ambry-router/src/main/java/com/github/ambry/router/OperationController.java @@ -16,6 +16,7 @@ import com.github.ambry.account.AccountService; import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.commons.BlobId; import com.github.ambry.commons.Callback; import com.github.ambry.commons.ResponseHandler; import com.github.ambry.config.RouterConfig; @@ -30,6 +31,7 @@ import com.github.ambry.protocol.RequestOrResponse; import com.github.ambry.protocol.RequestOrResponseType; import com.github.ambry.quota.QuotaChargeCallback; +import com.github.ambry.store.StoreKey; import com.github.ambry.utils.Time; import com.github.ambry.utils.Utils; import java.io.IOException; @@ -41,6 +43,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -48,6 +51,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.github.ambry.commons.BlobId.BlobDataType.*; + /** * OperationController is the scaling unit for the NonBlockingRouter. The NonBlockingRouter can have multiple @@ -68,6 +73,7 @@ public class OperationController implements Runnable { final ReplicateBlobManager replicateBlobManager; private final NetworkClient networkClient; private final ResponseHandler responseHandler; + private final ClusterMap clusterMap; private final RouterConfig routerConfig; private final Thread requestResponseHandlerThread; private final CountDownLatch shutDownLatch = new CountDownLatch(1); @@ -103,6 +109,7 @@ public OperationController(String suffix, String defaultPartitionClass, AccountS KeyManagementService kms, CryptoService cryptoService, CryptoJobHandler cryptoJobHandler, Time time, NonBlockingRouter nonBlockingRouter) throws IOException { networkClient = networkClientFactory.getNetworkClient(); + this.clusterMap = clusterMap; this.routerConfig = routerConfig; this.routerMetrics = routerMetrics; this.responseHandler = responseHandler; @@ -214,12 +221,64 @@ protected void stitchBlob(BlobProperties blobProperties, byte[] userMetadata, Li if (!putManager.isOpen()) { handlePutManagerClosed(blobProperties, true, futureResult, callback); } else { - putManager.submitStitchBlobOperation(blobProperties, userMetadata, chunksToStitch, futureResult, callback, - quotaChargeCallback); - routerCallback.onPollReady(); + try { + // We don't need this change if we go with changing router.max.put.chunk.size.bytes to 5 MB + List chunkInfos = flattenChunkIds(chunksToStitch, quotaChargeCallback); + putManager.submitStitchBlobOperation(blobProperties, userMetadata, chunkInfos, futureResult, callback, + quotaChargeCallback); + routerCallback.onPollReady(); + } catch (Exception e) { + nonBlockingRouter.completeOperation(futureResult, callback, null, e); + } } } + private List flattenChunkIds(List chunksToStitch, QuotaChargeCallback quotaChargeCallback) + throws RouterException, ExecutionException, InterruptedException { + + List chunkInfos = new ArrayList<>(); + logger.info("Operation controller | Flattening chunk ids. Received chunks to stitch {}", chunksToStitch); + for (ChunkInfo chunkInfo : chunksToStitch) { + BlobId blobId = RouterUtils.getBlobIdFromString(chunkInfo.getBlobId(), clusterMap); + logger.info("Operation controller | Flattening chunk ids. Blob id {}, Blob data type {}", blobId, + blobId.getBlobDataType()); + if (blobId.getBlobDataType() == METADATA) { + final FutureResult futureResult = new FutureResult<>(); + GetBlobOptions options = new GetBlobOptionsBuilder().operationType(GetBlobOptions.OperationType.All).build(); + GetBlobOptionsInternal optionsInternal = new GetBlobOptionsInternal(options, true, routerMetrics.ageAtGet); + getBlob(chunkInfo.getBlobId(), optionsInternal, (futureResult::done), quotaChargeCallback); + GetBlobResult getBlobResult = futureResult.get(); + logger.info("Operation controller | Flattening chunk ids. Received list of chunk ids for composite blob {}, {}", + blobId, getBlobResult.getBlobChunkIds()); + for (StoreKey blobChunkId : getBlobResult.getBlobChunkIds()) { + // Get blob Info + logger.info("Operation controller | Flattening chunk ids. Getting blob info for nested chunk id {}", + blobChunkId); + final FutureResult futureResultBlobInfo = new FutureResult<>(); + GetBlobOptions optionsBlobInfo = + new GetBlobOptionsBuilder().operationType(GetBlobOptions.OperationType.BlobInfo).build(); + GetBlobOptionsInternal optionsInternalBlobInfo = + new GetBlobOptionsInternal(optionsBlobInfo, false, routerMetrics.ageAtGet); + getBlob(blobChunkId.getID(), optionsInternalBlobInfo, (futureResultBlobInfo::done), quotaChargeCallback); + GetBlobResult blobInfo = futureResultBlobInfo.get(); + logger.info("Operation controller | Flattening chunk ids. Got blob info for nested chunk id {}. Blob size {}", + blobChunkId, blobInfo.getBlobInfo().getBlobProperties().getBlobSize()); + ChunkInfo subChunkInfo = + new ChunkInfo(blobChunkId.getID(), blobInfo.getBlobInfo().getBlobProperties().getBlobSize(), + chunkInfo.getExpirationTimeInMs(), chunkInfo.getReservedMetadataId()); + logger.info("Operation controller | Flattening chunk ids. Adding individual chunk id {} for stitching", + subChunkInfo); + chunkInfos.add(subChunkInfo); + } + } else { + logger.info("Operation controller | Flattening chunk ids. Adding simple blob id {} directly", blobId); + chunkInfos.add(chunkInfo); + } + } + + return chunkInfos; + } + /** * Requests for a blob to be deleted asynchronously and invokes the {@link Callback} when the request completes. * @param blobIdStr The ID of the blob that needs to be deleted in string form diff --git a/ambry-router/src/main/java/com/github/ambry/router/PutOperation.java b/ambry-router/src/main/java/com/github/ambry/router/PutOperation.java index 68d63f7763..d15a590935 100644 --- a/ambry-router/src/main/java/com/github/ambry/router/PutOperation.java +++ b/ambry-router/src/main/java/com/github/ambry/router/PutOperation.java @@ -360,9 +360,13 @@ void startOperation() { Exception exception = null; try { if (options.isChunkUpload() && options.getMaxUploadSize() > routerConfig.routerMaxPutChunkSizeBytes) { - exception = new RouterException("Invalid max upload size for chunk upload: " + options.getMaxUploadSize(), - RouterErrorCode.InvalidPutArgument); - } else if (isStitchOperation()) { + logger.info("Chunk upload has size {} greater than 4 MB", options.getMaxUploadSize()); + // If going with "router.max.put.chunk.size.bytes" = 5 MB, we can uncomment below + //exception = new RouterException("Invalid max upload size for chunk upload: " + options.getMaxUploadSize(), + // RouterErrorCode.InvalidPutArgument); + } + + if (isStitchOperation()) { processChunksToStitch(); } else { startReadingFromChannel(); @@ -670,6 +674,7 @@ void fillChunks() { if (chunkFillingCompletedSuccessfully) { // If the blob size is less than 4MB or the last chunk size is less than 4MB, than this lastChunk will be // the chunk above. + logger.info("Put operation | Chunk filling completed successfully. Building last chunk"); PutChunk lastChunk = getChunkInState(ChunkState.Building); // The last chunk could be the second chunk, if the blob size is less than 8MB. We need to check if any chunk is @@ -1522,6 +1527,7 @@ int fillFrom(ByteBuf channelReadBuf) { // If first put chunk is full, but not yet prepared then mark it awaiting resolution instead of completing it. // This chunk will be prepared as soon as either more bytes are read from the channel, or the chunk filling // is complete. At that point we will have enough information to mark this blob as simple or composite blob. + logger.info("Put operation | Finished reading first 4 MB from channel. Awaiting blob type resolution"); state = ChunkState.AwaitingBlobTypeResolution; } else { onFillComplete(true); diff --git a/config/frontend.properties b/config/frontend.properties index 8a9d024719..3b41f22328 100644 --- a/config/frontend.properties +++ b/config/frontend.properties @@ -14,13 +14,24 @@ rest.server.rest.request.service.factory=com.github.ambry.frontend.FrontendRestRequestServiceFactory # rest.server.account.service.factory=com.github.ambry.account.HelixAccountServiceFactory +mysql.named.blob.db.info=[{"url":"jdbc:mysql://localhost/AmbryNamedBlobs?serverTimezone=UTC","datacenter":"Datacenter","isWriteable":"true","username":"root","password":"password"}] +frontend.named.blob.db.factory=com.github.ambry.named.MySqlNamedBlobDbFactory +netty.server.enable.ssl=true +ssl.keystore.path=/tmp/ambry-client/certs/identity.p12 +ssl.keystore.password=work_around_jdk-6879539 +ssl.truststore.path=/etc/riddler/cacerts +ssl.truststore.password=changeit +ssl.keystore.type=PKCS12 +router.metadata.content.version=3 + # router router.hostname=localhost router.datacenter.name=Datacenter router.enable.http2.network.client=true router.put.success.target=1 router.delete.success.target=1 -# router.max.put.chunk.size.bytes=1048576 +# Putting chunk size to 6 MB (instead of 5 MB) because S3 adds signature to 5 MB chunks over HTTP. +router.max.put.chunk.size.bytes=6291456 # enable metadata cache for localhost testing router.blob.metadata.cache.enabled=true router.blob.metadata.cache.id=LocalhostMetadataCache diff --git a/config/log4j2.xml b/config/log4j2.xml index 266c0fa21d..30ca93b2c6 100644 --- a/config/log4j2.xml +++ b/config/log4j2.xml @@ -18,5 +18,9 @@ + + + + From 0a0b8711d43aed013b9dbee8a8bdf35754937fce Mon Sep 17 00:00:00 2001 From: Arun Bhima Date: Tue, 23 Jan 2024 14:57:06 -0800 Subject: [PATCH 2/3] Add S3ListHandler --- .../com/github/ambry/account/Container.java | 6 + .../com/github/ambry/frontend/Operations.java | 5 + .../com/github/ambry/rest/RequestPath.java | 54 +++++---- .../java/com/github/ambry/rest/RestUtils.java | 11 ++ .../frontend/FrontendRestRequestService.java | 11 +- .../ambry/frontend/NamedBlobListHandler.java | 45 ++----- .../ambry/frontend/NamedBlobPostHandler.java | 12 +- .../github/ambry/frontend/S3ListHandler.java | 114 ++++++++++++++++++ 8 files changed, 192 insertions(+), 66 deletions(-) create mode 100644 ambry-frontend/src/main/java/com/github/ambry/frontend/S3ListHandler.java diff --git a/ambry-api/src/main/java/com/github/ambry/account/Container.java b/ambry-api/src/main/java/com/github/ambry/account/Container.java index 4abbc1cfc7..3c58c31cfa 100644 --- a/ambry-api/src/main/java/com/github/ambry/account/Container.java +++ b/ambry-api/src/main/java/com/github/ambry/account/Container.java @@ -125,6 +125,12 @@ public class Container { */ public static final String DEFAULT_PRIVATE_CONTAINER_NAME = "default-private-container"; + /** + * Default name for the containers associated with S3 APIs. Since, S3 requests on client side only take Account + * (i.e. Bucket) name, we use a default name for containers. + */ + public static final String DEFAULT_S3_CONTAINER_NAME = "container-a"; + /** * The status of {@link #UNKNOWN_CONTAINER}. */ diff --git a/ambry-api/src/main/java/com/github/ambry/frontend/Operations.java b/ambry-api/src/main/java/com/github/ambry/frontend/Operations.java index 303fdda0be..c1a7f4b934 100644 --- a/ambry-api/src/main/java/com/github/ambry/frontend/Operations.java +++ b/ambry-api/src/main/java/com/github/ambry/frontend/Operations.java @@ -31,4 +31,9 @@ public class Operations { * First path segment for any operation on a named blob. */ public static final String NAMED_BLOB = "named"; + + /** + * First path segment for any s3 api operation. + */ + public static final String S3 = "s3"; } diff --git a/ambry-api/src/main/java/com/github/ambry/rest/RequestPath.java b/ambry-api/src/main/java/com/github/ambry/rest/RequestPath.java index a1ccb8bbb2..08e9448774 100644 --- a/ambry-api/src/main/java/com/github/ambry/rest/RequestPath.java +++ b/ambry-api/src/main/java/com/github/ambry/rest/RequestPath.java @@ -13,6 +13,8 @@ */ package com.github.ambry.rest; +import com.github.ambry.account.Container; +import com.github.ambry.frontend.Operations; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; @@ -39,6 +41,7 @@ public class RequestPath { private static final String PATH_SEPARATOR_STRING = String.valueOf(PATH_SEPARATOR_CHAR); private static final String SEGMENT = SubResource.Segment.toString(); private static final Logger logger = LoggerFactory.getLogger(RequestPath.class); + private static final String S3_PATH = PATH_SEPARATOR_CHAR + Operations.S3; /** * Parse the request path (and additional headers in some cases). The path will match the following regex-like @@ -79,31 +82,11 @@ public static RequestPath parse(RestRequest restRequest, List prefixesTo throw new RestServiceException("Invalid URI path", e, RestServiceErrorCode.BadRequest); } - // S3 requests to Ambry are in the form "PUT /s3/named-blob-sandbox/checkpoints/87833badf879a3fc7bf151adfe928eac/chk-1/_metadata" - // where "named-blob-sandbox" is the account name (no container name is added) and - // "checkpoints/87833badf879a3fc7bf151adfe928eac/chk-1/_metadata" is the key name. - - // We convert it to named blob request in the form "/named/named-blob-sandbox/container-a/checkpoints/87833badf879a3fc7bf151adfe928eac/chk-1/_metadata" - // i.e. we hardcode container name to 'container-a' - logger.info("S3 API | Input path: {}", path); - if (path.startsWith("/s3")) { + if (path.startsWith(S3_PATH)) { // Convert to named blob request internally - int accountStart = "/s3/".length(); - int accountEnd = path.indexOf("/", accountStart); - if (accountEnd == -1) { - accountEnd = path.length(); - } - String accountName = path.substring(accountStart, accountEnd); - String containerName = "container-a"; - String remainingPath = path.substring(accountEnd); - String namedPath = - "/named/" + accountName + "/" + containerName + (remainingPath.length() > 0 ? remainingPath : ""); - logger.info("S3 API | Converting S3 path to Named path. S3 path: {}, Named path: {}", path, namedPath); - path = namedPath; // Store the converted path - restRequest.setArg(S3_REQUEST, "true"); // signifies this is a s3 request - restRequest.setArg(S3_BUCKET, accountName); // store the bucket name - restRequest.setArg(S3_KEY, remainingPath); // store the key name + path = getNamedBlobPath(path); + restRequest.setArg(S3_REQUEST, "true"); } return parse(path, restRequest.getArgs(), prefixesToRemove, clusterName); @@ -334,4 +317,29 @@ private static int matchPathSegments(String path, int pathOffset, String pathSeg } return nextPathSegmentOffset; } + + /** + * Get named blob path from S3 request path + * @param path s3 request path + * @return named blob request path + */ + private static String getNamedBlobPath(String path) { + // S3 requests to Ambry are in the form "/s3/account-name/key-name". We convert it to named blob path + // "/named/account-name/container-name/key-name" internally. + // For ex: for S3 path, "/s3/named-blob-sandbox/checkpoints/87833badf879a3fc7bf151adfe928eac/chk-1/_metadata", + // the corresponding named blob path is "/named/named-blob-sandbox/container-a/checkpoints/87833badf879a3fc7bf151adfe928eac/chk-1/_metadata". + // Please note that we hardcode container-name to 'container-a'. + + int accountStart = S3_PATH.length() + PATH_SEPARATOR_STRING.length(); + int accountEnd = path.indexOf(PATH_SEPARATOR_CHAR, accountStart); + if (accountEnd == -1) { + accountEnd = path.length(); + } + String accountName = path.substring(accountStart, accountEnd); + String containerName = Container.DEFAULT_S3_CONTAINER_NAME; + String keyName = path.substring(accountEnd); + String namedBlobPath = "/named/" + accountName + "/" + containerName + (keyName.length() > 0 ? keyName : ""); + logger.info("S3 API | Converted S3 request path {} to NamedBlob path {}", path, namedBlobPath); + return namedBlobPath; + } } diff --git a/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java b/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java index aecfbb18a0..5b749e015b 100644 --- a/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java +++ b/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java @@ -49,6 +49,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.github.ambry.rest.RestUtils.InternalKeys.*; + /** * Common utility functions that will be used across implementations of REST interfaces. @@ -822,6 +824,15 @@ public static boolean isNamedBlobStitchRequest(RestRequest restRequest) { } } + /** + * Determines if the input is a S3 API request + * @param restRequest rest request + * @return {@code true} if the request is a S3 API request. + */ + public static boolean isS3Request(RestRequest restRequest) { + return restRequest.getArgs().containsKey(S3_REQUEST); + } + /** * Fetch time in ms for the {@code dateString} passed in, since epoch * @param dateString the String representation of the date that needs to be parsed diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java index f34fd7dcf9..63ba950d7d 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java @@ -104,6 +104,7 @@ class FrontendRestRequestService implements RestRequestService { private PostAccountsHandler postAccountsHandler; private PostDatasetsHandler postDatasetsHandler; private GetStatsReportHandler getStatsReportHandler; + private S3ListHandler s3ListHandler; private QuotaManager quotaManager; private boolean isUp = false; private final Random random = new Random(); @@ -222,6 +223,7 @@ public void start() throws InstantiationException { postAccountsHandler = new PostAccountsHandler(securityService, accountService, frontendConfig, frontendMetrics); postDatasetsHandler = new PostDatasetsHandler(securityService, accountService, frontendConfig, frontendMetrics, accountAndContainerInjector); + s3ListHandler = new S3ListHandler(namedBlobListHandler); namedBlobsCleanupRunner = new NamedBlobsCleanupRunner(router, namedBlobDb); if (frontendConfig.enableNamedBlobCleanupTask) { namedBlobsCleanupScheduler = Utils.newScheduler(1, "named-blobs-cleanup-", false); @@ -309,8 +311,13 @@ public void handleGet(final RestRequest restRequest, final RestResponseChannel r (result, exception) -> submitResponse(restRequest, restResponseChannel, result, exception)); } else if (requestPath.matchesOperation(Operations.NAMED_BLOB) && NamedBlobPath.parse(requestPath, restRequest.getArgs()).getBlobName() == null) { - namedBlobListHandler.handle(restRequest, restResponseChannel, - (result, exception) -> submitResponse(restRequest, restResponseChannel, result, exception)); + if (isS3Request(restRequest)) { + s3ListHandler.handle(restRequest, restResponseChannel, + (result, exception) -> submitResponse(restRequest, restResponseChannel, result, exception)); + } else { + namedBlobListHandler.handle(restRequest, restResponseChannel, + (result, exception) -> submitResponse(restRequest, restResponseChannel, result, exception)); + } } else if (RestUtils.getBooleanHeader(restRequest.getArgs(), ENABLE_DATASET_VERSION_LISTING, false) && DatasetVersionPath.parse(requestPath, restRequest.getArgs()).getVersion() == null) { listDatasetVersionHandler.handle(restRequest, restResponseChannel, diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobListHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobListHandler.java index 142f81197b..8154296af8 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobListHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobListHandler.java @@ -22,6 +22,7 @@ import com.github.ambry.commons.CallbackUtils; import com.github.ambry.named.NamedBlobDb; import com.github.ambry.named.NamedBlobRecord; +import com.github.ambry.rest.RequestPath; import com.github.ambry.rest.RestRequest; import com.github.ambry.rest.RestRequestMetrics; import com.github.ambry.rest.RestResponseChannel; @@ -152,14 +153,11 @@ private Callback securityPostProcessRequestCallback() { */ private Callback> listBlobsCallback() { return buildCallback(frontendMetrics.listDbLookupMetrics, page -> { - //ReadableStreamChannel channel = - // serializeJsonToChannel(page.toJson(record -> new NamedBlobListEntry(record).toJson())); - - // S3 expects listing of blobs in xml format. - if (restRequest.getArgs().containsKey("uploadId")) { - String bucket = (String) restRequest.getArgs().get(S3_BUCKET); - String key = (String) restRequest.getArgs().get(S3_KEY); + RequestPath requestPath = (RequestPath) restRequest.getArgs().get(REQUEST_PATH); + NamedBlobPath namedBlobPath = NamedBlobPath.parse(requestPath, restRequest.getArgs()); + String bucket = namedBlobPath.getAccountName(); + String key = namedBlobPath.getBlobName(); String uploadId = (String) restRequest.getArgs().get("uploadId"); LOGGER.info( "NamedBlobListHandler | Sending response for list upload parts. Bucket = {}, Key = {}, Upload Id = {}", @@ -178,41 +176,14 @@ private Callback> listBlobsCallback() { restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, channel.getSize()); finalCallback.onCompletion(channel, null); } else { - ReadableStreamChannel channel = serializeAsXml(page); + ReadableStreamChannel channel = + serializeJsonToChannel(page.toJson(record -> new NamedBlobListEntry(record).toJson())); restResponseChannel.setHeader(RestUtils.Headers.DATE, new GregorianCalendar().getTime()); - restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, "application/xml"); + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, "application/json"); restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, channel.getSize()); finalCallback.onCompletion(channel, null); } }, uri, LOGGER, finalCallback); } - - private ReadableStreamChannel serializeAsXml(Page namedBlobRecordPage) throws IOException { - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - - ListBucketResult listBucketResult = new ListBucketResult(); - listBucketResult.setName(restRequest.getPath()); - listBucketResult.setPrefix(restRequest.getArgs().get("prefix").toString()); - listBucketResult.setMaxKeys(1); - listBucketResult.setDelimiter("/"); - listBucketResult.setEncodingType("url"); - - List contentsList = new ArrayList<>(); - List namedBlobRecords = namedBlobRecordPage.getEntries(); - for (NamedBlobRecord namedBlobRecord : namedBlobRecords) { - Contents contents = new Contents(); - contents.setKey(namedBlobRecord.getBlobName()); - String todayDate = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").format(Calendar.getInstance().getTime()); - contents.setLastModified(todayDate); - contentsList.add(contents); - } - - listBucketResult.setContents(contentsList); - - ObjectMapper objectMapper = new XmlMapper(); - objectMapper.writeValue(outputStream, listBucketResult); - - return new ByteBufferReadableStreamChannel(ByteBuffer.wrap(outputStream.toByteArray())); - } } } diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPostHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPostHandler.java index 17297365f1..e6d17586a3 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPostHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPostHandler.java @@ -211,8 +211,10 @@ private Callback securityPostProcessRequestCallback(BlobInfo blobInfo) { LOGGER.debug("NamedBlobPostHandler | Generated {} from {}", signedUrl, restRequest); // Create xml response - String bucket = (String) restRequest.getArgs().get(S3_BUCKET); - String key = (String) restRequest.getArgs().get(S3_KEY); + RequestPath requestPath = (RequestPath) restRequest.getArgs().get(REQUEST_PATH); + NamedBlobPath namedBlobPath = NamedBlobPath.parse(requestPath, restRequest.getArgs()); + String bucket = namedBlobPath.getAccountName(); + String key = namedBlobPath.getBlobName(); LOGGER.info( "NamedBlobPostHandler | Sending response for Multipart begin upload. Bucket = {}, Key = {}, Upload Id = {}", bucket, key, signedUrl); @@ -346,8 +348,10 @@ private Callback securityProcessResponseCallback() { return buildCallback(frontendMetrics.putBlobSecurityProcessResponseMetrics, securityCheckResult -> { if (restRequest.getArgs().containsKey("uploadId") && restRequest.getArgs().containsKey(S3_REQUEST)) { // Create xml response - String bucket = (String) restRequest.getArgs().get(S3_BUCKET); - String key = (String) restRequest.getArgs().get(S3_KEY); + RequestPath requestPath = (RequestPath) restRequest.getArgs().get(REQUEST_PATH); + NamedBlobPath namedBlobPath = NamedBlobPath.parse(requestPath, restRequest.getArgs()); + String bucket = namedBlobPath.getAccountName(); + String key = namedBlobPath.getBlobName(); LOGGER.info( "NamedBlobPostHandler | Sending response for Multipart upload complete. Bucket = {}, Key = {}, etag = {}", bucket, key, restResponseChannel.getHeader(Headers.LOCATION)); diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/S3ListHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/S3ListHandler.java new file mode 100644 index 0000000000..0004e2865f --- /dev/null +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/S3ListHandler.java @@ -0,0 +1,114 @@ +/* + * Copyright 2021 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + */ +package com.github.ambry.frontend; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import com.github.ambry.commons.ByteBufferReadableStreamChannel; +import com.github.ambry.commons.Callback; +import com.github.ambry.commons.RetainingAsyncWritableChannel; +import com.github.ambry.rest.RestRequest; +import com.github.ambry.rest.RestResponseChannel; +import com.github.ambry.rest.RestUtils; +import com.github.ambry.router.ReadableStreamChannel; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Handles S3 requests for listing blobs that start with a provided prefix. + */ +public class S3ListHandler { + private final NamedBlobListHandler namedBlobListHandler; + private RestRequest restRequest; + private static final Logger LOGGER = LoggerFactory.getLogger(S3ListHandler.class); + + /** + * Constructs a handler for handling s3 requests for listing blobs. + * @param namedBlobListHandler named blob list handler + */ + S3ListHandler(NamedBlobListHandler namedBlobListHandler) { + this.namedBlobListHandler = namedBlobListHandler; + } + + /** + * Asynchronously get account metadata. + * @param restRequest the {@link RestRequest} that contains the request parameters and body. + * @param restResponseChannel the {@link RestResponseChannel} where headers should be set. + * @param callback the {@link Callback} to invoke when the response is ready (or if there is an exception). + */ + void handle(RestRequest restRequest, RestResponseChannel restResponseChannel, + Callback callback) { + this.restRequest = restRequest; + namedBlobListHandler.handle(restRequest, restResponseChannel, ((result, exception) -> { + if (exception != null) { + callback.onCompletion(result, exception); + return; + } + + try { + // Convert from json response to S3 xml response as defined in + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_ResponseSyntax. + RetainingAsyncWritableChannel writableChannel = new RetainingAsyncWritableChannel(); + result.readInto(writableChannel, null).get(1, TimeUnit.SECONDS); + JSONObject jsonObject = FrontendUtils.readJsonFromChannel(writableChannel); + Page page = Page.fromJson(jsonObject, NamedBlobListEntry::new); + ReadableStreamChannel readableStreamChannel = serializeAsXml(page); + restResponseChannel.setHeader(RestUtils.Headers.DATE, new GregorianCalendar().getTime()); + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, "application/xml"); + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, readableStreamChannel.getSize()); + callback.onCompletion(readableStreamChannel, null); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + } + + private ReadableStreamChannel serializeAsXml(Page namedBlobRecordPage) throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + ListBucketResult listBucketResult = new ListBucketResult(); + listBucketResult.setName(restRequest.getPath()); + listBucketResult.setPrefix(restRequest.getArgs().get("prefix").toString()); + List contentsList = new ArrayList<>(); + List namedBlobRecords = namedBlobRecordPage.getEntries(); + for (NamedBlobListEntry namedBlobRecord : namedBlobRecords) { + Contents contents = new Contents(); + contents.setKey(namedBlobRecord.getBlobName()); + String todayDate = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").format(Calendar.getInstance().getTime()); + contents.setLastModified(todayDate); + contentsList.add(contents); + } + listBucketResult.setMaxKeys(namedBlobRecords.size()); + listBucketResult.setDelimiter("/"); + listBucketResult.setEncodingType("url"); + listBucketResult.setContents(contentsList); + LOGGER.info("ListBucketResult is {}", listBucketResult); + + ObjectMapper objectMapper = new XmlMapper(); + objectMapper.writeValue(outputStream, listBucketResult); + return new ByteBufferReadableStreamChannel(ByteBuffer.wrap(outputStream.toByteArray())); + } +} \ No newline at end of file From e7a7eb28f9e11a786c2f70056a4bd8bb804884da Mon Sep 17 00:00:00 2001 From: Arun Bhima Date: Wed, 31 Jan 2024 21:52:03 -0800 Subject: [PATCH 3/3] Use UUID as multipart upload id instead of signed url --- .../java/com/github/ambry/rest/RestUtils.java | 12 ++++ .../frontend/AmbryIdConverterFactory.java | 20 ++---- .../ambry/frontend/NamedBlobPostHandler.java | 26 ++------ .../ambry/frontend/NamedBlobPutHandler.java | 66 +++++++++---------- 4 files changed, 54 insertions(+), 70 deletions(-) diff --git a/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java b/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java index 5b749e015b..8f41b7a90a 100644 --- a/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java +++ b/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java @@ -254,6 +254,8 @@ public static final class Headers { * stitched together. */ public static final String CHUNK_UPLOAD = "x-ambry-chunk-upload"; + + public static final String S3_CHUNK_UPLOAD = "x-ambry-chunk-upload-s3"; /** * The reserved blobid for metadata chunk of a stitched upload. */ @@ -911,6 +913,16 @@ public static boolean isChunkUpload(Map args) throws RestService return getBooleanHeader(args, Headers.CHUNK_UPLOAD, false); } + /** + * Determine if this is an chunk upload for S3 + * @param args + * @return + * @throws RestServiceException + */ + public static boolean isS3ChunkUpload(Map args) throws RestServiceException { + return getBooleanHeader(args, Headers.S3_CHUNK_UPLOAD, false); + } + /** * Return the reserved metadata id if set in the request args. Return {@code null} if not set in args. * @param args The request arguments. diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryIdConverterFactory.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryIdConverterFactory.java index 0cbea72178..a278bcbb08 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryIdConverterFactory.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/AmbryIdConverterFactory.java @@ -111,20 +111,12 @@ public Future convert(RestRequest restRequest, String input, BlobInfo bl LOGGER.info("AmbryIdConverter | convert method. Rest request: {}", restRequest); if (!isOpen) { exception = new RestServiceException("IdConverter is closed", RestServiceErrorCode.ServiceUnavailable); - } else if (restRequest.getRestMethod().equals(RestMethod.POST) && !restRequest.getArgs() - .containsKey(S3_REQUEST)) { - // For S3, POST requests with ?uploadId=<> are used in completion of multipart uploads. For eg, - // POST /s3/named-blob-sandbox/checkpoints/246cd68fa3480b2b0f9e6524fa473bca?uploadId=. - // For such use-case, we want to treat it as named blob upload + } else if (RestUtils.isChunkUpload(restRequest.getArgs())) { convertedId = "/" + signIdIfRequired(restRequest, input); - } else if (restRequest.getRestMethod().equals(RestMethod.PUT) && RestUtils.isChunkUpload(restRequest.getArgs()) - && restRequest.getArgs().containsKey(S3_REQUEST)) { + } else if (RestUtils.isS3ChunkUpload(restRequest.getArgs())) { // For S3, PUT requests with ?uploadId=<> are used in adding individual part of multipart upload. For eg, // PUT /s3_named-blob-sandbox_container-a/checkpoints/42b6b3f29b2f9e0b629ff03dac4e9302/shared/ - // c29b1701-de55-463d-a129-adaa90c1fc23?uploadId= - // http%3A%2F%2Flocalhost%3A1174%2F%3Fx-ambry-ttl%3D2419200%26x-ambry-service-id%3DFlink-S3-Client - // %26x-ambry-content-type%3Dapplication%252Foctet-stream%26x-ambry-chunk-upload%3Dtrue%26x-ambry-url-type%3D - // POST%26x-ambry-session%3D3a2aeb6f-aeed-4944-881e-19d41a6b7a22%26et%3D1703180930&partNumber=1 + // c29b1701-de55-463d-a129-adaa90c1fc23?uploadId=D3a2aeb6f-aeed-4944-881e-19d41a6b7a22&partNumber=1 // For such case, we want to give out chunk ID. convertedId = signIdIfRequired(restRequest, input); LOGGER.info("chunk upload for S3. Converted id {}", convertedId); @@ -185,7 +177,7 @@ private CompletionStage convertId(String input, RestRequest restRequest, // on delete requests we can soft delete the record from NamedBlobDb and get the blob ID in one step. conversionFuture = getNamedBlobDb().delete(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(), namedBlobPath.getBlobName()).thenApply(DeleteResult::getBlobId); - } else if (restRequest.getRestMethod() == RestMethod.PUT && RestUtils.getRequestPath(restRequest) + } else if (restRequest.getRestMethod() == RestMethod.PUT && RestUtils.getRequestPath(restRequest) .matchesOperation(Operations.UPDATE_TTL)) { //If operation == UPDATE_TTL, we will get the version and blobId info from named blob first //and do update ttl in routerCallBack. @@ -214,8 +206,8 @@ private CompletionStage convertId(String input, RestRequest restRequest, // Set named blob state as 'IN_PROGRESS', will set the state to be 'READY' in the ttlUpdate success callback: routerTtlUpdateCallback state = NamedBlobState.IN_PROGRESS; } - conversionFuture = getNamedBlobDb().put(record, state, RestUtils.isUpsertForNamedBlob(restRequest.getArgs())).thenApply( - result -> { + conversionFuture = getNamedBlobDb().put(record, state, RestUtils.isUpsertForNamedBlob(restRequest.getArgs())) + .thenApply(result -> { restRequest.setArg(RestUtils.InternalKeys.NAMED_BLOB_VERSION, result.getInsertedRecord().getVersion()); return result.getInsertedRecord().getBlobId(); }); diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPostHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPostHandler.java index e6d17586a3..e65cd28c67 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPostHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPostHandler.java @@ -57,6 +57,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -201,34 +202,24 @@ private Callback securityPostProcessRequestCallback(BlobInfo blobInfo) { return buildCallback(frontendMetrics.putSecurityPostProcessRequestMetrics, securityCheckResult -> { if (restRequest.getArgs().containsKey("uploads")) { - restRequest.setArg(RestUtils.Headers.URL_TYPE, "POST"); - restRequest.setArg(RestUtils.Headers.URL_TTL, "300"); - restRequest.setArg(RestUtils.Headers.MAX_UPLOAD_SIZE, "5242880"); - restRequest.setArg(RestUtils.Headers.CHUNK_UPLOAD, "true"); - - // Create signed url - String signedUrl = urlSigningService.getSignedUrl(restRequest); - LOGGER.debug("NamedBlobPostHandler | Generated {} from {}", signedUrl, restRequest); - - // Create xml response + String uploadId = UUID.randomUUID().toString(); RequestPath requestPath = (RequestPath) restRequest.getArgs().get(REQUEST_PATH); NamedBlobPath namedBlobPath = NamedBlobPath.parse(requestPath, restRequest.getArgs()); String bucket = namedBlobPath.getAccountName(); String key = namedBlobPath.getBlobName(); LOGGER.info( "NamedBlobPostHandler | Sending response for Multipart begin upload. Bucket = {}, Key = {}, Upload Id = {}", - bucket, key, signedUrl); + bucket, key, uploadId); InitiateMultipartUploadResult initiateMultipartUploadResult = new InitiateMultipartUploadResult(); initiateMultipartUploadResult.setBucket(bucket); initiateMultipartUploadResult.setKey(key); - initiateMultipartUploadResult.setUploadId(signedUrl); + initiateMultipartUploadResult.setUploadId(uploadId); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ObjectMapper objectMapper = new XmlMapper(); objectMapper.writeValue(outputStream, initiateMultipartUploadResult); ReadableStreamChannel channel = new ByteBufferReadableStreamChannel(ByteBuffer.wrap(outputStream.toByteArray())); restResponseChannel.setHeader(RestUtils.Headers.DATE, new GregorianCalendar().getTime()); - restResponseChannel.setHeader(RestUtils.Headers.SIGNED_URL, signedUrl); restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, "application/xml"); restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, channel.getSize()); @@ -436,7 +427,7 @@ List getChunksToStitch(BlobProperties stitchedBlobProperties, String reservedMetadataBlobId = null; List signedChunkIds = new ArrayList<>(); - LOGGER.info("NamedBlobPostHandler | received parts are {}", completeMultipartUpload.getPart()); + LOGGER.info("NamedBlobPostHandler | received parts are {}", (Object) completeMultipartUpload.getPart()); for (Part part : completeMultipartUpload.getPart()) { LOGGER.info("NamedBlobPostHandler | part is {}", part); @@ -467,15 +458,10 @@ List getChunksToStitch(BlobProperties stitchedBlobProperties, long chunkSizeBytes = RestUtils.getLongHeader(metadata, Headers.BLOB_SIZE, true); totalStitchedBlobSize += chunkSizeBytes; - // Expiration time is sent to the router, but not verified in this handler. The router is responsible for making - // checks related to internal ambry requirements, like making sure that the chunks do not expire before the - // metadata blob. - @SuppressWarnings("ConstantConditions") - long expirationTimeMs = RestUtils.getLongHeader(metadata, EXPIRATION_TIME_MS_KEY, true); verifyChunkAccountAndContainer(blobId, stitchedBlobProperties); reservedMetadataBlobId = getAndVerifyReservedMetadataBlobId(metadata, reservedMetadataBlobId, blobId); - chunksToStitch.add(new ChunkInfo(blobId, chunkSizeBytes, expirationTimeMs, reservedMetadataBlobId)); + chunksToStitch.add(new ChunkInfo(blobId, chunkSizeBytes, -1, reservedMetadataBlobId)); } //the actual blob size for stitched blob is the sum of all the chunk sizes restResponseChannel.setHeader(Headers.BLOB_SIZE, totalStitchedBlobSize); diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPutHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPutHandler.java index 87dfceef4d..a9efdbc14d 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPutHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/NamedBlobPutHandler.java @@ -66,6 +66,8 @@ import static com.github.ambry.frontend.FrontendUtils.*; import static com.github.ambry.rest.RestUtils.*; +import static com.github.ambry.rest.RestUtils.Headers.*; +import static com.github.ambry.rest.RestUtils.Headers.TARGET_DATASET_VERSION; import static com.github.ambry.rest.RestUtils.InternalKeys.*; import static com.github.ambry.router.RouterErrorCode.*; @@ -183,18 +185,11 @@ private void start() { .injectMetrics(frontendMetrics.putBlobMetricsGroup.getRestRequestMetrics(restRequest.isSslUsed(), false)); try { // Start the callback chain by parsing blob info headers and performing request security processing. - - if (restRequest.getArgs().containsKey("uploadId")) { - // This is a put chunk request in s3 multipart uploads - String signedUrl = (String) restRequest.getArgs().get("uploadId"); - LOGGER.info("Multipart chunk upload for S3. URL string is {}", signedUrl); - UrlBuilder urlBuilder = UrlBuilder.parse(signedUrl); - LOGGER.info("Built URL from string. URL is {}", urlBuilder); - for (String parameter : urlBuilder.getQuery().keySet()) { - String value = urlBuilder.getQuery().get(parameter); - LOGGER.info("Multipart chunk upload for S3. Adding query parameter {} and value {}", parameter, value); - restRequest.setArg(parameter, value); - } + String uploadId = RestUtils.getHeader(restRequest.getArgs(), "uploadId", false); + if (uploadId != null) { + LOGGER.info("This is a multipart chunk upload for S3. Session Id = {}", uploadId); + restRequest.setArg(S3_CHUNK_UPLOAD, true); + restRequest.setArg(SESSION, uploadId); } securityService.processRequest(restRequest, securityProcessRequestCallback()); @@ -255,7 +250,9 @@ private Callback securityPostProcessRequestCallback(BlobInfo blobInfo) { */ private Callback routerPutBlobCallback(BlobInfo blobInfo) { return buildCallback(frontendMetrics.putRouterPutBlobMetrics, blobId -> { - setSignedIdMetadataAndBlobSize(blobInfo.getBlobProperties()); + if (RestUtils.isS3ChunkUpload(restRequest.getArgs())) { + setSignedIdMetadataAndBlobSize(blobInfo.getBlobProperties()); + } idConverter.convert(restRequest, blobId, blobInfo, idConverterCallback(blobInfo, blobId)); }, uri, LOGGER, deleteDatasetCallback); } @@ -460,8 +457,7 @@ private Map getDatasetUserTags(RestRequest restRequest) throws R * @return the {@link PutBlobOptions} to use, parsed from the request. */ private PutBlobOptions getPutBlobOptionsFromRequest() throws RestServiceException { - boolean chunkUpload = isChunkUpload(restRequest.getArgs()); - LOGGER.info("Is chunk upload {}", chunkUpload); + boolean chunkUpload = isS3ChunkUpload(restRequest.getArgs()); PutBlobOptionsBuilder builder = new PutBlobOptionsBuilder().chunkUpload(chunkUpload).restRequest(restRequest); Long maxUploadSize = RestUtils.getLongHeader(restRequest.getArgs(), RestUtils.Headers.MAX_UPLOAD_SIZE, false); if (maxUploadSize != null) { @@ -477,25 +473,23 @@ private PutBlobOptions getPutBlobOptionsFromRequest() throws RestServiceExceptio * @throws RestServiceException */ private void setSignedIdMetadataAndBlobSize(BlobProperties blobProperties) throws RestServiceException { - if (RestUtils.isChunkUpload(restRequest.getArgs())) { - Map metadata = new HashMap<>(2); - metadata.put(RestUtils.Headers.BLOB_SIZE, Long.toString(restRequest.getBlobBytesReceived())); - metadata.put(RestUtils.Headers.SESSION, - RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.SESSION, true)); - metadata.put(EXPIRATION_TIME_MS_KEY, Long.toString( - Utils.addSecondsToEpochTime(SystemTime.getInstance().milliseconds(), - blobProperties.getTimeToLiveInSeconds()))); - if (blobProperties.getReservedMetadataBlobId() != null) { - metadata.put(RestUtils.Headers.RESERVED_METADATA_ID, blobProperties.getReservedMetadataBlobId()); - } else { - ReservedMetadataIdMetrics.getReservedMetadataIdMetrics( - frontendMetrics.getMetricRegistry()).noReservedMetadataFoundForChunkedUploadResponseCount.inc(); - throwRestServiceExceptionIfEnabled( - new RestServiceException("No reserved metadata id present to set in chunked upload response", - RestServiceErrorCode.BadRequest), router.getRouterConfig().routerReservedMetadataEnabled); - } - restRequest.setArg(RestUtils.InternalKeys.SIGNED_ID_METADATA_KEY, metadata); + Map metadata = new HashMap<>(2); + metadata.put(RestUtils.Headers.BLOB_SIZE, Long.toString(restRequest.getBlobBytesReceived())); + metadata.put(RestUtils.Headers.SESSION, + RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.SESSION, true)); + metadata.put(EXPIRATION_TIME_MS_KEY, Long.toString( + Utils.addSecondsToEpochTime(SystemTime.getInstance().milliseconds(), + blobProperties.getTimeToLiveInSeconds()))); + if (blobProperties.getReservedMetadataBlobId() != null) { + metadata.put(RestUtils.Headers.RESERVED_METADATA_ID, blobProperties.getReservedMetadataBlobId()); + } else { + ReservedMetadataIdMetrics.getReservedMetadataIdMetrics( + frontendMetrics.getMetricRegistry()).noReservedMetadataFoundForChunkedUploadResponseCount.inc(); + throwRestServiceExceptionIfEnabled( + new RestServiceException("No reserved metadata id present to set in chunked upload response", + RestServiceErrorCode.BadRequest), router.getRouterConfig().routerReservedMetadataEnabled); } + restRequest.setArg(RestUtils.InternalKeys.SIGNED_ID_METADATA_KEY, metadata); //the actual blob size is the number of bytes read restResponseChannel.setHeader(RestUtils.Headers.BLOB_SIZE, restRequest.getBlobBytesReceived()); } @@ -654,7 +648,7 @@ private void addDatasetVersion(BlobProperties blobProperties, RestRequest restRe restResponseChannel.setHeader(RestUtils.Headers.TARGET_ACCOUNT_NAME, accountName); restResponseChannel.setHeader(RestUtils.Headers.TARGET_CONTAINER_NAME, containerName); restResponseChannel.setHeader(RestUtils.Headers.TARGET_DATASET_NAME, datasetName); - restResponseChannel.setHeader(RestUtils.Headers.TARGET_DATASET_VERSION, datasetVersionRecord.getVersion()); + restResponseChannel.setHeader(TARGET_DATASET_VERSION, datasetVersionRecord.getVersion()); if (datasetVersionRecord.getExpirationTimeMs() != Utils.Infinite_Time) { restResponseChannel.setHeader(RestUtils.Headers.DATASET_EXPIRATION_TIME, new Date(datasetVersionRecord.getExpirationTimeMs())); @@ -687,7 +681,7 @@ private void updateVersionStateAndDeleteDatasetVersionOutOfRetentionCount(Callba String accountName = dataset.getAccountName(); String containerName = dataset.getContainerName(); String datasetName = dataset.getDatasetName(); - String targetVersion = (String) restResponseChannel.getHeader(Headers.TARGET_DATASET_VERSION); + String targetVersion = (String) restResponseChannel.getHeader(TARGET_DATASET_VERSION); try { accountService.updateDatasetVersionState(accountName, containerName, datasetName, targetVersion, DatasetVersionState.READY); @@ -778,7 +772,7 @@ private Callback deleteDatasetVersionIfUploadFailedCallBack(Callback c if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) { frontendMetrics.deleteDatasetVersionIfUploadFailCount.inc(); Dataset dataset = (Dataset) restRequest.getArgs().get(RestUtils.InternalKeys.TARGET_DATASET); - String version = (String) restResponseChannel.getHeader(Headers.TARGET_DATASET_VERSION); + String version = (String) restResponseChannel.getHeader(TARGET_DATASET_VERSION); accountService.deleteDatasetVersion(dataset.getAccountName(), dataset.getContainerName(), dataset.getDatasetName(), version); }