Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-consumers binding different parameters #879

Closed
lzz666 opened this issue Jan 2, 2019 · 6 comments
Closed

Multi-consumers binding different parameters #879

lzz666 opened this issue Jan 2, 2019 · 6 comments

Comments

@lzz666
Copy link

lzz666 commented Jan 2, 2019

version:
spring-boot 2.0.4.RELEASE

when a message is routed to Multi-consumers, a consumer needs to deserialize message, but another consumer receives it directly,which causes an exception,such as

Caused by: org.springframework.amqp.AmqpException: No method found for class com.lzz.MessageBody
	at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:147) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:250) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:70) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
	... 8 common frames omitted

SendMessage:amqpTemplate.convertAndSend("exchange","T.D",message);
ReceveMessage1:@RabbitHandler public void messageReceiver(MessageBody message){ log.info("time is {}",message.getMessage()); }
ReceveMessage2: @RabbitHandler public void messageReceiver(Message message){ log.info("date is {}",message); }

Expected two ways can be used simultaneously

@garyrussell
Copy link
Contributor

Your question is not clear. What is MessageBody ? What is message in the send() ?

You need to show your configuration.

@lzz666
Copy link
Author

lzz666 commented Jan 14, 2019

message is the object of MessageBody,MessageBody is class,which contain timestamp, query and other fields;

My configuration is as follows:
rabbitconfig:

package com.lzz.config;

import com.lzz.config.messageconverter.CustomJsonMessageConverter;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitConfig {

    @Bean
    public Queue time(){
        return new Queue("time");
    }

    @Bean
    public Queue date(){
        return new Queue("date");
    }

    @Bean
    public Queue chars(){
        return new Queue("chars");
    }
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange("exchange");
    }

    @Bean
    public Binding bindingTime(Queue time,TopicExchange exchange){
        return BindingBuilder.bind(time).to(exchange).with("#.T.#");
    }

    @Bean
    public Binding bindingDate(Queue date, TopicExchange exchange){
        return BindingBuilder.bind(date).to(exchange).with("#.D.#");
    }

    @Bean
    public Binding bindingChar(Queue chars, TopicExchange exchange){
        return BindingBuilder.bind(chars).to(exchange).with("#.C.#");
    }

    @Bean
    public AmqpTemplate amqpTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
}

ReceveMessage:

@Service
@Slf4j
@RabbitListener(queues = "time")
public class TimeStatisticService {

    /**
     * @param message
     */
    @RabbitHandler
    public void messageReceiver(MessageBody message){
        log.info("time is {}",message.getMessage());
    }
}

messagebody:

@Data
@AllArgsConstructor
public class MessageBody{
    private String timestamp;
    private String query;
}

@garyrussell
Copy link
Contributor

garyrussell commented Jan 14, 2019

You also need to show the @RabbitListener that receives Message.

Also please read about GitHub markup.

@lzz666
Copy link
Author

lzz666 commented Jan 17, 2019

so sorry,the listeneris as follows:

package com.lzz.service.statistic.listener;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
@Slf4j
@RabbitListener(queues = "date")
public class DateStatisticService {

     @RabbitHandler
    public void messageReceiver(Message message, Channel channel) throws IOException {
        try{
            log.info("date is {}",message);
            log.info("tag is {}",message.getMessageProperties().getDeliveryTag());
        }catch (Exception e){
            log.error("error is {}",e);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }

    }
}

In addition to, messageBody should be:

@Data
@AllArgsConstructor
public class MessageBody{
    private String timestamp;
    private String query;
    private String message;
}

@garyrussell
Copy link
Contributor

garyrussell commented Jan 17, 2019

Again; please use proper markup - see my edits.

@RabbitHandler is intended for multi-method listeners

@RabbitListener(...)
public class Foo {
    
    @RabbitHandler
    public void handleFoo(Foo foo) {...}

    @RabbitHandler
    public void handleBar(Bar bar) {...}

}

So we can determine which method to call after the payload has been converted.

If you want the raw message, put @RabbitListener on the method instead of the class.

@Component
class Foo {

	@RabbitListener(queues = "foo")
	public void handle(Message message) {
		System.out.println(message);
	}

}

We will change the documentation to make that clearer.

garyrussell added a commit to garyrussell/spring-amqp that referenced this issue Jan 17, 2019
garyrussell added a commit to garyrussell/spring-amqp that referenced this issue Jan 17, 2019
artembilan pushed a commit that referenced this issue Jan 17, 2019
@artembilan artembilan added this to the 2.1.4 milestone Jan 18, 2019
@lzz666
Copy link
Author

lzz666 commented Jan 26, 2019

Thank you for your help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants