Kafka producer async send

In this post, we will learn how to send messages asynchronously using the existing APIs along with the working examples of it.

In previous post we have seen the messages are sending synchronously, in case we want to send 100 messages. Let's consider, each message might take 500ms, then 100*500 becomes 50000 in terms of seconds its 50seconds. Considering 50seconds is very high which will cause hell lot of problems in case of the performance factors.

To avoid this kind of performance issues, we have a mechanism in Kafka called as Asynchronous messaging. We send a message to Kafka broker and no need to wait for the result or response from the Kafka continue to do the other stuff.

You might be having a question in mind how we can get the response from the Kafka, this where I am coming. Please look into the below lines of code, you might understand more on the same

Syntax:

producer.send(record,callback);

From the above send method , we have another argument named as a callback. callback(org.apache.kafka.clients.producer.Callback) is an interface of the kafka, which contains only one method, i.e., onCompletion(RecordMetadata,Exception).

onCompletion(RecordMetadata,Exception)

onCompletion method having two arguments one is RecordMetadata and another is Exception. So, if we implement the org.apache.kafka.clients.producer.Callback interface will be enough to get the record metadata and Exception as well.

Exception is useful in case of the message is not properly reached to Kafka due to many reasons. And we can get the recordmetadata only in case of there is no exceptions for the same and vice versa.

Producer Asynchronous send:

CopiedCopy Code
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProducerExample {
	public static void main(String[] args) {
		ProducerRecord<String , String> record = new ProducerRecord<>("Producer12345","Producer123");
		try {
			Properties props = new Properties();
			props.put("bootstrap.servers", "localhost:9092");
			props.put("client.id", "KafkaProducer");
			props.put("key.serializer", LongSerializer.class.getName());
			props.put("value.serializer", StringSerializer.class.getName());
			Producer<String , String> producer=new KafkaProducer(props);
			//RecordMetadata fut=producer.send(record).get();
			producer.send(record, new CallbackInterfaceImplementation());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
class CallbackInterfaceImplementation implements org.apache.kafka.clients.producer.Callback
{
	@Override
	public void onCompletion(RecordMetadata recordmetadata, Exception exception) {
		if(exception==null)
			System.out.println(recordmetadata.toString());
		else
			exception.printStackTrace();
	}
}