In a distributed system, it is crucial to deal with bad messages properly. Failures are bound to happen. RabbitMQ Dead Letter Queue helps in dealing with bad messages by allowing us to catch those messages and do something with them.
With Spring Cloud Stream, it becomes even more easy to manage RabbitMQ Dead Letter Queue. If you are not aware, Spring Cloud Stream is a package that belongs to the Spring Cloud family of projects.
In this post, we will explore this topic in detail. However, if you are new to RabbitMQ and Spring Cloud Stream, I will recommend going through the below post.
Learn how to make your microservices talk to each other.
>>Click to learn about Microservices Communication using Spring Cloud Stream and RabbitMQ
Another post to understand how you can handle multiple subscribers using RabbitMQ using fanout pattern.
Learn how to implement RabbitMQ Fanout Exchange using Spring Cloud Stream.
>>Click to learn how to implement RabbitMQ Fanout Exchange using Spring Cloud Stream
Below will be our plan for this post:
- How Spring Cloud Stream handles message failures?
- What is Dead Letter Queue and why we need one?
- Enabling RabbitMQ Dead Letter Queue with Spring Cloud Stream
- Processing bad messages from Dead Letter Queue.
- Summarize our learning.
How Spring Cloud Stream handles message failure?
A message failure typically occurs when our message listener is not able to process the incoming message and throws an exception.
We can easily simulate this behavior by making some minor changes to our subscriber-application.
@SpringBootApplication @EnableBinding(MessageChannel.class) public class SubscriberApplication { public static void main(String[] args) { SpringApplication.run(SubscriberApplication.class, args); } @StreamListener(MessageChannel.MESSAGES) public void handleMessage(String message) throws Exception{ System.out.println(message); if(message.contains("leave")) throw new Exception("Someone wants a vacation! Reject the leave application"); System.out.println("Subscriber Received Message is: " + message); } }
Note here that we are listening to the incoming message in the handleMessage() method.
Let’s imagine that this subscriber is a heartless manager who hates when employees ask for leave. Therefore, in this application, we check whether the word “leave” is present in the incoming message. If it is present, an exception is thrown.
Now in the publisher-application, we can simply hard-code the message “I want leave”. This publisher is like an employee asking for leave. Little does the employee know that the message is going to trigger an exception.
@RestController @EnableBinding(Source.class) class MessagePublisher{ @Autowired private Source source; @GetMapping(value = "/api/publish") public void sendMessage(){ String message = "I want leave"; source.output().send(MessageBuilder.withPayload(message).build()); } }
If we start up both the applications and trigger the endpoint /api/publish, we should see something like below in the subscriber-application’s logs.
2019-07-08 18:10:37.283 INFO 10155 --- [ main] c.p.d.s.SubscriberApplication : Started SubscriberApplication in 3.346 seconds (JVM running for 12.182)
I want leave
I want leave
I want leave
2019-07-08 18:11:52.855 ERROR 10155 --- [putSubscriber-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException
Caused by: java.lang.Exception: Someone wants a vacation! Reject the leave application
at com.progressivecoder.demo.subscriberapplication.SubscriberApplication.handleMessage(SubscriberApplication.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Basically, the subscriber-application tried processing the message three times without. Hence the three displays of “I want leave” on top. However, each time the exception prevented the leave application from processing successfully. After trying three times, the message was discarded. It was like the leave application fell on deaf ears.
This is the default behavior that comes out of the box with Spring Cloud Stream. However, the problem with this approach is that we never came to know about the bad message itself. The subscriber did try to process it multiple times but then it was lost forever.
Usually, we don’t want this approach. Mostly, we want to do something with the discarded message. That’s when Dead Letter Queue comes into the picture.
What is Dead Letter Queue and why we need it?
Dead Letter Queue is a queue that’s purpose is to store messages that went wrong. What went wrong can be of many types such as:
- Message length limit exceeds the allowed limit
- The message could not be processed due to exception.
- Message reaches a threshold read counter and could not be consumed.
- Queue length limit exceeded.
In our case, we can safely say that our message could not be processed multiple times. And hence, it would be ideal to move it to a Dead Letter Queue (also known as DLQ) for further processing.
This is because in a distributed message we don’t want to simply let the messages vanish. We would like to process failed messages. In the simplest case, we would at least want such messages to be recorded somewhere for analysis. Hence, a DLQ is a necessity.
Enabling RabbitMQ Dead Letter Queue
Thankfully, with Spring Cloud Stream, enabling RabbitMQ Dead Letter Queue becomes a piece of cake.
We just need to add the below property in the application.properties file of our subscriber application.
spring.cloud.stream.rabbit.bindings.messages.consumer.autoBindDlq=true
Here, messages is the name of the channel we are using in our subscriber application. This property automatically binds a Dead Letter Queue or DLQ to our main consumer queue.
Also, we need to add two other properties as below.
spring.cloud.stream.bindings.messages.destination=messages
spring.cloud.stream.bindings.messages.group=messageInputSubscriber
Here, we are binding our application’s message channel to the message exchange on the RabbitMQ server. Also, we are providing a name to the queue by setting the group property to messageInputSubscriber.
If we start the subscriber-application now and then visit our RabbitMQ console, we should see two queues for this consumer.
As evident, a DLQ was automatically created for our messageInputSubscriber queue with the .dlq extension.
Processing Bad Messages from Dead Letter Queue
Creating a Dead Letter Queue is only half the battle won. After all, we still need to do something with those messages. Otherwise, they will just lie on the queue.
For that, we need to create another application that can subscribe to the Dead Letter Queue and if a message appears over there, it can do something with such messages.
Luckily doing so with Spring Boot is pretty easy. Let’s see how.
For this DLQ processor application, we don’t need to use Spring Cloud Stream. We can simply use the Spring RabbitMQ binder. Below are the dependencies we need.
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
Below is the main class where we also implement some logic to listen to the Dead Letter Queue.
@SpringBootApplication public class MessageDlqProcessorApplication { private static final String ORIGINAL_SUBSCRIBER_QUEUE_NAME = "messages.messageInputSubscriber"; private static final String SUBSCRIBER_DLQ_NAME = ORIGINAL_SUBSCRIBER_QUEUE_NAME + ".dlq"; @Autowired RabbitTemplate rabbitTemplate; public static void main(String[] args) { SpringApplication.run(MessageDlqProcessorApplication.class, args); } @RabbitListener(queues = {SUBSCRIBER_DLQ_NAME}) public void processFailedMessage(Message failedMessage){ System.out.println("Message cannot be processed. Write to database: " + failedMessage.toString()); } }
Basically, we autowire an instance of the RabbitTemplate class. Then, we define a method processFailedMessage() and annotate it with @RabbitListener.
The @RabbitListener annotation takes the name of the DLQ as a parameter. In fact, we can bind multiple queues to the same listener. Then, we simply print the message to the console. However, in a real world scenario, we can do several things with the failed message. We can try to repair the message according to some rules. Or we can store the message in a database table.
To see the DLQ in action, let’s start our publisher-application as well as the dlq-processor-application and publish a message using the /api/publish end-point.
In this case, we should see the below logs in the console. Basically, after retrying to process the message thrice, it was sent to the DLQ. As soon as the message arrived in the DLQ, our DLQ processor application processed it by printing the error to the console.
2019-07-09 17:36:40.045 INFO 7929 --- [ main] c.p.m.MessageDlqProcessorApplication : Started MessageDlqProcessorApplication in 2.968 seconds (JVM running for 12.003)
Message cannot be processed. Write to database: (Body:'I want leave' MessageProperties [headers={x-first-death-exchange=messages, x-death=[{reason=rejected, count=1, exchange=messages
Conclusion
To summarize what we have learned in this post, we can look at the below high-level points.
- Spring Cloud Stream automatically tries to retry the messages that fail. However, once the retry attempts are exhausted, the message is lost.
- RabbitMQ Dead Letter Queue feature helps us tackle failed messages. We can do something with that message like logging to database or trying to repair it and reprocess.
- Enabling RabbitMQ Dead Letter Queue is quite straightforward and can be done using application.properties.
The code for this post is available on Github for reference.
If you have any comments and queries, sound off in the comments section below.
Happy Learning!
0 Comments