Data Aggregation

In this exercise we build an application designed to aggregate text messages sent to recipients. We would like to store all messages sent to single user in unique aggregate.

For the input topic, we send messages in the following format: ${timestamp}#${sender}#${receiver}#${message}, for example:

  • key: null, value: 1728148429#john#maria#Hello how are you?
  • key: null, value: 1728148430#maria#john#I am good, thank you
  • key: null, value: 1728148431#maria#john#And how are you?
  • key: null, value: 1728148450#john#maria#I am good too
  • key: null, value: 1728148450#john#maria#I'd like to talk with you about Apache Kafka training scheduled next week

On the output topic, we would like to have number of messages sent to single receiver, like:

  • key: maria, value: { "messages": [ { "senderTime": 1728148429, "inboxTime": 1728148459, "sender": "john", "message": "Hello how are you?"}]
  • key: john, value: { "messages": [ { "senderTime": 1728148430, "inboxTime": 1728148450, "sender": "maria", "message": "I am good, thank you"}]

Implementation

  1. Clone repository github.com/pszymczyk/kafka-streams-playground.
  2. Checkout to the app3 branch.
  3. In the solution you should use classes provided in com.pszymczyk.common package - Inbox, InboxMessage, JsonSerde.
  4. Open InboxApp.java file.
  5. Go to buildKafkaStreamsTopology() method. You can find there basic StreamsBuilder setup, we will use it to define our application topology.
  6. Let's start from consuming messages from app3-source topic, use stream(...) method.
  7. In the next step we need to change message key from null to message receiver, use selectKey(...) method.
  8. Now we need to group together messages with the same key, use groupByKey() method.
  9. When the messages are grouped together, we can aggregate them by evaluating aggregate(...) method. You should pass exactly three arguments there:
    1. new Initializer()
    2. new Aggregator<String, Message, Inbox>()
    3. Materialized.with(Serdes.String(), JsonSerdes.newSerdes(Inbox.class))
  10. Result of aggregation should be KTable<String, Inbox>.
  11. When aggregation is done, last thing we need to do is to forward computation results to sink topic. We should utilize methods toStream() and to() now.
  12. Run tests and check solution correctness.