Kafka Streams Tutorial

Kafka Streams is a powerful and lightweight library provided by Apache Kafka for building real-time streaming applications and microservices. In this tutorial we will show a simple Kafka Streams example with Quarkus which shows how to perform stream processing tasks directly within the Kafka ecosystem, leveraging the familiar Kafka infrastructure to process and transform data in real-time.

Kafka Streams vs Kafka

Firstly, if you are new to the Kafka ecosystem, it is worth understanding the key differences between Kafka and Kafka Streams API:

Apache Kafka is a distributed and fault-tolerant messaging system designed for handling large volumes of real-time data streams. It is built to store, publish, and subscribe to streams of records (messages) in a fault-tolerant manner.

Kafka Streams is a client-side Java library built on top of Apache Kafka. It enables developers to build real-time stream processing applications and microservices that consume, process, and produce data streams from Kafka topics.

Kafka Streams Use cases

Kafka Streams is ideal for scenarios where real-time data processing is required. It’s well-suited for applications that demand immediate processing and analysis of continuous streams of data, such as:

  • Real-time Analytics: Perform continuous computations on incoming data streams to derive insights and generate analytics in real-time.
  • Event-driven Architectures: Implement event-driven microservices where each event triggers specific actions or processing logic.
  • Fraud Detection: Detect fraudulent activities by analyzing patterns and anomalies in real-time data streams.

Besides, Kafka Streams also offers stateful stream processing capabilities, allowing you to maintain and update state as data streams through. Use cases include:

  • Session Windows: Analyze session-based data (e.g., user sessions on a website) by grouping events within specific time windows.
  • Aggregations: Perform continuous aggregations (e.g., count, sum) over data streams.

In the next section of this tutorial we will show a basic Kafka Streams example built with Quarkus that performs real-time data streaming on a flow of incoming messages.

Kafka streams architecture

Here is at high level our Kafka Streams project architecture:

Kafka streams architecture tutorial

As you can see, we have a Message Producer which emits Payment message events. In the middle, we have the Kafka Streams topology which contains the logic to process, transform and filter the messages as they move through the stream processing pipeline.

More in detail:

  • We will first iterate through the Payment to see if the email related to the Payment is valid.
  • Then, we will discard Messages with invalid email.
  • Next, we implement another filtering logic to send Payments over 500 to the topic largepayment. Smaller payments will go to the smallpayment topic instead.
  • On the Kafka side, we will Consume Payments with two distinct consumers: one for the topic largepayment. Another for the smallpayment.

Coding the Kafka Producers and Consumers

Firstly, we will code the Kafka Producer and Consumers. For this purpose, we will create an @ApplicationScoped Bean which incudes both an @Outgoing method and two @Incoming methods:

@ApplicationScoped
public class PaymentGenerator {

    private static final Logger LOG = LoggerFactory.getLogger(PaymentGenerator.class);

    private final Random random = new Random();

    @Outgoing("new-payment")
    public Multi<Payment> generatePayments() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .onItem().transform(n -> {
                    Payment payment = new Payment(generateRandomMail(), random.nextInt(1000));
                    LOG.info("Produced payment: " + payment);
                    return payment;
                });
    }


    @Incoming("large-payment")
    public void processLargePayment(Payment payment) {
        logBlueText("Received large payment! " + payment);
    }

    @Incoming("small-payment")
    public void processSmallPayment(Payment payment) {
        logWhiteText("Received small payment! " + payment);
    }
 
    public String generateRandomMail() {
       // generate random mail . Omitted for brevity
    }

}

For the sake of brevity, boilerplate code is commented. You will find the full source code at the end of this article. Anyone, the code is quite intuitive:

  • @Outgoing("new-payment")produces data to the “new-payment” stream. Uses a periodic timer to emit a Payment with random email addresses and amounts.
  • @Incoming("large-payment") consumes messages from the “large-payment” stream.
  • @Incoming("small-payment") consumes messages from the “small-payment” stream.

Coding the Kafka Streams Topology

Next, we will code the actual Kafka Streams processing logic to process incoming payments and generating outgoing messages.

@ApplicationScoped
public class PaymentStream {
    private static final Logger LOG = LoggerFactory.getLogger(PaymentStream.class);


    @Produces
    public Topology buildTopology() {
        
        StreamsBuilder builder = new StreamsBuilder();

        ObjectMapperSerde<Payment> valueSerde = new ObjectMapperSerde<>(Payment.class);
        Serde<String> keySerde = Serdes.String();

        KStream<String, Payment> stream = builder.stream("payment", Consumed.with(keySerde, valueSerde));

        stream
                .peek((key, payment) -> System.out.println("Received payment: " + payment))
                .filter((key, payment) -> {
                    boolean isValid = isValidEmail(payment.getEmail());
                    if (!isValid) {
                        logRedText("Invalid email encountered. Discarding payment.");
                    }
                    return isValid;
                })
                .mapValues(payment -> payment.getEmail() + " has paid $" + payment.getMoney())
                .foreach((key, value) -> logGreenText("Valid payment: " + value));

        stream
                .filter((key, payment) -> payment.getMoney() > 500)
                .to("largepayment", Produced.with(keySerde, valueSerde));

        stream
                .filter((key, payment) -> payment.getMoney() <= 500)
                .to("smallpayment", Produced.with(keySerde, valueSerde));


        return builder.build();
    }

    private boolean isValidEmail(String email) {
        // Regular expression to check for a valid email format
        String emailRegex = "^[a-zA-Z0-9_+&*-]+(?:\\.[a-zA-Z0-9_+&*-]+)*@(?:[a-zA-Z0-9-]+\\.)+[a-zA-Z]{2,7}$";
        return email.matches(emailRegex);
    }
}

More in detail, here is our Kafka Streams Topology in action:

Stream Processing:

  • Applies various operations on the stream:
    • peek(): Prints received payments to the console.
    • filter(): Checks if the payment’s email is valid; if not, logs and discards it.
    • mapValues(): Transforms the payment into a string representation.
    • foreach(): Logs valid payment information.

Branching and Output:

  • Filters the stream based on payment amount:
    • Sends payments with amount > 500 to the “largepayment” topic.
    • Sends payments with amount <= 500 to the “smallpayment” topic.

Return Topology:

  • Returns the built topology (Topology) created from the defined stream processing steps.

Configuring the Kafka application

Finally, we will add the configuration part. Since this will be a Quarkus application, within the application.properties we include the mapping of Kafka Streams topics with Incoming and Outgoing streams. We also need to set the Serializer and Deserializer for the Outgoing messages to use the ObjectMapperSerializer:

kafka.bootstrap.servers=localhost:9092
quarkus.kafka.devservices.enabled=false

quarkus.kafka-streams.topics=payment,largepayment,smallpayment
quarkus.kafka-streams.application-id=payment-system


mp.messaging.outgoing.new-payment.connector=smallrye-kafka
mp.messaging.outgoing.new-payment.topic=payment
mp.messaging.outgoing.new-payment.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
mp.messaging.outgoing.new-payment.value.deserializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

mp.messaging.incoming.large-payment.connector=smallrye-kafka
mp.messaging.incoming.large-payment.topic=largepayment

mp.messaging.incoming.small-payment.connector=smallrye-kafka
mp.messaging.incoming.small-payment.topic=smallpayment

Then, in order to build our application, we will need both Kafka API and Kafka Streams API in our Quarkus project:

<dependency>
     <groupId>io.quarkus</groupId>
     <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
     <groupId>io.quarkus</groupId>
     <artifactId>quarkus-kafka-streams</artifactId>
</dependency>

Testing the application

Everything is in the right place so let’s test our application. Although Quarkus is able to start up its own Kafka Service when you use Dev Services, we are including a sample docker-compose file to start up an independent Kafka Service:

docker-compose up

Then, launch the Quarkus application:

mvn install quarkus:dev

Our application will connect to Kafka and start to emit and process payments:

kafka streams tutorial

Conclusion

In conclusion, Kafka Streams runs alongside your application code, performing stream processing tasks such as filtering, mapping, aggregating, and joining data from Kafka topics. It’s a library that developers use to implement stream processing logic within their Java applications, which then interact with the Kafka brokers to consume and produce messages.

Source code: https://github.com/fmarchioni/mastertheboss/tree/master/quarkus/kafka-streams

Recommended reading:

Quarkus Reactive messaging with Kafka

Getting started with Apache Kafka