Writing Reactive Applications with WildFly

In this updated tutorial we will learn how to design, configure and deploy a Messaging Reactive application on WildFly, using smallrye-reactive-messaging and Apache Kafka as distributed data streaming platform for our demo application.

Overview of MicroProfile Reactive Applications

Reactive Streams aims to provide a standard for exchanging data streams across an asynchronous boundary.

In order to familiarize with MicroProfile Reactive Messaging, we need proper knowledge of some key concepts. First of all, MicroProfile Reactive Messaging is a specification which uses CDI Beans to drive the flow of messages towards some specific channels.

A Message is the basic interface that contains a payload to distribute. The Message interface contains a set of parameters in order to describe the type of payload it contains. Additionally, a message contains attributes and metadata which are specific to the broker used for message exchange (e.g. Kafka ).

A Channel, on the other hand, is a String indicating the Source or Destination of messages.

As MicroProfile Reactive Messaging relies on the CDI model, two core annotations are used to indicate if a method is a producer or consumer of messages:

@Incoming: This annotation indicates that it consumes messages from the specified channel. The name of the channel is added in the annotation as an attribute. Here is an example:

@Incoming("channel")
public void consume(Message<String> s) {
// Consume message here:
}

When you place this annotation on a method, the method will be called each time a message is sent to that channel. However, you may decide to make it clear that the method consumes a specific kind of Message, such as a KafkaMessage (which inherits from Message). Here is an example:

@Incoming("channel")
public void consume(KafkaMessage<String> s) {
// Consume message here:
}

@Outgoing: This annotation indicates that a method publishes messages to a channel. Much the same way, the name of the channel is stated in the annotation’s attribute:

@Outgoing("channel")
public Message<String> produce() {
// Produce and return a Message implementation
}

Within the method annotated with @Outgoing, we return a concrete implementation of the Message interface.

You can also annotate a method with both @Incoming and @Outgoing so that it behaves like a Message Processor, which transforms the content of the message data:

@Incoming("from")
@Outgoing("to")
public String translate(String text) {
return MyTranslator.translate(text);
}

Streaming Messages with WildFly

Firstly, we need to include Microprofile Reactive Messaging to the list of available extensions in WildFly – Therefore, start WildFly using the standalone-microprofile.xml configuration:

$ ./standalone.sh -c standalone-microprofile.xml

Then, when WildFly is up and running, connect from the CLI and execute the following short script to allow the microprofile.reactive-messaging-smallrye extensions:

batch
/extension=org.wildfly.extension.microprofile.reactive-messaging-smallrye:add
/extension=org.wildfly.extension.microprofile.reactive-streams-operators-smallrye:add
/subsystem=microprofile-reactive-streams-operators-smallrye:add
/subsystem=microprofile-reactive-messaging-smallrye:add
run-batch

reload

Boostrapping Kafka

Apache Kafka is a distributed data streaming platform that can be used to publish, subscribe, store, and process streams of data from multiple sources in real-time at amazing speeds.

Apache Kafka can be plugged into streaming data pipelines that distribute data between systems, and as well into the systems and applications that consume that data. Since Apache Kafka reduces the need for point-to-point integrations for data sharing, it is a perfect fit for a range of use cases where high throughput and scalability are vital.

To manage the Kafka environment, you need a software named Zookeeper which manages naming and configuration data so to provide flexible and robust synchronization within distributed systems. Zookeeper controls the status of the Kafka cluster nodes and it also keeps track of Kafka topics, partitions plus all the Kafka services you need.

There are several ways to start a Kafkacluster. The simplest one is to use the docker-compose tool so to orchestrate both Kafka and Zookeeper container images with a single file. The docker-compose.yaml file is available at the root of our example project. (Here you can take a look at it: https://github.com/fmarchioni/mastertheboss/blob/master/kafka/microprofile-kafka-demo/docker-compose.yaml )

First, make sure that docker is up and running:

$ service docker start

Then, start Apache Kafka and Zookeeper with:

$ docker-compose up

Creative a Reactive Application

In order to demonstrate Apache Kafka and Microprofle Streaming’s powerful combination on WildFly, we will design a simple application which generates some random Weather Data. This Data flows internally from the Producer to a Logger, then to a Filter . Then data flows to a Sender and a Receiver that communicate with Apache Kafka.

Here is an high level view of our Flow:

wildfly reactive messaging

Firstly, let’s define our Model object:

public record WeatherData(String city, Integer temperature) {
}

In order to move the Weather Data from and to Kafka, we will need to define a Serializer and a Deserializer for it. Let’s begin with the WeatherDataSerializer which will serialize our data into bytes:

public class WeatherDataSerializer implements Serializer<WeatherData> {
    @Override
    public byte[] serialize(String topic, WeatherData data) {
        if (data == null) {
            return null;
        }

        try {
            ByteArrayOutputStream bout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(bout);
            out.writeUTF(data.city());
            out.writeInt(data.temperature());
            out.close();
            return bout.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }

    }
}

Then, here’s the WeatherDataDeserializer Class:

public class WeatherDataDeserializer implements Deserializer<WeatherData> {

    @Override
    public WeatherData deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data))){
            String city = in.readUTF();
            Integer temperature= in.readInt();
            return new WeatherData(city,temperature);
        } catch (IOException e){
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}

Coding the In-VM Producers and Consumers

To begin sending WeatherData, we will create a TemperatureGenerator Class that schedules every 2 seconds the creation of a WeatherData as CompletionData. The CompletionData is a core interface for handling asynchronous computations and composing asynchronous tasks.

@ApplicationScoped
public class TemperatureGenerator {

    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    private final List<String> cities = List.of("Paris", "New York", "London", "Tokyo", "Sydney");
    private final Random random = new Random();

    public CompletionStage<WeatherData> getWeatherData() {
        CompletableFuture<WeatherData> future = new CompletableFuture<>();
        ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
            int temperature = random.nextInt(10) + 10;
            WeatherData weatherData = new WeatherData(getRandomCity(), temperature);
            future.complete(weatherData);
        }, 2, TimeUnit.SECONDS); // Schedule the task to run after 2 seconds

        return future;
    }
    private String getRandomCity() {
        return cities.get(random.nextInt(cities.size()));
    }

}

Then, let’s code the TemperatureGeInVMMessaging Class which Produces messages that will eventually flow to the Remote Messaging System:

@ApplicationScoped
public class InVMMessaging {

    @Inject TemperatureGenerator producer;

    @Outgoing("source")
    public CompletionStage<WeatherData> sendInVm() {
        return producer.getWeatherData();
 
    }  

    @Incoming("source")
    @Outgoing("filter")
    public WeatherData logAllMessages(WeatherData message) {
        System.out.println("Got Weather : " + message);
        return message;
    }

    @Incoming("filter")
    @Outgoing("sender")
    public PublisherBuilder<WeatherData> filterMessages(PublisherBuilder<WeatherData> messages) {
        return messages
                .filter(data -> !data.city().equals("Sydney"));
    } 
    
}

Here is a short description of the methods in it:

  1. sendInVm() method:
    • This method contains the @Outgoing("source"), which means it publishes messages to a channel named “source”.
    • The method returns a CompletionStage<WeatherData>, indicating that the method will produce a WeatherData object asynchronously.
  2. logAllMessages() method:
    • This method contains the @Incoming("source") and @Outgoing("filter") . It means that it consumes messages from the “source” channel and produces to the “filter” channel.
    • This method simply logs the received WeatherData object and then returns it.
  3. filterMessages() method:
    • This method is annotated with @Incoming("filter") and @Outgoing("sender"), which means it consumes messages from the “filter” channel and then produces messages to the “sender” channel.
    • The method uses the filter operator on the PublisherBuilder to filter out WeatherData objects with a city name of “Sydney”.

Coding Remote Producers and Consumers

Finally, we will add the RemoteMessaging Class which manages the send/receive of Messages to a Remote System.

@ApplicationScoped
public class RemoteMessaging { 
  
    @Incoming("sender")
    @Outgoing("to-kafka")
    public WeatherData producWeatherData(WeatherData message) {
        System.out.println("Sent WeatherData to Remote System : " + message);
        return message;
    }

    @Incoming("from-kafka")
    public CompletionStage<Void> consumeWeatherData(Message<WeatherData> message) {       
        System.out.println("Consumed WeatherData from Remote System: " + message.getPayload());
        return message.ack();
            
    }
}

Here is a short description of the methods:

  1. producWeatherData method:
    • This method acts as a producer that sends WeatherData messages to a Kafka topic. The method produces the Message to the Outgoingto-kafka” Channel. Within our configuration, we will configure this as Outgoing channel for the “weather” Topic.
  2. consumeWeatherData method:
    • This method acts as a consumer that receives WeatherData messages from a Kafka topic. The method consumes the Message from the Incomingfrom-kafka” Channel. Within our configuration, we will configure this as Incoming channel for the “weather” Topic.

In order to configure the connection to the remote Messaging system, we will add the following microprofile-config.properties in the resources/META-INF folder of our application:

mp.messaging.connector.smallrye-kafka.bootstrap.servers=0.0.0.0:9092

mp.messaging.outgoing.to-kafka.connector=smallrye-kafka
mp.messaging.outgoing.to-kafka.topic=weather
mp.messaging.outgoing.to-kafka.value.serializer=com.mastertheboss.mp.reactive.messaging.model.WeatherDataSerializer
mp.messaging.outgoing.to-kafka.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer

# Configure the Kafka source (we read from it)
mp.messaging.incoming.from-kafka.connector=smallrye-kafka
mp.messaging.incoming.from-kafka.topic=weather
mp.messaging.incoming.from-kafka.value.deserializer=com.mastertheboss.mp.reactive.messaging.model.WeatherDataDeserializer
mp.messaging.incoming.from-kafka.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

# Configure Kafka group.id to prevent warn message - if not set, some default value is generated automatically.
mp.messaging.connector.smallrye-kafka.group.id="microprofile-reactive-messaging-kafka-group-id"

# Needed as per https://github.com/smallrye/smallrye-reactive-messaging/issues/845 since the consumer
# joins after the messages are sent
mp.messaging.incoming.from-kafka.auto.offset.reset=earliest

Please note the definition of the incoming and outgoing Kafka topic named weather, which flows through the Channels “from-kafka” and “to-kafka”.

Running the Reactive Application

Since our configuration includes WildFly Maven plugin, we can simply deploy it as follows:

mvn install wildfly:deploy

You should be able to see WeatherData in your WildFly Console every 2 seconds:

11:04:25,873 INFO  [stdout] (pool-15-thread-1) Got Weather : WeatherData[city=London, temperature=18]
11:04:25,874 INFO  [stdout] (pool-15-thread-1) Sent WeatherData to Remote System : WeatherData[city=London, temperature=18]
11:04:25,888 INFO  [stdout] (vert.x-eventloop-thread-0) Consumed WeatherData from Remote System: WeatherData[city=London, temperature=18]
11:04:27,876 INFO  [stdout] (pool-15-thread-1) Got Weather : WeatherData[city=Sydney, temperature=17]
11:04:29,594 INFO  [org.wildfly.extension.undertow] (ServerService Thread Pool -- 77) WFLYUT0022: Unregistered web context: '/microprofile-reactive-messaging-kafka' from server 'default-server'
11:04:29,878 INFO  [stdout] (pool-15-thread-1) Got Weather : WeatherData[city=Tokyo, temperature=19]
11:04:29,878 INFO  [stdout] (pool-15-thread-1) Sent WeatherData to Remote System : WeatherData[city=Tokyo, temperature=19]
11:04:29,893 INFO  [stdout] (vert.x-eventloop-thread-0) Consumed WeatherData from Remote System: WeatherData[city=Tokyo, temperature=19]

Enriching Remote Consumers and Producers with Kafka API

Finally, we will show here how to enrich our Kafka communication with the Classes OutgoingKafkaRecordMetadata and IncomingKafkaRecordMetadata:

    @Incoming("sender")
    @Outgoing("to-kafka")
    public Message<WeatherData> sendToKafka(WeatherData msg) {
         
        try {
          
            System.out.println("Sending to Kafka " +msg);
            Message<WeatherData> m = Message.of(msg);
            // Just use the hash as the Kafka key for this example
            int key = msg.city().hashCode();

            // Create Metadata containing the Kafka key
            OutgoingKafkaRecordMetadata<Integer> md = OutgoingKafkaRecordMetadata
                    .<Integer>builder()
                    .withKey(key)
                    .build();

            // The returned message will have the metadata added
            return KafkaMetadataUtil.writeOutgoingKafkaMetadata(m, md);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
        
    }

    @Incoming("from-kafka")
    public CompletionStage<Void> receiveFromKafka(Message<WeatherData> message) {
        try {
            WeatherData payload = message.getPayload();

            IncomingKafkaRecordMetadata<Integer, WeatherData> md = KafkaMetadataUtil.readIncomingKafkaMetadata(message).get();
            String msg =
                    "Got Message with MetaData from from Kafka\n" +
                    "\t%s\n" +
                    "\tkey: %d; partition: %d, topic: %s";
            msg = String.format(msg, payload, md.getKey(), md.getPartition(), md.getTopic());
            System.out.println(msg);
          
        
            return message.ack();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

OutgoingKafkaRecordMetadata and IncomingKafkaRecordMetadata are Kafka API that alow you to attach additional metadata to outgoing Kafka messages. You can use metadata for various purposes, including:

1. Routing and Partitioning: You can include routing information within the metadata to direct messages to specific partitions or topics based on specific criteria. This enables flexible message routing and targeted delivery within your Kafka infrastructure.

2. Message Correlation: By adding correlation IDs or other identifiers to the metadata, you can correlate related messages even if they are sent through different channels or processed asynchronously. This helps maintain context and traceability throughout your messaging pipelines.

3. Custom Message Properties: You can leverage the metadata to store additional information relevant to the message content or processing requirements. This allows you to enrich the message with custom properties that might be useful for downstream consumers or specific processing logic.

By adding the new method implementation you should be able to read this information from the Kafka Consumer:

11:45:52,272 INFO  [stdout] (pool-17-thread-1) Got Weather : WeatherData[city=London, temperature=16]
11:45:52,273 INFO  [stdout] (pool-17-thread-1) Sending to Kafka WeatherData[city=London, temperature=16]
11:45:52,287 INFO  [stdout] (vert.x-eventloop-thread-0) Got Message with MetaData from from Kafka
11:45:52,287 INFO  [stdout] (vert.x-eventloop-thread-0) 	WeatherData[city=London, temperature=16]
11:45:52,287 INFO  [stdout] (vert.x-eventloop-thread-0) 	key: -2013264328; partition: 0, topic: weather

Conclusion

This article was a walk through how to configure Reactive Messaging on WildFly and how to deploy a Reactive application on top of it. We have discussed how to produce Messages In-memory and how to relay the messages to a Streaming Platform such as Apache Kafka.

Source code for this tutorial: https://github.com/fmarchioni/mastertheboss/tree/master/kafka/microprofile-kafka-demo