In this post, we will learn how to integrate RabbitMQ in Spring Boot application and mimic real world environment. If you are not comfortable with RabbitMQ and it’s installation process locally, you can have a quick glance to the rabbitmq-tutorial.
RabbitMQ is a messaging queueing software also know as message broker that allows asynchronous communication between two systems. RabbitMQ stores message and then transfer messages to applications using several protocols that it supports like AMQP, STOMP, MQTT and HTTP. It originally implements the Advance message Queuing Protocol (AMQP).
Direct Exchange
There are different ways to send message to RabbitMQ server and this is all possible by various exchange that RabbitMQ provides. In this example, we will be dealing with quite popular direct exchange type.
Messages are routed to queues using the message routing key in this exchange. Before sending a message to an exchange, the producer inserts a routing key into the message header. Exchange then compares the routing key to all of the queues’ binding keys and routes the message based on the match.
The producer adds a message attribute called the routing key to the message header. Routing key can also be used as an address to determine message routing by exchange. For a message to be delivered to the queue, the binding key must exactly match the routing key.
Publish Message in RabbitMQ
Setup project for RabbitMQ
Let’s use Spring Starter tool to generate a project with one click. I will be using IntelliJ for development. You can use any development tools you like for example Spring Tool Suite or Eclipse.
This is how your pom.xml file should look like:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.7</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.fullstack.coder</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq</name>
<description>Demo project for Spring Boot RabbitMq</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
The main dependency that is required for RabbitMQ integration is spring-boot-starter-amqp. I have added lombok dependency as well which is to clean up the model/class code and you will see it later. Lombok dependency is not required and depends upon you.
Configure RabbitMQ in Spring Boot Application
Just adding dependencies is not enough, we also need to update those configuration in application.properties file. Before i hope you have some instances of RabbitMQ up and running and if you do not, then check out this article for RabbitMQ-Installation. Once RabbitMQ is up and running, update application.properties file as necessary.
spring.rabbitmq.host = localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
- spring.rabbitmq.host as the name suggest is the host or the url property for RabbitMQ. Since, i am running rabbitMq locally, it is localhost.
- spring.rabbitmq.port by default runs on 5672.
- spring.rabbitmq.username and spring.rabbitmq.password is same when running locally. However, you can change the username and password too.
- fullstack.exchange is the exchange name for RabbitMQ; similarly fullstack.queue and fullstack.routingkey are the queue and routing key necessary for producer to create message and send it to exchange.
For this project i am going to use direct exchange type.
Model object for RabbitMQ
In model package, let’s define Message and User class.
@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Message {
String routingKey;
User user;
}
@AllArgsConstructor
@Getter
@Setter
@ToString
@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = User.class)
public class User implements Serializable {
private Integer id;
private String name;
}
Message class holds routingKey attribute and based on that User attribute is routed to the queue. The User class contains two properties id and name. All the getter, setter, toString and all argument constructor is generated by lombok annotation on the top of class. @JsonIdentityInfo annotation is a tool Jackson provides to deal with circular references in custom serialization.
Service class
We have a model object ready which is sent to RabbitMQ. Now, we will create a service class to send message to RabbitMQ.
@Service
public class RabbitMqSender {
@Autowired
private AmqpTemplate amqpTemplate;
public String exchange = "cart_exchange";
public void send(Message message) {
amqpTemplate.convertAndSend(exchange, message.getRoutingKey(), message.getUser());
System.out.println("Send msg = " + message.getUser().toString());
}
}
The AmqpTemplate is configured in config file. It is used to send message to exchange from RabbitMQ. Producer does not send message directly to queue instead a mediator is used in the form of exchange. Exchange property is responsible for routing messages to queues based on different attributes like header, bindings, and routing keys. The send method in the class uses amqpTemplate convertAndSend() method to send message to the queues based on routing key and exchange.
Config for RabbitMQ
@Configuration
public class RabbitMqConfig {
String deliveryQueue = "delivery_queue";
String emailQueue = "email_queue";
String cartExchange = "cart_exchange";
String deliveryRoutingKey = "delivery";
String emailRoutingKey = "email";
@Bean
Queue deliveryQueue() {
return new Queue(deliveryQueue, false);
}
@Bean
Queue emailQueue() {
return new Queue(emailQueue, false);
}
@Bean
DirectExchange exchange() {
return new DirectExchange(cartExchange);
}
@Bean
Binding deliveryBinding(Queue deliveryQueue, DirectExchange exchange) {
return BindingBuilder.bind(deliveryQueue).to(exchange).with(deliveryRoutingKey);
}
@Bean
Binding emailBinding(Queue emailQueue, DirectExchange exchange) {
return BindingBuilder.bind(emailQueue).to(exchange).with(emailRoutingKey);
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
// @Bean
// ConnectionFactory connectionFactory() {
// CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost");
// cachingConnectionFactory.setUsername();
// cachingConnectionFactory.setPassword();
// return cachingConnectionFactory;
// }
}
RabbitMQ does not work just by setting up. There are few terminologies that comes with RabbitMQ like Queue, Exchange and routing key. These properties need to be configured and all these values is set in application.properties.
Queue bean is created and the durability is set to false. Setting the queue to non-durable will remove queue and message whenever RabbitMQ is stopped. However, it does not affect while restarting RabbitMQ.
Exchange type is Direct Exchange. This exchange routes messages to queue based on exact match of message routing key.
Binding bean is a “link” or address to bind queue with exchange.
ConnectionFactory can be used to facilitate connection to RabbitMQ broker.
MessageConverter is to convert JSON object to message.
Producer Controller
@RestController
@RequestMapping("/api")
public class ProducerController {
@Autowired
private RabbitMqSender rabbitMqSender;
@PostMapping("/message/send")
public String publishUser(@RequestBody Message message) {
rabbitMqSender.send(message);
return "Message sent successfully";
}
}
Producer controller is pretty straight forward. It is the same generic REST API with @PostMapping annotation. RabbitMqSender is autowired and the only method to send message to exchange is called with the JSON object.
Project Structure
Testing using postman
By default, the project runs on 8080 port. Now, let’s test it with postman. We got one endpoint which sends message to RabbitMQ server.
Consume Message in RabbitMQ
In this part we will consume message that is being published from Producer. Let’s get started:
Setup project for RabbitMQ Consumer
This is how your pom.xml file should look like:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.7</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.fullstack.coder</groupId>
<artifactId>rabbitmq.consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq.consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
As discussed spring-boot-starter-amqp depedency is required to consume message.
Configure RabbitMQ in Spring Boot Application
I hope RabbitMQ is up and running and if not then checkout RabbitMQ-Installation. Now update application.properties file as necessary.
spring.rabbitmq.host = localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
server.port=8090
I am running it on different port than producer so that we can test consumer application.
Model object for RabbitMQ
Now let’s define a User class in model package. This user class is required and the message send from producer is of User type.
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@ToString
//@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id", scope = User.class)
public class User implements Serializable {
private Integer id;
private String name;
}
The class contains two properties id and name. All the getter, setter, toString and all argument constructor is generated by lombok annotation on the top of class. @JsonIdentityInfo annotation is a tool Jackson provides to deal with circular references in custom serialization.
Service class
This is the class where consumer will be listening to message sent from publisher.
@Component
public class RabbitMqService {
@RabbitListener(queues = "delivery_queue")
public void consumeMessage(User user) {
System.out.println("Message returned:" + user);
}
}
@RabbitListener annotation listens to message being sent to RabbitMQ server from producer and consumes it. We need to define queue and any message delivered to the queue will be relayed to our Rabbit Listener. In this example, the listener is listening to message being delivered to “delivery_queue”.
Project Structure
Conclusion
RabbitMQ is a messaging queue software and in this tutorial we looked into how we can build a Producer/Publisher to publish message to RabbitMQ server and Consumer Application to consume message sent from Producer. I hope this post will give you a quick start to dive deep into RabbitMQ.