Kafka Producer Acks configuration property can often confuse developers. However, it is one of the most important properties in making sure you don’t end up loosing messages.
Basically, the Kafka Producer Acks property controls how many replicas of a partition must receive the message or record before the Producer can consider the write as successful.
If you are totally new to Kafka, you can check out the our detailed Kafka Introduction.
1 – How Kafka Replication Works?
Since we need to work with replication to understand the acks
property, let us first understand how Kafka replication actually works.
Check out the below illustration that brings a visual perspective to replication in Kafka.
As you can see, a Kafka cluster has multiple brokers. Within the broker, we can register a topic with one or more partitions. Producers write records or messages to a partition that belongs to a particular topic. For example, the Topic A contains two partitions (0 and 1).
Basically, from an application point of view, we want to make sure that messages written to a particular partition are not lost. And this is where replication comes into the picture.
Replication in Kafka provides redundancy of messages stored in a partition. When a particular partition is replicated, it is also assigned to another broker. As an example, partition 0 of Topic A is assigned to both Broker 1 and Broker 2.
However, important point here is that only one of the brokers will be the leader of a given partition. In the illustration, Broker 1 is the leader of Topic A-Partition 0. The other brokers are called followers of the partition. The messages received by the leader are also copied over to the followers to achieve replication.
So what’s the difference between leaders and followers?
There is no difference on the consumer side because consumers can consume messages from the leader broker or followers. However, Producers can only publish messages to the leader broker. If a leader broker goes down for some reason, an election for the new leader takes place automatically. One of the followers emerges victorious and assumes the leadership position.
2 – What are Kafka In-Sync Replicas (ISRs)?
Before we get into the Kafka Producer Acks property, there is another important term that needs to be understood – Kafka In-Sync Replicas or ISRs.
Kafka In-Sync Replicas are the replicas of a partition that are currently in-sync with the leader. In other words, a replicas is considered in-sync if it has fully caught up with the leader partition in a certain amount of time. The time setting is controlled by the property [replica.lag.time.max.ms](<http://replica.lag.time.max.ms>)
and can be set at a topic-level.
A leader is always an in-sync replica. However, a follower is an in-sync replica only if it has fully caught up to the partition it’s following. In other words, it can’t be behind on the latest records for a given partition.
So what happens if a follower fails for some reason?
Followers replicate data from the leader to themselves by sending periodic fetch requests. If a follower fails, it will stop sending these requests. And after a set amount of time, it will be removed from the list of in-sync replicas.
See below illustration that depicts the concept of in-sync replicas in Kafka.
In the above example, Broker 1 and Broker 2 are in-sync replicas of partition 1. However, due to some reason, Broker 3 has **not been able to replicate the partition’s data and hence, we cannot consider it as an in-sync replica.
As we will see in the next section, concept of replication along with in-sync replicas forms a key part of the Kafka Producer Acks property.
3 – Kafka Producer Acks Configuration Property
As the name suggests, ack or acknowledgment is a signal passed between communicating processes to signify receipt of the message.
The Kafka Producer Acks property controls how many partition replicas must acknowledge the receipt of a record before a producer can consider a particular write operation as successful.
There are three possible values of the Kafka acks property as follows:
acks=0
– With this value, the producer will never wait for a reply from the broker. Once the message is gone, it is basically gone for good. Even if something goes wrong and the broker does not receive the message, the producer will not try resending. It will simply assume the message was sent successfully. In other words, the message can get lost in this setup. On the upside, since the producer is not waiting for any acknowledgment, it can send messages as fast as the network supports. Basically,acks = 0
is ideal for achieving a high throughput.acks=1
– When we set the Kafka ack value to 1, the producer receives an acknowledgment as soon as the leader replica has received the message. The leader broker will write the record to its partition and send the acknowledgment without worrying whether the followers have been able to replicate the message or not. Messages can get lost in this approach but only when the leader fails immediately after acknowledging the message but before the followers have replicated it. To simplify,acks=1
is sort of the middle ground between latency, throughput and durability.acks=all
– Setting the acks value toall
means that the producer will receive an acknowledgment from the broker only once all in-sync replicas receive the message. The key point here is in-sync replicas and not just replicas. This is the safes mode of sending messages since we are making sure that more than one broker has the message and the message will survive even if the leader crashes. Of course, there is a trade off as the latency inacks=all
will be the highest.
Producer Acks Common Questions
Having understood the three possible values of Kafka Producer Acks config property, let us also answer a few common questions that are asked about them:
- What does acks = -1 mean?
Basically, acks=-1
is the same as acks=all
. We can use the two properties interchangeably. However, acks=all
is far more readable and self-explanatory.
- What ACKS value should be used if the producer doesn’t want to wait for any acknowledgement?
As we discussed, acks=0
is the right choice if the producer doesn’t want to wait for any acknowledgement from the replicas. In this case, the producer will not even wait for the leader replica’s acknowledgment.
- What value of ACKS should be used by producer to achieve at most message delivery semantics?
Basically, at most once message delivery means making sure that a message is not sent more than once. Here also, acks=0
is the only possibility of achieving at most once message delivery semantics. This is because with acks=0
, the producer does not retry sending the message even when there is a timeout or an error. This will ensure that the message cannot be sent more than once. Of course, there are chances of a message getting lost in this case.
4 – How Kafka Producer Acks Impacts Latency?
Before we jump into the coding part of the Kafka Acks property, it is important to understand how the acks
properties impact the latency.
With lower and less reliable acks
configuration, the producer will be able to send records faster. In other words, we are trading off reliability for producer latency. However, there is no impact on the end-to-end latency that is measured from a time a record was produced until it becomes available for the consumer to process. For all the acks
values, the end-to-end latency is identical.
The main reason for this is that Kafka does not allow consumers to read records until they are written to all in-sync replicas.
This makes a very interesting point. If end-to-end latency is all that matters to our application, then we have no trade off to make even if choose the most reliable acks
value.
5 – Setting Kafka Acks Property on the Producer
With the theory part finished, time to look at how you can actually use the acks
property.
Since the acks
property deals with replication, we need a Kafka cluster with multiple brokers in order to test.
For our demo purpose, we can start up three different brokers – localhost:9093
, [localhost:9094](<http://localhost:9094>)
and [localhost:9095](<http://localhost:9095>)
. Within the brokers, we create 3 partitions such that each partition is replicated in all three brokers. In other words, for every partition, one of the brokers is a leader while the other two will be followers.
Setting up the cluster locally will be covered in another post. However, once we are done with the cluster setup, describing the topic using Kafka CLI shows the below structure.
$ ./bin/kafka-topics.sh --bootstrap-server localhost:9093 --describe --topic my-cluster-topic
Topic: my-cluster-topic TopicId: 3qDCC2LTTACkdUioKJwczQ PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: my-cluster-topic Partition: 0 Leader: 2 Replicas: 2,3,1Isr: 2,3,1
Topic: my-cluster-topic Partition: 1 Leader: 3 Replicas: 3,1,2Isr: 3,1,2
Topic: my-cluster-topic Partition: 2 Leader: 1 Replicas: 1,2,3Isr: 1,2,3
Here, the topic name is my-cluster-topic
. All three partitions are replicated. For example, in the case of partition 0, the broker 2 is the leader and replicas are 2, 3, 1 (here 3 and 1 are basically followers). Also, do note the isr
column that also contains the list of all in-sync replicas. The same is the case for partition 1 and partition 2.
Next, we create the Producer using a typical Java Spring Boot application. Check out the below code.
package com.progressivecoder.kafkaexample;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.Properties;
@SpringBootApplication
public class KafkaExampleApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(KafkaExampleApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "all");
Producer<String, String> producer = new KafkaProducer<>(kafkaProps);
ProducerRecord<String, String> record = new ProducerRecord<String, String>("my-cluster-topic", "msg", "kafka-trial-message-ack-2");
try {
producer.send(record, new DemoProducerCallback());
} catch(Exception e) {
e.printStackTrace();
}
}
}
class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("Record metadata");
System.out.println(recordMetadata.topic());
System.out.println(recordMetadata.partition());
System.out.println(recordMetadata.offset());
if (e != null) {
e.printStackTrace();
}
}
}
Summarizing the code, we first create a new Producer
instance. The Producer
instance takes some properties as input.
BOOTSTRAP_SERVERS_CONFIG
– This contains the list of thehost:port
pairs of brokers that the producer can use to create the initial connection to Kafka. We don’t need to specify all the brokers. Once the connection is established with one of the brokers, our Producer would be able to discover the others using Zookeeper.KEY_SERIALIZER_CLASS_CONFIG
– The serializer class for the message key.VALUE_SERIALIZER_CLASS_CONFIG
– The serializer class for the message value.ACKS_CONFIG
– Lastly, we have theacks
value set toall
.
Next, we simply create a ProducerRecord
for my-cluster-topic
and use the producer.send()
method to send the message to Kafka. In order to handle the acknowledgment part, we have a callback instance of DemoProducerCallback
where we display some metadata of the record.
To read more about producing messages in Kafka, check out this detailed post on Kafka Producers.
To test our acks
property, we can now shut down one of the brokers. If we describe the topic post shutting down the broker, we should see details as below:
$ ./bin/kafka-topics.sh --bootstrap-server localhost:9093 --describe --topic my-cluster-topic
Topic: my-cluster-topic TopicId: 3qDCC2LTTACkdUioKJwczQ PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: my-cluster-topic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,1
Topic: my-cluster-topic Partition: 1 Leader: 1 Replicas: 3,1,2 Isr: 1,2
Topic: my-cluster-topic Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2
As you may have noticed, the Isr
values have changed. The Broker 3 is no longer present in the Isr
list even though it is present in the replicas.
If we run the application now, what do you think will happen? Will there be an error because acks=all
and one of our replicas is down?
If you thought there would be no error, you are absolutely correct. If you remember, acks=all
only considers in-sync replicas while determining whether the message was successfully sent. In this case, Broker 3 was already removed from the list of in-sync replicas and hence, the Kafka does not care if it did not receive the message. The application will run fine and the message will be sent to Kafka successfully.
But then, how do we actually simulate the situation when there is an acknowledge failure?
We will find out in the below section.
6 – Setting the min.insync.replicas
(Minimum In-Sync Replica) property
To simulate an acknowledgment failure, we need to make a small change to the topic. Basically, we need to set the min.insync.replicas
property on the topic level.
To do so, we execute the below command using the Kafka CLI.
$ ./bin/kafka-configs.sh --bootstrap-server localhost:9093 --alter --entity-type topics --entity-name my-cluster-topic --add-config min.insync.replicas=3
Topic: my-cluster-topic TopicId: 3qDCC2LTTACkdUioKJwczQ PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=3
Topic: my-cluster-topic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,1
Topic: my-cluster-topic Partition: 1 Leader: 1 Replicas: 3,1,2 Isr: 1,2
Topic: my-cluster-topic Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2
Basically, we altered the topic configuration by adding a new config item known as min.insync.replicas
with a value of 3. In other words, we are telling Kafka that for this topic and its partitions, the minimum in-sync replicas should always be 3. However, with Broker 3 still down, the Isr
values only contain two replicas for every partition.
If we run the application once more, this time we get a message saying Error: NOT_ENOUGH_REPLICAS
. Basically, the message acknowledgment failed because there were only 2 in-sync replicas.
This leads to the final important point of the post.
What is the ideal setting needed to make sure we don’t lose messages?
The key points to note about this requirement are as follows:
- Only
acks=all
is not enough to ensure zero loss of messages. This is because ifmin.insync.replicas
is equal to 1, only the leader has to acknowledge the message. If the leader goes down without the message getting replicated to other replicas, we lose the message permanently. - To make sure that doesn’t happen, we need to set
min.insync.replicas
to a value of 2 (at least 2). In this case, Kafka will make sure that the message is replicated to at least one other broker before acknowledgment. - On the other hand, having too large a value for
min.insync.replicas
can reduce the overall availability. In a 3 node cluster with minimum in-sync replicas set to 3, even if one node goes down, the other two nodes will not be able to receive any data.
Conclusion
Kafka Producer Acks configuration property is extremely important to prevent your application from message loss.
However, it alone cannot guarantee a safe producer. One of the important properties we need to understand is In-Sync Replicas. Using both acks
and min.insync.replicas
, we can ensure a much safer producer for sending messages.
Have you used replication for your Kafka partitions? What sort of parameters did you go with?
Do share your views in the comments section below.
0 Comments