Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
ChineseTony committed Feb 29, 2024
2 parents c9630e9 + eed303d commit d242d94
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.rocketmq.broker.BrokerController;
Expand Down Expand Up @@ -370,15 +371,12 @@ protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx,
sendMessageContext.setCommercialOwner(owner);

Map<String, String> properties = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String uniqueKey = properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
properties.put(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
properties.put(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
requestHeader.setProperties(MessageDecoder.messageProperties2String(properties));

if (uniqueKey == null) {
uniqueKey = "";
}
sendMessageContext.setMsgUniqueKey(uniqueKey);
String uniqueKey = properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
sendMessageContext.setMsgUniqueKey(Optional.ofNullable(uniqueKey).orElse(""));

if (properties.containsKey(MessageConst.PROPERTY_SHARDING_KEY)) {
sendMessageContext.setMsgType(MessageType.Order_Msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ private RemotingCommand sendFinalMessage(MessageExtBrokerInner msgInner) {
switch (putMessageResult.getPutMessageStatus()) {
// Success
case PUT_OK:
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
this.brokerController.getBrokerStatsManager().incBrokerPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum());
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.stats.Stats;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
Expand Down Expand Up @@ -56,6 +57,8 @@
@RunWith(MockitoJUnitRunner.class)
public class EndTransactionProcessorTest {

private static final String TOPIC = "trans_topic_test";

private EndTransactionProcessor endTransactionProcessor;

@Mock
Expand Down Expand Up @@ -95,20 +98,26 @@ private OperationResult createResponse(int status) {
public void testProcessRequest() throws RemotingCommandException {
when(transactionMsgService.commitMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS));
when(messageStore.putMessage(any(MessageExtBrokerInner.class)))
.thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
.thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, createAppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, false);
RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.BROKER_PUT_NUMS, brokerController.getBrokerConfig().getBrokerClusterName()).getValue().sum()).isEqualTo(1);
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_NUMS, TOPIC).getValue().sum()).isEqualTo(1L);
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_SIZE, TOPIC).getValue().sum()).isEqualTo(1L);
}

@Test
public void testProcessRequest_CheckMessage() throws RemotingCommandException {
when(transactionMsgService.commitMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS));
when(messageStore.putMessage(any(MessageExtBrokerInner.class)))
.thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
.thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, createAppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, true);
RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.BROKER_PUT_NUMS, brokerController.getBrokerConfig().getBrokerClusterName()).getValue().sum()).isEqualTo(1);
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_NUMS, TOPIC).getValue().sum()).isEqualTo(1L);
assertThat(brokerController.getBrokerStatsManager().getStatsItem(Stats.TOPIC_PUT_SIZE, TOPIC).getValue().sum()).isEqualTo(1L);
}

@Test
Expand Down Expand Up @@ -148,6 +157,7 @@ private MessageExt createDefaultMessageExt() {
messageExt.setQueueId(0);
messageExt.setCommitLogOffset(123456789L);
messageExt.setQueueOffset(1234);
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, TOPIC);
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "0");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_PRODUCER_GROUP, "testTransactionGroup");
Expand Down Expand Up @@ -195,4 +205,12 @@ private MessageExt createRejectMessageExt() {
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "60");
return messageExt;
}

private AppendMessageResult createAppendMessageResult(AppendMessageStatus status) {
AppendMessageResult result = new AppendMessageResult(status);
result.setMsgId("12345678");
result.setMsgNum(1);
result.setWroteBytes(1);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ public void shutdown() {
try {
this.timer.stop();

for (String addr : this.channelTables.keySet()) {
this.channelTables.get(addr).close();
for (Map.Entry<String, ChannelWrapper> channel : this.channelTables.entrySet()) {
channel.getValue().close();
}

this.channelWrapperTables.clear();
Expand Down Expand Up @@ -943,8 +943,9 @@ protected void scanChannelTablesOfNameServer() {
return;
}

for (String addr : this.channelTables.keySet()) {
ChannelWrapper channelWrapper = this.channelTables.get(addr);
for (Map.Entry<String, ChannelWrapper> entry : this.channelTables.entrySet()) {
String addr = entry.getKey();
ChannelWrapper channelWrapper = entry.getValue();
if (channelWrapper == null) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public int hashCode() {
result = prime * result + (consumeEnable ? 1231 : 1237);
result = prime * result + (consumeFromMinEnable ? 1231 : 1237);
result = prime * result + (notifyConsumerIdsChangedEnable ? 1231 : 1237);
result = prime * result + (consumeMessageOrderly ? 1231 : 1237);
result = prime * result + ((groupName == null) ? 0 : groupName.hashCode());
result = prime * result + retryMaxTimes;
result = prime * result + retryQueueNums;
Expand All @@ -208,6 +209,7 @@ public boolean equals(Object obj) {
.append(consumeEnable, other.consumeEnable)
.append(consumeFromMinEnable, other.consumeFromMinEnable)
.append(consumeBroadcastEnable, other.consumeBroadcastEnable)
.append(consumeMessageOrderly, other.consumeMessageOrderly)
.append(retryQueueNums, other.retryQueueNums)
.append(retryMaxTimes, other.retryMaxTimes)
.append(whichBrokerWhenConsumeSlowly, other.whichBrokerWhenConsumeSlowly)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2183,7 +2183,7 @@ class CleanCommitLogService {
System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "");
private long lastRedeleteTimestamp = 0;

private volatile int manualDeleteFileSeveralTimes = 0;
private final AtomicInteger manualDeleteFileSeveralTimes = new AtomicInteger();

private volatile boolean cleanImmediately = false;

Expand Down Expand Up @@ -2226,7 +2226,7 @@ class CleanCommitLogService {
}

public void executeDeleteFilesManually() {
this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
this.manualDeleteFileSeveralTimes.set(MAX_MANUAL_DELETE_FILE_TIMES);
DefaultMessageStore.LOGGER.info("executeDeleteFilesManually was invoked");
}

Expand All @@ -2248,12 +2248,12 @@ private void deleteExpiredFiles() {

boolean isTimeUp = this.isTimeToDelete();
boolean isUsageExceedsThreshold = this.isSpaceToDelete();
boolean isManualDelete = this.manualDeleteFileSeveralTimes > 0;
boolean isManualDelete = this.manualDeleteFileSeveralTimes.get() > 0;

if (isTimeUp || isUsageExceedsThreshold || isManualDelete) {

if (isManualDelete) {
this.manualDeleteFileSeveralTimes--;
this.manualDeleteFileSeveralTimes.decrementAndGet();
}

boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
Expand All @@ -2262,7 +2262,7 @@ private void deleteExpiredFiles() {
fileReservedTime,
isTimeUp,
isUsageExceedsThreshold,
manualDeleteFileSeveralTimes,
manualDeleteFileSeveralTimes.get(),
cleanAtOnce,
deleteFileBatchMax);

Expand Down Expand Up @@ -2407,11 +2407,11 @@ private boolean isSpaceToDelete() {
}

public int getManualDeleteFileSeveralTimes() {
return manualDeleteFileSeveralTimes;
return manualDeleteFileSeveralTimes.get();
}

public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {
this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
this.manualDeleteFileSeveralTimes.set(manualDeleteFileSeveralTimes);
}

public double calcStorePathPhysicRatio() {
Expand Down

0 comments on commit d242d94

Please sign in to comment.