Skip to content

Commit

Permalink
Merge branch 'apache:develop' into test/coverage-to-50
Browse files Browse the repository at this point in the history
  • Loading branch information
HardX8 authored Aug 15, 2024
2 parents eb10447 + 7964f06 commit 7d818a7
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.tieredstore.TieredMessageStore;

public class EscapeBridge {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
Expand Down Expand Up @@ -99,7 +101,7 @@ public PutMessageResult putMessage(MessageExtBrokerInner messageExt) {

try {
messageExt.setWaitStoreMsgOK(false);
final SendResult sendResult = putMessageToRemoteBroker(messageExt);
final SendResult sendResult = putMessageToRemoteBroker(messageExt, null);
return transformSendResult2PutResult(sendResult);
} catch (Exception e) {
LOG.error("sendMessageInFailover to remote failed", e);
Expand All @@ -112,7 +114,10 @@ public PutMessageResult putMessage(MessageExtBrokerInner messageExt) {
}
}

private SendResult putMessageToRemoteBroker(MessageExtBrokerInner messageExt) {
public SendResult putMessageToRemoteBroker(MessageExtBrokerInner messageExt, String brokerNameToSend) {
if (this.brokerController.getBrokerConfig().getBrokerName().equals(brokerNameToSend)) { // not remote broker
return null;
}
final boolean isTransHalfMessage = TransactionalMessageUtil.buildHalfTopic().equals(messageExt.getTopic());
MessageExtBrokerInner messageToPut = messageExt;
if (isTransHalfMessage) {
Expand All @@ -125,12 +130,26 @@ private SendResult putMessageToRemoteBroker(MessageExtBrokerInner messageExt) {
return null;
}

final MessageQueue mqSelected = topicPublishInfo.selectOneMessageQueue(this.brokerController.getBrokerConfig().getBrokerName());

messageToPut.setQueueId(mqSelected.getQueueId());
final MessageQueue mqSelected;
if (StringUtils.isEmpty(brokerNameToSend)) {
mqSelected = topicPublishInfo.selectOneMessageQueue(this.brokerController.getBrokerConfig().getBrokerName());
messageToPut.setQueueId(mqSelected.getQueueId());
brokerNameToSend = mqSelected.getBrokerName();
if (this.brokerController.getBrokerConfig().getBrokerName().equals(brokerNameToSend)) {
LOG.warn("putMessageToRemoteBroker failed, remote broker not found. Topic: {}, MsgId: {}, Broker: {}",
messageExt.getTopic(), messageExt.getMsgId(), brokerNameToSend);
return null;
}
} else {
mqSelected = new MessageQueue(messageExt.getTopic(), brokerNameToSend, messageExt.getQueueId());
}

final String brokerNameToSend = mqSelected.getBrokerName();
final String brokerAddrToSend = this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
if (null == brokerAddrToSend) {
LOG.warn("putMessageToRemoteBroker failed, remote broker address not found. Topic: {}, MsgId: {}, Broker: {}",
messageExt.getTopic(), messageExt.getMsgId(), brokerNameToSend);
return null;
}

final long beginTimestamp = System.currentTimeMillis();
try {
Expand Down Expand Up @@ -279,8 +298,12 @@ public CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageAsync(St
}
List<MessageExt> list = decodeMsgList(result, deCompressBody);
if (list == null || list.isEmpty()) {
LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, result is {}", topic, offset, queueId, result);
return Triple.of(null, "Can not get msg", false); // local store, so no retry
// OFFSET_FOUND_NULL returned by TieredMessageStore indicates exception occurred
boolean needRetry = GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus())
&& messageStore instanceof TieredMessageStore;
LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, needRetry {}, result is {}",
topic, offset, queueId, needRetry, result);
return Triple.of(null, "Can not get msg", needRetry);
}
return Triple.of(list.get(0), "", false);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ private boolean reachTail(PullResult pullResult, long offset) {
}

// Triple<MessageExt, info, needRetry>
private CompletableFuture<Triple<MessageExt, String, Boolean>> getBizMessage(String topic, long offset, int queueId,
String brokerName) {
return this.brokerController.getEscapeBridge().getMessageAsync(topic, offset, queueId, brokerName, false);
public CompletableFuture<Triple<MessageExt, String, Boolean>> getBizMessage(PopCheckPoint popCheckPoint, long offset) {
return this.brokerController.getEscapeBridge().getMessageAsync(popCheckPoint.getTopic(), offset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName(), false);
}

public PullResult getMessage(String group, String topic, int queueId, long offset, int nums,
Expand Down Expand Up @@ -358,7 +357,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
if (point.getTopic() == null || point.getCId() == null) {
continue;
}
map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);
map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime() + point.getBrokerName(), point);
PopMetricsManager.incPopReviveCkGetCount(point, queueId);
point.setReviveOffset(messageExt.getQueueOffset());
if (firstRt == 0) {
Expand All @@ -371,7 +370,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
}
AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime();
String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName();
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
Expand All @@ -396,7 +395,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {

BatchAckMsg bAckMsg = JSON.parseObject(raw, BatchAckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(bAckMsg, queueId);
String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime();
String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime() + bAckMsg.getBrokerName();
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
Expand Down Expand Up @@ -528,7 +527,7 @@ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {

// retry msg
long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
CompletableFuture<Pair<Long, Boolean>> future = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName())
CompletableFuture<Pair<Long, Boolean>> future = getBizMessage(popCheckPoint, msgOffset)
.thenApply(rst -> {
MessageExt message = rst.getLeft();
if (message == null) {
Expand Down Expand Up @@ -568,9 +567,9 @@ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {

private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) {
int rePutTimes = oldCK.parseRePutTimes();
if (rePutTimes >= ckRewriteIntervalsInSeconds.length) {
POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {}, {}-{}, {}, {}", oldCK.getTopic(), oldCK.getCId(),
oldCK.getBrokerName(), oldCK.getQueueId(), pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime());
if (rePutTimes >= ckRewriteIntervalsInSeconds.length && brokerController.getBrokerConfig().isSkipWhenCKRePutReachMaxTimes()) {
POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {}, {}-{}, {}, {}, {}", oldCK.getTopic(), oldCK.getCId(),
oldCK.getBrokerName(), oldCK.getQueueId(), pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime(), rePutTimes);
return;
}

Expand All @@ -588,7 +587,8 @@ private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) {
newCk.setRePutTimes(String.valueOf(rePutTimes + 1)); // always increment even if removed from reviveRequestMap
if (oldCK.getReviveTime() <= System.currentTimeMillis()) {
// never expect an ACK matched in the future, we just use it to rewrite CK and try to revive retry message next time
newCk.setInvisibleTime(oldCK.getInvisibleTime() + ckRewriteIntervalsInSeconds[rePutTimes] * 1000);
int intervalIndex = rePutTimes >= ckRewriteIntervalsInSeconds.length ? ckRewriteIntervalsInSeconds.length - 1 : rePutTimes;
newCk.setInvisibleTime(oldCK.getInvisibleTime() + ckRewriteIntervalsInSeconds[intervalIndex] * 1000);
}
MessageExtBrokerInner ckMsg = brokerController.getPopMessageProcessor().buildCkMsg(newCk, queueId);
brokerController.getMessageStore().putMessage(ckMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,23 @@
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.tieredstore.TieredMessageStore;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -58,6 +62,9 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.verify;

@RunWith(MockitoJUnitRunner.class)
public class EscapeBridgeTest {
Expand All @@ -75,6 +82,9 @@ public class EscapeBridgeTest {
@Mock
private DefaultMessageStore defaultMessageStore;

@Mock
private TieredMessageStore tieredMessageStore;

private GetMessageResult getMessageResult;

@Mock
Expand Down Expand Up @@ -200,14 +210,37 @@ public void getMessageAsyncTest_localStore_getMessageAsync_null() {
}

@Test
public void getMessageAsyncTest_localStore_decodeNothing() throws Exception {
public void getMessageAsyncTest_localStore_decodeNothing_DefaultMessageStore() throws Exception {
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore);
when(defaultMessageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any()))
.thenReturn(CompletableFuture.completedFuture(mockGetMessageResult(0, TEST_TOPIC, null)));
Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false).join();
Assert.assertNull(rst.getLeft());
Assert.assertEquals("Can not get msg", rst.getMiddle());
Assert.assertFalse(rst.getRight()); // no retry
for (GetMessageStatus status : GetMessageStatus.values()) {
GetMessageResult getMessageResult = mockGetMessageResult(0, TEST_TOPIC, null);
getMessageResult.setStatus(status);
when(defaultMessageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any()))
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false).join();
Assert.assertNull(rst.getLeft());
Assert.assertEquals("Can not get msg", rst.getMiddle());
Assert.assertFalse(rst.getRight()); // DefaultMessageStore, no retry
}
}

@Test
public void getMessageAsyncTest_localStore_decodeNothing_TieredMessageStore() throws Exception {
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(tieredMessageStore);
for (GetMessageStatus status : GetMessageStatus.values()) {
GetMessageResult getMessageResult = new GetMessageResult();
getMessageResult.setStatus(status);
when(tieredMessageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any()))
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false).join();
Assert.assertNull(rst.getLeft());
Assert.assertEquals("Can not get msg", rst.getMiddle());
if (GetMessageStatus.OFFSET_FOUND_NULL.equals(status)) {
Assert.assertTrue(rst.getRight()); // TieredMessageStore returns OFFSET_FOUND_NULL, need retry
} else {
Assert.assertFalse(rst.getRight()); // other status, like DefaultMessageStore, no retry
}
}
}

@Test
Expand Down Expand Up @@ -320,6 +353,57 @@ public void decodeMsgListTest_messageNotNull() throws Exception {
Assert.assertTrue(Arrays.equals(msg.getBody(), list.get(0).getBody()));
}

@Test
public void testPutMessageToRemoteBroker_noSpecificBrokerName_hasRemoteBroker() throws Exception {
MessageExtBrokerInner message = new MessageExtBrokerInner();
message.setTopic(TEST_TOPIC);
String anotherBrokerName = "broker_b";
TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME, anotherBrokerName);
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
when(topicRouteInfoManager.findBrokerAddressInPublish(anotherBrokerName)).thenReturn("127.0.0.1");
escapeBridge.putMessageToRemoteBroker(message, null);
verify(brokerOuterAPI).sendMessageToSpecificBroker(eq("127.0.0.1"), eq(anotherBrokerName), any(MessageExtBrokerInner.class), anyString(), anyLong());
}

@Test
public void testPutMessageToRemoteBroker_noSpecificBrokerName_noRemoteBroker() throws Exception {
MessageExtBrokerInner message = new MessageExtBrokerInner();
message.setTopic(TEST_TOPIC);
TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME);
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
escapeBridge.putMessageToRemoteBroker(message, null);
verify(topicRouteInfoManager, times(0)).findBrokerAddressInPublish(anyString());
}

@Test
public void testPutMessageToRemoteBroker_specificBrokerName_equals() throws Exception {
escapeBridge.putMessageToRemoteBroker(new MessageExtBrokerInner(), BROKER_NAME);
verify(topicRouteInfoManager, times(0)).tryToFindTopicPublishInfo(anyString());
}

@Test
public void testPutMessageToRemoteBroker_specificBrokerName_addressNotFound() throws Exception {
MessageExtBrokerInner message = new MessageExtBrokerInner();
message.setTopic(TEST_TOPIC);
TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME);
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
escapeBridge.putMessageToRemoteBroker(message, "whatever");
verify(topicRouteInfoManager).findBrokerAddressInPublish(eq("whatever"));
verify(brokerOuterAPI, times(0)).sendMessageToSpecificBroker(anyString(), anyString(), any(MessageExtBrokerInner.class), anyString(), anyLong());
}

@Test
public void testPutMessageToRemoteBroker_specificBrokerName_addressFound() throws Exception {
MessageExtBrokerInner message = new MessageExtBrokerInner();
message.setTopic(TEST_TOPIC);
String anotherBrokerName = "broker_b";
TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME, anotherBrokerName);
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
when(topicRouteInfoManager.findBrokerAddressInPublish(anotherBrokerName)).thenReturn("127.0.0.1");
escapeBridge.putMessageToRemoteBroker(message, anotherBrokerName);
verify(brokerOuterAPI).sendMessageToSpecificBroker(eq("127.0.0.1"), eq(anotherBrokerName), any(MessageExtBrokerInner.class), anyString(), anyLong());
}

private GetMessageResult mockGetMessageResult(int count, String topic, byte[] body) throws Exception {
GetMessageResult result = new GetMessageResult();
for (int i = 0; i < count; i++) {
Expand All @@ -337,4 +421,12 @@ private GetMessageResult mockGetMessageResult(int count, String topic, byte[] bo
return result;
}

private TopicPublishInfo mockTopicPublishInfo(String... brokerNames) {
TopicPublishInfo topicPublishInfo = new TopicPublishInfo();
for (String brokerName : brokerNames) {
topicPublishInfo.getMessageQueueList().add(new MessageQueue(TEST_TOPIC, brokerName, 0));
}
return topicPublishInfo;
}

}
Loading

0 comments on commit 7d818a7

Please sign in to comment.