Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/apache/develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
ChineseTony committed Jun 12, 2024
2 parents 59596fa + d60198f commit 6df0255
Show file tree
Hide file tree
Showing 58 changed files with 1,157 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.rocketmq.acl.AccessResource;
import org.apache.rocketmq.acl.common.AclException;
Expand Down Expand Up @@ -268,7 +269,9 @@ public static PlainAccessResource parse(GeneratedMessageV3 messageV3, Authentica
}
} else if (NotifyClientTerminationRequest.getDescriptor().getFullName().equals(rpcFullName)) {
NotifyClientTerminationRequest request = (NotifyClientTerminationRequest) messageV3;
accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
if (StringUtils.isNotBlank(request.getGroup().getName())) {
accessResource.addGroupResourceAndPerm(request.getGroup(), Permission.SUB);
}
} else if (QueryRouteRequest.getDescriptor().getFullName().equals(rpcFullName)) {
QueryRouteRequest request = (QueryRouteRequest) messageV3;
accessResource.addResourceAndPerm(request.getTopic(), Permission.ANY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ public List<DefaultAuthorizationContext> build(Metadata metadata, GeneratedMessa
}
if (message instanceof NotifyClientTerminationRequest) {
NotifyClientTerminationRequest request = (NotifyClientTerminationRequest) message;
result = newGroupSubContexts(metadata, request.getGroup());
if (StringUtils.isNotBlank(request.getGroup().getName())) {
result = newGroupSubContexts(metadata, request.getGroup());
}
}
if (message instanceof ChangeInvisibleDurationRequest) {
ChangeInvisibleDurationRequest request = (ChangeInvisibleDurationRequest) message;
Expand Down
2 changes: 2 additions & 0 deletions broker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ java_library(
"//remoting",
"//srvutil",
"//store",
"//tieredstore",
"@maven//:ch_qos_logback_logback_classic",
"@maven//:com_alibaba_fastjson",
"@maven//:com_alibaba_fastjson2_fastjson2",
Expand Down Expand Up @@ -81,6 +82,7 @@ java_library(
"//filter",
"//remoting",
"//store",
"//tieredstore",
"@maven//:com_alibaba_fastjson",
"@maven//:com_alibaba_fastjson2_fastjson2",
"@maven//:com_google_guava_guava",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ public BrokerController(
this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);

this.brokerStatsManager.setProduerStateGetter(new BrokerStatsManager.StateGetter() {
this.brokerStatsManager.setProducerStateGetter(new BrokerStatsManager.StateGetter() {
@Override
public boolean online(String instanceId, String group, String topic) {
if (getTopicConfigManager().getTopicConfigTable().containsKey(NamespaceUtil.wrapNamespace(instanceId, topic))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
Expand Down Expand Up @@ -803,7 +803,10 @@ private void scanAvailableControllerAddresses() {

private void updateControllerAddr() {
if (brokerConfig.isFetchControllerAddrByDnsLookup()) {
this.controllerAddresses = brokerOuterAPI.dnsLookupAddressByDomain(this.brokerConfig.getControllerAddr());
List<String> adders = brokerOuterAPI.dnsLookupAddressByDomain(this.brokerConfig.getControllerAddr());
if (CollectionUtils.isNotEmpty(adders)) {
this.controllerAddresses = adders;
}
} else {
final String controllerPaths = this.brokerConfig.getControllerAddr();
final String[] controllers = controllerPaths.split(";");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHol
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
msgStoreTime, filterBitMap, properties);
this.popMessageProcessor.notifyMessageArriving(topic, queueId);
this.notificationProcessor.notifyMessageArriving(topic, queueId);

this.pullRequestHoldService.notifyMessageArriving(
topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties);
this.popMessageProcessor.notifyMessageArriving(
topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
this.notificationProcessor.notifyMessageArriving(
topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.MessageFilter;

import static org.apache.rocketmq.broker.longpolling.PollingResult.NOT_POLLING;
import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_FULL;
Expand Down Expand Up @@ -147,39 +150,61 @@ public void run() {
}

public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId) {
this.notifyMessageArrivingWithRetryTopic(topic, queueId, null, 0L, null, null);
}

public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String notifyTopic;
if (KeyBuilder.isPopRetryTopicV2(topic)) {
notifyTopic = KeyBuilder.parseNormalTopic(topic);
} else {
notifyTopic = topic;
}
notifyMessageArriving(notifyTopic, queueId);
notifyMessageArriving(notifyTopic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
}

public void notifyMessageArriving(final String topic, final int queueId) {
public void notifyMessageArriving(final String topic, final int queueId,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
if (cids == null) {
return;
}
for (Map.Entry<String, Byte> cid : cids.entrySet()) {
if (queueId >= 0) {
notifyMessageArriving(topic, cid.getKey(), -1);
notifyMessageArriving(topic, -1, cid.getKey(), tagsCode, msgStoreTime, filterBitMap, properties);
}
notifyMessageArriving(topic, cid.getKey(), queueId);
notifyMessageArriving(topic, queueId, cid.getKey(), tagsCode, msgStoreTime, filterBitMap, properties);
}
}

public boolean notifyMessageArriving(final String topic, final String cid, final int queueId) {
public boolean notifyMessageArriving(final String topic, final int queueId, final String cid,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
ConcurrentSkipListSet<PopRequest> remotingCommands = pollingMap.get(KeyBuilder.buildPollingKey(topic, cid, queueId));
if (remotingCommands == null || remotingCommands.isEmpty()) {
return false;
}

PopRequest popRequest = pollRemotingCommands(remotingCommands);
if (popRequest == null) {
return false;
}

if (popRequest.getMessageFilter() != null && popRequest.getSubscriptionData() != null) {
boolean match = popRequest.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
if (match && properties != null) {
match = popRequest.getMessageFilter().isMatchedByCommitLog(null, properties);
}
if (!match) {
remotingCommands.add(popRequest);
totalPollingNum.incrementAndGet();
return false;
}
}

if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("lock release , new msg arrive , wakeUp : {}", popRequest);
POP_LOGGER.info("lock release, new msg arrive, wakeUp: {}", popRequest);
}
return wakeUp(popRequest);
}
Expand Down Expand Up @@ -221,6 +246,11 @@ public boolean wakeUp(final PopRequest request) {
*/
public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand remotingCommand,
final PollingHeader requestHeader) {
return this.polling(ctx, remotingCommand, requestHeader, null, null);
}

public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand remotingCommand,
final PollingHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter) {
if (requestHeader.getPollTime() <= 0 || this.isStopped()) {
return NOT_POLLING;
}
Expand All @@ -234,7 +264,7 @@ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand re
}
cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
long expired = requestHeader.getBornTime() + requestHeader.getPollTime();
final PopRequest request = new PopRequest(remotingCommand, ctx, expired);
final PopRequest request = new PopRequest(remotingCommand, ctx, expired, subscriptionData, messageFilter);
boolean isFull = totalPollingNum.get() >= this.brokerController.getBrokerConfig().getMaxPopPollingSize();
if (isFull) {
POP_LOGGER.info("polling {}, result POLLING_FULL, total:{}", remotingCommand, totalPollingNum.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,35 @@
*/
package org.apache.rocketmq.broker.longpolling;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.Comparator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.rocketmq.remoting.protocol.RemotingCommand;

import io.netty.channel.Channel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.MessageFilter;

public class PopRequest {
private static final AtomicLong COUNTER = new AtomicLong(Long.MIN_VALUE);

private final RemotingCommand remotingCommand;
private final ChannelHandlerContext ctx;
private final long expired;
private final AtomicBoolean complete = new AtomicBoolean(false);
private final long op = COUNTER.getAndIncrement();

public PopRequest(RemotingCommand remotingCommand, ChannelHandlerContext ctx, long expired) {
private final long expired;
private final SubscriptionData subscriptionData;
private final MessageFilter messageFilter;

public PopRequest(RemotingCommand remotingCommand, ChannelHandlerContext ctx,
long expired, SubscriptionData subscriptionData, MessageFilter messageFilter) {

this.ctx = ctx;
this.remotingCommand = remotingCommand;
this.expired = expired;
this.subscriptionData = subscriptionData;
this.messageFilter = messageFilter;
}

public Channel getChannel() {
Expand All @@ -64,6 +71,14 @@ public long getExpired() {
return expired;
}

public SubscriptionData getSubscriptionData() {
return subscriptionData;
}

public MessageFilter getMessageFilter() {
return messageFilter;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("PopRequest{");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ public class BrokerMetricsConstant {

public static final String GAUGE_PROCESSOR_WATERMARK = "rocketmq_processor_watermark";
public static final String GAUGE_BROKER_PERMISSION = "rocketmq_broker_permission";
public static final String GAUGE_TOPIC_NUM = "rocketmq_topic_number";
public static final String GAUGE_CONSUMER_GROUP_NUM = "rocketmq_consumer_group_number";

public static final String COUNTER_MESSAGES_IN_TOTAL = "rocketmq_messages_in_total";
public static final String COUNTER_MESSAGES_OUT_TOTAL = "rocketmq_messages_out_total";
public static final String COUNTER_THROUGHPUT_IN_TOTAL = "rocketmq_throughput_in_total";
public static final String COUNTER_THROUGHPUT_OUT_TOTAL = "rocketmq_throughput_out_total";
public static final String HISTOGRAM_MESSAGE_SIZE = "rocketmq_message_size";
public static final String HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME = "rocketmq_topic_create_execution_time";
public static final String HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME = "rocketmq_consumer_group_create_execution_time";

public static final String GAUGE_PRODUCER_CONNECTIONS = "rocketmq_producer_connections";
public static final String GAUGE_CONSUMER_CONNECTIONS = "rocketmq_consumer_connections";
Expand All @@ -52,6 +56,7 @@ public class BrokerMetricsConstant {
public static final String LABEL_PROCESSOR = "processor";

public static final String LABEL_TOPIC = "topic";
public static final String LABEL_INVOCATION_STATUS = "invocation_status";
public static final String LABEL_IS_RETRY = "is_retry";
public static final String LABEL_IS_SYSTEM = "is_system";
public static final String LABEL_CONSUMER_GROUP = "consumer_group";
Expand Down
Loading

0 comments on commit 6df0255

Please sign in to comment.