Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Make some operation deduplication methods in Namespaces async. #15608

Merged
merged 2 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -906,13 +906,13 @@ protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncRespons
internalSetAutoSubscriptionCreation(asyncResponse, null);
}

protected void internalModifyDeduplication(Boolean enableDeduplication) {
validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
updatePolicies(namespaceName, policies -> {
policies.deduplicationEnabled = enableDeduplication;
return policies;
});
protected CompletableFuture<Void> internalModifyDeduplicationAsync(Boolean enableDeduplication) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.deduplicationEnabled = enableDeduplication;
return policies;
}));
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -2146,9 +2146,10 @@ protected void internalSetMaxProducersPerTopic(Integer maxProducersPerTopic) {
}
}

protected Boolean internalGetDeduplication() {
validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).deduplicationEnabled;
protected CompletableFuture<Boolean> internalGetDeduplicationAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.deduplicationEnabled);
}

protected Integer internalGetMaxConsumersPerTopic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,17 @@ public void removeNamespaceAntiAffinityGroup(@PathParam("property") String prope
@ApiOperation(hidden = true, value = "Enable or disable broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public void modifyDeduplication(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, boolean enableDeduplication) {
public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
boolean enableDeduplication) {
validateNamespaceName(property, cluster, namespace);
internalModifyDeduplication(enableDeduplication);
internalModifyDeduplicationAsync(enableDeduplication)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,32 +411,53 @@ public void removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
@ApiOperation(value = "Get broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetDeduplication();
internalGetDeduplicationAsync()
.thenAccept(deduplication -> asyncResponse.resume(deduplication))
.exceptionally(ex -> {
log.error("Failed to get broker deduplication config for namespace {}", namespace, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "Flag for disabling or enabling broker side deduplication "
+ "for all topics in the specified namespace", required = true)
boolean enableDeduplication) {
validateNamespaceName(tenant, namespace);
internalModifyDeduplication(enableDeduplication);
internalModifyDeduplicationAsync(enableDeduplication)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Remove broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public void removeDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalModifyDeduplication(null);
internalModifyDeduplicationAsync(null)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to remove broker deduplication config for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand Down