Communication between microservices is the backbone of distributed systems. Usually, developers try to avoid it altogether fearing an increase in complexity. However, a combination of Spring Cloud Stream and RabbitMQ can make it relatively easy to handle communication between microservices.

Spring Cloud Stream is also part of the Spring Cloud group of projects. It provides easy integration with various message brokers with minimum configuration. Below is a high-level view on the overall pattern.

spring cloud stream rabbit mq high level

Spring Cloud Stream helps in exchanging messages between two applications or microservices. It can work seamlessly with message brokers such as RabbitMQ, Kafka and so on.

In this post, we will implement communication between microservices using Spring Boot and Spring Cloud Stream. Below is our high-level plan.

  • Step 1 – Create a RabbitMQ Server using Docker
  • Step 2 – Create a Spring Boot application using Spring Cloud Stream to listen to messages. We call this application as subscriber-application.
  • Step 3 – Publish messages on RabbitMQ so that our subscriber-application can listen to those messages.
  • Step 4 – Create another Spring Boot application using Spring Cloud Stream to publish messages to Rabbit MQ. We call this application publisher-application.
  • Step 5 – Publish Messages using publisher-application that will be consumed by the subscriber-application.

So let’s start the process.

Step 1 – Creating RabbitMQ Server using Docker

For demonstrating Spring Cloud Stream, we will use RabbitMQ as the message broker. However, Spring Cloud Stream can easily work with other message brokers as well such as Kafka or Amazon Kinesis. So you can use those well.

For simplicity, however, we will go with RabbitMQ.

RabbitMQ can be easily setup on your machine. One approach is to follow the official download guide and get RabbitMQ for your operating system of choice based on the guidelines.

However, another easy way to get RabbitMQ is through Docker. If you are not familiar with Docker, you can follow my detailed post on understanding the basics of Docker. The official Docker image for RabbitMQ is available at this link.

RabbitMQ server can be easily started with the below command.

docker run -d --hostname my-test-rabbit --name test-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

This command pulls in the RabbitMQ image from Docker hub and starts the server on port 5672. We also expose the RabbitMQ management console on port 15672.

RabbitMQ stores data based on node name that defaults to the hostname. In this case, we provide the hostname as my-test-rabbit. We could also check the logs for RabbitMQ server using the below command.

docker logs test-rabbit

However, we have also exposed a management console for our RabbitMQ server. Basically, it provides a graphical user interface to manage our RabbitMQ. You can access it at http://localhost:15672.

rabbitmq management console

If you see the above login page, the RabbitMQ setup on your machine has been successful. Now we can move to Step 2.

Step 2 – Create the Subscriber Application

In this step, we can create an application using Spring Boot and Spring Cloud Stream to listen to messages. If you are not aware of Spring Boot, I have a detailed guide on Spring Boot Microservices.

Below is our basic plan at this point.

rabbit mq spring cloud stream sink

Basically, we will be first building our subscriber-application that connects to the RabbitMQ server. We will publish the messages directly through the RabbitMQ web console. And our Spring Boot application will have something known as Sink that will help us process the incoming messages. We will look at what a Sink is in the next section.

Selecting the Dependencies

We can quickly bootstrap an application using https://start.spring.io. Important thing to note is the below dependencies for Spring Cloud Stream and RabbitMQ. These two dependencies together form the basis of our integration. At this point, you can also select Kafka or some other option (instead of RabbitMQ) depending on your choice.

spring cloud stream dependencies

You can generate the project after which you can look at the POM.xml file (in your IDE of choice) where these dependencies are now mentioned.

<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-test-support</artifactId>
			<scope>test</scope>
		</dependency>
</dependencies>

Adding Message Handler

Even at this point, our Spring Boot application can be run. However, it does not have any functionality.

As per our plan, we want this application to listen to messages published on the RabbitMQ. To do so, we need to add a listener that can handle incoming messages.

Below is how we will do it.

@SpringBootApplication
@EnableBinding(Sink.class)
public class SubscriberApplication {

	public static void main(String[] args) {
		SpringApplication.run(SubscriberApplication.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void handleMessage(Message message){
		System.out.println("Received Message is: " + message);
	}

	public static class Message{
		private String message;

		public String getMessage() {
			return message;
		}

		public void setMessage(String message) {
			this.message = message;
		}

		@Override
		public String toString() {
			return "Message{" +
					"message='" + message + '\'' +
					'}';
		}
	}

}

Important things to note here are as follows:

  • We have enabled Sink binding by using @EnableBinding annotation. This step signals the underlying framework to create the necessary bindings to the messaging middleware. In other words, it will create the destination items such as queue, topic etc.
  • Also, we have added a handler method. This method is used to receiving incoming messages of type Message. This is one of the most powerful features of Spring Cloud Stream. The framework tries to automatically convert incoming messages to the type Message.

With this we are basically done with the minimal setup required in our application.

Step 3 – Publish Message through RabbitMQ

We are ready to test our subscriber application and see it in action.

You can simply start the application using the below command:

clean package spring-boot:run

The application will automatically try to connect to a RabbitMQ server at http://localhost:5672. You should see something like this in the application startup logs.

2019-06-25 17:35:22.304  INFO 8115 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#34688e58:0/SimpleConnection@1a981cf0 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 52000]
2019-06-25 17:35:22.355  INFO 8115 --- [           main] o.s.i.monitor.IntegrationMBeanExporter   : Registering MessageChannel input.anonymous.KWrQWmqZSYKPjOBkx4DotA.errors
2019-06-25 17:35:22.432  INFO 8115 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'application.input.anonymous.KWrQWmqZSYKPjOBkx4DotA.errors' has 1 subscriber(s).
2019-06-25 17:35:22.432  INFO 8115 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'application.input.anonymous.KWrQWmqZSYKPjOBkx4DotA.errors' has 2 subscriber(s).
2019-06-25 17:35:22.454  INFO 8115 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.KWrQWmqZSYKPjOBkx4DotA
2019-06-25 17:35:22.464  INFO 8115 --- [           main] c.p.d.s.SubscriberApplication            : Started SubscriberApplication in 3.52 seconds (JVM running for 18.305)

To test whether our application is able to listen to messages, we can publish a message through the RabbitMQ management console.

To do so, you can login to the console at http://localhost:15672 using default userid/password as guest/guest.

At this point, you should see the list of exchanges as below. Note the last exchange in the list known as input. This is basically created automatically when we started our application.

rabbit mq input exchange

Also, a queue under the input exchange is created where we can publish messages.

rabbitmq publish message

Once you click Publish, you will see the message printed in the subscriber-application logs as below.

2019-06-25 17:35:22.454  INFO 8115 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.KWrQWmqZSYKPjOBkx4DotA
2019-06-25 17:35:22.464  INFO 8115 --- [           main] c.p.d.s.SubscriberApplication            : Started SubscriberApplication in 3.52 seconds (JVM running for 18.305)
Received Message is: Message{message='Hello World'}

With this, we are done with the subscriber part of our application. Now we can move onto the next step.

Step 4 – Create the Publisher Application

In this step, we will create a publisher application that has an end-point to publish messages to the RabbitMQ server.

Below is our high-level plan for the same.

rabbit mq spring cloud stream source

As you can see, now we have a publisher-application instead of the web console. In the publisher application, we have something known as Source. Think of it as an opposite of the Sink interface we saw earlier in the subscriber-application. We will look at the Source interface in detail in this section.

Selecting the Dependencies

To quickly bootstrap the application, we can use the Spring Initializr like we did for the subscriber-application. This time we will include an extra dependency known as Spring Web Starter.

spring cloud stream publisher

You can click Generate Project to get the zip file on your local machine.

Adding Message Publishing Logic

To publish message using Spring Cloud Stream and RabbitMQ, we modify our publisher-application’s main class as below.

@SpringBootApplication
public class PublisherApplication {

	public static void main(String[] args) {
		SpringApplication.run(PublisherApplication.class, args);
	}

}

@RestController
@EnableBinding(Source.class)
class MessagePublisher{

	@Autowired
	private Source source;

	@GetMapping(value = "/api/publish")
	public void sendMessage(){
		Message message = new Message("Hello World from Publisher");

		source.output().send(MessageBuilder.withPayload(message).build());

	}
}

class Message{
	String message;

	public Message(String message) {
		this.message = message;
	}

	public String getMessage() {
		return message;
	}

	public void setMessage(String message) {
		this.message = message;
	}
}

The important things to note here are:

  • We create a MessagePublisher class and annotate it as @RestController.
  • Also, we annotate the controller class with @EnableBinding. However, instead of binding to Sink (as we did in the subscriber), we bind this class with the Source.class. Basically, Source and Sink are binding interfaces provided by Spring Cloud Stream.
  • We auto-wire an instance of the Source class and in the /api/publish call we use it to publish a Message object to RabbitMQ.
  • We also define the Message class to create a new message.

There is another important setting you need to do in the application.properties file. We need to define the output binding as below.

spring.cloud.stream.bindings.output.destination=input
spring.cloud.stream.default.contentType=application/json

The below illustration can help understand what we mean by this property.

spring cloud stream source binding

Basically, the Source interface exposes an Output channel that we are binding to the input exchange. This relation is required for the publisher and the subscriber to be able to exchange messages.

Step 5 – Publish Message using publisher-application

Now we can start-up the publisher-application as well. Once the application starts up, we need to trigger the endpoint http://localhost:8080/api/publish.

Nothing will be shown in the browser or the client as response. However, if you visit the logs of the subscriber-application, we will be able to see the message printed.

2019-06-26 17:18:50.366  INFO 5138 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.Z0h4KjScTG2guIvw1KxRxQ
2019-06-26 17:18:50.374  INFO 5138 --- [           main] c.p.d.s.SubscriberApplication            : Started SubscriberApplication in 2.939 seconds (JVM running for 11.335)
Received Message is: Message{message='Hello World from Publisher'}

Also, you will be able to see a green spike in the RabbitMQ console in the input queue. This shows the transmission of the message to the input queue to which the subscriber-application is listening.

rabbitmq message spike

Conclusion

With this, we have successfully developed a small application using Spring Cloud Stream and RabbitMQ to publish and subscribe to messages.

The code for this post is available on Github for your reference.

If you have any comments on queries, please write it in the comments section below.

Happy Learning!


Saurabh Dashora

Saurabh is a Software Architect with over 12 years of experience. He has worked on large-scale distributed systems across various domains and organizations. He is also a passionate Technical Writer and loves sharing knowledge in the community.

2 Comments

Fábio Santos · November 21, 2019 at 3:21 pm

Hi Saurabh
I acctually apreciated your article and it was vary useful for me (again).
Good job

    Saurabh Dashora · November 22, 2019 at 10:58 am

    Hi Fábio,

    Thanks for your kind feedback!

Leave a Reply

Your email address will not be published. Required fields are marked *