Skip to content

Commit

Permalink
[ISSUE #8483] Optimize unnecessary broker reverse notification (notif…
Browse files Browse the repository at this point in the history
…yConsumerIdsChanged) in broadcast mode (#8484)

* [ISSUE #8483] Optimize unnecessary broker reverse notification (notifyConsumerIdsChanged) in broadcast mode

* Update

* Update test

* Update test
  • Loading branch information
yx9o authored Aug 30, 2024
1 parent e5e3839 commit 71ec1ed
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, next.getKey());
}
}

callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
if (!isBroadcastMode(info.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
}
}
}
return removed;
Expand Down Expand Up @@ -196,7 +197,7 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
}

if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
if (isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
Expand All @@ -219,7 +220,7 @@ public boolean registerConsumerWithoutSub(final String group, final ClientChanne
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean updateChannelRst = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);
if (updateChannelRst && isNotifyConsumerIdsChangedEnable) {
if (updateChannelRst && isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
if (null != this.brokerStatsManager) {
Expand All @@ -244,7 +245,7 @@ public void unregisterConsumer(final String group, final ClientChannelInfo clien
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
}
}
if (isNotifyConsumerIdsChangedEnable) {
if (isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
Expand Down Expand Up @@ -334,4 +335,8 @@ protected void callConsumerIdsChangeListener(ConsumerGroupEvent event, String gr
}
}
}

private boolean isBroadcastMode(final MessageModel messageModel) {
return MessageModel.BROADCASTING.equals(messageModel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
package org.apache.rocketmq.broker.client;

import io.netty.channel.Channel;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
Expand All @@ -30,13 +27,25 @@
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.HashSet;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -49,19 +58,10 @@ public class ConsumerManagerTest {

private ConsumerManager consumerManager;

private DefaultConsumerIdsChangeListener defaultConsumerIdsChangeListener;

@Mock
private BrokerController brokerController;

@Mock
private ConsumerFilterManager consumerFilterManager;

private BrokerConfig brokerConfig = new BrokerConfig();

private Broker2Client broker2Client;

private BrokerStatsManager brokerStatsManager;
private final BrokerConfig brokerConfig = new BrokerConfig();

private static final String GROUP = "DEFAULT_GROUP";

Expand All @@ -74,40 +74,38 @@ public class ConsumerManagerTest {
@Before
public void before() {
clientChannelInfo = new ClientChannelInfo(channel, CLIENT_ID, LanguageCode.JAVA, VERSION);
defaultConsumerIdsChangeListener = new DefaultConsumerIdsChangeListener(brokerController);
brokerStatsManager = new BrokerStatsManager(brokerConfig);
consumerManager = new ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager, brokerConfig);
broker2Client = new Broker2Client(brokerController);
DefaultConsumerIdsChangeListener defaultConsumerIdsChangeListener = new DefaultConsumerIdsChangeListener(brokerController);
BrokerStatsManager brokerStatsManager = new BrokerStatsManager(brokerConfig);
consumerManager = spy(new ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager, brokerConfig));
ConsumerFilterManager consumerFilterManager = mock(ConsumerFilterManager.class);
when(brokerController.getConsumerFilterManager()).thenReturn(consumerFilterManager);
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
when(brokerController.getBroker2Client()).thenReturn(broker2Client);
}

@Test
public void compensateBasicConsumerInfoTest() {
ConsumerGroupInfo consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
Assertions.assertThat(consumerGroupInfo).isNull();
assertThat(consumerGroupInfo).isNull();

consumerManager.compensateBasicConsumerInfo(GROUP, ConsumeType.CONSUME_ACTIVELY, MessageModel.BROADCASTING);
consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
Assertions.assertThat(consumerGroupInfo).isNotNull();
Assertions.assertThat(consumerGroupInfo.getConsumeType()).isEqualTo(ConsumeType.CONSUME_ACTIVELY);
Assertions.assertThat(consumerGroupInfo.getMessageModel()).isEqualTo(MessageModel.BROADCASTING);
assertThat(consumerGroupInfo).isNotNull();
assertThat(consumerGroupInfo.getConsumeType()).isEqualTo(ConsumeType.CONSUME_ACTIVELY);
assertThat(consumerGroupInfo.getMessageModel()).isEqualTo(MessageModel.BROADCASTING);
}

@Test
public void compensateSubscribeDataTest() {
ConsumerGroupInfo consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
Assertions.assertThat(consumerGroupInfo).isNull();
assertThat(consumerGroupInfo).isNull();

consumerManager.compensateSubscribeData(GROUP, TOPIC, new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
Assertions.assertThat(consumerGroupInfo).isNotNull();
Assertions.assertThat(consumerGroupInfo.getSubscriptionTable().size()).isEqualTo(1);
assertThat(consumerGroupInfo).isNotNull();
assertThat(consumerGroupInfo.getSubscriptionTable().size()).isEqualTo(1);
SubscriptionData subscriptionData = consumerGroupInfo.getSubscriptionTable().get(TOPIC);
Assertions.assertThat(subscriptionData).isNotNull();
Assertions.assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
Assertions.assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
assertThat(subscriptionData).isNotNull();
assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
}

@Test
Expand All @@ -118,7 +116,8 @@ public void registerConsumerTest() {
subList.add(subscriptionData);
consumerManager.registerConsumer(GROUP, clientChannelInfo, ConsumeType.CONSUME_PASSIVELY,
MessageModel.BROADCASTING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true);
Assertions.assertThat(consumerManager.getConsumerTable().get(GROUP)).isNotNull();
verify(consumerManager, never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(), any());
assertThat(consumerManager.getConsumerTable().get(GROUP)).isNotNull();
}

@Test
Expand All @@ -128,63 +127,65 @@ public void unregisterConsumerTest() {

// unregister
consumerManager.unregisterConsumer(GROUP, clientChannelInfo, true);
Assertions.assertThat(consumerManager.getConsumerTable().get(GROUP)).isNull();
verify(consumerManager, never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(), any());
assertThat(consumerManager.getConsumerTable().get(GROUP)).isNull();
}

@Test
public void findChannelTest() {
register();
final ClientChannelInfo consumerManagerChannel = consumerManager.findChannel(GROUP, CLIENT_ID);
Assertions.assertThat(consumerManagerChannel).isNotNull();
assertThat(consumerManagerChannel).isNotNull();
}

@Test
public void findSubscriptionDataTest() {
register();
final SubscriptionData subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC);
Assertions.assertThat(subscriptionData).isNotNull();
assertThat(subscriptionData).isNotNull();
}

@Test
public void findSubscriptionDataCountTest() {
register();
final int count = consumerManager.findSubscriptionDataCount(GROUP);
assert count > 0;
assertTrue(count > 0);
}

@Test
public void findSubscriptionTest() {
SubscriptionData subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, true);
Assertions.assertThat(subscriptionData).isNull();
assertThat(subscriptionData).isNull();

consumerManager.compensateSubscribeData(GROUP, TOPIC, new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, true);
Assertions.assertThat(subscriptionData).isNotNull();
Assertions.assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
Assertions.assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
assertThat(subscriptionData).isNotNull();
assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);

subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, false);
Assertions.assertThat(subscriptionData).isNull();
assertThat(subscriptionData).isNull();
}

@Test
public void scanNotActiveChannelTest() {
clientChannelInfo.setLastUpdateTimestamp(System.currentTimeMillis() - brokerConfig.getChannelExpiredTimeout() * 2);
consumerManager.scanNotActiveChannel();
Assertions.assertThat(consumerManager.getConsumerTable().size()).isEqualTo(0);
assertThat(consumerManager.getConsumerTable().size()).isEqualTo(0);
}

@Test
public void queryTopicConsumeByWhoTest() {
register();
final HashSet<String> consumeGroup = consumerManager.queryTopicConsumeByWho(TOPIC);
assert consumeGroup.size() > 0;
assertFalse(consumeGroup.isEmpty());
}

@Test
public void doChannelCloseEventTest() {
consumerManager.doChannelCloseEvent("127.0.0.1", channel);
assert consumerManager.getConsumerTable().size() == 0;
verify(consumerManager, never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(), any());
assertEquals(0, consumerManager.getConsumerTable().size());
}

private void register() {
Expand All @@ -203,8 +204,8 @@ public void removeExpireConsumerGroupInfo() {
consumerManager.compensateSubscribeData(GROUP, TOPIC, subscriptionData);
consumerManager.compensateSubscribeData(GROUP, TOPIC + "_1", new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
consumerManager.removeExpireConsumerGroupInfo();
Assertions.assertThat(consumerManager.getConsumerGroupInfo(GROUP, true)).isNotNull();
Assertions.assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC)).isNull();
Assertions.assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC + "_1")).isNotNull();
assertThat(consumerManager.getConsumerGroupInfo(GROUP, true)).isNotNull();
assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC)).isNull();
assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC + "_1")).isNotNull();
}
}

0 comments on commit 71ec1ed

Please sign in to comment.