Skip to content

Commit

Permalink
Introducing MultiRabbit Bootstrap (#1303)
Browse files Browse the repository at this point in the history
* GH-1302 Introduced MultiRabbitBootstrapConfiguration to register MultiRabbitBPP

* GH-1302 Injecting admin at RabbitListener so as be resolved for the RabbitListenerEndpoint
  • Loading branch information
rwanderc authored Feb 8, 2021
1 parent faa7dce commit 692713d
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.annotation;

import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.lang.Nullable;

/**
* An {@link ImportBeanDefinitionRegistrar} class that registers
* a {@link MultiRabbitListenerAnnotationBeanPostProcessor} bean, if MultiRabbit
* is enabled.
*
* @author Wander Costa
*
* @since 1.4
*
* @see RabbitListenerAnnotationBeanPostProcessor
* @see MultiRabbitListenerAnnotationBeanPostProcessor
* @see RabbitListenerEndpointRegistry
* @see EnableRabbit
*/
public class MultiRabbitBootstrapConfiguration implements ImportBeanDefinitionRegistrar, EnvironmentAware {

private Environment environment;

@Override
public void registerBeanDefinitions(@Nullable AnnotationMetadata importingClassMetadata,
BeanDefinitionRegistry registry) {

if (isMultiRabbitEnabled() && !registry.containsBeanDefinition(
RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {

registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
new RootBeanDefinition(MultiRabbitListenerAnnotationBeanPostProcessor.class));
}
}

private boolean isMultiRabbitEnabled() {
final String isMultiEnabledStr = this.environment.getProperty(
RabbitListenerConfigUtils.MULTI_RABBIT_ENABLED_PROPERTY);
return Boolean.parseBoolean(isMultiEnabledStr);
}

@Override
public void setEnvironment(final Environment environment) {
this.environment = environment;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,16 +16,20 @@

package org.springframework.amqp.rabbit.annotation;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Collection;

import org.springframework.amqp.core.Declarable;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.util.StringUtils;

/**
* An extension of {@link RabbitListenerAnnotationBeanPostProcessor} that associates the
* proper RabbitAdmin to the beans of Exchanges, Queues, and Bindings after they are
* created.
* An extension of {@link RabbitListenerAnnotationBeanPostProcessor} that indicates the proper
* RabbitAdmin bean to be used when processing to the listeners, and also associates it to the
* declarables (Exchanges, Queues, and Bindings) returned.
* <p>
* This processing restricts the {@link org.springframework.amqp.rabbit.core.RabbitAdmin} according to the related
* configuration, preventing the server from automatic binding non-related structures.
Expand All @@ -36,19 +40,12 @@
*/
public class MultiRabbitListenerAnnotationBeanPostProcessor extends RabbitListenerAnnotationBeanPostProcessor {

public static final String CONNECTION_FACTORY_BEAN_NAME = "multiRabbitConnectionFactory";

public static final String CONNECTION_FACTORY_CREATOR_BEAN_NAME = "rabbitConnectionFactoryCreator";

private static final String DEFAULT_RABBIT_ADMIN_BEAN_NAME = "defaultRabbitAdmin";

private static final String RABBIT_ADMIN_SUFFIX = "-admin";

@Override
protected Collection<Declarable> processAmqpListener(RabbitListener rabbitListener, Method method,
Object bean, String beanName) {
final Collection<Declarable> declarables = super.processAmqpListener(rabbitListener, method, bean, beanName);
final String rabbitAdmin = resolveMultiRabbitAdminName(rabbitListener);
final RabbitListener rabbitListenerRef = proxyIfAdminNotPresent(rabbitListener, rabbitAdmin);
final Collection<Declarable> declarables = super.processAmqpListener(rabbitListenerRef, method, bean, beanName);
for (final Declarable declarable : declarables) {
if (declarable.getDeclaringAdmins().isEmpty()) {
declarable.setAdminsThatShouldDeclare(rabbitAdmin);
Expand All @@ -57,6 +54,15 @@ protected Collection<Declarable> processAmqpListener(RabbitListener rabbitListen
return declarables;
}

private RabbitListener proxyIfAdminNotPresent(final RabbitListener rabbitListener, final String rabbitAdmin) {
if (StringUtils.hasText(rabbitListener.admin())) {
return rabbitListener;
}
return (RabbitListener) Proxy.newProxyInstance(
RabbitListener.class.getClassLoader(), new Class<?>[]{RabbitListener.class},
new RabbitListenerAdminReplacementInvocationHandler(rabbitListener, rabbitAdmin));
}

/**
* Resolves the name of the RabbitAdmin bean based on the RabbitListener, or falls back to
* the default RabbitAdmin name provided by MultiRabbit.
Expand All @@ -66,13 +72,35 @@ protected Collection<Declarable> processAmqpListener(RabbitListener rabbitListen
protected String resolveMultiRabbitAdminName(RabbitListener rabbitListener) {
String admin = super.resolveExpressionAsString(rabbitListener.admin(), "admin");
if (!StringUtils.hasText(admin) && StringUtils.hasText(rabbitListener.containerFactory())) {
admin = rabbitListener.containerFactory()
+ MultiRabbitListenerAnnotationBeanPostProcessor.RABBIT_ADMIN_SUFFIX;
admin = rabbitListener.containerFactory() + RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX;
}
if (!StringUtils.hasText(admin)) {
admin = MultiRabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_ADMIN_BEAN_NAME;
admin = RabbitListenerConfigUtils.RABBIT_ADMIN_BEAN_NAME;
}
return admin;
}

/**
* An {@link InvocationHandler} to provide a replacing admin() parameter of the listener.
*/
private final class RabbitListenerAdminReplacementInvocationHandler implements InvocationHandler {

private final RabbitListener target;
private final String admin;

private RabbitListenerAdminReplacementInvocationHandler(final RabbitListener target, final String admin) {
this.target = target;
this.admin = admin;
}

@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws InvocationTargetException, IllegalAccessException {
if (method.getName().equals("admin")) {
return this.admin;
}
return method.invoke(this.target, args);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,10 @@

/**
* A {@link DeferredImportSelector} implementation with the lowest order to import a
* {@link RabbitBootstrapConfiguration} as late as possible.
* {@link MultiRabbitBootstrapConfiguration} and {@link RabbitBootstrapConfiguration}
* as late as possible.
* {@link MultiRabbitBootstrapConfiguration} has precedence to be able to provide the
* extended BeanPostProcessor, if enabled.
*
* @author Artem Bilan
*
Expand All @@ -33,7 +36,8 @@ public class RabbitListenerConfigurationSelector implements DeferredImportSelect

@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
return new String[] { RabbitBootstrapConfiguration.class.getName() };
return new String[] { MultiRabbitBootstrapConfiguration.class.getName(),
RabbitBootstrapConfiguration.class.getName()};
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,4 +36,29 @@ public abstract class RabbitListenerConfigUtils {
public static final String RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME =
"org.springframework.amqp.rabbit.config.internalRabbitListenerEndpointRegistry";

/**
* The bean name of the default RabbitAdmin.
*/
public static final String RABBIT_ADMIN_BEAN_NAME = "amqpAdmin";

/**
* The bean name of the default ConnectionFactory.
*/
public static final String RABBIT_CONNECTION_FACTORY_BEAN_NAME = "rabbitConnectionFactory";

/**
* The default property to enable/disable MultiRabbit processing.
*/
public static final String MULTI_RABBIT_ENABLED_PROPERTY = "spring.multirabbitmq.enabled";

/**
* The bean name of the ContainerFactory of the default broker for MultiRabbit.
*/
public static final String MULTI_RABBIT_CONTAINER_FACTORY_BEAN_NAME = "multiRabbitContainerFactory";

/**
* The MultiRabbit admins' suffix.
*/
public static final String MULTI_RABBIT_ADMIN_SUFFIX = "-admin";

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.amqp.rabbit.annotation;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
Expand All @@ -31,13 +32,15 @@
import org.springframework.amqp.core.Declarable;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.config.MessageListenerTestContainer;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.amqp.rabbit.config.RabbitListenerContainerTestFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MethodRabbitListenerEndpoint;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
Expand Down Expand Up @@ -179,6 +182,28 @@ void testCreationOfConnections() {
context.close(); // Close and stop the listeners
}

@Test
@DisplayName("Test assignment of RabbitAdmin in the endpoint registry")
void testAssignmentOfRabbitAdminInTheEndpointRegistry() {
ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(MultiConfig.class,
AutoBindingListenerTestBeans.class);

final RabbitListenerEndpointRegistry registry = context.getBean(RabbitListenerEndpointRegistry.class);
final Collection<MessageListenerContainer> listenerContainers = registry.getListenerContainers();

Assertions.assertThat(listenerContainers).hasSize(3);
listenerContainers.forEach(container -> {
Assertions.assertThat(container).isInstanceOf(MessageListenerTestContainer.class);
final MessageListenerTestContainer refContainer = (MessageListenerTestContainer) container;
final RabbitListenerEndpoint endpoint = refContainer.getEndpoint();
Assertions.assertThat(endpoint).isInstanceOf(MethodRabbitListenerEndpoint.class);
final MethodRabbitListenerEndpoint refEndpoint = (MethodRabbitListenerEndpoint) endpoint;
Assertions.assertThat(refEndpoint.getAdmin()).isNotNull();
});

context.close(); // Close and stop the listeners
}

@Component
static class AutoBindingListenerTestBeans {

Expand Down Expand Up @@ -266,7 +291,7 @@ public RabbitListenerAnnotationBeanPostProcessor postProcessor() {
return postProcessor;
}

@Bean("defaultRabbitAdmin")
@Bean(RabbitListenerConfigUtils.RABBIT_ADMIN_BEAN_NAME)
public RabbitAdmin defaultRabbitAdmin() {
return DEFAULT_RABBIT_ADMIN;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2014-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.annotation;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.core.env.Environment;

class MultiRabbitBootstrapConfigurationTest {

@Test
@DisplayName("test if MultiRabbitBPP is registered when enabled")
void testMultiRabbitBPPIsRegistered() throws Exception {
final Environment environment = Mockito.mock(Environment.class);
final ArgumentCaptor<RootBeanDefinition> captor = ArgumentCaptor.forClass(RootBeanDefinition.class);
final BeanDefinitionRegistry registry = Mockito.mock(BeanDefinitionRegistry.class);
final MultiRabbitBootstrapConfiguration bootstrapConfiguration = new MultiRabbitBootstrapConfiguration();
bootstrapConfiguration.setEnvironment(environment);

Mockito.when(environment.getProperty(RabbitListenerConfigUtils.MULTI_RABBIT_ENABLED_PROPERTY))
.thenReturn("true");

bootstrapConfiguration.registerBeanDefinitions(null, registry);

Mockito.verify(registry).registerBeanDefinition(
Mockito.eq(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME),
captor.capture());

assertThat(captor.getValue().getBeanClass()).isEqualTo(MultiRabbitListenerAnnotationBeanPostProcessor.class);
}

@Test
@DisplayName("test if MultiRabbitBPP is not registered when disabled")
void testMultiRabbitBPPIsNotRegistered() throws Exception {
final Environment environment = Mockito.mock(Environment.class);
final BeanDefinitionRegistry registry = Mockito.mock(BeanDefinitionRegistry.class);
final MultiRabbitBootstrapConfiguration bootstrapConfiguration = new MultiRabbitBootstrapConfiguration();
bootstrapConfiguration.setEnvironment(environment);

Mockito.when(environment.getProperty(RabbitListenerConfigUtils.MULTI_RABBIT_ENABLED_PROPERTY))
.thenReturn("false");

bootstrapConfiguration.registerBeanDefinitions(null, registry);

Mockito.verify(registry, Mockito.never()).registerBeanDefinition(Mockito.anyString(),
Mockito.any(RootBeanDefinition.class));
}
}
Loading

0 comments on commit 692713d

Please sign in to comment.