In this tutorial we will discuss about using MQTT connectivity protocol which is an extremely lightweight publish/subscribe messaging transport. For this reason it is useful for connections with remote locations where a small code footprint is required and/or network bandwidth such as pocket devices (e.g. mobile).
ArtemisMQ supports the MQTT protocol and will automatically map between JMS/NMS and MQTT clients. Out of the box ArtemisMQ contains a connector which enables MQTT through the following URL:
<transportConnectors> <transportConnector name="mqtt" uri="mqtt://localhost:1883"/> </transportConnectors>
Now let’s code an MQTT client. For this purpose we will create a simple Vert-x client using JBoss Forge.
So first of all start JBoss Forge and install the forge add-on:
addon-install --coordinate me.escoffier.forge:vertx-forge-addon
Next, the vert.x project type will be enabled so you will be able to create your first vert-x project:
project-new --named demo-mqtt --type vert.x --vertx-version 3.3.3 vertx-add-verticle --name Client.java --type java
Now open with your IDE the Verticle you have just created and edit it as follows:
package org.demo.mqtt; import io.vertx.core.AbstractVerticle; import io.vertx.core.*; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.AbstractVerticle; import io.vertx.core.buffer.Buffer; import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; public class Client extends AbstractVerticle { private static final String MQTT_QUEUE = "ExampleQueue"; private static final String MQTT_MESSAGE = "Hello Vert.x MQTT Client"; private static final String BROKER_HOST = "localhost"; private static final int BROKER_PORT = 1883; @Override public void start() throws Exception { MqttClientOptions options = new MqttClientOptions() .setPort(BROKER_PORT) .setHost(BROKER_HOST); MqttClient mqttClient = MqttClient.create(vertx, options); mqttClient.connect(ch -> { if (ch.succeeded()) { System.out.println("Connected to a server"); mqttClient.publish( MQTT_QUEUE, Buffer.buffer(MQTT_MESSAGE), MqttQoS.AT_MOST_ONCE, false, false, s -> mqttClient.disconnect(d -> System.out.println("Disconnected from server"))); } else { System.out.println("Failed to connect to a server"); System.out.println(ch.cause()); } }); } }
Finally, include the vertx-mqtt dependency to compile the project. Ensure that also the vertx-core uses the same library version:
<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-mqtt</artifactId> <version>3.5.0.Beta1</version> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId> <version>3.5.0.Beta1</version> </dependency>
That’s all. In order to run it, you can either execute a:
$ java -jar demo-mqtt.jar
Or, if you have configured in your vert-x maven plugin:
mvn vertx:run
You will see a short log message which informs you:
INFO: Succeeded in deploying verticle Aug 21, 2017 11:31:46 AM io.vertx.mqtt.impl.MqttClientImpl INFO: Connection with localhost:1883 established successfully Connected to a server Disconnected from server
Let’s have a look inside the ArtremisMQ server through JConsole. As you can see from the following picture, a message has been received by the server
For the sake of completeness, we will mention how to code a simple MQTT standalone server using a Verticle. That’s pretty simple:
package org.demo.mqtt; import io.vertx.core.AbstractVerticle; import io.vertx.mqtt.MqttServer; import io.vertx.mqtt.MqttServerOptions; /** * An example of using the MQTT server as a verticle */ public class Server extends AbstractVerticle { @Override public void start() throws Exception { MqttServerOptions options = new MqttServerOptions() .setPort(1883) .setHost("0.0.0.0"); MqttServer server = MqttServer.create(vertx, options); server.endpointHandler(endpoint -> { System.out.println("connected client " + endpoint.clientIdentifier()); endpoint.publishHandler(message -> { System.out.println("Just received message on [" + message.topicName() + "] payload [" + message.payload() + "] with QoS [" + message.qosLevel() + "]"); }); endpoint.accept(false); }); server.listen(ar -> { if (ar.succeeded()) { System.out.println("MQTT server started and listening on port " + server.actualPort()); } else { System.err.println("MQTT server error on start" + ar.cause().getMessage()); } }); } }