diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index d53454f215d..690a47c5156 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -106,11 +106,11 @@ private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader re LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), globalOffset, true); mappingContext.setCurrentItem(mappingItem); - if (globalOffset < mappingItem.getLogicOffset()) { + //if (globalOffset < mappingItem.getLogicOffset()) { //handleOffsetMoved //If the physical queue is reused, we should handle the PULL_OFFSET_MOVED independently //Otherwise, we could just transfer it to the physical process - } + //} //below are physical info String bname = mappingItem.getBname(); Integer phyQueueId = mappingItem.getQueueId(); @@ -536,14 +536,13 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re getMessageResult.setNextBeginOffset(broadcastInitOffset); } else { SubscriptionData finalSubscriptionData = subscriptionData; - RemotingCommand finalResponse = response; messageStore.getMessageAsync(group, topic, queueId, requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter) .thenApply(result -> { if (null == result) { - finalResponse.setCode(ResponseCode.SYSTEM_ERROR); - finalResponse.setRemark("store getMessage return null"); - return finalResponse; + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("store getMessage return null"); + return response; } brokerController.getColdDataCgCtrService().coldAcc(requestHeader.getConsumerGroup(), result.getColdDataSum()); return pullMessageResultHandler.handle( @@ -555,7 +554,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re subscriptionGroupConfig, brokerAllowSuspend, messageFilter, - finalResponse, + response, mappingContext, beginTimeMills );