Skip to content

Integrate with Message Queue (Kafka)

Wuyi Chen edited this page Jul 3, 2019 · 2 revisions

Overview

There are several components in the structure of the integration of Spring microservices and message queue:

  • Message Broker
  • Message Producer
    • Source: The interface for serializing the object of POJO and publish it to a channel.
    • Channel: The queue to hold the published messages.
    • Binder: The adapter for integrating your Spring application with different message queue platforms.
  • Message Consumer
    • Binder: The adapter for integrating your Spring application with different message queue platforms.
    • Channel: The queue to hold the received messages.
    • Sink: The interface for listening to a channel for incoming messages and deserializing the message back to the object of POJO.

For integrating microservices with Kafka, you need to do several things:


Install Message Broker: ZooKeeper and Kafka

Kafka is a distributed system and uses Zookeeper to track the status of Kafka cluster nodes. So you need to install both ZooKeeper and Kafka:


Set Up Message Producer

Set dependencies

build.gradle

dependencies {
    compile group: 'org.springframework.cloud', name: 'spring-cloud-stream',               version: '1.3.3.RELEASE'
    compile group: 'org.springframework.cloud', name: 'spring-cloud-starter-stream-kafka', version: '1.3.3.RELEASE'  // This is telling Spring Cloud Stream to use Kafka as message broker
}

Change the application configuration

application.yml

spring:
  cloud:
    stream:
      bindings:
        output:
          destination:  orgChangeTopic    # The message topic name or message queue name
          content-type: application/json  # The type of messages are going to be sent and received
        kafka:
          binder:
             zkNodes: localhost           # The location of ZooKeeper
             brokers: localhost           # The location of message broker (Kafka)

Change the application bootstrap class

Application.java

@SpringBootApplication
@EnableBinding(Source.class)    // Binds the application with the message broker
                                // This service will communicate with the message broker through the channels defined in the Source class
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

Implement a class for publishing messages

SimpleSourceBean.java

@Component
public class SimpleSourceBean {
    private Source source;

    private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);

    @Autowired
    public SimpleSourceBean(Source source) {                                   // Spring Cloud Stream will inject the implementation of Source interface to here
        this.source = source;
    }

    public void publish(OrganizationChangeModel change){                       // The change object will be serialized into JSON and published
        source.output().send(MessageBuilder.withPayload(change).build());      // The source.output() will return a channel and allow you to call send() for sending a message
    }
}

Set Up Message Consumer

Set dependencies

build.gradle

dependencies {
    compile group: 'org.springframework.cloud', name: 'spring-cloud-stream',               version: '1.3.3.RELEASE'
    compile group: 'org.springframework.cloud', name: 'spring-cloud-starter-stream-kafka', version: '1.3.3.RELEASE'  // This is telling Spring Cloud Stream to use Kafka as message broker
}

Change the application configuration

application.yml

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: orgChangeTopic     # The message topic name or message queue name
          content-type: application/json  # The type of messages are going to be sent and received
          group: licensingGroup           # The group name of consumers
        binder:
          zkNodes: localhost              # The location of ZooKeeper
          brokers: localhost              # The location of message broker (Kafka)

Create corresponding POJO class for deserializing JSON from the message broker

For deserializing the message in JSON from the message broker, the POJO class need to have a default constructor (the constructor with no input parameter). If you implement a constructor with parameters, it will suppress the default constructor created by the compiler automatically. In that case, you need to create the default constructor explicitly.

OrganizationChangeModel.java

public class OrganizationChangeModel {
    public OrganizationChangeModel() {
    	super();
    }
    /* omit fields and their getters and setter*/
}

Otherwise you will see the error like:

Can not construct instance of xxx.OrganizationChangeModel: no suitable constructor found, can not deserialize from Object value (missing default constructor or creator, or perhaps need to add/enable type information?)

Implement a class for handling the deserialized Java object from the message in JSON

OrganizationChangeHandler.java

@EnableBinding(Sink.class)                  // Binds the class with the message broker by the default Spring Sink interface
public class OrganizationChangeHandler {
    private static final Logger logger = LoggerFactory.getLogger(OrganizationChangeHandler.class);

    @StreamListener(Sink.INPUT)             // Tells Spring Cloud Stream to execute this method every time a message is received off the input channel
    public void loggerSink(OrganizationChangeModel orgChange) {
    	logger.info("Received an event for organization id {}", orgChange.getOrganizationId());
    }
}

Verification

In this example, we are using the organization service as the message producer and use the licensing service as the message consumer. When a user hit the API endpoint of the organization service for updating the organization data, the organization service will publish a new message to the message queue about that update operation. The licensing service will poll the message queue to check there is any new message or not. If there is a new message, the licensing service will grab the new message and print a piece of log information about that message.

To verify the integration between the message queue with the organization service and the licensing service, we will send an HTTP request to the organization service for updating the organization data. After that, we will check the log of the licensing service to see we can the information about the organization update operation or not.

Key Value
Authorization [{"key":"Authorization","value":"Bearer cb0a4eb5-17ff-41e8-bf05-da9866eefeac"}]
Content-Type application/json
  • Body
{
  "id": "e254f8c-c442-4ebe-a82a-e2fc1d1ff78a",
  "name": "customer-crm-co",
  "contactName": "Mark Balster",
  "contactEmail": "[email protected]",
  "contactPhone": "832-555-2222"
}

After sending this request to the organization service, you should see the log information for this update operation:

Received an event for organization id e254f8c-c442-4ebe-a82a-e2fc1d1ff78a
Clone this wiki locally