From 366f9daef286ca045d4961adbaf6206854d33928 Mon Sep 17 00:00:00 2001 From: Tran Ngoc Nhan Date: Sat, 19 Oct 2024 16:58:29 +0700 Subject: [PATCH 1/3] Improve condition --- .../stream/producer/RabbitStreamTemplate.java | 86 ++++++++----------- 1 file changed, 38 insertions(+), 48 deletions(-) diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java index 0751e7dbd3..8b6f62cb79 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,6 +55,7 @@ * * @author Gary Russell * @author Christian Tzolov + * @author Ngoc Nhan * @since 2.4 * */ @@ -107,29 +108,29 @@ public RabbitStreamTemplate(Environment environment, String streamName) { private Producer createOrGetProducer() { - this.lock.lock(); - try { - if (this.producer == null) { - ProducerBuilder builder = this.environment.producerBuilder(); - if (this.superStreamRouting == null) { - builder.stream(this.streamName); - } - else { - builder.superStream(this.streamName) - .routing(this.superStreamRouting); - } - this.producerCustomizer.accept(this.beanName, builder); - this.producer = builder.build(); - if (!this.streamConverterSet) { - ((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier( - () -> this.producer.messageBuilder()); - } + if (this.producer == null) { + this.lock.lock(); + try { + ProducerBuilder builder = this.environment.producerBuilder(); + if (this.superStreamRouting == null) { + builder.stream(this.streamName); + } + else { + builder.superStream(this.streamName) + .routing(this.superStreamRouting); + } + this.producerCustomizer.accept(this.beanName, builder); + this.producer = builder.build(); + if (!this.streamConverterSet) { + ((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier( + () -> this.producer.messageBuilder()); + } + } + finally { + this.lock.unlock(); } - return this.producer; - } - finally { - this.lock.unlock(); } + return this.producer; } @Override @@ -305,24 +306,13 @@ private ConfirmationHandler handleConfirm(CompletableFuture future, Obs } else { int code = confStatus.getCode(); - String errorMessage; - switch (code) { - case Constants.CODE_MESSAGE_ENQUEUEING_FAILED: - errorMessage = "Message Enqueueing Failed"; - break; - case Constants.CODE_PRODUCER_CLOSED: - errorMessage = "Producer Closed"; - break; - case Constants.CODE_PRODUCER_NOT_AVAILABLE: - errorMessage = "Producer Not Available"; - break; - case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT: - errorMessage = "Publish Confirm Timeout"; - break; - default: - errorMessage = "Unknown code: " + code; - break; - } + String errorMessage = switch (code) { + case Constants.CODE_MESSAGE_ENQUEUEING_FAILED -> "Message Enqueueing Failed"; + case Constants.CODE_PRODUCER_CLOSED -> "Producer Closed"; + case Constants.CODE_PRODUCER_NOT_AVAILABLE -> "Producer Not Available"; + case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT -> "Publish Confirm Timeout"; + default -> "Unknown code: " + code; + }; StreamSendException ex = new StreamSendException(errorMessage, code); observation.error(ex); observation.stop(); @@ -339,15 +329,15 @@ private ConfirmationHandler handleConfirm(CompletableFuture future, Obs */ @Override public void close() { - this.lock.lock(); - try { - if (this.producer != null) { - this.producer.close(); - this.producer = null; + if (this.producer != null) { + this.lock.lock(); + try { + this.producer.close(); + this.producer = null; + } + finally { + this.lock.unlock(); } - } - finally { - this.lock.unlock(); } } From add77447c9d60aead5274024f3abf2e0ba7086e6 Mon Sep 17 00:00:00 2001 From: Tran Ngoc Nhan Date: Sat, 19 Oct 2024 18:51:50 +0700 Subject: [PATCH 2/3] Reduce `else` condition and modernize switch pattern --- ...itListenerAnnotationBeanPostProcessor.java | 5 +- .../rabbit/batch/SimpleBatchingStrategy.java | 12 ++--- .../config/ListenerContainerFactoryBean.java | 15 +++--- .../config/ListenerContainerParser.java | 8 +-- ...RetryOperationsInterceptorFactoryBean.java | 22 ++++---- .../AbstractRoutingConnectionFactory.java | 4 +- .../connection/ConsumerChannelRegistry.java | 8 ++- .../rabbit/connection/PendingConfirm.java | 5 +- .../PooledChannelConnectionFactory.java | 33 ++++++------ .../PublisherCallbackChannelImpl.java | 33 ++++++------ .../RabbitConnectionFactoryBean.java | 17 +++--- .../amqp/rabbit/connection/RabbitUtils.java | 52 +++++++------------ .../ThreadChannelConnectionFactory.java | 33 ++++++------ .../rabbit/connection/WebFluxNodeLocator.java | 6 +-- .../rabbit/listener/MicrometerHolder.java | 5 +- 15 files changed, 117 insertions(+), 141 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessor.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessor.java index 2c1ce24447..e22e1b3860 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessor.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessor.java @@ -76,6 +76,7 @@ import org.springframework.context.expression.StandardBeanExpressionResolver; import org.springframework.core.Ordered; import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.core.annotation.MergedAnnotation; import org.springframework.core.annotation.MergedAnnotations; import org.springframework.core.annotation.MergedAnnotations.SearchStrategy; import org.springframework.core.convert.ConversionService; @@ -357,7 +358,7 @@ else if (source instanceof Method method) { } return !name.contains("$MockitoMock$"); }) - .map(ann -> ann.synthesize()) + .map(MergedAnnotation::synthesize) .collect(Collectors.toList()); } @@ -893,7 +894,7 @@ private void addToMap(Map map, String key, Object value, Class= this.bufferLimit) { + if (this.currentSize >= this.bufferLimit) { // release immediately, we're already over the limit return new Date(); } - else { - return new Date(System.currentTimeMillis() + this.timeout); - } + + return new Date(System.currentTimeMillis() + this.timeout); } @Override @@ -122,9 +121,8 @@ public Collection releaseBatches() { if (batch == null) { return Collections.emptyList(); } - else { - return Collections.singletonList(batch); - } + + return Collections.singletonList(batch); } private MessageBatch doReleaseBatch() { diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java index c919105ea1..b2bc26fb8a 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java @@ -575,14 +575,13 @@ private AbstractMessageListenerContainer createContainer() { .acceptIfNotNull(this.retryDeclarationInterval, container::setRetryDeclarationInterval); return container; } - else { - DirectMessageListenerContainer container = new DirectMessageListenerContainer(this.connectionFactory); - JavaUtils.INSTANCE - .acceptIfNotNull(this.consumersPerQueue, container::setConsumersPerQueue) - .acceptIfNotNull(this.taskScheduler, container::setTaskScheduler) - .acceptIfNotNull(this.monitorInterval, container::setMonitorInterval); - return container; - } + + DirectMessageListenerContainer container = new DirectMessageListenerContainer(this.connectionFactory); + JavaUtils.INSTANCE + .acceptIfNotNull(this.consumersPerQueue, container::setConsumersPerQueue) + .acceptIfNotNull(this.taskScheduler, container::setTaskScheduler) + .acceptIfNotNull(this.monitorInterval, container::setMonitorInterval); + return container; } @Override diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java index fce31f9935..71acf60981 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java @@ -101,8 +101,8 @@ public BeanDefinition parse(Element element, ParserContext parserContext) { } List childElements = DomUtils.getChildElementsByTagName(element, LISTENER_ELEMENT); - for (int i = 0; i < childElements.size(); i++) { - parseListener(childElements.get(i), element, parserContext, containerList); + for (Element childElement : childElements) { + parseListener(childElement, element, parserContext, containerList); } parserContext.popAndRegisterContainingComponent(); @@ -190,8 +190,8 @@ private void parseListener(Element listenerEle, Element containerEle, ParserCont else { String[] names = StringUtils.commaDelimitedListToStringArray(queues); List values = new ManagedList<>(); - for (int i = 0; i < names.length; i++) { - values.add(new RuntimeBeanReference(names[i].trim())); + for (String name : names) { + values.add(new RuntimeBeanReference(name.trim())); } containerDef.getPropertyValues().add("queues", values); } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptorFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptorFactoryBean.java index 3aa40f94ef..a7f113d471 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptorFactoryBean.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/StatefulRetryOperationsInterceptorFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,7 @@ * * @author Dave Syer * @author Gary Russell + * @author Ngoc Nhan * * @see RetryOperations#execute(org.springframework.retry.RetryCallback, org.springframework.retry.RecoveryCallback, * org.springframework.retry.RetryState) @@ -90,9 +91,8 @@ private NewMethodArgumentsIdentifier createNewItemIdentifier() { if (StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier == null) { return !message.getMessageProperties().isRedelivered(); } - else { - return StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier.isNew(message); - } + + return StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier.isNew(message); }; } @@ -127,23 +127,19 @@ private MethodArgumentsKeyGenerator createKeyGenerator() { } return messageId; } - else { - return StatefulRetryOperationsInterceptorFactoryBean.this.messageKeyGenerator.getKey(message); - } + return StatefulRetryOperationsInterceptorFactoryBean.this.messageKeyGenerator.getKey(message); }; } - @SuppressWarnings("unchecked") private Message argToMessage(Object[] args) { Object arg = args[1]; - Message message = null; if (arg instanceof Message msg) { - message = msg; + return msg; } - else if (arg instanceof List) { - message = ((List) arg).get(0); + if (arg instanceof List list) { + return (Message) list.get(0); } - return message; + return null; } @Override diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractRoutingConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractRoutingConnectionFactory.java index b4fed509e0..be70f77cc5 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractRoutingConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractRoutingConnectionFactory.java @@ -69,7 +69,7 @@ public void setTargetConnectionFactories(Map targetCo Assert.noNullElements(targetConnectionFactories.values().toArray(), "'targetConnectionFactories' cannot have null values."); this.targetConnectionFactories.putAll(targetConnectionFactories); - targetConnectionFactories.values().stream().forEach(cf -> checkConfirmsAndReturns(cf)); + targetConnectionFactories.values().forEach(this::checkConfirmsAndReturns); } /** @@ -293,7 +293,7 @@ public void destroy() { @Override public void resetConnection() { - this.targetConnectionFactories.values().forEach(factory -> factory.resetConnection()); + this.targetConnectionFactories.values().forEach(ConnectionFactory::resetConnection); this.defaultTargetConnectionFactory.resetConnection(); } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConsumerChannelRegistry.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConsumerChannelRegistry.java index d1a82f4d87..e72c338b04 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConsumerChannelRegistry.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConsumerChannelRegistry.java @@ -84,11 +84,9 @@ public static void unRegisterConsumerChannel() { @Nullable public static Channel getConsumerChannel() { ChannelHolder channelHolder = consumerChannel.get(); - Channel channel = null; - if (channelHolder != null) { - channel = channelHolder.getChannel(); - } - return channel; + return channelHolder != null + ? channelHolder.getChannel() + : null; } /** diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PendingConfirm.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PendingConfirm.java index 95c71fe744..5aa67760c9 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PendingConfirm.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PendingConfirm.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ * expired. It also holds {@link CorrelationData} for * the client to correlate a confirm with a sent message. * @author Gary Russell + * @author Ngoc Nhan * @since 1.0.1 * */ @@ -115,7 +116,7 @@ public void setReturned(boolean isReturned) { * @since 2.2.10 */ public boolean waitForReturnIfNeeded() throws InterruptedException { - return this.returned ? this.latch.await(RETURN_CALLBACK_TIMEOUT, TimeUnit.SECONDS) : true; + return !this.returned || this.latch.await(RETURN_CALLBACK_TIMEOUT, TimeUnit.SECONDS); } /** diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java index a458be72ab..321c8925ab 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java @@ -55,6 +55,7 @@ * @author Gary Russell * @author Leonardo Ferreira * @author Christian Tzolov + * @author Ngoc Nhan * @since 2.3 * */ @@ -255,23 +256,21 @@ private Channel createProxy(Channel channel, boolean transacted) { Advice advice = (MethodInterceptor) invocation -> { String method = invocation.getMethod().getName(); - switch (method) { - case "close": - handleClose(channel, transacted, proxy); - return null; - case "getTargetChannel": - return channel; - case "isTransactional": - return transacted; - case "confirmSelect": - confirmSelected.set(true); - return channel.confirmSelect(); - case "isConfirmSelected": - return confirmSelected.get(); - case "isPublisherConfirms": - return false; - } - return null; + return switch (method) { + case "close" -> { + handleClose(channel, transacted, proxy); + yield null; + } + case "getTargetChannel" -> channel; + case "isTransactional" -> transacted; + case "confirmSelect" -> { + confirmSelected.set(true); + yield channel.confirmSelect(); + } + case "isConfirmSelected" -> confirmSelected.get(); + case "isPublisherConfirms" -> false; + default -> null; + }; }; NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(advice); advisor.addMethodName("close"); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java index b8ee5e29dd..74fb85bc95 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.java @@ -922,27 +922,26 @@ public Collection expire(Listener listener, long cutoffTime) { try { SortedMap pendingConfirmsForListener = this.pendingConfirms.get(listener); if (pendingConfirmsForListener == null) { - return Collections.emptyList(); + return Collections.emptyList(); } - else { - List expired = new ArrayList<>(); - Iterator> iterator = pendingConfirmsForListener.entrySet().iterator(); - while (iterator.hasNext()) { - PendingConfirm pendingConfirm = iterator.next().getValue(); - if (pendingConfirm.getTimestamp() < cutoffTime) { - expired.add(pendingConfirm); - iterator.remove(); - CorrelationData correlationData = pendingConfirm.getCorrelationData(); - if (correlationData != null && StringUtils.hasText(correlationData.getId())) { - this.pendingReturns.remove(correlationData.getId()); // NOSONAR never null - } - } - else { - break; + + List expired = new ArrayList<>(); + Iterator> iterator = pendingConfirmsForListener.entrySet().iterator(); + while (iterator.hasNext()) { + PendingConfirm pendingConfirm = iterator.next().getValue(); + if (pendingConfirm.getTimestamp() < cutoffTime) { + expired.add(pendingConfirm); + iterator.remove(); + CorrelationData correlationData = pendingConfirm.getCorrelationData(); + if (correlationData != null && StringUtils.hasText(correlationData.getId())) { + this.pendingReturns.remove(correlationData.getId()); // NOSONAR never null } } - return expired; + else { + break; + } } + return expired; } finally { this.lock.unlock(); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitConnectionFactoryBean.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitConnectionFactoryBean.java index d12361ee1d..779bdef7c9 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitConnectionFactoryBean.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitConnectionFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,6 +79,7 @@ * @author Hareendran * @author Dominique Villard * @author Zachary DeLuca + * @author Ngoc Nhan * * @since 1.4 */ @@ -360,12 +361,11 @@ protected String getKeyStoreType() { if (this.keyStoreType == null && this.sslProperties.getProperty(KEY_STORE_TYPE) == null) { return KEY_STORE_DEFAULT_TYPE; } - else if (this.keyStoreType != null) { + if (this.keyStoreType != null) { return this.keyStoreType; } - else { - return this.sslProperties.getProperty(KEY_STORE_TYPE); - } + + return this.sslProperties.getProperty(KEY_STORE_TYPE); } /** @@ -389,12 +389,11 @@ protected String getTrustStoreType() { if (this.trustStoreType == null && this.sslProperties.getProperty(TRUST_STORE_TYPE) == null) { return TRUST_STORE_DEFAULT_TYPE; } - else if (this.trustStoreType != null) { + if (this.trustStoreType != null) { return this.trustStoreType; } - else { - return this.sslProperties.getProperty(TRUST_STORE_TYPE); - } + + return this.sslProperties.getProperty(TRUST_STORE_TYPE); } /** diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java index 1d0c1609f8..e135d365a2 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java @@ -228,12 +228,7 @@ public static void setPhysicalCloseRequired(Channel channel, boolean b) { */ public static boolean isPhysicalCloseRequired() { Boolean mustClose = physicalCloseRequired.get(); - if (mustClose == null) { - return false; - } - else { - return mustClose; - } + return mustClose != null && mustClose; } /** @@ -322,13 +317,12 @@ public static boolean isMismatchedQueueArgs(Exception e) { if (sig == null) { return false; } - else { - Method shutdownReason = sig.getReason(); - return shutdownReason instanceof AMQP.Channel.Close closeReason - && AMQP.PRECONDITION_FAILED == closeReason.getReplyCode() - && closeReason.getClassId() == QUEUE_CLASS_ID_50 - && closeReason.getMethodId() == DECLARE_METHOD_ID_10; - } + + Method shutdownReason = sig.getReason(); + return shutdownReason instanceof AMQP.Channel.Close closeReason + && AMQP.PRECONDITION_FAILED == closeReason.getReplyCode() + && closeReason.getClassId() == QUEUE_CLASS_ID_50 + && closeReason.getMethodId() == DECLARE_METHOD_ID_10; } /** @@ -352,13 +346,12 @@ public static boolean isExchangeDeclarationFailure(Exception e) { if (sig == null) { return false; } - else { - Method shutdownReason = sig.getReason(); - return shutdownReason instanceof AMQP.Channel.Close closeReason - && AMQP.PRECONDITION_FAILED == closeReason.getReplyCode() - && closeReason.getClassId() == EXCHANGE_CLASS_ID_40 - && closeReason.getMethodId() == DECLARE_METHOD_ID_10; - } + + Method shutdownReason = sig.getReason(); + return shutdownReason instanceof AMQP.Channel.Close closeReason + && AMQP.PRECONDITION_FAILED == closeReason.getReplyCode() + && closeReason.getClassId() == EXCHANGE_CLASS_ID_40 + && closeReason.getMethodId() == DECLARE_METHOD_ID_10; } /** @@ -395,18 +388,13 @@ public static int getMaxFrame(ConnectionFactory connectionFactory) { public static SaslConfig stringToSaslConfig(String saslConfig, com.rabbitmq.client.ConnectionFactory connectionFactory) { - switch (saslConfig) { - case "DefaultSaslConfig.PLAIN": - return DefaultSaslConfig.PLAIN; - case "DefaultSaslConfig.EXTERNAL": - return DefaultSaslConfig.EXTERNAL; - case "JDKSaslConfig": - return new JDKSaslConfig(connectionFactory); - case "CRDemoSaslConfig": - return new CRDemoMechanism.CRDemoSaslConfig(); - default: - throw new IllegalStateException("Unrecognized SaslConfig: " + saslConfig); - } + return switch (saslConfig) { + case "DefaultSaslConfig.PLAIN" -> DefaultSaslConfig.PLAIN; + case "DefaultSaslConfig.EXTERNAL" -> DefaultSaslConfig.EXTERNAL; + case "JDKSaslConfig" -> new JDKSaslConfig(connectionFactory); + case "CRDemoSaslConfig" -> new CRDemoMechanism.CRDemoSaslConfig(); + default -> throw new IllegalStateException("Unrecognized SaslConfig: " + saslConfig); + }; } /** diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ThreadChannelConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ThreadChannelConnectionFactory.java index b9d371e683..8097eed059 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ThreadChannelConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ThreadChannelConnectionFactory.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import org.aopalliance.aop.Advice; import org.aopalliance.intercept.MethodInterceptor; @@ -196,8 +195,8 @@ public void destroy() { this.logger.warn("Unclaimed context switches from threads:" + this.switchesInProgress.values() .stream() - .map(t -> t.getName()) - .collect(Collectors.toList())); + .map(Thread::getName) + .toList()); } this.contextSwitches.clear(); this.switchesInProgress.clear(); @@ -319,23 +318,21 @@ private Channel createProxy(Channel channel, boolean transactional) { Advice advice = (MethodInterceptor) invocation -> { String method = invocation.getMethod().getName(); - switch (method) { - case "close": + return switch (method) { + case "close" -> { handleClose(channel, transactional); - return null; - case "getTargetChannel": - return channel; - case "isTransactional": - return transactional; - case "confirmSelect": + yield null; + } + case "getTargetChannel" -> channel; + case "isTransactional" -> transactional; + case "confirmSelect" -> { confirmSelected.set(true); - return channel.confirmSelect(); - case "isConfirmSelected": - return confirmSelected.get(); - case "isPublisherConfirms": - return false; - } - return null; + yield channel.confirmSelect(); + } + case "isConfirmSelected" -> confirmSelected.get(); + case "isPublisherConfirms" -> false; + default -> null; + }; }; NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(advice); advisor.addMethodName("close"); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java index 6c10164474..75acd71e81 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ * A {@link NodeLocator} using the Spring WebFlux {@link WebClient}. * * @author Gary Russell + * @author Ngoc Nhan * @since 2.4.8 * */ @@ -46,14 +47,13 @@ public Map restCall(WebClient client, String baseUri, String vho URI uri = new URI(baseUri) .resolve("/api/queues/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/" + queue); - HashMap queueInfo = client.get() + return client.get() .uri(uri) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(new ParameterizedTypeReference>() { }) .block(Duration.ofSeconds(10)); // NOSONAR magic# - return queueInfo != null ? queueInfo : null; } /** diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/MicrometerHolder.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/MicrometerHolder.java index 72d06e140f..5cd536329f 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/MicrometerHolder.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/MicrometerHolder.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ * Abstraction to avoid hard reference to Micrometer. * * @author Gary Russell + * @author Ngoc Nhan * @since 2.4.6 * */ @@ -95,7 +96,7 @@ private Timer buildTimer(String aListenerId, String result, String queue, String .tag("result", result) .tag("exception", exception); if (this.tags != null && !this.tags.isEmpty()) { - this.tags.forEach((key, value) -> builder.tag(key, value)); + this.tags.forEach(builder::tag); } Timer registeredTimer = builder.register(this.registry); this.timers.put(queue + exception, registeredTimer); From 2408622d3770cccb5610193d4bf6051608c62d31 Mon Sep 17 00:00:00 2001 From: Tran Ngoc Nhan Date: Mon, 21 Oct 2024 00:28:24 +0700 Subject: [PATCH 3/3] Check condition after calling lock method --- .../rabbit/stream/producer/RabbitStreamTemplate.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java index 8b6f62cb79..8a42b52101 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java @@ -111,6 +111,7 @@ private Producer createOrGetProducer() { if (this.producer == null) { this.lock.lock(); try { + if (this.producer == null) { ProducerBuilder builder = this.environment.producerBuilder(); if (this.superStreamRouting == null) { builder.stream(this.streamName); @@ -125,6 +126,7 @@ private Producer createOrGetProducer() { ((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier( () -> this.producer.messageBuilder()); } + } } finally { this.lock.unlock(); @@ -332,8 +334,10 @@ public void close() { if (this.producer != null) { this.lock.lock(); try { + if (this.producer != null) { this.producer.close(); this.producer = null; + } } finally { this.lock.unlock();