Skip to content

Commit

Permalink
AMQP-801-2: Introduce ConsumerDecorator
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-801

To properly assign the queue to the `ConsumeOkEvent`, we need perform
such a logic in the `Consumer.handleConsumeOk()`.

* Introduce `BlockingQueueConsumer.ConsumerDecorator` to be created on
each `channel.basicConsume()` for wrapping the target `InternalConsumer`
per queue
* Add getters to the `ConsumeOkEvent` for better interoperability
* Assert assigned queue names for the `ConsumeOkEvent`s in the
`SimpleMessageListenerContainerIntegration2Tests`

**Cherry-pick to 1.7.x**

* Add `ConsumerDecorator.consumerTag` property
* Add `ConsumerDecorator.toString()`
* Add JavaDocs for the `ConsumeOkEvent`
  • Loading branch information
artembilan authored and garyrussell committed Mar 8, 2018
1 parent 3dba703 commit e50cb89
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Recoverable;
Expand Down Expand Up @@ -237,10 +238,11 @@ public BlockingQueueConsumer(ConnectionFactory connectionFactory,
* @param queues The queues.
*/
public BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
Map<String, Object> consumerArgs, boolean exclusive, String... queues) {
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
Map<String, Object> consumerArgs, boolean exclusive, String... queues) {

this(connectionFactory, messagePropertiesConverter, activeObjectCounter, acknowledgeMode, transactional,
prefetchCount, defaultRequeueRejected, consumerArgs, false, exclusive, queues);
}
Expand Down Expand Up @@ -483,7 +485,9 @@ private Message handle(Delivery delivery) throws InterruptedException {
* @throws ShutdownSignalException if the connection is shut down while waiting
*/
public Message nextMessage() throws InterruptedException, ShutdownSignalException {
logger.trace("Retrieving delivery for " + this);
if (logger.isTraceEnabled()) {
logger.trace("Retrieving delivery for " + this);
}
return handle(this.queue.take());
}

Expand Down Expand Up @@ -673,8 +677,10 @@ private void addRecoveryListener() {

private void consumeFromQueue(String queue) throws IOException {
String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal, this.exclusive,
this.consumerArgs, this.consumer);
(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
this.exclusive, this.consumerArgs,
new ConsumerDecorator(queue, this.consumer, this.applicationEventPublisher));

if (consumerTag != null) {
this.consumerTags.put(consumerTag, queue);
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -817,7 +823,7 @@ public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
*/
boolean isLocallyTransacted = locallyTransacted
|| (this.transactional
&& TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
&& TransactionSynchronizationManager.getResource(this.connectionFactory) == null);
try {

boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
Expand Down Expand Up @@ -884,11 +890,6 @@ public void handleConsumeOk(String consumerTag) {
if (logger.isDebugEnabled()) {
logger.debug("ConsumeOK: " + BlockingQueueConsumer.this);
}
if (BlockingQueueConsumer.this.applicationEventPublisher != null) {
String queueName = BlockingQueueConsumer.this.consumerTags.get(consumerTag);
BlockingQueueConsumer.this.applicationEventPublisher
.publishEvent(new ConsumeOkEvent(this, queueName, consumerTag));
}
}

@Override
Expand Down Expand Up @@ -962,6 +963,68 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp

}

private static final class ConsumerDecorator implements Consumer {

private final String queue;

private final Consumer delegate;

private final ApplicationEventPublisher applicationEventPublisher;

private String consumerTag;

ConsumerDecorator(String queue, Consumer delegate, ApplicationEventPublisher applicationEventPublisher) {
this.queue = queue;
this.delegate = delegate;
this.applicationEventPublisher = applicationEventPublisher;
}


@Override
public void handleConsumeOk(String consumerTag) {
this.consumerTag = consumerTag;
this.delegate.handleConsumeOk(consumerTag);
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new ConsumeOkEvent(this.delegate, this.queue, consumerTag));
}
}

@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
this.delegate.handleShutdownSignal(consumerTag, sig);
}

@Override
public void handleCancel(String consumerTag) throws IOException {
this.delegate.handleCancel(consumerTag);
}

@Override
public void handleCancelOk(String consumerTag) {
this.delegate.handleCancelOk(consumerTag);
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {

this.delegate.handleDelivery(consumerTag, envelope, properties, body);
}

@Override
public void handleRecoverOk(String consumerTag) {
this.delegate.handleRecoverOk(consumerTag);
}

@Override
public String toString() {
return "ConsumerDecorator{" + "queue='" + this.queue + '\'' +
", consumerTag='" + this.consumerTag + '\'' +
'}';
}

}

@SuppressWarnings("serial")
private static final class DeclarationException extends AmqpException {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017 the original author or authors.
* Copyright 2017-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,12 @@
import org.springframework.amqp.event.AmqpEvent;

/**
* An {@link AmqpEvent} emitted by the listener container
* when consumer is subscribed to the queue.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 1.7.5
*
*/
Expand All @@ -30,12 +35,37 @@ public class ConsumeOkEvent extends AmqpEvent {

private final String consumerTag;

/**
* Instantiate a {@link ConsumeOkEvent} based on the provided
* consumer, queue and consumer tag.
* @param source the consumer subscribed to the queue
* @param queue the queue to consume
* @param consumerTag the tag indicate a consumer subscription
*/
public ConsumeOkEvent(Object source, String queue, String consumerTag) {
super(source);
this.queue = queue;
this.consumerTag = consumerTag;
}

/**
* Obtain the queue name a consumer has been subscribed.
* @return the queue name a consumer subscribed.
* @since 1.7.7
*/
public String getQueue() {
return this.queue;
}

/**
* Obtain the consumer tag assigned to the consumer.
* @return the consumer tag for subscription.
* @since 1.7.7
*/
public String getConsumerTag() {
return this.consumerTag;
}

@Override
public String toString() {
return "ConsumeOkEvent [queue=" + this.queue + ", consumerTag=" + this.consumerTag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.isOneOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -90,6 +91,7 @@
* @author Gunnar Hillert
* @author Gary Russell
* @author Artem Bilan
*
* @since 1.3
*
*/
Expand Down Expand Up @@ -283,7 +285,11 @@ public void publishEvent(ApplicationEvent event) {
assertThat(events.size(), equalTo(8));
assertThat(events.get(0), instanceOf(AsyncConsumerStartedEvent.class));
assertThat(events.get(1), instanceOf(ConsumeOkEvent.class));
ConsumeOkEvent consumeOkEvent = (ConsumeOkEvent) events.get(1);
assertThat(consumeOkEvent.getQueue(), isOneOf(this.queue.getName(), this.queue1.getName()));
assertThat(events.get(2), instanceOf(ConsumeOkEvent.class));
consumeOkEvent = (ConsumeOkEvent) events.get(2);
assertThat(consumeOkEvent.getQueue(), isOneOf(this.queue.getName(), this.queue1.getName()));
assertSame(events.get(3), eventRef.get());
assertThat(events.get(4), instanceOf(AsyncConsumerRestartedEvent.class));
assertThat(events.get(5), instanceOf(ConsumeOkEvent.class));
Expand Down

0 comments on commit e50cb89

Please sign in to comment.