Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve condition #2874

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,6 +55,7 @@
*
* @author Gary Russell
* @author Christian Tzolov
* @author Ngoc Nhan
* @since 2.4
*
*/
Expand Down Expand Up @@ -107,29 +108,31 @@ public RabbitStreamTemplate(Environment environment, String streamName) {


private Producer createOrGetProducer() {
this.lock.lock();
try {
if (this.producer == null) {
ProducerBuilder builder = this.environment.producerBuilder();
if (this.superStreamRouting == null) {
builder.stream(this.streamName);
}
else {
builder.superStream(this.streamName)
.routing(this.superStreamRouting);
}
this.producerCustomizer.accept(this.beanName, builder);
this.producer = builder.build();
if (!this.streamConverterSet) {
((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier(
() -> this.producer.messageBuilder());
if (this.producer == null) {
artembilan marked this conversation as resolved.
Show resolved Hide resolved
this.lock.lock();
try {
if (this.producer == null) {
ProducerBuilder builder = this.environment.producerBuilder();
artembilan marked this conversation as resolved.
Show resolved Hide resolved
if (this.superStreamRouting == null) {
builder.stream(this.streamName);
}
else {
builder.superStream(this.streamName)
.routing(this.superStreamRouting);
}
this.producerCustomizer.accept(this.beanName, builder);
this.producer = builder.build();
if (!this.streamConverterSet) {
((DefaultStreamMessageConverter) this.streamConverter).setBuilderSupplier(
() -> this.producer.messageBuilder());
}
}
}
return this.producer;
}
finally {
this.lock.unlock();
finally {
this.lock.unlock();
}
}
return this.producer;
}

@Override
Expand Down Expand Up @@ -305,24 +308,13 @@ private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, Obs
}
else {
int code = confStatus.getCode();
String errorMessage;
switch (code) {
case Constants.CODE_MESSAGE_ENQUEUEING_FAILED:
errorMessage = "Message Enqueueing Failed";
break;
case Constants.CODE_PRODUCER_CLOSED:
errorMessage = "Producer Closed";
break;
case Constants.CODE_PRODUCER_NOT_AVAILABLE:
errorMessage = "Producer Not Available";
break;
case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT:
errorMessage = "Publish Confirm Timeout";
break;
default:
errorMessage = "Unknown code: " + code;
break;
}
String errorMessage = switch (code) {
case Constants.CODE_MESSAGE_ENQUEUEING_FAILED -> "Message Enqueueing Failed";
case Constants.CODE_PRODUCER_CLOSED -> "Producer Closed";
case Constants.CODE_PRODUCER_NOT_AVAILABLE -> "Producer Not Available";
case Constants.CODE_PUBLISH_CONFIRM_TIMEOUT -> "Publish Confirm Timeout";
default -> "Unknown code: " + code;
};
StreamSendException ex = new StreamSendException(errorMessage, code);
observation.error(ex);
observation.stop();
Expand All @@ -339,15 +331,17 @@ private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, Obs
*/
@Override
public void close() {
this.lock.lock();
try {
if (this.producer != null) {
this.producer.close();
this.producer = null;
if (this.producer != null) {
this.lock.lock();
try {
if (this.producer != null) {
this.producer.close();
artembilan marked this conversation as resolved.
Show resolved Hide resolved
this.producer = null;
}
}
finally {
this.lock.unlock();
}
}
finally {
this.lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.annotation.MergedAnnotation;
import org.springframework.core.annotation.MergedAnnotations;
import org.springframework.core.annotation.MergedAnnotations.SearchStrategy;
import org.springframework.core.convert.ConversionService;
Expand Down Expand Up @@ -357,7 +358,7 @@ else if (source instanceof Method method) {
}
return !name.contains("$MockitoMock$");
})
.map(ann -> ann.synthesize())
.map(MergedAnnotation::synthesize)
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -893,7 +894,7 @@ private void addToMap(Map<String, Object> map, String key, Object value, Class<?
}
}
else {
if (value instanceof String && !StringUtils.hasText((String) value)) {
if (value instanceof String string && !StringUtils.hasText(string)) {
putEmpty(map, key);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,12 @@ public Date nextRelease() {
if (this.messages.isEmpty() || this.timeout <= 0) {
return null;
}
else if (this.currentSize >= this.bufferLimit) {
if (this.currentSize >= this.bufferLimit) {
// release immediately, we're already over the limit
return new Date();
}
else {
return new Date(System.currentTimeMillis() + this.timeout);
}

return new Date(System.currentTimeMillis() + this.timeout);
}

@Override
Expand All @@ -122,9 +121,8 @@ public Collection<MessageBatch> releaseBatches() {
if (batch == null) {
return Collections.emptyList();
}
else {
return Collections.singletonList(batch);
}

return Collections.singletonList(batch);
}

private MessageBatch doReleaseBatch() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,14 +575,13 @@ private AbstractMessageListenerContainer createContainer() {
.acceptIfNotNull(this.retryDeclarationInterval, container::setRetryDeclarationInterval);
return container;
}
else {
DirectMessageListenerContainer container = new DirectMessageListenerContainer(this.connectionFactory);
JavaUtils.INSTANCE
.acceptIfNotNull(this.consumersPerQueue, container::setConsumersPerQueue)
.acceptIfNotNull(this.taskScheduler, container::setTaskScheduler)
.acceptIfNotNull(this.monitorInterval, container::setMonitorInterval);
return container;
}

DirectMessageListenerContainer container = new DirectMessageListenerContainer(this.connectionFactory);
JavaUtils.INSTANCE
.acceptIfNotNull(this.consumersPerQueue, container::setConsumersPerQueue)
.acceptIfNotNull(this.taskScheduler, container::setTaskScheduler)
.acceptIfNotNull(this.monitorInterval, container::setMonitorInterval);
return container;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public BeanDefinition parse(Element element, ParserContext parserContext) {
}

List<Element> childElements = DomUtils.getChildElementsByTagName(element, LISTENER_ELEMENT);
for (int i = 0; i < childElements.size(); i++) {
parseListener(childElements.get(i), element, parserContext, containerList);
for (Element childElement : childElements) {
parseListener(childElement, element, parserContext, containerList);
}

parserContext.popAndRegisterContainingComponent();
Expand Down Expand Up @@ -190,8 +190,8 @@ private void parseListener(Element listenerEle, Element containerEle, ParserCont
else {
String[] names = StringUtils.commaDelimitedListToStringArray(queues);
List<RuntimeBeanReference> values = new ManagedList<>();
for (int i = 0; i < names.length; i++) {
values.add(new RuntimeBeanReference(names[i].trim()));
for (String name : names) {
values.add(new RuntimeBeanReference(name.trim()));
}
containerDef.getPropertyValues().add("queues", values);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,6 +47,7 @@
*
* @author Dave Syer
* @author Gary Russell
* @author Ngoc Nhan
*
* @see RetryOperations#execute(org.springframework.retry.RetryCallback, org.springframework.retry.RecoveryCallback,
* org.springframework.retry.RetryState)
Expand Down Expand Up @@ -90,9 +91,8 @@ private NewMethodArgumentsIdentifier createNewItemIdentifier() {
if (StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier == null) {
return !message.getMessageProperties().isRedelivered();
}
else {
return StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier.isNew(message);
}

return StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier.isNew(message);
};
}

Expand Down Expand Up @@ -127,23 +127,19 @@ private MethodArgumentsKeyGenerator createKeyGenerator() {
}
return messageId;
}
else {
return StatefulRetryOperationsInterceptorFactoryBean.this.messageKeyGenerator.getKey(message);
}
return StatefulRetryOperationsInterceptorFactoryBean.this.messageKeyGenerator.getKey(message);
};
}

@SuppressWarnings("unchecked")
private Message argToMessage(Object[] args) {
Object arg = args[1];
Message message = null;
if (arg instanceof Message msg) {
message = msg;
return msg;
}
else if (arg instanceof List) {
message = ((List<Message>) arg).get(0);
if (arg instanceof List<?> list) {
return (Message) list.get(0);
}
return message;
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setTargetConnectionFactories(Map<Object, ConnectionFactory> targetCo
Assert.noNullElements(targetConnectionFactories.values().toArray(),
"'targetConnectionFactories' cannot have null values.");
this.targetConnectionFactories.putAll(targetConnectionFactories);
targetConnectionFactories.values().stream().forEach(cf -> checkConfirmsAndReturns(cf));
targetConnectionFactories.values().forEach(this::checkConfirmsAndReturns);
}

/**
Expand Down Expand Up @@ -293,7 +293,7 @@ public void destroy() {

@Override
public void resetConnection() {
this.targetConnectionFactories.values().forEach(factory -> factory.resetConnection());
this.targetConnectionFactories.values().forEach(ConnectionFactory::resetConnection);
this.defaultTargetConnectionFactory.resetConnection();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,9 @@ public static void unRegisterConsumerChannel() {
@Nullable
public static Channel getConsumerChannel() {
ChannelHolder channelHolder = consumerChannel.get();
Channel channel = null;
if (channelHolder != null) {
channel = channelHolder.getChannel();
}
return channel;
return channelHolder != null
? channelHolder.getChannel()
: null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
* expired. It also holds {@link CorrelationData} for
* the client to correlate a confirm with a sent message.
* @author Gary Russell
* @author Ngoc Nhan
* @since 1.0.1
*
*/
Expand Down Expand Up @@ -115,7 +116,7 @@ public void setReturned(boolean isReturned) {
* @since 2.2.10
*/
public boolean waitForReturnIfNeeded() throws InterruptedException {
return this.returned ? this.latch.await(RETURN_CALLBACK_TIMEOUT, TimeUnit.SECONDS) : true;
return !this.returned || this.latch.await(RETURN_CALLBACK_TIMEOUT, TimeUnit.SECONDS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
* @author Gary Russell
* @author Leonardo Ferreira
* @author Christian Tzolov
* @author Ngoc Nhan
* @since 2.3
*
*/
Expand Down Expand Up @@ -255,23 +256,21 @@ private Channel createProxy(Channel channel, boolean transacted) {
Advice advice =
(MethodInterceptor) invocation -> {
String method = invocation.getMethod().getName();
switch (method) {
case "close":
handleClose(channel, transacted, proxy);
return null;
case "getTargetChannel":
return channel;
case "isTransactional":
return transacted;
case "confirmSelect":
confirmSelected.set(true);
return channel.confirmSelect();
case "isConfirmSelected":
return confirmSelected.get();
case "isPublisherConfirms":
return false;
}
return null;
return switch (method) {
case "close" -> {
handleClose(channel, transacted, proxy);
yield null;
}
case "getTargetChannel" -> channel;
case "isTransactional" -> transacted;
case "confirmSelect" -> {
confirmSelected.set(true);
yield channel.confirmSelect();
}
case "isConfirmSelected" -> confirmSelected.get();
case "isPublisherConfirms" -> false;
default -> null;
};
};
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(advice);
advisor.addMethodName("close");
Expand Down
Loading