This tutorial will teach you how to install a Resource Adapter for Apache Kafka on WildFly so that you can Produce and Consume streams of messages on your favourite application server!
First of all some basics: what is Apache Kafka? Apache Kafka is a Streaming Platform which provides some key capabilities:
- Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
- Store streams of records in a fault-tolerant durable way.
- Process streams of records as they occur.
Apache Kafka is generally used for two types of applications:
- Application which build real-time streaming data pipelines that reliably get data between systems or applications
- Applications which transform or react to the streams of data
The simplest way to start Kafka is by means of a Docker Compose YAML file, which will take care to start both the Container image of Kafka and Zookeeper, which is needed for the Cluster Management. Here is a sample docker-compose.yaml file:
version: '2' services: zookeeper: image: strimzi/kafka:0.11.3-kafka-2.1.0 command: [ "sh", "-c", "bin/zookeeper-server-start.sh config/zookeeper.properties" ] ports: - "2181:2181" environment: LOG_DIR: /tmp/logs kafka: image: strimzi/kafka:0.11.3-kafka-2.1.0 command: [ "sh", "-c", "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" ] depends_on: - zookeeper ports: - "9092:9092" environment: LOG_DIR: "/tmp/logs" KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
From the same directory where you have saved the docker-compose.yaml file execute:
docker-compose up
Check from the Console that Kafka started successfully:
kafka_1 | [2019-11-04 07:58:50,051] INFO Kafka version : 2.1.0 (org.apache.kafka.common.utils.AppInfoParser) kafka_1 | [2019-11-04 07:58:50,051] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafka.common.utils.AppInfoParser) kafka_1 | [2019-11-04 07:58:50,053] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
Installing and Configuring Kafka Resource adapter on WildFly
A Kafka Resource adapter is available in this GitHub project: https://github.com/payara/Cloud-Connectors/tree/master/Kafka/KafkaRAR
Download the project and build it:
cd Kafka/KafkaRAR mvn install ls KafkaRAR/target/ kafka-rar-0.6.0-SNAPSHOT kafka-rar-0.6.0-SNAPSHOT.rar
Now deploy the kafka-rar-0.6.0-SNAPSHOT.rar into WildFly:
cp target/kafka-rar-0.6.0-SNAPSHOT.rar $JBOSS_HOME/standalone/deployments
Next, we will be adding the RAR configuration in standalone-full.xml:
<subsystem xmlns="urn:jboss:domain:resource-adapters:5.0"> <resource-adapters> <resource-adapter id="kafka"> <archive> kafka-rar-0.6.0-SNAPSHOT.rar </archive> <transaction-support>XATransaction</transaction-support> <connection-definitions> <connection-definition class-name="fish.payara.cloud.connectors.kafka.outbound.KafkaManagedConnectionFactory" jndi-name="java:/KafkaConnectionFactory" enabled="true" pool-name="ConnectionFactory"> <xa-pool> <min-pool-size>1</min-pool-size> <max-pool-size>20</max-pool-size> <prefill>false</prefill> <is-same-rm-override>false</is-same-rm-override> </xa-pool> </connection-definition> </connection-definitions> </resource-adapter> </resource-adapters> </subsystem>
Now start WildFly and check that the Server logs registered a successful Kafka Connection:
15:28:37,765 INFO [org.apache.kafka.common.utils.AppInfoParser] (MSC service thread 1-1) Kafka version: 2.3.1 15:28:37,766 INFO [org.apache.kafka.common.utils.AppInfoParser] (MSC service thread 1-1) Kafka commitId: 18a913733fb71c01 15:28:37,766 INFO [org.apache.kafka.common.utils.AppInfoParser] (MSC service thread 1-1) Kafka startTimeMs: 1572877717755 15:28:37,767 INFO [org.jboss.as.connector.deployment] (MSC service thread 1-1) WFLYJCA0007: Registered connection factory java:/KafkaConnectionFactory 15:28:37,882 INFO [fish.payara.cloud.connectors.kafka.inbound.KafkaResourceAdapter] (MSC service thread 1-1) Kafka Resource Adapter Started..
Great, let’s deploy an example application.
Creating a Kafka Producer
We will now deploy a REST application which produces Kafka Messages. Later on we will show how to consume these messages. As for every basic JAX-RS application, we will start by adding a JAX-RS Activator Class with the rest context in it:
package com.mastertheboss.kafka; import javax.ws.rs.ApplicationPath; import javax.ws.rs.core.Application; @ApplicationPath("/rest") public class JaxRsActivator extends Application { }
Then, let’s add a basic Producer Endpoint that will send messages to the “my-topic” topic, using the KafkaConnectionFactory:
package com.mastertheboss.kafka; import fish.payara.cloud.connectors.kafka.api.KafkaConnection; import fish.payara.cloud.connectors.kafka.api.KafkaConnectionFactory; import org.apache.kafka.clients.producer.ProducerRecord; import javax.annotation.Resource; import javax.enterprise.context.ApplicationScoped; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.core.Response; @Path("/kafka") @ApplicationScoped public class KafkaQueueResource { @Resource(lookup="java:/KafkaConnectionFactory") KafkaConnectionFactory factory; public KafkaQueueResource() { } @GET public Response hello() { try (KafkaConnection conn = factory.createConnection()) { conn.send(new ProducerRecord("my-topic","Hello world")); return Response .status(Response.Status.OK) .entity("Message sent!") .build(); } catch (Exception ex) { ex.printStackTrace(); return Response.serverError().entity(ex.getMessage()).build(); } } }
In order to be able to run this application, you will need to link it with the Resource Adapter dependencies in jboss-deployment-structure.xml as follows:
<jboss-deployment-structure> <deployment> <dependencies> <module name="deployment.kafka-rar-0.6.0-SNAPSHOT.rar" export="TRUE"/> </dependencies> </deployment> </jboss-deployment-structure>
Now build the application and deploy in on WildFly:
$ mvn clean install wildfly:deploy
Starting a Kafka Consumer
In first instance, we will start a Topic consumer, by executing the kafka-console-consumer.sh script which is available in the bin folder of Kafka. Let’s find the Kafka process so we can enter into its bash process:
$ docker ps CONTAINER ID IMAGE COMMAND PORTS NAMES bd352522dd35 strimzi/kafka:0.11.3-kafka-2.1.0 "sh -c 'bin/kafka-..." 9091/tcp, 9404/tcp, 0.0.0.0:9092->9092/tcp kafka-demo_kafka_1 f1352dc7783b strimzi/kafka:0.11.3-kafka-2.1.0 "sh -c 'bin/zookee..." 9091-9092/tcp, 0.0.0.0:2181->2181/tcp, 9404/tcp kafka-demo_zookeeper_1 [francesco@fedora kafka-demo]$ docker exec -it bd352522dd35 bash
Now execute the kafka-console-consumer.sh:
[kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic
Back to our Host machine, let’s push one message by requesting the following URL:
curl http://localhost:8080/kafka-example/rest/kafka Message sent!
If you check the kafka-console-consumer logs, you should see the incoming message:
Developing a Kafka Consumer as Message Driven Mean
In second instance, we will add a MessageDriven Bean class which implements the KafkaListener interface to receive callbacks when a message arrives. The methods annotated with OnRecord are used to retrieve the message for one topic:
import fish.payara.cloud.connectors.kafka.api.KafkaListener; import fish.payara.cloud.connectors.kafka.api.OnRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.jboss.ejb3.annotation.ResourceAdapter; import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "clientId", propertyValue = "KafkaJCAClient"), @ActivationConfigProperty(propertyName = "groupIdConfig", propertyValue = "myGroup"), @ActivationConfigProperty(propertyName = "topics", propertyValue = "my-topic"), @ActivationConfigProperty(propertyName = "bootstrapServersConfig", propertyValue = "localhost:9092"), @ActivationConfigProperty(propertyName = "retryBackoff", propertyValue = "1000"), @ActivationConfigProperty(propertyName = "autoCommitInterval", propertyValue = "100"), @ActivationConfigProperty(propertyName = "keyDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"), @ActivationConfigProperty(propertyName = "valueDeserializer", propertyValue = "org.apache.kafka.common.serialization.StringDeserializer"), @ActivationConfigProperty(propertyName = "pollInterval", propertyValue = "3000"), @ActivationConfigProperty(propertyName = "commitEachPoll", propertyValue = "true"), @ActivationConfigProperty(propertyName = "useSynchMode", propertyValue = "true") }) @ResourceAdapter(value="kafka") public class KafkaMDB implements KafkaListener { public KafkaMDB() { System.out.println("Bean instance created"); } @OnRecord( topics={"my-topic"}) public void getMessage(ConsumerRecord record) { System.out.println("> Got record on topic test " + record); } }
Let’s wrap also the MDB in our application. As we send new messages, now they will be consumed by the MDB, as you can see from WildFly’s console:
Congratulations! You have just produced and consume messages with Kafka and WildFly Resource Adapter.
You can find the full source code of this example at: https://github.com/fmarchioni/mastertheboss/tree/master/kafka/kafka-wildfly
Spring Boot users: Interested to see Apache Kafka in action with Spring boot ? Check this tutorial then Apache Kafka and Spring Boot quickstart