My Articles

How to Create Kafka Idempotent Consumer in Spring -

Lets go into some meaningful example.
Let's assume you enter an elevator and want to select the floor you want to stop at. You press the button for the 5th floor.
You can see that pressing the 5th-floor button multiple times does not change the outcome – the elevator will still stop at the 5th floor. The operation is idempotent because, no matter how many times the button is pressed, the elevator will always stop at the same floor.

Lets go into some meaningful example.

Let's assume you enter a...

Kafka Streams - Retrying a message

As I understand Kafka Streams and the whole Confluent Platform architecture you shouldn’t communicate with any external resources directly from Kafka Streams application. One of the basic concept is Kafka Steams application inputs and outputs are just Kafka topics. Communication with every other external resources should be done by Kafka Connect. There is a lot different connectors made by Confluent and community, you can even write your own implementation if needed.

In this approach you don't...

Connection management when using kafka producer in high traffic environment

I am going to use kafka in a very high traffic environment of more than a billion requests per day. Every request will make a connection to kafka cluster to send message. So there will be so many connections being made continuously every second. This could cause issues like socket timeouts.
producer is making all the non-persistent connections. So in such case there could be socket timeout or port exhaustion issues.

Most ecosystem is in php, so I have to use php library for kafka. Now how to ef...

How to automate Kafka Testing

As I understood you want to implement end to end tests starting from messages. Me and some people from recently made a research for libraries, tools and frameworks to test Event-driven systems using Kafka.
We found Zerocode which is an automated API testing using declarative language like JSON or YAML. It support REST, SOAP and what we are interested, Messaging. It sends and consumes messages from topics and make assertions in the end, easy to learn and use. Here is the link for more details Zer...

Building a cache with Kafka Streams

I am trying to get a sense of what is possible and how to think when working with Kafka Streams.
There is a topic called Transactions:
I want to create a cache that will hold all recent transactions (last 10 minutes).
The cache can be queried by a rest client by providing the transaction reference.

I am trying to get a sense of what is possible and how to think when working with Kafka Streams.

There is a topic called Transactions:

I want to create a cache that will hold all recent transaction...

KStreams Grouping by multiple fields to get count

So I have a bunch of records in a topic like the one below. I can create the GroupBy in KSQLDB with no problem as it is more SQL than anything else. But I have been tasked to move it over to Java KStreams and am failing miserably.
Can someone guide me on the Topology for first grouping by user_id then Object_id then by day? I don't ask this lightly as I have tried over and over with state stores with so many examples but I am just chasing my tail. Basically, I would like to know how many times a...

How to use @EventListeners with Kafka poll() method?

There will be an event coming from kafka from some other applications and I want to listen to that event using spring events. The events are transported via kafka but How can I listen to that event?
The event transported would be of the type Event class, but how can I use poll which is to be used for kafka events?
Example :
@EventListener public void handleEvent(){ .poll() }
I couldn't understand how should I proceed with calling poll inside @EventListener method

There will be an e...

1 million kafka producer records a second?

Key properties during KafkaProducers throughput tuning are:
Producer configs:
Topic config:
So after studying the linked documentation you will see two groups of properties coupled:
1. Effective batching and compression on Kafka producer
Kafka producer has built in mechanism of batching and compression. The mechanism gets the best results when the producer groups and then compresses many messages into a single batch, so simply setting a high value for batch size and enabling compression sounds v...

Which messaging system for a web dashboard?

I would like to make a Web Dashboard system and I am facing a problem. I need to get an information that is in the cache of one of the instances of my program, for this I had thought of doing Pub/Sub with Kafka however I don't know how to do to Publish and get a response from one of my Subscriber. Do you know a pattern that allows this and a service that allows me to do this?
EDIT: I would like to design an infrastructure that follows this pattern:

I would like to make a Web Dashboard system an...

Stop Kafka Consumer after consuming the log end offset using reactor kafka

I have a requirement where I will consume all message from a topic from the beginning to the latest message offset in the partition then stop the consumer from listening to the topic. I have created the following code snippet:
I don't know what approach should I do.. I'm thinking of getting the current partition position() but I'm not sure if I should proceed with it..

I have a requirement where I will consume all message from a topic from the beginning to the latest message offset in the parti...

KTable & LogAndContinueExceptionHandler

I have a very simple consumer from which I create a materialized view. I have enabled validation on my value object (throwing Constraintviolationexception for invalid json data). When I receive a value on which the validation fails, I exepct the value to logged & consumer should read the next offset as I have LogAndContinueExceptionHandler enabled.
However LogAndContinueExceptionHandler is never invoked and consumePojo State transition from PENDING_ERROR to ERROR
Code
Why is it that LogAndContin...

Kafka Streams calculations app - most efficient way to account for time elapsed in streams app

I have Kafka Streaming API application joining 2 streams which works ok.
I have another Kafka Streaming API application which is performing some calculations based on joined values from 1st application - also works ok.
Results of calculations are used to trigger commands on PLC server.
Messages are coming in every 25ms which is window for join in 1st application.
Now I have requirement which says after each trigger wait for xy milliseconds before allowing next trigger on PLC server.
So Kafka Str...

When Kafka Streams GlobalKTable is a good choice as a data store in microservices world?

I'm new in Kafka Streams world. I'm wondering when to use Kafka Streams GlobalKTable (with compacted topic under the hood) instead of regular database for persisting data. And what are advantages and disadvantages of both solution. I guess both ensure data persistence on the same level.
Let's say there is an simple e-commerce app having users registering and updating their data. And there are two microservices - first one (service-users) is responsible for registering users and the second one (s...

Unit Test MessageListenerContainer using a mock consumer

I have a Spring Boot application with a MessageListenerContainer bean, which I want to unit test. How can I use a mock Kafka consumer to simulate the receipt of a number of records? I've searched and found MockConsumer together with examples using plain Apache Kafka, but I can't figure out how to use it with Spring Kafka's MessageListenerContainer. I use Kotlin and Kotest, but answers using Java/JUnit are also welcome. My unit tests don't use @SpringBootTest: instead, I manually create beans and...

Whether kafka cluster non-flushing disk mode could take effect for certain topics?

If I want Kafka cluster to support quasi real-time message transform(< 500 ms), do I need to set the Kafka cluster to the non-flushing disk mode? If I set non-flushing disk mode, Whether non-flushing disk mode could takes effect for certain topics? not for all the topics, because other topic I need high reliability for consumers.

If I want Kafka cluster to support quasi real-time message transform(< 500 ms), do I need to set the Kafka cluster to the non-flushing disk mode? If I set non-flushing...

Publish multiple events in one Kafka topic

I need to publish multiple messages from the same project which represents employee journey events, and i need to use one topic only to publish these messages as they are representing the same project, but in some cases the message may contain extra fields for example:
All messages share (id, name, type, date) and
may some events have more fields like (course id, course name), so I am intending to use one parent object called "Journey", contains "Event" object, and I will create multiple childre...

Subscribe to get my new articles by email