-
Notifications
You must be signed in to change notification settings - Fork 60
Integrate with Message Queue (Kafka)
There are several components in the structure of the integration of Spring microservices and message queue:
- Message Broker
-
Message Producer
- Source: The interface for serializing the object of POJO and publish it to a channel.
- Channel: The queue to hold the published messages.
- Binder: The adapter for integrating your Spring application with different message queue platforms.
-
Message Consumer
- Binder: The adapter for integrating your Spring application with different message queue platforms.
- Channel: The queue to hold the received messages.
- Sink: The interface for listening to a channel for incoming messages and deserializing the message back to the object of POJO.
For integrating microservices with Kafka, you need to do several things:
- Install Message Broker: ZooKeeper and Kafka
- Set Up Message Producer
- Set Up Message Consumer
- Verification
Kafka is a distributed system and uses Zookeeper to track the status of Kafka cluster nodes. So you need to install both ZooKeeper and Kafka:
dependencies {
compile group: 'org.springframework.cloud', name: 'spring-cloud-stream', version: '1.3.3.RELEASE'
compile group: 'org.springframework.cloud', name: 'spring-cloud-starter-stream-kafka', version: '1.3.3.RELEASE' // This is telling Spring Cloud Stream to use Kafka as message broker
}
spring:
cloud:
stream:
bindings:
output:
destination: orgChangeTopic # The message topic name or message queue name
content-type: application/json # The type of messages are going to be sent and received
kafka:
binder:
zkNodes: localhost # The location of ZooKeeper
brokers: localhost # The location of message broker (Kafka)
@SpringBootApplication
@EnableBinding(Source.class) // Binds the application with the message broker
// This service will communicate with the message broker through the channels defined in the Source class
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@Component
public class SimpleSourceBean {
private Source source;
private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);
@Autowired
public SimpleSourceBean(Source source) { // Spring Cloud Stream will inject the implementation of Source interface to here
this.source = source;
}
public void publish(OrganizationChangeModel change){ // The change object will be serialized into JSON and published
source.output().send(MessageBuilder.withPayload(change).build()); // The source.output() will return a channel and allow you to call send() for sending a message
}
}
dependencies {
compile group: 'org.springframework.cloud', name: 'spring-cloud-stream', version: '1.3.3.RELEASE'
compile group: 'org.springframework.cloud', name: 'spring-cloud-starter-stream-kafka', version: '1.3.3.RELEASE' // This is telling Spring Cloud Stream to use Kafka as message broker
}
spring:
cloud:
stream:
bindings:
input:
destination: orgChangeTopic # The message topic name or message queue name
content-type: application/json # The type of messages are going to be sent and received
group: licensingGroup # The group name of consumers
binder:
zkNodes: localhost # The location of ZooKeeper
brokers: localhost # The location of message broker (Kafka)
For deserializing the message in JSON from the message broker, the POJO class need to have a default constructor (the constructor with no input parameter). If you implement a constructor with parameters, it will suppress the default constructor created by the compiler automatically. In that case, you need to create the default constructor explicitly.
public class OrganizationChangeModel {
public OrganizationChangeModel() {
super();
}
/* omit fields and their getters and setter*/
}
Otherwise you will see the error like:
Can not construct instance of xxx.OrganizationChangeModel: no suitable constructor found, can not deserialize from Object value (missing default constructor or creator, or perhaps need to add/enable type information?)
OrganizationChangeHandler.java
@EnableBinding(Sink.class) // Binds the class with the message broker by the default Spring Sink interface
public class OrganizationChangeHandler {
private static final Logger logger = LoggerFactory.getLogger(OrganizationChangeHandler.class);
@StreamListener(Sink.INPUT) // Tells Spring Cloud Stream to execute this method every time a message is received off the input channel
public void loggerSink(OrganizationChangeModel orgChange) {
logger.info("Received an event for organization id {}", orgChange.getOrganizationId());
}
}
In this example, we are using the organization service as the message producer and use the licensing service as the message consumer. When a user hit the API endpoint of the organization service for updating the organization data, the organization service will publish a new message to the message queue about that update operation. The licensing service will poll the message queue to check there is any new message or not. If there is a new message, the licensing service will grab the new message and print a piece of log information about that message.
To verify the integration between the message queue with the organization service and the licensing service, we will send an HTTP request to the organization service for updating the organization data. After that, we will check the log of the licensing service to see we can the information about the organization update operation or not.
- URL: http://localhost:8060/v1/organizations/e254f8c-c442-4ebe-a82a-e2fc1d1ff78a
- Method: PUT
- Headers:
Key | Value |
---|---|
Authorization | [{"key":"Authorization","value":"Bearer cb0a4eb5-17ff-41e8-bf05-da9866eefeac"}] |
Content-Type | application/json |
- Body
{
"id": "e254f8c-c442-4ebe-a82a-e2fc1d1ff78a",
"name": "customer-crm-co",
"contactName": "Mark Balster",
"contactEmail": "[email protected]",
"contactPhone": "832-555-2222"
}
After sending this request to the organization service, you should see the log information for this update operation:
Received an event for organization id e254f8c-c442-4ebe-a82a-e2fc1d1ff78a
- Overview
- Getting Started
-
Technical Essentials
- Autowired
- SpringData JPA
- Configuration File Auto-loading
- Configuration Encryption
- Service Discovery with Eureka
- Resiliency Patterns with Hystrix
- Configure Hystrix
- Service Gateway with Zuul
- Zuul Filters
- Protect Service with Spring Security and OAuth2
- Use JWT as Access Token
- Store Clients and Users' Credentials to DB
- Integrate with Message Queue (Kafka)
- Integrate with Redis
- Tune Logging
- Log Aggregation
- Send Trace to Zipkin
- Build Runnable Jar
- Core Application Logic
- Components