Building Reactive Applications with WildFly

In this tutorial we will learn how to design, configure and deploy a reactive application on WildFly 23, using smallrye-reactive-messaging version 3.0.0. We will use Apache Kafka as distributed data streaming platform for our demo application.

Reactive Streams aims to provide a standard for exchanging data streams across an asynchronous boundary. At the same time, it guarantees that the receiving side is not forced to buffer arbitrary amounts of data.

In order to familiarize with MicroProfile Reactive Messaging, we need proper knowledge of some key concepts. First of all, MicroProfile Reactive Messaging is a specification which uses CDI Beans to drive the flow of messages towards some specific channels.

A Message is the basic interface that contains a payload to be streamed. The Message interface is parametrized in order to describe the type of payload it contains. Additionally, a message contains attributes and metadata which are specific to the broker used for message exchange (e.g. Kafka ).

A Channel, on the other hand, is a String indicating which source or destination of messages is used.

As MicroProfile Reactive Messaging is fully governed by the CDI model, two core annotations are used to indicate if a method is a producer or consumer of messages:

@Incoming: This annotation indicates that it consumes messages from the specified channel. The name of the channel is added in the annotation as an attribute. Here is an example:

@Incoming("channel")
public void consume(Message<String> s) {
// Consume message here:
}

When you place this annotation on a method, the method will be called each time a message is sent to that channel. However, you may decide to make it clear that the method consumes a specific kind of Message, such as a KafkaMessage (which inherits from Message). Here is an example:

@Incoming("channel")
public void consume(KafkaMessage<String> s) {
// Consume message here:
}

@Outgoing: This annotation indicates that a method publishes messages to a channel. Much the same way, the name of the channel is stated in the annotation’s attribute:

@Outgoing("channel")
public Message<String> produce() {
// Produce and return a Message implementation
}

Within the method annotated with @Outgoing, we return a concrete implementation of the Message interface.

You can also annotate a method with both @Incoming and @Outgoing so that it behaves like a Message Processor, which transforms the content of the message data:

@Incoming("from")
@Outgoing("to")
public String translate(String text) {
return MyTranslator.translate(text);
}

Streaming Messages with WildFly

WildFly supports, since the version 23, the distributed streaming of messages. The required subsystems (microprofile-reactive-streams-operators-smallrye and microprofile-reactive-messaging-smallrye) are not however included by default in the configuration. In order to add them, start at first WildFly:

$ ./standalone.sh -Djboss.as.reactive.messaging.experimental=true -c standalone-microprofile.xml

Please note the jboss.as.reactive.messaging.experimental is required to use some features (such as the @Channel annotation) which are available in the version 3.0.0 of reactive messaging.

When WildFly is up and running, connect from the CLI and execute the following short script to allow the required extensions:

batch
/extension=org.wildfly.extension.microprofile.reactive-messaging-smallrye:add
/extension=org.wildfly.extension.microprofile.reactive-streams-operators-smallrye:add
/subsystem=microprofile-reactive-streams-operators-smallrye:add
/subsystem=microprofile-reactive-messaging-smallrye:add
run-batch

reload

Boostrapping Kafka

Apache Kafka is a distributed data streaming platform that can be used to publish, subscribe, store, and process streams of data from multiple sources in real-time at amazing speeds.

Apache Kafka can be plugged into streaming data pipelines that distribute data between systems, and as well into the systems and applications that consume that data. Since Apache Kafka reduces the need for point-to-point integrations for data sharing, it is a perfect fit for a range of use cases where high throughput and scalability are vital.

To manage the Kafka environment, you need a software named Zookeeper which manages naming and configuration data so to provide flexible and robust synchronization within distributed systems. Zookeeper controls the status of the Kafka cluster nodes and it also keeps track of Kafka topics, partitions plus all the Kafka services you need.

There are several ways to start a Kafkacluster. The simplest one is to use the docker-compose tool so to orchestrate both Kafka and Zookeeper container images with a single file. The docker-compose.yaml file is located at the root of our example project. (Here you can take a look at it: https://github.com/fmarchioni/mastertheboss/blob/master/kafka/microprofile-kafka-demo/docker-compose.yaml )

First, make sure that docker is up and running:

$ service docker start

Then, start Apache Kafka and Zookeeper with:

$ docker-compose up

Creative a Reactive Application

In order to demonstrate Apache Kafka and Microprofle Streaming’s powerful combination on WildFly, we will design a simple application which simulates a Stock Trading ticker, updated in real time by purchases and sales.

We will create the following channels:

1. An Outgoing Producer bound to the “stock-quote” channel where messages containing stock orders will be written into a Topic named “stocks”.

2. An Incoming Consumer bound to the “stocks” channel which read messages available in the “stocks” Topic.

3. An Outgoing Producer bound to the “in-memory-stream” channel which broadcasts internally the new Stock Quote to all available subscribers

4. An Incoming Consumer bound to the “in-memory-stream” channel which reads the new Stock Quote and sends it is as Server Side Event to Clients

The following picture depicts the basic stream of messages used in our example:

The first class we will add is QuoteGenerated which is an ApplicationScoped CDI Bean that produces random quotes for a Company every two seconds. Here is the code of it:

@ApplicationScoped
public class QuoteGenerator {

    @Inject
    private MockExternalAsyncResource externalAsyncResource;

    @Outgoing("stock-quote")
    public CompletionStage<String> generate() {
        return externalAsyncResource.getNextValue();
    }
    
}

This class use an external resource to produce the messages that will be written to Kafka through the channel “stock-quote“.

Here is our MockExternalAsyncResource which produces a Json string with the Stock quote at regular intervals using a ScheduledExecutorService:

@ApplicationScoped
public class MockExternalAsyncResource {
    private static final int TICK = 2000;
    private Random random = new Random();
    String company[] = new String[] {
            "Acme","Globex","Umbrella","Soylent","Initech" };

    private ScheduledExecutorService delayedExecutor = Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory());
    private final AtomicInteger count = new AtomicInteger(0);
    private long last = System.currentTimeMillis();

    @PreDestroy
    public void stop() {
        delayedExecutor.shutdown();
    }

    public CompletionStage<String> getNextValue() {
        synchronized (this) {
            CompletableFuture<String> cf = new CompletableFuture<>();
            long now = System.currentTimeMillis();
            long next = TICK + last;
            long delay = next - now;
            last = next;
            NextQuote nor = new NextQuote(cf);
            delayedExecutor.schedule(nor, delay , TimeUnit.MILLISECONDS);
            return cf;
        }
    }

    private class NextQuote implements Runnable {
        private final CompletableFuture<String> cf;

        public NextQuote(CompletableFuture<String> cf) {
            this.cf = cf;
        }

        @Override
        public void run() {
            String _company = company[random.nextInt(5)];
            int amount = random.nextInt(100);
            int op = random.nextInt(2);
            Jsonb jsonb = JsonbBuilder.create();
            Operation operation = new Operation(op, _company, amount);
            cf.complete(jsonb.toJson(operation));

        }
    }
}

At the end of the day, the getNextValue method will produce a Message which contains a JSON String like in the following example:

{"amount":32,"company":"Soylent","type":0}

Next, is the Operation Class, which is a wrapper to a random stock operation:

public class Operation   {

    public static final int SELL = 0;
    public static final int BUY = 1;
    private int amount;
    private String company;
    private int type;

    // Constructors / getter/setters omitted for brevity

Next, the following QuoteConverter Class will do the job of converting a Stock Order into a new quotation for the Company involved in the transaction:

@ApplicationScoped
public class QuoteConverter {
    HashMap<String,Double> quotes;

    private Random random = new Random();
    @PostConstruct
    public void init() {
        quotes = new HashMap<>();
        String company[] = new String[] {
                "Acme","Globex","Umbrella","Soylent","Initech" };

        for (String c: company)
        quotes.put(c, new Double(random.nextInt(100) + 50));

    }
 
    @Incoming("stocks")
    @Outgoing("in-memory-stream")
    @Broadcast
    public String newQuote(String quoteJson) {
        Jsonb jsonb = JsonbBuilder.create();

        Operation operation = jsonb.fromJson(quoteJson, Operation.class);

        double currentQuote = quotes.get(operation.getCompany());
        double newQuote;
        double change = (operation.getAmount() / 25);

        if (operation.getType() == Operation.BUY) {
              newQuote = currentQuote + change;
        }
        else  {
            newQuote = currentQuote - change;
        }
        if (newQuote < 0) newQuote = 0;

        quotes.replace(operation.getCompany(), newQuote);
        Quote quote = new Quote(operation.getCompany(), newQuote);
        return jsonb.toJson(quote);

    }

}

The init method of this class, simply bootstraps the initial quotation of every Company with some random values.

The newQuote method is the real heart of our transaction system. By reading the Operation data contained in the JSON file, a new quote is generated, using a basic algorithm: for any 25 stocks transacted, there will be one point’s impact on the value of the stock. The returned JSON String, wraps the Quote Class, and it’s broadcasted to all matching subscribers of the”in-memory-stream” channel, by means of the @Broadcast annotation on the top of the

method.

For the sake of completeness, we also include the Quote Java Class, which will be sent as JSON to the Client:

public class Quote {
	String company;
	Double value;
	public Quote(String company, Double value) {
	this.company = company;
	this.value = value;
	}
// Getters Setters method omitted for brebity
}

Within our example, we have the following subscriber for the “in-memory-stream” channel, where the Quote is published:

@Path("/quotes")
public class QuoteEndpoint {

    @Inject
    @Channel("in-memory-stream")
    Publisher<String> quote;

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType("text/plain")
    public Publisher<String> stream() {

        return quote;
    }

}

The QuoteEndpoint is our REST Endpoint. Within this, we are using the @Channel qualifier to inject the Channel “in-memory-stream” into the Bean.

All the above components need a broker where we publish the stock quotes and from where they can be read as well. Here is the META-INF/microprofile-config.properties file which keeps all pieces together:

mp.messaging.connector.smallrye-kafka.bootstrap.servers=localhost:9092
# Kafka sink (we write to it)
mp.messaging.outgoing.stock-quote.connector=smallrye-kafka
mp.messaging.outgoing.stock-quote.topic=stocks
mp.messaging.outgoing.stock-quote.value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Configure the Kafka source (we read from it)
mp.messaging.incoming.stocks.connector=smallrye-kafka
mp.messaging.incoming.stocks.topic=stocks
mp.messaging.incoming.stocks.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

The first block is related to the Kafka destination, also known as sink, where we write the Stock Quote produced by the QuoteGenerator .

In the second block, we configure the source topic and connector where we read the Stock Quote as JSON Serialized Stream.

What is left to do, is to add a Client application which is able to capture the Server Side Event and display the text of it in a nicely formatted table of data. For the sake of brevity, we will add here just the core Javascript function that collects the Server Side Event:

<script>
    var source = new EventSource("/reactive/rest/quotes/stream");
    source.onmessage = function (event) {
    var data = JSON.parse(event.data);
    var company = data['company'];
    var value = data['value'];
        document.getElementById(company).innerHTML = value;
    };
</script>

The above code, is in the index.html page you will find in the source code of this example.

You can deploy the application on WildFly with:

$ mvn clean install wildfly:deploy

And here’s our beautiful Stock Ticker demo running on WildFly:

Source code for this tutorial: https://github.com/fmarchioni/mastertheboss/tree/master/kafka/microprofile-kafka-demo

Many thanks to Kabir Khan for his help in fixing a couple of issues I’ve hit during the example set up and for providing a first excellent overview of Reactive Messaging in WildFly in this post: https://www.wildfly.org/news/2021/03/11/WildFly-MicroProfile-Reactive-specifications-feature-pack-2.0/