Partitioner and Producer Interceptor

In this task, we will implement a custom Partitioner for a Kafka Producer, as well as a Producer Interceptor. The objective is to enhance the functionality of the Kafka Producer by creating a tailored approach to message distribution and interception.

Objectives:

  1. Custom Partitioner:
    • Develop a custom Partitioner that determines how messages are distributed across Kafka partitions. This should consider factors such as message key, value, or other relevant criteria to optimize load balancing and message processing.
  2. Producer Interceptor:
    • Implement a Producer Interceptor that allows for pre-processing and post-processing of messages sent by the producer. This could include tasks such as logging, metrics collection, or modifying the message before it is sent to Kafka.

Implementation

  1. Clone repository https://github.com/pszymczyk/kafka-native-java-playground
  2. Switch to step3 branch
  3. In the MetadataEnrichmentInterceptor class, implement your own interceptor that adds two headers to the message
    1. local_time, with the value of the current time in the Canada/Yukon zone
    2. source, with the name of the microservice producing the message, i.e. Step3_microservice
  4. In the PositiveNegativePartitioner class, implement your own partitioning mechanism working according to the following rules:
    1. If the value of the sent message is 0, send the message to partition number 0
    2. If the value of the sent message is a number less than zero, send the message to partition number 1
    3. If the value of the sent message is a number greater than zero, send the message to partition number 2
  5. In the PublishRunner class, register the interceptor and the partitioner
  6. Run PublishRunner, every 100ms messages from the range [-100,100] are sent to the topic step3
  7. Run Kafka Console Consumer and check the operation of the partitioner and interceptor
    1. ./kafka-console-consumer.sh --topic step3 --bootstrap-server localhost:9092 --property print.partition=true --property print.headers=true