Skip to content

Commit

Permalink
[Broker] Fix call sync method in async rest api for internalExpireMes…
Browse files Browse the repository at this point in the history
…sagesByPosition (#13878)

### Motivation
Avoid call sync method in async rest API for PersistentTopicsBase#internalExpireMessagesByPosition.

### Modifications
Use async instead of sync method.
  • Loading branch information
liudezhi2098 authored Jan 27, 2022
1 parent a7b34c8 commit 61f99cb
Showing 1 changed file with 58 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3505,47 +3505,61 @@ private void internalExpireMessagesByTimestampForSinglePartition(String subName,

protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
MessageIdImpl messageId, boolean isExcluded, int batchIndex) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
try {
validateGlobalNamespaceOwnership(namespaceName);
} catch (Exception e) {
log.warn("[{}][{}] Failed to expire messages on subscription {} to position {}: {}", clientAppId(),
topicName, subName, messageId, e.getMessage());
resumeAsyncResponseExceptionally(asyncResponse, e);
return;
}
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
} else {
future = CompletableFuture.completedFuture(null);
}

validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);

log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(), topicName,
subName, messageId);
future.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> {
log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
return getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(partitionMetadata -> {
if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) {
String msg = "Expire message at position is not supported for partitioned-topic";
log.warn("[{}] {} {}({}) {}", clientAppId(), msg, topicName, messageId, subName);
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, msg));
return;
} else if (messageId.getPartitionIndex() != topicName.getPartitionIndex()) {
String msg = "Invalid parameter for expire message by position, partition index of "
+ "passed in message position doesn't match partition index for the topic";
log.warn("[{}] {} {}({}).", clientAppId(), msg, topicName, messageId);
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, msg));
return;
} else {
internalExpireMessagesNonPartitionedTopicByPosition(asyncResponse, subName,
messageId, isExcluded, batchIndex);
}
});
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to expire messages up to {} on subscription {} to position {}",
clientAppId(), topicName, subName, messageId, cause);
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}

// If the topic name is a partition name, no need to get partition topic metadata again
if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, authoritative, false).partitions > 0) {
log.warn("[{}] Not supported operation expire message up to {} on partitioned-topic {} {}",
clientAppId(), messageId, topicName, subName);
asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
"Expire message at position is not supported for partitioned-topic"));
return;
} else if (messageId.getPartitionIndex() != topicName.getPartitionIndex()) {
log.warn("[{}] Invalid parameter for expire message by position, partition index of passed in message"
+ " position {} doesn't match partition index of topic requested {}.",
clientAppId(), messageId, topicName);
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
"Invalid parameter for expire message by position, partition index of message position "
+ "passed in doesn't match partition index for the topic."));
} else {
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
private CompletableFuture<Void> internalExpireMessagesNonPartitionedTopicByPosition(AsyncResponse asyncResponse,
String subName,
MessageIdImpl messageId,
boolean isExcluded,
int batchIndex) {
return getTopicReferenceAsync(topicName).thenAccept(t -> {
PersistentTopic topic = (PersistentTopic) t;
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
return;
}
try {
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
return;
}
CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
Expand All @@ -3570,23 +3584,22 @@ protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, Str
} else {
if (log.isDebugEnabled()) {
log.debug("Expire message by position not issued on topic {} for subscription {} "
+ "due to ongoing message expiration not finished or subscription "
+ "almost catch up.", topicName, subName);
+ "due to ongoing message expiration not finished or subscription almost "
+ "catch up.", topicName, subName);
}
throw new RestException(Status.CONFLICT, "Expire message by position not issued on topic "
+ topicName + " for subscription " + subName + " due to ongoing message expiration"
+ " not finished or invalid message position provided.");
+ topicName + " for subscription " + subName + " due to ongoing"
+ " message expiration not finished or invalid message position provided.");
}
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
} catch (Exception exception) {
log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}",
clientAppId(), position, topicName, subName, exception);
throw new RestException(exception);
}
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}", clientAppId(),
messageId, topicName, subName, e);
log.error("[{}] Failed to expire messages up to {} on {} with subscription {} {}",
clientAppId(), messageId, topicName, subName, e);
asyncResponse.resume(e);
return null;
});
Expand All @@ -3595,8 +3608,13 @@ protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, Str
clientAppId(), topicName, messageId, subName, messageId, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
asyncResponse.resume(Response.noContent().build());
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
log.error("[{}] Failed to expire messages up to {} on subscription {} to position {}", clientAppId(),
topicName, subName, messageId, cause);
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}

protected void internalTriggerCompaction(AsyncResponse asyncResponse, boolean authoritative) {
Expand Down

0 comments on commit 61f99cb

Please sign in to comment.