Quarkus Reactive messaging with Kafka

In this article, we will learn how to create a Quarkus Reactive application which uses the SmallRye Reactive Messaging and Mutiny project to stream data from and to a Kafka cluster.

Reactive messaging in a nutshell

The architecture style of enterprise application has been changing in the last years. Besides the standard client-server approach we now have microservices, reactive applications, and even serverless applications.

Quarkus is a Reactive framework. You can deliver reactive applications using two core integrations: the Eclipse MicroProfile Reactive specification and SmallRye Mutiny.

Microprofile Reactive Messaging is a specification that uses CDI beans to drive the flow of messages toward some specific channels.

You can use the following annotations are used to indicate whether a method is a producer or a consumer of messages:

  • org.eclipse.microprofile.reactive.messaging.Incoming – Used to signify a subscriber to incoming messages
  • org.eclipse.microprofile.reactive.messaging.Outgoing – Used to signify a publisher of outgoing messages

With regards to event sources, Quarkus uses the Mutiny Framework as primary model. With Mutiny, you observe events, react to them, and create elegant and readable processing pipelines.

Smallrye Mutiny offers two types that are both event-driven and lazy:

  • A Uni emits a single event (an item or a failure). Unis are convenient to represent asynchronous actions that return 0 or 1 result. A good example is the result of sending a message to a message broker queue.
  • A Multi emits multiple events (n items, 1 failure or 1 completion). Multis can represent streams of items, potentially unbounded. A good example is receiving messages from a message broker queue.

As a example of combining Microprofile Reactive Messaging with Mutiny consider the following example:

@Outgoing("source")
public Multi<String> generate() {
    return  Multi.createFrom()
          .iterable(Arrays.asList("some", "iterable","data"));
}

@Incoming("source")
@Outgoing("destination")
public String process(String in) {
    return in.toUpperCase();
}

@Incoming("destination")
public void consume(String processed) {
    System.out.println(processed);
}

Reactive Messaging automatically binds matching @Outgoing to @Incoming to form a chain. Therefore, the following chain will be generated:

generate --> [ source ] --> process --> [ desination ] --> consume

Let’s see with a practical example how to create a Quarkus application which uses Reactive messages to stream Stock Exchange Quotes. In order to publish, subscribe and process streams in real time we will be using Apache Kafka as distributed streaming platform.

Creating the Quarkus application

Firstly, we will kick-start our application “kafka-demo”. Use any available tool such as Quarkus CLI:

$ quarkus create app kafka-demo

Next, navigate into the kafka-demo folder and add the following extensions:

$ quarkus ext add smallrye-reactive-messaging-kafka resteasy-jackson
Looking for the newly published extensions in registry.quarkus.io
[SUCCESS] ✅  Extension io.quarkus:quarkus-smallrye-reactive-messaging-kafka has been installed
[SUCCESS] ✅  Extension io.quarkus:quarkus-resteasy-jackson has been installed

Our application will be streaming real time Stock Quotes. The simplest way to do that is including the Yahoo Finance library to your project:

<dependency>
	<groupId>com.yahoofinance-api</groupId>
	<artifactId>YahooFinanceAPI</artifactId>
	<version>3.15.0</version>
</dependency>

Setting up Producers and Consumers

We are done with the project skeleton. Next thing, will be coding the CDI Beans for handling Incoming and Outgoing messages and a REST Endpoint that will publish quotes as Server Side Events.

Firstly, we will add a QuoteGenerator. This CDI Bean will produce Stock Quotes in the generate method, which is declared as published by the @Outgoing(“stock-quote”) annotation:

@ApplicationScoped
public class QuoteGenerator {

	int counter = 0;

	@ConfigProperty(name = "stock.tickers")
	List<String> stocks;

	@Outgoing("stock-quote")
	public Multi<Quote> generate() {
		return Multi.createFrom().ticks().every(Duration.ofSeconds(1)).map(n -> generateQuote());
	}

	private Quote generateQuote() {
		Stock stock = null;
		String ticker = stocks.get(getCounter());
		try {
			stock = YahooFinance.get(ticker);
		} catch (IOException e) {
			e.printStackTrace();
		}
		BigDecimal price = stock.getQuote().getPrice();
		BigDecimal priceChange = stock.getQuote().getChange();

		Quote q = new Quote();
		q.setCompany(ticker);
		q.setValue(price.doubleValue());
		q.setChange(priceChange.doubleValue());

		return q;
	}

	private int getCounter() {
		counter++;
		if (counter == stocks.size()) {
			counter = 0;
		}
		return counter;
	}

}

In this example, we use the generateQuote method to fetch the Stock price and change from YahooFinance, using the actual Stock Ticker as key (ex: AAPL stands for Apple). The getCounter method is an helper method to rotate over the List of Stocks.

Next, the Outgoing messages, are consumed by the following QuoteConverter Class:

@ApplicationScoped
public class QuoteConverter {
 
    DateFormat dateFormat = new SimpleDateFormat("hh:mm:ss");  

    @Incoming("stocks")
    @Outgoing("in-memory-stream")
    @Broadcast
    public Quote newQuote(Quote quote) throws Exception {
 
        Date date = Calendar.getInstance().getTime();        
        String strDate = dateFormat.format(date);  
        quote.setTime(strDate);
 
    	 return quote;
     
    }

}

In Stream based applications, a Converter performs any kind of change/filter on the messages. In our example, it is not doing anything complicated: it just sets the timestamp of the current Quote.

Add Intelligence to your Stock Ticker application

The Converter Bean is ideal to keep your business logic distinct from the logic which fetches your Data. For example, in the method newQuote you might include Business Intelligence in relation to the Stock trend. Within it, you could add to your Quote a signal to buy or sell the Stock.

As you could see, Messages are now in the “in-memory-stream” channel. The last stop will be the REST Endpoint which consumes from this channel and publishes data as Server Side Events:

@Path("/quotes")
public class QuoteEndpoint {

	@Channel("in-memory-stream")
	Publisher<Quote> quotes;

	@ConfigProperty(name = "stock.tickers")
	List<String> stocks;

	@GET
	@Path("/stream")
	@Produces(MediaType.SERVER_SENT_EVENTS)
	@SseElementType(MediaType.APPLICATION_JSON)
	public Publisher<Quote> stream() {
		return quotes;
	}

	@GET
	@Path("/init")
	public List<Quote> getList() {
		List<Quote> list = new ArrayList();
		for (String ticker : stocks) {
			list.add(new Quote(ticker));
		}
		return list;
	}

}
  • The method stream publishes data (in JSON format) as Server Side Events
  • The method getList publishes the list of Stocks as an items of JSON Arrays. The sole purpose of this method is to create the front-end grid dynamically at boot.

Our application is almost ready. The basic Quote Class completes the Java code:

public class Quote   {
   String company;
   Double value;
   Double change;
   String time;
    
    // Getters and Setters Omitted for brevity

}

The application configuration

The file application.properties contains some important stuff:

#Some Nasdaq Stocks
stock.tickers=AAPL,MSFT,AMZN,GOOG,TSLA,NVDA,FB,AVGO,COST,CSCO

#Some Italian Stocks
#stock.tickers=CRDI.MI,ENEL.MI,ENI.MI,ACE.MI,HER.MI,ISP.MI,PST.MI,STLA.MI

# Kafka destination
mp.messaging.outgoing.stock-quote.connector=smallrye-kafka
mp.messaging.outgoing.stock-quote.topic=stocks

# Kafka source (we read from it)
mp.messaging.incoming.stocks.connector=smallrye-kafka
mp.messaging.incoming.stocks.topic=stocks

quarkus.reactive-messaging.kafka.serializer-generation.enabled=true

Firstly, the list of Stocks is an array of Strings. By default we use some US Stocks from Nasdaq. If you want to see the numbers bouncing during EMEA Time, just use some European stocks tickers instead (e.g. the ones commented as stocks from Italy’s FTSE MIB).

Then, in order to decouple Producer and Consumer messages, we use the Kafka connectors from SmallRye Kafka. Message producers and Consumers are split in two blocks:

  • In the first block, we configure the Kafka destination topic and connector, where we send messages as Stock Quote in JSON Format.
  • In the second block, we configure the Kafka source topic and connector, where we read messages as Stock Quote in JSON Format..

Finally, (although this is by default true) we enable the automatic serialization of data which, by default is streamed as Bytes.

This picture summarizes the whole schema of messages from / to Apache Kafka:

quarkus kafka tutorial

The Front end layer

Finally, to complete our application, all we need to do is add a client application that is able to capture the SSE and display
the text of in a readable format. For the sake of brevity, we will include here just the core JavaScript function that uses Javascript and jQuery to subscribe to the SSE events:

<script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
 
<script>
   var source = new EventSource("/quotes/stream");
   source.onmessage = function (event) {

    var data = JSON.parse(event.data);
  
    var company = data['company'];
    var value = data['value'];
    var change = data['change'];
    var timestamp = data['time'];
 
    var companyChange = company+"-change";
    if ( document.getElementById(company) != null){
        document.getElementById(company).innerHTML = value;
        document.getElementById(companyChange).innerHTML = change;
        if (change < 0)  {
            document.getElementById(companyChange).style.color="red";
        }
        else  {
            document.getElementById(companyChange).style.color="green";
        }
        document.getElementById("timestamp").innerHTML = timestamp;
   }
  } 
    
    
</script>

You will find the full source code at the end of this article, including my titanic effort to build a dynamic set of divs using JQuery and Javascript 🙂

Running the application

Before we start the Quarkus application, we need a Kafka and Zookeeper cluster up and running. The quickest solution is to start them as Docker container through the docker-compose tool.

The following docker-compose.yaml file starts up a cluster of Zookeeper server and Apache Kafka using the OpenSource Apache Strimzi project:

version: '3.5'

services:
  zookeeper:
    image: quay.io/strimzi/kafka:0.23.0-kafka-2.8.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: quay.io/strimzi/kafka:0.23.0-kafka-2.8.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

You can now build and start the Quarkus application as well:

$ mvn install quarkus:dev

The application is available at localhost:8080:

quarkus apache kafka tutorial

By clicking on the “Connect to Stock Exchange” button, the application will start displaying and updating data in real time.

Source code

You can download the source code from here: https://github.com/fmarchioni/mastertheboss/tree/master/quarkus/kafka-demo

Found the article helpful? if so please follow us on Socials