Skip to content

Commit

Permalink
Add attribute for SubscriptionGroupConfig (#6891)
Browse files Browse the repository at this point in the history
  • Loading branch information
drpmma authored Jun 14, 2023
1 parent 4291971 commit aaa4a4b
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
*/
package org.apache.rocketmq.broker.subscription;

import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -26,11 +29,13 @@
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SubscriptionGroupAttributes;
import org.apache.rocketmq.common.attribute.AttributeUtil;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;

Expand Down Expand Up @@ -111,6 +116,17 @@ private void init() {
}

public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
Map<String, String> newAttributes = request(config);
Map<String, String> currentAttributes = current(config.getGroupName());

Map<String, String> finalAttributes = AttributeUtil.alterCurrentAttributes(
this.subscriptionGroupTable.get(config.getGroupName()) == null,
SubscriptionGroupAttributes.ALL,
ImmutableMap.copyOf(currentAttributes),
ImmutableMap.copyOf(newAttributes));

config.setAttributes(finalAttributes);

SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
if (old != null) {
log.info("update subscription group config, old: {} new: {}", old, config);
Expand Down Expand Up @@ -315,4 +331,22 @@ public boolean containsSubscriptionGroup(String group) {

return subscriptionGroupTable.containsKey(group);
}

private Map<String, String> request(SubscriptionGroupConfig subscriptionGroupConfig) {
return subscriptionGroupConfig.getAttributes() == null ? new HashMap<>() : subscriptionGroupConfig.getAttributes();
}

private Map<String, String> current(String groupName) {
SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(groupName);
if (subscriptionGroupConfig == null) {
return new HashMap<>();
} else {
Map<String, String> attributes = subscriptionGroupConfig.getAttributes();
if (attributes == null) {
return new HashMap<>();
} else {
return attributes;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
*/
package org.apache.rocketmq.broker.topic;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -38,6 +36,7 @@
import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.Attribute;
import org.apache.rocketmq.common.attribute.AttributeUtil;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
Expand Down Expand Up @@ -466,8 +465,9 @@ public void updateTopicConfig(final TopicConfig topicConfig) {
Map<String, String> newAttributes = request(topicConfig);
Map<String, String> currentAttributes = current(topicConfig.getTopicName());

Map<String, String> finalAttributes = alterCurrentAttributes(
Map<String, String> finalAttributes = AttributeUtil.alterCurrentAttributes(
this.topicConfigTable.get(topicConfig.getTopicName()) == null,
TopicAttributes.ALL,
ImmutableMap.copyOf(currentAttributes),
ImmutableMap.copyOf(newAttributes));

Expand Down Expand Up @@ -628,106 +628,6 @@ private Map<String, String> current(String topic) {
}
}

private Map<String, String> alterCurrentAttributes(boolean create, ImmutableMap<String, String> currentAttributes,
ImmutableMap<String, String> newAttributes) {
Map<String, String> init = new HashMap<>();
Map<String, String> add = new HashMap<>();
Map<String, String> update = new HashMap<>();
Map<String, String> delete = new HashMap<>();
Set<String> keys = new HashSet<>();

for (Entry<String, String> attribute : newAttributes.entrySet()) {
String key = attribute.getKey();
String realKey = realKey(key);
String value = attribute.getValue();

validate(realKey);
duplicationCheck(keys, realKey);

if (create) {
if (key.startsWith("+")) {
init.put(realKey, value);
} else {
throw new RuntimeException("only add attribute is supported while creating topic. key: " + realKey);
}
} else {
if (key.startsWith("+")) {
if (!currentAttributes.containsKey(realKey)) {
add.put(realKey, value);
} else {
update.put(realKey, value);
}
} else if (key.startsWith("-")) {
if (!currentAttributes.containsKey(realKey)) {
throw new RuntimeException("attempt to delete a nonexistent key: " + realKey);
}
delete.put(realKey, value);
} else {
throw new RuntimeException("wrong format key: " + realKey);
}
}
}

validateAlter(init, true, false);
validateAlter(add, false, false);
validateAlter(update, false, false);
validateAlter(delete, false, true);

log.info("add: {}, update: {}, delete: {}", add, update, delete);
HashMap<String, String> finalAttributes = new HashMap<>(currentAttributes);
finalAttributes.putAll(init);
finalAttributes.putAll(add);
finalAttributes.putAll(update);
for (String s : delete.keySet()) {
finalAttributes.remove(s);
}
return finalAttributes;
}

private void duplicationCheck(Set<String> keys, String key) {
boolean notExist = keys.add(key);
if (!notExist) {
throw new RuntimeException("alter duplication key. key: " + key);
}
}

private void validate(String kvAttribute) {
if (Strings.isNullOrEmpty(kvAttribute)) {
throw new RuntimeException("kv string format wrong.");
}

if (kvAttribute.contains("+")) {
throw new RuntimeException("kv string format wrong.");
}

if (kvAttribute.contains("-")) {
throw new RuntimeException("kv string format wrong.");
}
}

private void validateAlter(Map<String, String> alter, boolean init, boolean delete) {
for (Entry<String, String> entry : alter.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();

Attribute attribute = allAttributes().get(key);
if (attribute == null) {
throw new RuntimeException("unsupported key: " + key);
}
if (!init && !attribute.isChangeable()) {
throw new RuntimeException("attempt to update an unchangeable attribute. key: " + key);
}

if (!delete) {
attribute.verify(value);
}
}
}

private String realKey(String key) {
return key.substring(1);
}

public boolean containsTopic(String topic) {
return topicConfigTable.containsKey(topic);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.broker.subscription;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.SubscriptionGroupAttributes;
import org.apache.rocketmq.common.attribute.BooleanAttribute;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class SubscriptionGroupManagerTest {
private String group = "group";
@Mock
private BrokerController brokerControllerMock;
private SubscriptionGroupManager subscriptionGroupManager;

@Before
public void before() {
SubscriptionGroupAttributes.ALL.put("test", new BooleanAttribute(
"test",
false,
false
));
subscriptionGroupManager = spy(new SubscriptionGroupManager(brokerControllerMock));
when(brokerControllerMock.getMessageStore()).thenReturn(null);
doNothing().when(subscriptionGroupManager).persist();
}

@Test
public void updateSubscriptionGroupConfig() {
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(group);
Map<String, String> attr = ImmutableMap.of("+test", "true");
subscriptionGroupConfig.setAttributes(attr);
subscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig);
SubscriptionGroupConfig result = subscriptionGroupManager.getSubscriptionGroupTable().get(group);
assertThat(result).isNotNull();
assertThat(result.getGroupName()).isEqualTo(group);
assertThat(result.getAttributes().get("test")).isEqualTo("true");


SubscriptionGroupConfig subscriptionGroupConfig1 = new SubscriptionGroupConfig();
subscriptionGroupConfig1.setGroupName(group);
Map<String, String> attrRemove = ImmutableMap.of("-test", "");
subscriptionGroupConfig1.setAttributes(attrRemove);
assertThatThrownBy(() -> subscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig1))
.isInstanceOf(RuntimeException.class).hasMessage("attempt to update an unchangeable attribute. key: test");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,31 @@
*/
package org.apache.rocketmq.broker.topic;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.attribute.Attribute;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.Attribute;
import org.apache.rocketmq.common.attribute.BooleanAttribute;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.attribute.EnumAttribute;
import org.apache.rocketmq.common.attribute.LongRangeAttribute;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.common.attribute.CQType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.Sets.newHashSet;
import static java.util.Arrays.asList;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -318,7 +316,6 @@ private void supportAttributes(List<Attribute> supportAttributes) {
supportedAttributes.put(supportAttribute.getName(), supportAttribute);
}

topicConfigManager = spy(topicConfigManager);
when(topicConfigManager.allAttributes()).thenReturn(supportedAttributes);
TopicAttributes.ALL.putAll(supportedAttributes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common;

import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.common.attribute.Attribute;

public class SubscriptionGroupAttributes {
public static final Map<String, Attribute> ALL;

static {
ALL = new HashMap<>();
}
}
Loading

0 comments on commit aaa4a4b

Please sign in to comment.