Skip to content

Commit

Permalink
[StorageQuota] Unify interface with QuotaManager (#1801)
Browse files Browse the repository at this point in the history
Integrate storage quota into unified quota manager interface.
  • Loading branch information
justinlin-linkedin authored Apr 13, 2021
1 parent fc6181e commit da4f106
Show file tree
Hide file tree
Showing 33 changed files with 565 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface QuotaEnforcer {
/**
* Method to initialize the {@link QuotaEnforcer}.
*/
void init();
void init() throws Exception;

/**
* Makes an {@link QuotaRecommendation} using the information in {@link BlobInfo} and {@link RestRequest}. This
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface QuotaManager {
/**
* Method to initialize the {@link QuotaManager}.
*/
void init();
void init() throws InstantiationException;

/**
* Computes the overall boolean recommendation to throttle a request or not for all the types of request quotas supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.github.ambry.rest.RestRequest;
import com.github.ambry.rest.RestUtils.InternalKeys;


/**
* {@link StorageQuotaService} is the component to handles storage quota for different {@link Account} and {@link Container}.
* It keeps track of the storage usage of different {@link Container}s and decides to throttle the Frontend operations
Expand Down
41 changes: 39 additions & 2 deletions ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.github.ambry.account.Account;
import com.github.ambry.account.Container;
import com.github.ambry.frontend.Operations;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.protocol.GetOption;
import com.github.ambry.quota.QuotaName;
Expand Down Expand Up @@ -671,6 +672,42 @@ public static RequestPath getRequestPath(RestRequest restRequest) {
InternalKeys.REQUEST_PATH + " not set in " + restRequest);
}

/**
* Return true if this request is uploading a blob. We now have two ways of uploading a blob
* 1. A POST request to root path
* 2. A PUT request to namedBlob path
* Notice that stitch requests are not uploads since chunks are uploaded through signed url post.
* @param restRequest The {@link RestRequest}.
* @return
*/
public static boolean isUploadRequest(RestRequest restRequest) {
RequestPath requestPath = RestUtils.getRequestPath(restRequest);
RestMethod method = restRequest.getRestMethod();
// For POST request, when the operation is "", it's upload
// For PUT request, when the operation is named blob, it's named upload upload. However, we have to exclude the
// case for stitch named blob.
return method == RestMethod.POST && requestPath.getOperationOrBlobId(true).isEmpty()
|| method == RestMethod.PUT && requestPath.matchesOperation(Operations.NAMED_BLOB) && !isNamedBlobStitchRequest(
restRequest);
}

/**
* Return true when the given named blob request is a stitch request. The {@code restRequest} has to be a named blob upload
* request, which means it's PUT request and the operation in requestPath is namedBlob.
* Notice that this method doesn't enforce this precondition.
* @param restRequest
* @return
*/
public static boolean isNamedBlobStitchRequest(RestRequest restRequest) {
// This request has to be NamedBlob Request, which means it's PUT request and the operation in requestPath is namedBlob.
final String STITCH = "STITCH";
try {
return STITCH.equals(RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.UPLOAD_NAMED_BLOB_MODE, false));
} catch (RestServiceException e) {
return false;
}
}

/**
* Fetch time in ms for the {@code dateString} passed in, since epoch
* @param dateString the String representation of the date that needs to be parsed
Expand Down Expand Up @@ -1010,10 +1047,10 @@ public static Map<String, String> buildUserQuotaHeadersMap(ThrottlingRecommendat
.stream()
.collect(Collectors.toMap(e -> e.getKey().name(), e -> String.valueOf(e.getValue())))));


// set retry header if present.
if (throttlingRecommendation.getRetryAfterMs() != ThrottlingRecommendation.NO_RETRY_AFTER_MS) {
quotaHeadersMap.put(RequestQuotaHeaders.RETRY_AFTER_MS, String.valueOf(throttlingRecommendation.getRetryAfterMs()));
quotaHeadersMap.put(RequestQuotaHeaders.RETRY_AFTER_MS,
String.valueOf(throttlingRecommendation.getRetryAfterMs()));
}

// set the warning header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class MockRestRequestServiceFactory implements RestRequestServiceFactory
private final Router router;

public MockRestRequestServiceFactory(VerifiableProperties verifiableProperties, ClusterMap clusterMap, Router router,
AccountService accountService, QuotaManager quotaManager) {
AccountService accountService) {
this.verifiableProperties = verifiableProperties;
this.router = router;
}
Expand Down
43 changes: 43 additions & 0 deletions ambry-api/src/test/java/com/github/ambry/rest/RestUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.github.ambry.account.Account;
import com.github.ambry.account.Container;
import com.github.ambry.account.InMemAccountService;
import com.github.ambry.frontend.Operations;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.protocol.GetOption;
import com.github.ambry.quota.QuotaName;
Expand All @@ -37,11 +38,13 @@
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;
import org.json.JSONException;
import org.json.JSONObject;
import org.junit.Test;
Expand Down Expand Up @@ -907,6 +910,46 @@ public void decodeKVHeaderValueTest() {
}
}

/**
* Test request is upload request.
* @throws Exception
*/
@Test
public void testIsUploadRequest() throws Exception {
List<String> operations = Utils.getStaticFieldValuesAsStrings(Operations.class).collect(Collectors.toList());
RestMethod[] methods = RestMethod.values();

for (String operation : operations) {
for (RestMethod method : methods) {
JSONObject header = new JSONObject();
header.put(RestUtils.InternalKeys.REQUEST_PATH,
RequestPath.parse("/" + operation, Collections.emptyMap(), Collections.emptyList(), "ambry-test"));
RestRequest request = createRestRequest(method, "/" + operation, header);
boolean isUpload = RestUtils.isUploadRequest(request);
assertEquals(operation.equals(Operations.NAMED_BLOB) && method == RestMethod.PUT, isUpload);
}
}
// One exception for PUT named blob
{
JSONObject header = new JSONObject();
header.put(RestUtils.InternalKeys.REQUEST_PATH,
RequestPath.parse("/" + Operations.NAMED_BLOB, Collections.emptyMap(), Collections.emptyList(),
"ambry-test"));
header.put(RestUtils.Headers.UPLOAD_NAMED_BLOB_MODE, "STITCH");
RestRequest request = createRestRequest(RestMethod.PUT, "/" + Operations.NAMED_BLOB, header);
assertFalse(RestUtils.isUploadRequest(request));
}

for (RestMethod method : methods) {
JSONObject header = new JSONObject();
header.put(RestUtils.InternalKeys.REQUEST_PATH,
RequestPath.parse("/", Collections.emptyMap(), Collections.emptyList(), "ambry-test"));
RestRequest request = createRestRequest(method, "/", header);
boolean isUpload = RestUtils.isUploadRequest(request);
assertEquals(method == RestMethod.POST, isUpload);
}
}

// helpers.
// general.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,16 @@ class AmbrySecurityService implements SecurityService {
private final FrontendMetrics frontendMetrics;
private final UrlSigningService urlSigningService;
private final HostLevelThrottler hostLevelThrottler;
private final StorageQuotaService storageQuotaService;
private final QuotaManager quotaManager;
private final RequestCostPolicy requestCostPolicy;
private boolean isOpen;

AmbrySecurityService(FrontendConfig frontendConfig, FrontendMetrics frontendMetrics,
UrlSigningService urlSigningService, HostLevelThrottler hostLevelThrottler,
StorageQuotaService storageQuotaService, QuotaManager quotaManager) {
UrlSigningService urlSigningService, HostLevelThrottler hostLevelThrottler, QuotaManager quotaManager) {
this.frontendConfig = frontendConfig;
this.frontendMetrics = frontendMetrics;
this.urlSigningService = urlSigningService;
this.hostLevelThrottler = hostLevelThrottler;
this.storageQuotaService = storageQuotaService;
this.quotaManager = quotaManager;
this.requestCostPolicy = new UserQuotaRequestCostPolicy();
isOpen = true;
Expand Down Expand Up @@ -132,15 +129,13 @@ public void postProcessRequest(RestRequest restRequest, Callback<Void> callback)
exception = new RestServiceException("SecurityService is closed", RestServiceErrorCode.ServiceUnavailable);
} else if (hostLevelThrottler.shouldThrottle(restRequest)) {
exception = new RestServiceException("Too many requests", RestServiceErrorCode.TooManyRequests);
} else if (storageQuotaService != null && storageQuotaService.shouldThrottle(restRequest)) {
exception = new RestServiceException("StorageQuotaExceeded", RestServiceErrorCode.TooManyRequests);
} else {
if (quotaManager != null) {
ThrottlingRecommendation throttlingRecommendation = quotaManager.getThrottleRecommendation(restRequest);
if (throttlingRecommendation != null && throttlingRecommendation.shouldThrottle()) {
Map<String, String> quotaHeaderMap = RestUtils.buildUserQuotaHeadersMap(throttlingRecommendation);
throw new RestServiceException("User Quota Exceeded", RestServiceErrorCode.TooManyRequests, true, true,
quotaHeaderMap);
quotaHeaderMap);
}
}
if (restRequest.getRestMethod() == RestMethod.DELETE || restRequest.getRestMethod() == RestMethod.PUT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.github.ambry.config.HostThrottleConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.quota.QuotaManager;
import com.github.ambry.quota.storage.StorageQuotaService;


/**
Expand All @@ -34,24 +33,21 @@ public class AmbrySecurityServiceFactory implements SecurityServiceFactory {
private final HostThrottleConfig hostThrottleConfig;
private final FrontendMetrics frontendMetrics;
private final UrlSigningService urlSigningService;
private final StorageQuotaService storageQuotaService;
private final QuotaManager quotaManager;

public AmbrySecurityServiceFactory(VerifiableProperties verifiableProperties, ClusterMap clusterMap,
AccountService accountService, UrlSigningService urlSigningService, IdSigningService idSigningService,
AccountAndContainerInjector accountAndContainerInjector, StorageQuotaService storageQuotaService,
QuotaManager quotaManager) {
AccountAndContainerInjector accountAndContainerInjector, QuotaManager quotaManager) {
frontendConfig = new FrontendConfig(verifiableProperties);
hostThrottleConfig = new HostThrottleConfig(verifiableProperties);
frontendMetrics = new FrontendMetrics(clusterMap.getMetricRegistry());
this.urlSigningService = urlSigningService;
this.storageQuotaService = storageQuotaService;
this.quotaManager = quotaManager;
}

@Override
public SecurityService getSecurityService() {
return new AmbrySecurityService(frontendConfig, frontendMetrics, urlSigningService,
new HostLevelThrottler(hostThrottleConfig), storageQuotaService, quotaManager);
new HostLevelThrottler(hostThrottleConfig), quotaManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.github.ambry.messageformat.BlobInfo;
import com.github.ambry.named.NamedBlobDb;
import com.github.ambry.protocol.GetOption;
import com.github.ambry.quota.storage.StorageQuotaService;
import com.github.ambry.quota.QuotaManager;
import com.github.ambry.rest.RequestPath;
import com.github.ambry.rest.ResponseStatus;
import com.github.ambry.rest.RestMethod;
Expand Down Expand Up @@ -103,7 +103,7 @@ class FrontendRestRequestService implements RestRequestService {
private GetAccountsHandler getAccountsHandler;
private PostAccountsHandler postAccountsHandler;
private GetStatsReportHandler getStatsReportHandler;
private StorageQuotaService storageQuotaService;
private QuotaManager quotaManager;
private boolean isUp = false;

/**
Expand All @@ -123,15 +123,13 @@ class FrontendRestRequestService implements RestRequestService {
* @param datacenterName the local datacenter name for this frontend.
* @param hostname the hostname for this frontend.
* @param clusterName the name of the storage cluster that the router communicates with.
* @param storageQuotaService the {@link StorageQuotaService} used to throttle traffics.
* @param accountStatsStore the {@link AccountStatsStore} used to fetch aggregated stats reports.
*/
FrontendRestRequestService(FrontendConfig frontendConfig, FrontendMetrics frontendMetrics, Router router,
ClusterMap clusterMap, IdConverterFactory idConverterFactory, SecurityServiceFactory securityServiceFactory,
UrlSigningService urlSigningService, IdSigningService idSigningService, NamedBlobDb namedBlobDb,
AccountService accountService, AccountAndContainerInjector accountAndContainerInjector, String datacenterName,
String hostname, String clusterName, StorageQuotaService storageQuotaService,
AccountStatsStore accountStatsStore) {
String hostname, String clusterName, AccountStatsStore accountStatsStore, QuotaManager quotaManager) {
this.frontendConfig = frontendConfig;
this.frontendMetrics = frontendMetrics;
this.router = router;
Expand All @@ -146,9 +144,9 @@ class FrontendRestRequestService implements RestRequestService {
this.accountStatsStore = accountStatsStore;
this.datacenterName = datacenterName;
this.hostname = hostname;
this.quotaManager = quotaManager;
this.clusterName = clusterName.toLowerCase();
getReplicasHandler = new GetReplicasHandler(frontendMetrics, clusterMap);
this.storageQuotaService = storageQuotaService;
logger.trace("Instantiated FrontendRestRequestService");
}

Expand All @@ -166,16 +164,9 @@ public void start() throws InstantiationException {
throw new InstantiationException("ResponseHandler is not set.");
}
long startupBeginTime = System.currentTimeMillis();
quotaManager.init();
idConverter = idConverterFactory.getIdConverter();
securityService = securityServiceFactory.getSecurityService();
if (storageQuotaService != null) {
try {
storageQuotaService.start();
} catch (Exception e) {
logger.error("Failed to start storage quota service", e);
throw new InstantiationException("StorageQuotaService fail to start");
}
}
getPeersHandler = new GetPeersHandler(clusterMap, securityService, frontendMetrics);
getSignedUrlHandler =
new GetSignedUrlHandler(urlSigningService, securityService, idConverter, accountAndContainerInjector,
Expand Down Expand Up @@ -208,6 +199,10 @@ public void shutdown() {
long shutdownBeginTime = System.currentTimeMillis();
isUp = false;
try {
if (quotaManager != null) {
quotaManager.shutdown();
quotaManager = null;
}
if (securityService != null) {
securityService.close();
securityService = null;
Expand All @@ -216,9 +211,6 @@ public void shutdown() {
idConverter.close();
idConverter = null;
}
if (storageQuotaService != null) {
storageQuotaService.shutdown();
}
logger.info("FrontendRestRequestService shutdown complete");
} catch (IOException e) {
logger.error("Downstream service close failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.FrontendConfig;
import com.github.ambry.config.QuotaConfig;
import com.github.ambry.config.StatsManagerConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.named.NamedBlobDb;
import com.github.ambry.named.NamedBlobDbFactory;
import com.github.ambry.quota.MaxThrottlePolicy;
import com.github.ambry.quota.QuotaManager;
import com.github.ambry.quota.storage.StorageQuotaService;
import com.github.ambry.quota.storage.StorageQuotaServiceFactory;
import com.github.ambry.quota.QuotaManagerFactory;
import com.github.ambry.rest.RestRequestService;
import com.github.ambry.rest.RestRequestServiceFactory;
import com.github.ambry.router.Router;
Expand All @@ -50,7 +51,6 @@ public class FrontendRestRequestServiceFactory implements RestRequestServiceFact
private final ClusterMapConfig clusterMapConfig;
private final Router router;
private final AccountService accountService;
private final QuotaManager quotaManager;

/**
* Creates a new instance of FrontendRestRequestServiceFactory.
Expand All @@ -61,12 +61,11 @@ public class FrontendRestRequestServiceFactory implements RestRequestServiceFact
* @throws IllegalArgumentException if any of the arguments are null.
*/
public FrontendRestRequestServiceFactory(VerifiableProperties verifiableProperties, ClusterMap clusterMap,
Router router, AccountService accountService, QuotaManager quotaManager) {
Router router, AccountService accountService) {
this.verifiableProperties = Objects.requireNonNull(verifiableProperties, "Provided VerifiableProperties is null");
this.clusterMap = Objects.requireNonNull(clusterMap, "Provided ClusterMap is null");
this.router = Objects.requireNonNull(router, "Provided Router is null");
this.accountService = Objects.requireNonNull(accountService, "Provided AccountService is null");
this.quotaManager = quotaManager;
clusterMapConfig = new ClusterMapConfig(verifiableProperties);
frontendConfig = new FrontendConfig(verifiableProperties);
frontendMetrics = new FrontendMetrics(clusterMap.getMetricRegistry());
Expand Down Expand Up @@ -94,23 +93,21 @@ public RestRequestService getRestRequestService() {
clusterMap.getMetricRegistry()).getUrlSigningService();
AccountAndContainerInjector accountAndContainerInjector =
new AccountAndContainerInjector(accountService, frontendMetrics, frontendConfig);
StorageQuotaService storageQuotaService = null;
AccountStatsStore accountStatsStore =
Utils.<AccountStatsStoreFactory>getObj(frontendConfig.accountStatsStoreFactory, verifiableProperties,
clusterMapConfig, new StatsManagerConfig(verifiableProperties),
clusterMap.getMetricRegistry()).getAccountStatsStore();
if (frontendConfig.enableStorageQuotaService) {
storageQuotaService =
Utils.<StorageQuotaServiceFactory>getObj(frontendConfig.storageQuotaServiceFactory, verifiableProperties,
accountStatsStore, clusterMap.getMetricRegistry()).getStorageQuotaService();
}
QuotaConfig quotaConfig = new QuotaConfig(verifiableProperties);
QuotaManager quotaManager =
((QuotaManagerFactory) Utils.getObj(quotaConfig.quotaManagerFactory, quotaConfig, new MaxThrottlePolicy(),
accountService, accountStatsStore)).getQuotaManager();
SecurityServiceFactory securityServiceFactory =
Utils.getObj(frontendConfig.securityServiceFactory, verifiableProperties, clusterMap, accountService,
urlSigningService, idSigningService, accountAndContainerInjector, storageQuotaService, quotaManager);
urlSigningService, idSigningService, accountAndContainerInjector, quotaManager);
return new FrontendRestRequestService(frontendConfig, frontendMetrics, router, clusterMap, idConverterFactory,
securityServiceFactory, urlSigningService, idSigningService, namedBlobDb, accountService,
accountAndContainerInjector, clusterMapConfig.clusterMapDatacenterName, clusterMapConfig.clusterMapHostName,
clusterMapConfig.clusterMapClusterName, storageQuotaService, accountStatsStore);
clusterMapConfig.clusterMapClusterName, accountStatsStore, quotaManager);
} catch (Exception e) {
throw new IllegalStateException("Could not instantiate FrontendRestRequestService", e);
}
Expand Down
Loading

0 comments on commit da4f106

Please sign in to comment.