Getting started with Apache Kafka and WildFly

This tutorial will teach you how to install a Resource Adapter for Apache Kafka on WildFly so that you can Produce and Consume streams of messages on your favourite application server!

First of all some basics: what is Apache Kafka? Apache Kafka is a Streaming Platform which provides some key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

Apache Kafka is generally used for two types of applications:

  • Application which build real-time streaming data pipelines that reliably get data between systems or applications
  • Applications which transform or react to the streams of data

The simplest way to start Kafka is by means of a Docker Compose YAML file, which will take care to start both the Container image of Kafka and Zookeeper, which is needed for the Cluster Management. Here is a sample docker-compose.yaml file:

version: '2'

services:

  zookeeper:
    image: strimzi/kafka:0.11.3-kafka-2.1.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: strimzi/kafka:0.11.3-kafka-2.1.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

From the same directory where you have saved the docker-compose.yaml file execute:

docker-compose up

Check from the Console that Kafka started successfully:

kafka_1      | [2019-11-04 07:58:50,051] INFO Kafka version : 2.1.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2019-11-04 07:58:50,051] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2019-11-04 07:58:50,053] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

Installing and Configuring Kafka Resource adapter on WildFly

A Kafka Resource adapter is available in this GitHub project: https://github.com/payara/Cloud-Connectors/tree/master/Kafka/KafkaRAR

Download the project and build it:

cd Kafka/KafkaRAR

mvn install

ls KafkaRAR/target/
kafka-rar-0.6.0-SNAPSHOT  kafka-rar-0.6.0-SNAPSHOT.rar

Now deploy the kafka-rar-0.6.0-SNAPSHOT.rar into WildFly:

cp target/kafka-rar-0.6.0-SNAPSHOT.rar $JBOSS_HOME/standalone/deployments

Next, we will be adding the RAR configuration in standalone-full.xml:

<subsystem xmlns="urn:jboss:domain:resource-adapters:5.0">
    <resource-adapters>
        <resource-adapter id="kafka">
            <archive>
                kafka-rar-0.6.0-SNAPSHOT.rar
            </archive>
            <transaction-support>XATransaction</transaction-support>
            <connection-definitions>
                <connection-definition class-name="fish.payara.cloud.connectors.kafka.outbound.KafkaManagedConnectionFactory" jndi-name="java:/KafkaConnectionFactory" enabled="true" pool-name="ConnectionFactory">
                    <xa-pool>
                        <min-pool-size>1</min-pool-size>
                        <max-pool-size>20</max-pool-size>
                        <prefill>false</prefill>
                        <is-same-rm-override>false</is-same-rm-override>
                    </xa-pool>
                </connection-definition>
            </connection-definitions>
        </resource-adapter>
    </resource-adapters>
</subsystem>

Now start WildFly and check that the Server logs registered a successful Kafka Connection:

15:28:37,765 INFO  [org.apache.kafka.common.utils.AppInfoParser] (MSC service thread 1-1) Kafka version: 2.3.1
15:28:37,766 INFO  [org.apache.kafka.common.utils.AppInfoParser] (MSC service thread 1-1) Kafka commitId: 18a913733fb71c01
15:28:37,766 INFO  [org.apache.kafka.common.utils.AppInfoParser] (MSC service thread 1-1) Kafka startTimeMs: 1572877717755
15:28:37,767 INFO  [org.jboss.as.connector.deployment] (MSC service thread 1-1) WFLYJCA0007: Registered connection factory java:/KafkaConnectionFactory
15:28:37,882 INFO  [fish.payara.cloud.connectors.kafka.inbound.KafkaResourceAdapter] (MSC service thread 1-1) Kafka Resource Adapter Started..

Great, let’s deploy an example application.

Creating a Kafka Producer

We will now deploy a REST application which produces Kafka Messages. Later on we will show how to consume these messages. As for every basic JAX-RS application, we will start by adding a JAX-RS Activator Class with the rest context in it:

package com.mastertheboss.kafka;

import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;
 
@ApplicationPath("/rest")
public class JaxRsActivator extends Application {
  
}

Then, let’s add a basic Producer Endpoint that will send messages to the “my-topic” topic, using the KafkaConnectionFactory:

package com.mastertheboss.kafka;

import fish.payara.cloud.connectors.kafka.api.KafkaConnection;
import fish.payara.cloud.connectors.kafka.api.KafkaConnectionFactory;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Resource;
import javax.enterprise.context.ApplicationScoped;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;


@Path("/kafka")

@ApplicationScoped
public class KafkaQueueResource {
        
    @Resource(lookup="java:/KafkaConnectionFactory")
    KafkaConnectionFactory factory;

    public KafkaQueueResource() {
    }

    @GET
    public Response hello() {

        try (KafkaConnection conn = factory.createConnection()) {
            conn.send(new ProducerRecord("my-topic","Hello world"));
            return Response
                    .status(Response.Status.OK)
                    .entity("Message sent!")
                    .build();
        } catch (Exception ex) {
            ex.printStackTrace();
            return  Response.serverError().entity(ex.getMessage()).build();
        }    

    }
}

In order to be able to run this application, you will need to link it with the Resource Adapter dependencies in jboss-deployment-structure.xml as follows:

<jboss-deployment-structure>
  <deployment>
      <dependencies>
         <module name="deployment.kafka-rar-0.6.0-SNAPSHOT.rar" export="TRUE"/>
      </dependencies>
    </deployment>
</jboss-deployment-structure>

Now build the application and deploy in on WildFly:

$ mvn clean install wildfly:deploy

Starting a Kafka Consumer

In first instance, we will start a Topic consumer, by executing the kafka-console-consumer.sh script which is available in the bin folder of Kafka. Let’s find the Kafka process so we can enter into its bash process:

$ docker ps
CONTAINER ID        IMAGE                              COMMAND                        PORTS                                             NAMES
bd352522dd35        strimzi/kafka:0.11.3-kafka-2.1.0   "sh -c 'bin/kafka-..."   9091/tcp, 9404/tcp, 0.0.0.0:9092->9092/tcp        kafka-demo_kafka_1
f1352dc7783b        strimzi/kafka:0.11.3-kafka-2.1.0   "sh -c 'bin/zookee..."   9091-9092/tcp, 0.0.0.0:2181->2181/tcp, 9404/tcp   kafka-demo_zookeeper_1
[francesco@fedora kafka-demo]$ docker exec -it bd352522dd35 bash

Now execute the kafka-console-consumer.sh:

[kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic 

Back to our Host machine, let’s push one message by requesting the following URL:

curl http://localhost:8080/kafka-example/rest/kafka
Message sent!

If you check the kafka-console-consumer logs, you should see the incoming message:

kafka wildfly tutorial kafka wildfly tutorial

Developing a Kafka Consumer as Message Driven Mean

In second instance, we will add a MessageDriven Bean class which implements the KafkaListener interface to receive callbacks when a message arrives. The methods annotated with OnRecord are used to retrieve the message for one topic:

import fish.payara.cloud.connectors.kafka.api.KafkaListener;
import fish.payara.cloud.connectors.kafka.api.OnRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jboss.ejb3.annotation.ResourceAdapter;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;

@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "clientId", propertyValue = "KafkaJCAClient"),
        @ActivationConfigProperty(propertyName = "groupIdConfig", propertyValue = "myGroup"),
        @ActivationConfigProperty(propertyName = "topics", propertyValue = "my-topic"),
        @ActivationConfigProperty(propertyName = "bootstrapServersConfig", propertyValue = "localhost:9092"),
        @ActivationConfigProperty(propertyName = "retryBackoff", propertyValue = "1000"),
        @ActivationConfigProperty(propertyName = "autoCommitInterval", propertyValue = "100"),
        @ActivationConfigProperty(propertyName = "keyDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),
        @ActivationConfigProperty(propertyName = "valueDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"),
        @ActivationConfigProperty(propertyName = "pollInterval", propertyValue = "3000"),
        @ActivationConfigProperty(propertyName = "commitEachPoll", propertyValue = "true"),
        @ActivationConfigProperty(propertyName = "useSynchMode", propertyValue = "true")
})
@ResourceAdapter(value="kafka")
public class KafkaMDB implements KafkaListener {


    public KafkaMDB() {
       System.out.println("Bean instance created");
    }

    @OnRecord( topics={"my-topic"})
    public void getMessage(ConsumerRecord record) {
        System.out.println("> Got record on topic test " + record);
    }


}

Let’s wrap also the MDB in our application. As we send new messages, now they will be consumed by the MDB, as you can see from WildFly’s console:

kafka wildfly tutorial kafka wildfly tutorial

Congratulations! You have just produced and consume messages with Kafka and WildFly Resource Adapter.

You can find the full source code of this example at: https://github.com/fmarchioni/mastertheboss/tree/master/kafka/kafka-wildfly

Spring Boot users: Interested to see Apache Kafka in action with Spring boot ? Check this tutorial then Apache Kafka and Spring Boot quickstart