[Native] How to setup Shared Kafka Consumer aka Queues for Kafka
Since 4.1.0 Kafka we have an opportunity to test totally new approach for consuming messages from Kafka:
We can consume data from the single topic partition by multiple Kafka Consumer instances running in the same Consume Group!
This is a huge change in the matter of dynamic scaling Kafka Consumers, before 4.1.0 maximum level of parallel processing was limited by number on topics partitions we are listening on, with that release we try to scale our applications dynamically, based on some different factors like number of messages or Consumer Group lag.
You can read more about Queues in Apache Kafka in official Kafka Improvement Proposal documents:
So lets play with Shared Kafka Consumer:
- Download Apache Kafka binaries https://kafka.apache.org/downl...
- Modify
/config/server.propertiesfile, add following configurations:-
unstable.api.versions.enable=true
group.coordinator.rebalance.protocols=classic,consumer,share
group.share.enable=true
-
- Start Kafka Broker
- Create topic
.bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-queue
- Clone github repository https://github.com/pszymczyk/k...
- Open project in your favourite IDE
- Navigate to
com.pszymczyk.step6.Step6class - Navigate to line 50 and see that we are using
org.apache.kafka.clients.consumer.ShareConsumer<String, String>instead of classic Kafka Consumer - Navigate to line 60 and see that we provided new Shared Kafka Consumer configuration parameter
props.put(SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit")so we can use totally new Shared Consumer APIconsumer.acknowledge(record, AcknowledgeType.ACCEPT)(line 78). - Run
SubscribeRunner.main() - Analyze logs, you can see that different members are assigned to the same topic partition
12. Check consumer group state and members using Kafka CLI, you should also see that we have 3 members assigned to the same topic and partition:
13. Finally! Let's produce some messages, you can use com.pszymczyk.step6.PublishRunner for that. In the application logs you will see that our application is consuming messages from single topic partition on multiple Kafka Consumer instances running on different threads (not possible before 4.1.0 release)