Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC for S3 API experiments (Not to be checked in) #2664

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ class InMemoryUnknownAccountService implements AccountService {
Account.ACL_INHERITED_BY_CONTAINER_DEFAULT_VALUE, Account.SNAPSHOT_VERSION_DEFAULT_VALUE,
Arrays.asList(Container.UNKNOWN_CONTAINER, Container.DEFAULT_PUBLIC_CONTAINER,
Container.DEFAULT_PRIVATE_CONTAINER), Account.QUOTA_RESOURCE_TYPE_DEFAULT_VALUE);
// Create a hardcoded Account "named-blob-account" which will be used for s3 prototype tests
static final Account NAMED_BLOB_ACCOUNT = new Account((short) 101, "named-blob-sandbox", Account.AccountStatus.ACTIVE,
Account.ACL_INHERITED_BY_CONTAINER_DEFAULT_VALUE, Account.SNAPSHOT_VERSION_DEFAULT_VALUE,
Collections.singletonList(Container.NAMED_BLOB_CONTAINER), Account.QUOTA_RESOURCE_TYPE_DEFAULT_VALUE);
private static final Collection<Account> accounts =
Collections.unmodifiableCollection(Collections.singletonList(UNKNOWN_ACCOUNT));
Collections.unmodifiableCollection(Collections.singletonList(NAMED_BLOB_ACCOUNT));
private volatile boolean isOpen = true;

@Override
public Account getAccountById(short accountId) {
checkOpen();
return accountId == Account.UNKNOWN_ACCOUNT_ID ? UNKNOWN_ACCOUNT : null;
return NAMED_BLOB_ACCOUNT;
}

@Override
Expand All @@ -59,7 +63,7 @@ public boolean removeAccountUpdateConsumer(Consumer<Collection<Account>> account
public Account getAccountByName(String accountName) {
checkOpen();
Objects.requireNonNull(accountName, "accountName cannot be null.");
return UNKNOWN_ACCOUNT;
return NAMED_BLOB_ACCOUNT;
}

@Override
Expand Down
18 changes: 18 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/account/Container.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ public class Container {
*/
public static final String DEFAULT_PRIVATE_CONTAINER_NAME = "default-private-container";

/**
* Default name for the containers associated with S3 APIs. Since, S3 requests on client side only take Account
* (i.e. Bucket) name, we use a default name for containers.
*/
public static final String DEFAULT_S3_CONTAINER_NAME = "container-a";

/**
* The status of {@link #UNKNOWN_CONTAINER}.
*/
Expand Down Expand Up @@ -288,6 +294,18 @@ public class Container {
SNAPSHOT_VERSION_DEFAULT_VALUE, ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT_VALUE, CACHE_TTL_IN_SECOND_DEFAULT_VALUE,
USER_METADATA_KEYS_TO_NOT_PREFIX_IN_RESPONSE_DEFAULT_VALUE);

// Create a container 'container-a' which will be used for s3 prototype tests
public static final Container NAMED_BLOB_CONTAINER =
new Container((short) 8, "container-a", UNKNOWN_CONTAINER_STATUS, UNKNOWN_CONTAINER_DESCRIPTION,
UNKNOWN_CONTAINER_ENCRYPTED_SETTING, UNKNOWN_CONTAINER_PREVIOUSLY_ENCRYPTED_SETTING,
UNKNOWN_CONTAINER_CACHEABLE_SETTING, UNKNOWN_CONTAINER_MEDIA_SCAN_DISABLED_SETTING, null,
UNKNOWN_CONTAINER_TTL_REQUIRED_SETTING, SECURE_PATH_REQUIRED_DEFAULT_VALUE,
CONTENT_TYPE_WHITELIST_FOR_FILENAMES_ON_DOWNLOAD_DEFAULT_VALUE, BACKUP_ENABLED_DEFAULT_VALUE,
OVERRIDE_ACCOUNT_ACL_DEFAULT_VALUE, NamedBlobMode.OPTIONAL, (short) 101,
UNKNOWN_CONTAINER_DELETE_TRIGGER_TIME, LAST_MODIFIED_TIME_DEFAULT_VALUE, SNAPSHOT_VERSION_DEFAULT_VALUE,
ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT_VALUE, CACHE_TTL_IN_SECOND_DEFAULT_VALUE,
USER_METADATA_KEYS_TO_NOT_PREFIX_IN_RESPONSE_DEFAULT_VALUE);

/**
* A container defined specifically for the blobs put without specifying target container but isPrivate flag is
* set to {@code false}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Map;
import java.util.Objects;

import static com.github.ambry.rest.RestUtils.InternalKeys.*;


/**
* Represents the blob url parsing results for named blob.
Expand Down Expand Up @@ -51,23 +53,31 @@ public static NamedBlobPath parse(String path, Map<String, Object> args) throws
String[] splitPath = path.split("/", 4);
String blobNamePrefix = RestUtils.getHeader(args, PREFIX_PARAM, false);
boolean isListRequest = blobNamePrefix != null;
// S3 can issue "HEAD /s3/named-blob-sandbox" on the bucket-name.
// The converted named blob would be /named/named-blob-sandbox/container-a. So, don't check for number of expected
// segments for S3 as of now
/*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the case we hit the exception?
After the naming translation from S3 to "named",

  • For Get, we do have four parts, "named", account, container and key.
  • For list, we have three parts, "named", account, container. At least for flink case, looks like it's still true. If we support list under any key, this is another case.

One small issue I saw was that for listing, the path is "/named/named-blob-sandbox/container-a/". With the ending "/", we split it to four parts. But if we trim the ending "/", it works fine.

int expectedSegments = isListRequest ? 3 : 4;
if (splitPath.length != expectedSegments || !Operations.NAMED_BLOB.equalsIgnoreCase(splitPath[0])) {
throw new RestServiceException(String.format(
"Path must have format '/named/<account_name>/<container_name>%s. Received path='%s', blobNamePrefix='%s'",
isListRequest ? "" : "/<blob_name>'", path, blobNamePrefix), RestServiceErrorCode.BadRequest);
}
*/
String accountName = splitPath[1];
String containerName = splitPath[2];
if (isListRequest) {
String pageToken = RestUtils.getHeader(args, PAGE_PARAM, false);
return new NamedBlobPath(accountName, containerName, null, blobNamePrefix, pageToken);
} else {
String blobName = splitPath[3];
if (blobName.length() > MAX_BLOB_NAME_LENGTH) {
throw new RestServiceException(
String.format("Blob name maximum length should be less than %s", MAX_BLOB_NAME_LENGTH),
RestServiceErrorCode.BadRequest);
String blobName = null;
if (splitPath.length == 4) {
blobName = splitPath[3];
if (blobName.length() > MAX_BLOB_NAME_LENGTH) {
throw new RestServiceException(
String.format("Blob name maximum length should be less than %s", MAX_BLOB_NAME_LENGTH),
RestServiceErrorCode.BadRequest);
}
}
return new NamedBlobPath(accountName, containerName, blobName, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public class Operations {
* First path segment for any operation on a named blob.
*/
public static final String NAMED_BLOB = "named";

/**
* First path segment for any s3 api operation.
*/
public static final String S3 = "s3";
}
44 changes: 44 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/rest/RequestPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@
*/
package com.github.ambry.rest;

import com.github.ambry.account.Container;
import com.github.ambry.frontend.Operations;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.ambry.rest.RestUtils.*;
import static com.github.ambry.rest.RestUtils.InternalKeys.*;
import static com.github.ambry.router.GetBlobOptions.*;


Expand All @@ -35,6 +40,8 @@ public class RequestPath {
private static final char PATH_SEPARATOR_CHAR = '/';
private static final String PATH_SEPARATOR_STRING = String.valueOf(PATH_SEPARATOR_CHAR);
private static final String SEGMENT = SubResource.Segment.toString();
private static final Logger logger = LoggerFactory.getLogger(RequestPath.class);
private static final String S3_PATH = PATH_SEPARATOR_CHAR + Operations.S3;

/**
* Parse the request path (and additional headers in some cases). The path will match the following regex-like
Expand Down Expand Up @@ -74,6 +81,14 @@ public static RequestPath parse(RestRequest restRequest, List<String> prefixesTo
} catch (IllegalArgumentException e) {
throw new RestServiceException("Invalid URI path", e, RestServiceErrorCode.BadRequest);
}

logger.info("S3 API | Input path: {}", path);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As another comment, probably we can have one function to parse all S3 command and generate context for each S3 commands.

if (path.startsWith(S3_PATH)) {
// Convert to named blob request internally
path = getNamedBlobPath(path);
restRequest.setArg(S3_REQUEST, "true");
}

return parse(path, restRequest.getArgs(), prefixesToRemove, clusterName);
}

Expand All @@ -94,6 +109,10 @@ public static RequestPath parse(String path, Map<String, Object> args, List<Stri
String clusterName) throws RestServiceException {
int offset = 0;

for (String arg : args.keySet()) {
System.out.println(arg + "=" + args.get(arg));
}

// remove prefix.
String prefixFound = "";
if (prefixesToRemove != null) {
Expand Down Expand Up @@ -298,4 +317,29 @@ private static int matchPathSegments(String path, int pathOffset, String pathSeg
}
return nextPathSegmentOffset;
}

/**
* Get named blob path from S3 request path
* @param path s3 request path
* @return named blob request path
*/
private static String getNamedBlobPath(String path) {
// S3 requests to Ambry are in the form "/s3/account-name/key-name". We convert it to named blob path
// "/named/account-name/container-name/key-name" internally.
// For ex: for S3 path, "/s3/named-blob-sandbox/checkpoints/87833badf879a3fc7bf151adfe928eac/chk-1/_metadata",
// the corresponding named blob path is "/named/named-blob-sandbox/container-a/checkpoints/87833badf879a3fc7bf151adfe928eac/chk-1/_metadata".
// Please note that we hardcode container-name to 'container-a'.

int accountStart = S3_PATH.length() + PATH_SEPARATOR_STRING.length();
int accountEnd = path.indexOf(PATH_SEPARATOR_CHAR, accountStart);
if (accountEnd == -1) {
accountEnd = path.length();
}
String accountName = path.substring(accountStart, accountEnd);
String containerName = Container.DEFAULT_S3_CONTAINER_NAME;
String keyName = path.substring(accountEnd);
String namedBlobPath = "/named/" + accountName + "/" + containerName + (keyName.length() > 0 ? keyName : "");
logger.info("S3 API | Converted S3 request path {} to NamedBlob path {}", path, namedBlobPath);
return namedBlobPath;
}
}
40 changes: 39 additions & 1 deletion ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.ambry.rest.RestUtils.InternalKeys.*;


/**
* Common utility functions that will be used across implementations of REST interfaces.
Expand Down Expand Up @@ -252,6 +254,8 @@ public static final class Headers {
* stitched together.
*/
public static final String CHUNK_UPLOAD = "x-ambry-chunk-upload";

public static final String S3_CHUNK_UPLOAD = "x-ambry-chunk-upload-s3";
/**
* The reserved blobid for metadata chunk of a stitched upload.
*/
Expand Down Expand Up @@ -452,6 +456,21 @@ public static final class InternalKeys {
* The version for the NamedBlob record in MySQL DB
*/
public static final String NAMED_BLOB_VERSION = KEY_PREFIX + "named-blob-version";

/**
* Boolean field set to "true" if this is a S3 request.
*/
public static final String S3_REQUEST = KEY_PREFIX + "is-s3-request";

/**
* Stores the S3 bucket-name, i.e. Ambry account name.
*/
public static final String S3_BUCKET = KEY_PREFIX + "s3-bucket";

/**
* Stores the file name.
*/
public static final String S3_KEY = KEY_PREFIX + "s3-key";
}

/**
Expand Down Expand Up @@ -541,7 +560,7 @@ public static BlobProperties buildBlobProperties(Map<String, Object> args) throw
* @throws RestServiceException if required arguments aren't present or if they aren't in the format expected.
*/
public static long getTtlFromRequestHeader(Map<String, Object> args) throws RestServiceException {
long ttl = Utils.Infinite_Time;
long ttl = 86400;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we ever discuss the life cycle management with flink and TiKV team?

Long ttlFromHeader = getLongHeader(args, Headers.TTL, false);
if (ttlFromHeader != null) {
if (ttlFromHeader < -1) {
Expand Down Expand Up @@ -807,6 +826,15 @@ public static boolean isNamedBlobStitchRequest(RestRequest restRequest) {
}
}

/**
* Determines if the input is a S3 API request
* @param restRequest rest request
* @return {@code true} if the request is a S3 API request.
*/
public static boolean isS3Request(RestRequest restRequest) {
return restRequest.getArgs().containsKey(S3_REQUEST);
}

/**
* Fetch time in ms for the {@code dateString} passed in, since epoch
* @param dateString the String representation of the date that needs to be parsed
Expand Down Expand Up @@ -885,6 +913,16 @@ public static boolean isChunkUpload(Map<String, Object> args) throws RestService
return getBooleanHeader(args, Headers.CHUNK_UPLOAD, false);
}

/**
* Determine if this is an chunk upload for S3
* @param args
* @return
* @throws RestServiceException
*/
public static boolean isS3ChunkUpload(Map<String, Object> args) throws RestServiceException {
return getBooleanHeader(args, Headers.S3_CHUNK_UPLOAD, false);
}

/**
* Return the reserved metadata id if set in the request args. Return {@code null} if not set in args.
* @param args The request arguments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.ambry.rest.RestUtils.InternalKeys.*;


/**
* Factory that instantiates an {@link IdConverter} implementation for the frontend.
Expand Down Expand Up @@ -106,10 +108,18 @@ public Future<String> convert(RestRequest restRequest, String input, BlobInfo bl
frontendMetrics.idConverterRequestRate.mark();
long startTimeInMs = System.currentTimeMillis();
try {
LOGGER.info("AmbryIdConverter | convert method. Rest request: {}", restRequest);
if (!isOpen) {
exception = new RestServiceException("IdConverter is closed", RestServiceErrorCode.ServiceUnavailable);
} else if (restRequest.getRestMethod().equals(RestMethod.POST)) {
} else if (RestUtils.isChunkUpload(restRequest.getArgs())) {
convertedId = "/" + signIdIfRequired(restRequest, input);
} else if (RestUtils.isS3ChunkUpload(restRequest.getArgs())) {
// For S3, PUT requests with ?uploadId=<> are used in adding individual part of multipart upload. For eg,
// PUT /s3_named-blob-sandbox_container-a/checkpoints/42b6b3f29b2f9e0b629ff03dac4e9302/shared/
// c29b1701-de55-463d-a129-adaa90c1fc23?uploadId=D3a2aeb6f-aeed-4944-881e-19d41a6b7a22&partNumber=1
// For such case, we want to give out chunk ID.
convertedId = signIdIfRequired(restRequest, input);
LOGGER.info("chunk upload for S3. Converted id {}", convertedId);
} else {
CallbackUtils.callCallbackAfter(convertId(input, restRequest, blobInfo),
(id, e) -> completeConversion(id, e, future, callback));
Expand Down Expand Up @@ -167,7 +177,7 @@ private CompletionStage<String> convertId(String input, RestRequest restRequest,
// on delete requests we can soft delete the record from NamedBlobDb and get the blob ID in one step.
conversionFuture = getNamedBlobDb().delete(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName()).thenApply(DeleteResult::getBlobId);
} else if (restRequest.getRestMethod() == RestMethod.PUT && RestUtils.getRequestPath(restRequest)
} else if (restRequest.getRestMethod() == RestMethod.PUT && RestUtils.getRequestPath(restRequest)
.matchesOperation(Operations.UPDATE_TTL)) {
//If operation == UPDATE_TTL, we will get the version and blobId info from named blob first
//and do update ttl in routerCallBack.
Expand All @@ -180,8 +190,9 @@ private CompletionStage<String> convertId(String input, RestRequest restRequest,
conversionFuture = getNamedBlobDb().get(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), getOption).thenApply(NamedBlobRecord::getBlobId);
}
} else if (restRequest.getRestMethod() == RestMethod.PUT && RestUtils.getRequestPath(restRequest)
.matchesOperation(Operations.NAMED_BLOB)) {
} else if ((restRequest.getRestMethod() == RestMethod.PUT || restRequest.getRestMethod() == RestMethod.POST)
&& RestUtils.getRequestPath(restRequest).matchesOperation(Operations.NAMED_BLOB)) {
// For S3, Multipart upload completion is a POST method. So, adding POST method check avove.
Objects.requireNonNull(blobInfo, "blobInfo cannot be null.");
NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs());
String blobId = RestUtils.stripSlashAndExtensionFromId(input);
Expand All @@ -195,8 +206,8 @@ private CompletionStage<String> convertId(String input, RestRequest restRequest,
// Set named blob state as 'IN_PROGRESS', will set the state to be 'READY' in the ttlUpdate success callback: routerTtlUpdateCallback
state = NamedBlobState.IN_PROGRESS;
}
conversionFuture = getNamedBlobDb().put(record, state, RestUtils.isUpsertForNamedBlob(restRequest.getArgs())).thenApply(
result -> {
conversionFuture = getNamedBlobDb().put(record, state, RestUtils.isUpsertForNamedBlob(restRequest.getArgs()))
.thenApply(result -> {
restRequest.setArg(RestUtils.InternalKeys.NAMED_BLOB_VERSION, result.getInsertedRecord().getVersion());
return result.getInsertedRecord().getBlobId();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,12 @@ public void processResponse(RestRequest restRequest, RestResponseChannel respons
case OPTIONS:
case PUT:
if (requestPath.matchesOperation(Operations.NAMED_BLOB)) {
responseChannel.setStatus(ResponseStatus.Created);
if (restRequest.getArgs().containsKey("uploadId")) {
// If PUT + ?uploadId is present, this is a s3 multipart upload for chunk. Return 200 instead of 201
responseChannel.setStatus(ResponseStatus.Ok);
} else {
responseChannel.setStatus(ResponseStatus.Created);
}
responseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, 0);
responseChannel.setHeader(RestUtils.Headers.CREATION_TIME,
new Date(blobInfo.getBlobProperties().getCreationTimeInMs()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ public String getSignedUrl(RestRequest restRequest) throws RestServiceException
argsForUrl.put(RestUtils.Headers.RESERVED_METADATA_ID, reservedMetadataBlobId);
}
}
argsForUrl.put(RestUtils.Headers.MAX_UPLOAD_SIZE, maxUploadSize);
// Commenting this for allowing multipart uploads with 5 MB. But if ambry "router.max.put.chunk.size.bytes" is
// changed to 5 MB, we don't need to comment this.
//argsForUrl.put(RestUtils.Headers.MAX_UPLOAD_SIZE, maxUploadSize);
}
argsForUrl.put(LINK_EXPIRY_TIME, time.seconds() + urlTtlSecs);

Expand Down
Loading