Message counter
In this exercise we implement an application designed to count the number of text messages sent to recipients.
For the input topic, we send messages in the following format: ${timestamp}#${sender}#${receiver}#${message}, for example:
- key: null, value: 1728148429#john#maria#Hello how are you?
- key: null, value: 1728148430#maria#john#I am good, thank you
- key: null, value: 1728148431#maria#john#And how are you?
- key: null, value: 1728148450#john#maria#I am good too
- key: null, value: 1728148450#john#maria#I'd like to talk with you about Apache Kafka training scheduled next week
On the output topic, we would like to have number of messages sent to single receiver, like:
- key: maria, value: 1
- key: john, value: 1
- key: john, value: 2
- key: maria, value: 2
- key: maria, value: 3

Implementation
- Clone repository github.com/pszymczyk/kafka-streams-playground.
- Checkout to the app2 branch.
- In the MessagesCountApp class, there is a main(...) method that you can use to manually test created application. In addition to the main(...) method, in the /src/test/groovy/com.pszymczyk/app2 directory, there are unit tests for checking the solution correctness.
- Open App2.java file.
- Go to buildKafkaStreamsTopology() method. You can find there basic StreamsBuilder setup, we will use it to define our application topology.
- Let's start from consuming messages from app2-source topic, use stream(...) method.
- In the next step we need to change message key from null to message receiver, use selectKey(...) method.
- Now we need to group together messages with the same key, use groupByKey() method.
- When the messages are grouped together, we can count them by evaluating count() method.
- Now our calculations are finished, last thing we need to do is to forward computation results to sink topic. We should utilize methods toStream() and to() now.
- Run tests and check solution correctness.
Refactoring
The purpose of refactoring is to move the serialization and deserialization details from topology definition into specific SerDe implementation.
- Open MessageSerde.java file.
- Implement methods serialize(...) and deserialize(...).
- Open App2.java file.
- Override default SerDe in stream definition, you can do it by passing Consumed.with(Serdes.Void(), MessageSerde.newSerde()) to stream(...) method.
- Adjust groupBy(...) method to new implementation.
- Adjust tests.
- Run tests and check solution correctness.