Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DirectMessageListenerContainer fails to recover consuming after node fail #1034

Closed
bendyna opened this issue Jun 25, 2019 · 5 comments · Fixed by #1039 or #1042
Closed

DirectMessageListenerContainer fails to recover consuming after node fail #1034

bendyna opened this issue Jun 25, 2019 · 5 comments · Fixed by #1039 or #1042
Assignees
Milestone

Comments

@bendyna
Copy link

bendyna commented Jun 25, 2019

Version 2.1.7.RELEASE

If node in RabbitMQ cluster fails, DirectMessageListenerContainer does not reconnect to other node. Docker containers, where I could reproduce it, attached.
docker.zip

pom.xml

    ...
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>
    </dependencies>
    ...

Application.java

@SpringBootApplication
public class Application {

    private static final Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(ConnectionFactory connectionFactory) {
        return args -> {
            String queueName = "queue";

            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.declareQueue(new Queue(queueName));

            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            for (int i = 0; i < 10000; i++) {
                rabbitTemplate.send(queueName, new Message(Integer.toString(i).getBytes(), new MessageProperties()));
            }
            logger.info("Message were sent");

            try {
                Thread.sleep(10000); // wait for queue mirroring
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }

            DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
//            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setQueueNames(queueName);
            container.setPrefetchCount(1);
            container.setMessageListener(message -> {
                try {
                    Thread.sleep(5);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            container.start();
        };
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("localhost:5672,localhost:5673,localhost:5674");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("test");
        return connectionFactory;
    }

}

Steps to reproduce (with docker):

  1. run docker-compose: "docker-compose -f rabbit-ha.yml up -d"
  2. wait until RabbitMQ cluster is started and Management is available on localhost:15673
  3. run spring boot application
  4. wait until consuming started
  5. usually client connects to first address - localhost:5672, so stop first node: "docker-compose -f rabbit-ha.yml stop rabbitmq1"

I tried this 10 times and in 7 times out of 10 DMLC was unable to continue consuming. Log:

2019-06-25 17:58:51.894 ERROR 4951 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
2019-06-25 17:58:51.895  INFO 4951 --- [ool-1-thread-12] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672, localhost:5673, localhost:5674]
2019-06-25 17:58:51.896  WARN 4951 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler     : An unexpected connection driver error occured (Exception message: Connection reset)
2019-06-25 17:58:51.899  INFO 4951 --- [ool-1-thread-12] o.s.a.r.c.CachingConnectionFactory       : Created new connection: connectionFactory#5ef5c734:1/SimpleConnection@17a9645f [delegate=amqp://[email protected]:5673/test, localPort= 43392]
2019-06-25 17:58:51.902 ERROR 4951 --- [ool-1-thread-12] o.s.a.r.l.DirectMessageListenerContainer : Error acking

java.lang.IllegalStateException: Channel closed; cannot ack/nack
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1131) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at com.sun.proxy.$Proxy47.basicAck(Unknown Source) ~[na:na]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleAck(DirectMessageListenerContainer.java:1063) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:997) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:956) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_212]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_212]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]

3 times out of 10 DMLC reconnected and recovered consuming. Log:

2019-06-25 17:55:39.224 ERROR 921 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
2019-06-25 17:55:39.347 ERROR 921 --- [nsumerMonitor-1] o.s.a.r.l.DirectMessageListenerContainer : Consumer canceled - channel closed SimpleConsumer [queue=queue, consumerTag=amq.ctag-PRL6yM5060BqWCnVpmMU2g identity=66a4c32a]
2019-06-25 17:55:44.347  INFO 921 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672, localhost:5673, localhost:5674]
2019-06-25 17:55:44.355  INFO 921 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: connectionFactory#2c78d320:1/SimpleConnection@1a526f2d [delegate=amqp://[email protected]:5673/test, localPort= 43252]
2019-06-25 17:55:44.370  INFO 921 --- [nsumerMonitor-1] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=queue, consumerTag=amq.ctag-_FDFe2xQt82rn5lPbaN-Gg identity=3d46acf5] started

Also tried same steps 10 times with SimpleMessageListenerContainer. All 10 times SMLC recovered consuming.

@garyrussell
Copy link
Contributor

@bendyna Is there anything in the log after the Error acking?

I can't reproduce it (using a real cluster on hardware, not docker).

I see the consumer monitor thread detecting the dead consumer and starting a new one. The Error acking is just noise.

With trace level logging, I see this...

2019-06-27 13:26:33.530  INFO 14943 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a415aa9:0/SimpleConnection@4f20a5e0 [delegate=amqp://[email protected]:5672/, localPort= 54899]
2019-06-27 13:26:33.578 DEBUG 14943 --- [           main] o.s.amqp.rabbit.core.RabbitAdmin         : Initializing declarations
2019-06-27 13:26:33.583 DEBUG 14943 --- [           main] o.s.amqp.rabbit.core.RabbitAdmin         : Nothing to declare
2019-06-27 13:26:33.591  INFO 14943 --- [           main] o.s.a.r.l.DirectMessageListenerContainer : Container initialized for queues: [foo]
2019-06-27 13:26:33.602 DEBUG 14943 --- [ntContainer#0-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,1)
2019-06-27 13:26:33.610 TRACE 14943 --- [ntContainer#0-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) channel.basicQos([250])
2019-06-27 13:26:33.639 TRACE 14943 --- [ntContainer#0-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) channel.queueDeclarePassive([foo])
2019-06-27 13:26:33.660 TRACE 14943 --- [ntContainer#0-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) channel.basicConsume([foo, false, , false, false, {}, SimpleConsumer [queue=foo, consumerTag=null identity=179562b6]])
2019-06-27 13:26:33.720  INFO 14943 --- [ntContainer#0-1] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=foo, consumerTag=amq.ctag-cssndWJmAzjH9BrveyyBdw identity=179562b6] started
2019-06-27 13:26:33.720 DEBUG 14943 --- [pool-1-thread-3] o.s.a.r.l.DirectMessageListenerContainer : New SimpleConsumer [queue=foo, consumerTag=amq.ctag-cssndWJmAzjH9BrveyyBdw identity=179562b6] consumeOk
2019-06-27 13:26:33.722 DEBUG 14943 --- [pool-1-thread-4] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=foo, consumerTag=amq.ctag-cssndWJmAzjH9BrveyyBdw identity=179562b6] received (Body:'foo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=foo, deliveryTag=1, consumerTag=amq.ctag-cssndWJmAzjH9BrveyyBdw, consumerQueue=foo])
foo
2019-06-27 13:26:38.594 TRACE 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) channel.isOpen()
2019-06-27 13:26:43.589 TRACE 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) channel.isOpen()
2019-06-27 13:26:44.362 ERROR 14943 --- [ 10.0.0.13:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
2019-06-27 13:26:44.366  WARN 14943 --- [ 10.0.0.13:5672] c.r.c.impl.ForgivingExceptionHandler     : An unexpected connection driver error occured (Exception message: Connection reset)
2019-06-27 13:26:48.592 TRACE 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) channel.isOpen()
2019-06-27 13:26:48.593 TRACE 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) channel.isOpen()
2019-06-27 13:26:48.593 TRACE 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) channel.getTargetChannel()
2019-06-27 13:26:48.595 ERROR 14943 --- [nsumerMonitor-1] o.s.a.r.l.DirectMessageListenerContainer : Consumer canceled - channel closed SimpleConsumer [queue=foo, consumerTag=amq.ctag-cssndWJmAzjH9BrveyyBdw identity=179562b6]
2019-06-27 13:26:48.596 TRACE 14943 --- [nsumerMonitor-1] o.s.a.r.l.DirectMessageListenerContainer : Consumers to restart now: [SimpleConsumer [queue=foo, consumerTag=amq.ctag-cssndWJmAzjH9BrveyyBdw identity=179562b6]]
2019-06-27 13:26:48.596 TRACE 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) channel.close()
2019-06-27 13:26:48.596 DEBUG 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : Closing cached Channel: AMQChannel(amqp://[email protected]:5672/,1)
2019-06-27 13:26:48.602 TRACE 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) is already closed

com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
	at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:396) ~[amqp-client-5.4.3.jar:5.4.3]
	at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:292) ~[amqp-client-5.4.3.jar:5.4.3]
	at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:607) ~[amqp-client-5.4.3.jar:5.4.3]
	at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:541) ~[amqp-client-5.4.3.jar:5.4.3]
	at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534) ~[amqp-client-5.4.3.jar:5.4.3]
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.physicalClose(CachingConnectionFactory.java:1294) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1101) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at com.sun.proxy.$Proxy50.close(Unknown Source) [na:na]
	at org.springframework.amqp.rabbit.connection.RabbitUtils.closeChannel(RabbitUtils.java:111) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.finalizeConsumer(DirectMessageListenerContainer.java:1159) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.cancelConsumer(DirectMessageListenerContainer.java:1154) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.lambda$checkConsumers$4(DirectMessageListenerContainer.java:500) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at java.util.ArrayList.forEach(ArrayList.java:1257) ~[na:1.8.0_212]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.checkConsumers(DirectMessageListenerContainer.java:489) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.lambda$startMonitor$2(DirectMessageListenerContainer.java:427) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_212]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_212]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_212]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_212]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_212]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_212]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_212]

2019-06-27 13:26:48.603 DEBUG 14943 --- [nsumerMonitor-1] o.s.a.r.l.DirectMessageListenerContainer : Attempting to restart consumer SimpleConsumer [queue=foo, consumerTag=amq.ctag-cssndWJmAzjH9BrveyyBdw identity=179562b6]
2019-06-27 13:26:48.603  INFO 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [10.0.0.13:5672, 10.0.0.3:5672]
2019-06-27 13:26:48.610  INFO 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a415aa9:1/SimpleConnection@f3a7f58 [delegate=amqp://[email protected]:5672/, localPort= 54915]
2019-06-27 13:26:48.610 DEBUG 14943 --- [nsumerMonitor-1] o.s.amqp.rabbit.core.RabbitAdmin         : Initializing declarations
2019-06-27 13:26:48.610 DEBUG 14943 --- [nsumerMonitor-1] o.s.amqp.rabbit.core.RabbitAdmin         : Nothing to declare
2019-06-27 13:26:48.612 DEBUG 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,1)
2019-06-27 13:26:48.612 TRACE 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) channel.basicQos([250])
2019-06-27 13:26:48.613 TRACE 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) channel.queueDeclarePassive([foo])
2019-06-27 13:26:48.623 TRACE 14943 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/,1) channel.basicConsume([foo, false, , false, false, {}, SimpleConsumer [queue=foo, consumerTag=null identity=42122e99]])
2019-06-27 13:26:48.635  INFO 14943 --- [nsumerMonitor-1] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=foo, consumerTag=amq.ctag-CmwRD_DW4Z5JtajFjJeyFA identity=42122e99] started
2019-06-27 13:26:48.636 DEBUG 14943 --- [ool-1-thread-10] o.s.a.r.l.DirectMessageListenerContainer : New SimpleConsumer [queue=foo, consumerTag=amq.ctag-CmwRD_DW4Z5JtajFjJeyFA identity=42122e99] consumeOk
2019-06-27 13:26:48.636 DEBUG 14943 --- [ool-1-thread-10] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=foo, consumerTag=amq.ctag-CmwRD_DW4Z5JtajFjJeyFA identity=42122e99] received (Body:'foo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=foo, deliveryTag=1, consumerTag=amq.ctag-CmwRD_DW4Z5JtajFjJeyFA, consumerQueue=foo])

@bendyna
Copy link
Author

bendyna commented Jun 28, 2019

Trace logs:

2019-06-28 10:00:23.096 DEBUG 32612 --- [ool-1-thread-12] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=queue, consumerTag=amq.ctag-Jp7F5_Tfu_T4hehG5EHR7Q identity=69bd62d8] received (Body:'[B@103bb243(byte[4])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=queue, deliveryTag=1113, consumerTag=amq.ctag-Jp7F5_Tfu_T4hehG5EHR7Q, consumerQueue=queue])
2019-06-28 10:00:23.101 TRACE 32612 --- [ool-1-thread-12] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/test,1) channel.basicAck([1113, false])
2019-06-28 10:00:23.103 DEBUG 32612 --- [ool-1-thread-13] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=queue, consumerTag=amq.ctag-Jp7F5_Tfu_T4hehG5EHR7Q identity=69bd62d8] received (Body:'[B@707fea5(byte[4])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=queue, deliveryTag=1114, consumerTag=amq.ctag-Jp7F5_Tfu_T4hehG5EHR7Q, consumerQueue=queue])
2019-06-28 10:00:23.108 TRACE 32612 --- [ool-1-thread-13] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/test,1) channel.basicAck([1114, false])
2019-06-28 10:00:23.109 DEBUG 32612 --- [ool-1-thread-14] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=queue, consumerTag=amq.ctag-Jp7F5_Tfu_T4hehG5EHR7Q identity=69bd62d8] received (Body:'[B@37d0c88f(byte[4])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=queue, deliveryTag=1115, consumerTag=amq.ctag-Jp7F5_Tfu_T4hehG5EHR7Q, consumerQueue=queue])
2019-06-28 10:00:23.111 ERROR 32612 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
2019-06-28 10:00:23.114 TRACE 32612 --- [ool-1-thread-14] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/test,1) channel.basicAck([1115, false])
2019-06-28 10:00:23.114 DEBUG 32612 --- [ool-1-thread-14] o.s.a.r.c.CachingConnectionFactory       : Detected closed channel on exception.  Re-initializing: AMQChannel(amqp://[email protected]:5672/test,1)
2019-06-28 10:00:23.114  INFO 32612 --- [ool-1-thread-14] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672, localhost:5673, localhost:5674]
2019-06-28 10:00:23.115  WARN 32612 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler     : An unexpected connection driver error occured (Exception message: Connection reset)
2019-06-28 10:00:23.119  INFO 32612 --- [ool-1-thread-14] o.s.a.r.c.CachingConnectionFactory       : Created new connection: connectionFactory#76f4b65:1/SimpleConnection@26f9e638 [delegate=amqp://[email protected]:5673/test, localPort= 45332]
2019-06-28 10:00:23.119 TRACE 32612 --- [ool-1-thread-14] o.s.retry.support.RetryTemplate          : RetryContext retrieved: [RetryContext: count=0, lastException=null, exhausted=false]
2019-06-28 10:00:23.119 DEBUG 32612 --- [ool-1-thread-14] o.s.retry.support.RetryTemplate          : Retry: count=0
2019-06-28 10:00:23.119 DEBUG 32612 --- [ool-1-thread-14] o.s.amqp.rabbit.core.RabbitAdmin         : Initializing declarations
2019-06-28 10:00:23.119 DEBUG 32612 --- [ool-1-thread-14] o.s.amqp.rabbit.core.RabbitAdmin         : Nothing to declare
2019-06-28 10:00:23.121 ERROR 32612 --- [ool-1-thread-14] o.s.a.r.l.DirectMessageListenerContainer : Error acking

java.lang.IllegalStateException: Channel closed; cannot ack/nack
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1131) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at com.sun.proxy.$Proxy47.basicAck(Unknown Source) ~[na:na]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleAck(DirectMessageListenerContainer.java:1063) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:997) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:956) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_212]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_212]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]

2019-06-28 10:00:26.305 TRACE 32612 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5673/test,1) channel.isOpen()
2019-06-28 10:00:31.305 TRACE 32612 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5673/test,1) channel.isOpen()
2019-06-28 10:00:36.305 TRACE 32612 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5673/test,1) channel.isOpen()
2019-06-28 10:00:41.305 TRACE 32612 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5673/test,1) channel.isOpen()
2019-06-28 10:00:46.305 TRACE 32612 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5673/test,1) channel.isOpen()
2019-06-28 10:00:51.305 TRACE 32612 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5673/test,1) channel.isOpen()

and this last line ("... channel.isOpen()") logged infinitely every 5 seconds

@garyrussell
Copy link
Contributor

That's bizarre - it implies the channel is reporting that it is open; not sure how that can happen but I will add some code to use the failure to nack as an alternative signal that the consumer is dead.

@garyrussell garyrussell added this to the 2.1.8 milestone Jun 28, 2019
garyrussell added a commit to garyrussell/spring-amqp that referenced this issue Jun 28, 2019
Resolves spring-projects#1034

The monitor task now cancels the consumer after a failed ack/nack,
whether or not the channel `isOpen()` returns true.

Test with a mock channel that stays open after a failed ack.

**cherry-pick to 2.1.x, 2.0.x**
artembilan pushed a commit that referenced this issue Jun 28, 2019
Resolves #1034

The monitor task now cancels the consumer after a failed ack/nack,
whether or not the channel `isOpen()` returns true.

Test with a mock channel that stays open after a failed ack.

**cherry-pick to 2.1.x, 2.0.x**
artembilan pushed a commit that referenced this issue Jun 28, 2019
Resolves #1034

The monitor task now cancels the consumer after a failed ack/nack,
whether or not the channel `isOpen()` returns true.

Test with a mock channel that stays open after a failed ack.

**cherry-pick to 2.1.x, 2.0.x**

* Fix tests removing AssertJ dependency
garyrussell added a commit that referenced this issue Jun 28, 2019
Resolves #1034

The monitor task now cancels the consumer after a failed ack/nack,
whether or not the channel `isOpen()` returns true.

Test with a mock channel that stays open after a failed ack.

**cherry-pick to 2.1.x, 2.0.x**

* Fix tests removing AssertJ dependency
@bendyna
Copy link
Author

bendyna commented Jun 30, 2019

@garyrussell it fixes issue with ack, but problem is more common

As I understand it, DMLC creates channel with CachingConnectionFactory. But it's not real channel, it is proxy channel, which has real channel inside (field "target"). But proxy channel can replace target, and DMLC does not know about it. DMLC "thinks" that it is the same channel, and checks if it is open in method checkConsumers(). And this new channel is open, but basicConsume was not sent to this new channel.

You fixed case with ack, but it's also possible to reproduce it with nack.

@SpringBootApplication
public class Application {

    private static final Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(ConnectionFactory connectionFactory) {
        return args -> {
            String queueName = "queue";

            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            rabbitAdmin.declareQueue(new Queue(queueName));

            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.send(queueName, new Message("1".getBytes(), new MessageProperties()));
            logger.info("Message were sent");

            try {
                Thread.sleep(10000); // wait for queue mirroring
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }

            DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
            //            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setQueueNames(queueName);
            container.setPrefetchCount(1);
            container.setMessageListener(message -> {
                try {
                    Thread.sleep(20000);
                    throw new RuntimeException("exception in onMessage");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });

            container.setMonitorInterval(50000);
            container.setFailedDeclarationRetryInterval(50000);
            container.setIdleEventInterval(100000);

            container.start();
        };
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("localhost:5672,localhost:5673,localhost:5674");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("test");
        return connectionFactory;
    }

}

I set larger container intervals for reproducibility.

Log:

2019-06-30 10:44:46.251  INFO 5465 --- [           main] com.test.Application                     : Message were sent
2019-06-30 10:44:56.285  INFO 5465 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2019-06-30 10:44:56.285 DEBUG 5465 --- [           main] o.s.a.r.l.DirectMessageListenerContainer : Starting Rabbit listener container.
2019-06-30 10:44:56.288  INFO 5465 --- [           main] o.s.a.r.l.DirectMessageListenerContainer : Container initialized for queues: [queue]
2019-06-30 10:44:56.290 TRACE 5465 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/test,1), conn: Proxy@32811494 Shared Rabbit Connection: SimpleConnection@4c4748bf [delegate=amqp://[email protected]:5672/test, localPort= 34200] retrieved from cache
2019-06-30 10:44:56.290 TRACE 5465 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/test,1) channel.isOpen()
2019-06-30 10:44:56.290 TRACE 5465 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : Found cached Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/test,1), conn: Proxy@32811494 Shared Rabbit Connection: SimpleConnection@4c4748bf [delegate=amqp://[email protected]:5672/test, localPort= 34200]
2019-06-30 10:44:56.290 TRACE 5465 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/test,1) channel.basicQos([1])
2019-06-30 10:44:56.291 TRACE 5465 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/test,1) channel.queueDeclarePassive([queue])
2019-06-30 10:44:56.292 TRACE 5465 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/test,1) channel.basicConsume([queue, false, , false, false, {}, SimpleConsumer [queue=queue, consumerTag=null identity=429b7052]])
2019-06-30 10:44:56.294 DEBUG 5465 --- [pool-1-thread-3] o.s.a.r.l.DirectMessageListenerContainer : New SimpleConsumer [queue=queue, consumerTag=amq.ctag-zWdlzvG5p-fdQOHJc3K6Ug identity=429b7052] consumeOk
2019-06-30 10:44:56.294  INFO 5465 --- [cTaskExecutor-1] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=queue, consumerTag=amq.ctag-zWdlzvG5p-fdQOHJc3K6Ug identity=429b7052] started
2019-06-30 10:44:56.295 DEBUG 5465 --- [pool-1-thread-4] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=queue, consumerTag=amq.ctag-zWdlzvG5p-fdQOHJc3K6Ug identity=429b7052] received (Body:'[B@4f4e6b01(byte[1])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=queue, deliveryTag=1, consumerTag=amq.ctag-zWdlzvG5p-fdQOHJc3K6Ug, consumerQueue=queue])
2019-06-30 10:45:04.594 ERROR 5465 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
2019-06-30 10:45:16.302  WARN 5465 --- [pool-1-thread-4] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1651) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1603) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:996) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:956) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_212]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_212]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Caused by: java.lang.RuntimeException: exception in onMessage
	at com.test.Application.lambda$null$0(Application.java:55) ~[classes/:na]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1600) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	... 11 common frames omitted

2019-06-30 10:45:16.305 ERROR 5465 --- [pool-1-thread-4] o.s.a.r.l.DirectMessageListenerContainer : Failed to invoke listener

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1651) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1603) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:996) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:956) [spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_212]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_212]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Caused by: java.lang.RuntimeException: exception in onMessage
	at com.test.Application.lambda$null$0(Application.java:55) ~[classes/:na]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1600) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
	... 11 common frames omitted

2019-06-30 10:45:16.306 DEBUG 5465 --- [pool-1-thread-4] o.s.a.r.l.DirectMessageListenerContainer : Rejecting messages (requeue=true)
2019-06-30 10:45:16.306 TRACE 5465 --- [pool-1-thread-4] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5672/test,1) channel.basicNack([1, true, true])
2019-06-30 10:45:16.307 DEBUG 5465 --- [pool-1-thread-4] o.s.a.r.c.CachingConnectionFactory       : Detected closed channel on exception.  Re-initializing: AMQChannel(amqp://[email protected]:5672/test,1)
2019-06-30 10:45:16.307  INFO 5465 --- [pool-1-thread-4] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672, localhost:5673, localhost:5674]
2019-06-30 10:45:16.317  INFO 5465 --- [pool-1-thread-4] o.s.a.r.c.CachingConnectionFactory       : Created new connection: connectionFactory#38aa816f:1/SimpleConnection@95ee525 [delegate=amqp://[email protected]:5673/test, localPort= 52246]
2019-06-30 10:45:16.318 DEBUG 5465 --- [pool-1-thread-4] o.s.amqp.rabbit.core.RabbitAdmin         : Initializing declarations
2019-06-30 10:45:16.318 DEBUG 5465 --- [pool-1-thread-4] o.s.amqp.rabbit.core.RabbitAdmin         : Nothing to declare
2019-06-30 10:45:46.288 TRACE 5465 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5673/test,1) channel.isOpen()
2019-06-30 10:46:36.288 TRACE 5465 --- [nsumerMonitor-1] o.s.a.r.c.CachingConnectionFactory       : AMQChannel(amqp://[email protected]:5673/test,1) channel.isOpen()

I think, DMLC should somehow monitor if target channel was recreated in proxy, and restart consumer in this case.

@garyrussell garyrussell reopened this Jul 1, 2019
@garyrussell
Copy link
Contributor

Good point.

garyrussell added a commit to garyrussell/spring-amqp that referenced this issue Jul 1, 2019
Fixes spring-projects#1034

If the connection factory refreshed the target connection, the DMLC
is not made aware of it and so we never consume from the new channel.

**cherry-pick to all 2.x**
garyrussell added a commit to garyrussell/spring-amqp that referenced this issue Jul 1, 2019
Fixes spring-projects#1034

If the connection factory refreshed the target connection, the DMLC
is not made aware of it and so we never consume from the new channel.

**cherry-pick to all 2.x**
artembilan pushed a commit that referenced this issue Jul 1, 2019
Fixes #1034

If the connection factory refreshed the target connection, the DMLC
is not made aware of it and so we never consume from the new channel.

**cherry-pick to all 2.x**
artembilan pushed a commit that referenced this issue Jul 1, 2019
Fixes #1034

If the connection factory refreshed the target connection, the DMLC
is not made aware of it and so we never consume from the new channel.

**cherry-pick to all 2.x**

# Conflicts:
#	spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerMockTests.java
artembilan pushed a commit that referenced this issue Jul 1, 2019
Fixes #1034

If the connection factory refreshed the target connection, the DMLC
is not made aware of it and so we never consume from the new channel.

**cherry-pick to all 2.x**

# Conflicts:
#	spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerMockTests.java

# Conflicts:
#	spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java
#	spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerMockTests.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants