diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 3886205d77603..0472a048f9c60 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -857,17 +857,19 @@ protected CompletableFuture internalSetNamespaceMessageTTLAsync(Integer me })); } - protected void internalSetSubscriptionExpirationTime(Integer expirationTime) { - validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_EXPIRATION_TIME, PolicyOperation.WRITE); - validatePoliciesReadOnlyAccess(); - - if (expirationTime != null && expirationTime < 0) { - throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for subscription expiration time"); - } - updatePolicies(namespaceName, policies -> { - policies.subscription_expiration_time_minutes = expirationTime; - return policies; - }); + protected CompletableFuture internalSetSubscriptionExpirationTimeAsync(Integer expirationTime) { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SUBSCRIPTION_EXPIRATION_TIME, + PolicyOperation.WRITE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenAccept(__ -> { + if (expirationTime != null && expirationTime < 0) { + throw new RestException(Status.PRECONDITION_FAILED, + "Invalid value for subscription expiration time"); + } + }).thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + policies.subscription_expiration_time_minutes = expirationTime; + return policies; + })); } protected CompletableFuture internalGetAutoTopicCreationAsync() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index e523b0a843ef7..a2e3970a9ac38 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -470,13 +470,19 @@ public void removeNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @ApiOperation(hidden = true, value = "Get the subscription expiration time for the namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) - public Integer getSubscriptionExpirationTime(@PathParam("property") String property, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { - validateAdminAccessForTenant(property); + public void getSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - - Policies policies = getNamespacePolicies(namespaceName); - return policies.subscription_expiration_time_minutes; + validateAdminAccessForTenantAsync(property) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.subscription_expiration_time_minutes)) + .exceptionally(ex -> { + log.error("[{}] Failed to get subscription expiration time for namespace {}: {} ", clientAppId(), + namespaceName, ex.getCause().getMessage(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -485,10 +491,18 @@ public Integer getSubscriptionExpirationTime(@PathParam("property") String prope @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid expiration time") }) - public void setSubscriptionExpirationTime(@PathParam("property") String property, + public void setSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, int expirationTime) { validateNamespaceName(property, cluster, namespace); - internalSetSubscriptionExpirationTime(expirationTime); + internalSetSubscriptionExpirationTimeAsync(expirationTime) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to set subscription expiration time for namespace {}: {} ", clientAppId(), + namespaceName, ex.getCause().getMessage(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @DELETE @@ -496,10 +510,18 @@ public void setSubscriptionExpirationTime(@PathParam("property") String property @ApiOperation(hidden = true, value = "Remove subscription expiration time for namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) - public void removeSubscriptionExpirationTime(@PathParam("property") String property, + public void removeSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - internalSetSubscriptionExpirationTime(null); + internalSetSubscriptionExpirationTimeAsync(null) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to remove subscription expiration time for namespace {}: {} ", clientAppId(), + namespaceName, ex.getCause().getMessage(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 2325eb704a29f..370466bf8c188 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -410,13 +410,19 @@ public void removeNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @ApiOperation(value = "Get the subscription expiration time for the namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public Integer getSubscriptionExpirationTime(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { - validateAdminAccessForTenant(tenant); + public void getSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - - Policies policies = getNamespacePolicies(namespaceName); - return policies.subscription_expiration_time_minutes; + validateAdminAccessForTenantAsync(tenant) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.subscription_expiration_time_minutes)) + .exceptionally(ex -> { + log.error("[{}] Failed to get subscription expiration time for namespace {}: {} ", clientAppId(), + namespaceName, ex.getCause().getMessage(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -425,13 +431,21 @@ public Integer getSubscriptionExpirationTime(@PathParam("tenant") String tenant, @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid expiration time")}) - public void setSubscriptionExpirationTime(@PathParam("tenant") String tenant, + public void setSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @ApiParam(value = "Expiration time in minutes for the specified namespace", required = true) int expirationTime) { validateNamespaceName(tenant, namespace); - internalSetSubscriptionExpirationTime(expirationTime); + internalSetSubscriptionExpirationTimeAsync(expirationTime) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to set subscription expiration time for namespace {}: {} ", clientAppId(), + namespaceName, ex.getCause().getMessage(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @DELETE @@ -439,10 +453,18 @@ public void setSubscriptionExpirationTime(@PathParam("tenant") String tenant, @ApiOperation(value = "Remove subscription expiration time for namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")}) - public void removeSubscriptionExpirationTime(@PathParam("tenant") String tenant, + public void removeSubscriptionExpirationTime(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - internalSetSubscriptionExpirationTime(null); + internalSetSubscriptionExpirationTimeAsync(null) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to remove subscription expiration time for namespace {}: {} ", clientAppId(), + namespaceName, ex.getCause().getMessage(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET