Effective strategy to avoid duplicate messages in apache kafka consumer

JavaMessage QueueApache Kafka

Java Problem Overview


I have been studying apache kafka for a month now. I am however, stuck at a point now. My use case is, I have two or more consumer processes running on different machines. I ran a few tests in which I published 10,000 messages in kafka server. Then while processing these messages I killed one of the consumer processes and restarted it. Consumers were writing processed messages in a file. So after consumption finished, file was showing more than 10k messages. So some messages were duplicated.

In consumer process I have disabled auto commit. Consumers manually commit offsets batch wise. So for e.g if 100 messages are written to file, consumer commits offsets. When single consumer process is running and it crashes and recovers duplication is avoided in this manner. But when more than one consumers are running and one of them crashes and recovers, it writes duplicate messages to file.

Is there any effective strategy to avoid these duplicate messages?

Java Solutions


Solution 1 - Java

The short answer is, no.

What you're looking for is exactly-once processing. While it may often seem feasible, it should never be relied upon because there are always caveats.

Even in order to attempt to prevent duplicates you would need to use the simple consumer. How this approach works is for each consumer, when a message is consumed from some partition, write the partition and offset of the consumed message to disk. When the consumer restarts after a failure, read the last consumed offset for each partition from disk.

But even with this pattern the consumer can't guarantee it won't reprocess a message after a failure. What if the consumer consumes a message and then fails before the offset is flushed to disk? If you write to disk before you process the message, what if you write the offset and then fail before actually processing the message? This same problem would exist even if you were to commit offsets to ZooKeeper after every message.

There are some cases, though, where exactly-once processing is more attainable, but only for certain use cases. This simply requires that your offset be stored in the same location as unit application's output. For instance, if you write a consumer that counts messages, by storing the last counted offset with each count you can guarantee that the offset is stored at the same time as the consumer's state. Of course, in order to guarantee exactly-once processing this would require that you consume exactly one message and update the state exactly once for each message, and that's completely impractical for most Kafka consumer applications. By its nature Kafka consumes messages in batches for performance reasons.

Usually your time will be more well spent and your application will be much more reliable if you simply design it to be idempotent.

Solution 2 - Java

This is what Kafka FAQ has to say on the subject of exactly-once:

> #How do I get exactly-once messaging from Kafka? Exactly once semantics has two parts: avoiding duplication during data production and avoiding duplicates during data consumption.

> There are two approaches to getting exactly once semantics during data production:

> * Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded

  • Include a primary key (UUID or something) in the message and deduplicate on the consumer.

> If you do one of these things, the log that Kafka hosts will be duplicate-free. However, reading without duplicates depends on some co-operation from the consumer too. If the consumer is periodically checkpointing its position then if it fails and restarts it will restart from the checkpointed position. Thus if the data output and the checkpoint are not written atomically it will be possible to get duplicates here as well. This problem is particular to your storage system. For example, if you are using a database you could commit these together in a transaction. The HDFS loader Camus that LinkedIn wrote does something like this for Hadoop loads. The other alternative that doesn't require a transaction is to store the offset with the data loaded and deduplicate using the topic/partition/offset combination.

> I think there are two improvements that would make this a lot easier:

> * Producer idempotence could be done automatically and much more cheaply by optionally integrating support for this on the server.

  • The existing high-level consumer doesn't expose a lot of the more fine grained control of offsets (e.g. to reset your position). We will be working on that soon

Solution 3 - Java

I agree with RaGe's deduplicate on the consumer side. And we use Redis to deduplicate Kafka message.

Assume the Message class has a member called 'uniqId', which is filled by the producer side and is guaranteed to be unique. We use a 12 length random string. (regexp is '^[A-Za-z0-9]{12}$')

The consumer side use Redis's SETNX to deduplicate and EXPIRE to purge expired keys automatically. Sample code:

Message msg = ... // eg. ConsumerIterator.next().message().fromJson();
Jedis jedis = ... // eg. JedisPool.getResource();
String key = "SPOUT:" + msg.uniqId; // prefix name at will
String val = Long.toString(System.currentTimeMillis());
long rsps = jedis.setnx(key, val);
if (rsps <= 0) {
    log.warn("kafka dup: {}", msg.toJson()); // and other logic
} else {
    jedis.expire(key, 7200); // 2 hours is ok for production environment;
}

The above code did detect duplicate messages several times when Kafka(version 0.8.x) had situations. With our input/output balance audit log, no message lost or dup happened.

Solution 4 - Java

There's a relatively new 'Transactional API' now in Kafka that can allow you to achieve exactly once processing when processing a stream. With the transactional API, idempotency can be built in, as long as the remainder of your system is designed for idempotency. See https://www.baeldung.com/kafka-exactly-once

Solution 5 - Java

Whatever done on producer side, still the best way we believe to deliver exactly once from kafka is to handle it on consumer side:

  1. Produce msg with a uuid as the Kafka message Key into topic T1
  2. consumer side read the msg from T1, write it on hbase with uuid as rowkey
  3. read back from hbase with the same rowkey and write to another topic T2
  4. have your end consumers actually consume from topic T2

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionShades88View Question on Stackoverflow
Solution 1 - JavakuujoView Answer on Stackoverflow
Solution 2 - JavaRaGeView Answer on Stackoverflow
Solution 3 - JavapeihanView Answer on Stackoverflow
Solution 4 - JavaChris HalcrowView Answer on Stackoverflow
Solution 5 - JavaDean JainView Answer on Stackoverflow