Using MQTT protocol with ArtemisMQ

In this tutorial we will discuss about using MQTT connectivity protocol which is an extremely lightweight publish/subscribe messaging transport. For this reason it is useful for connections with remote locations where a small code footprint is required and/or network bandwidth such as pocket devices (e.g. mobile).

ArtemisMQ supports the MQTT protocol and will automatically map between JMS/NMS and MQTT clients. Out of the box ArtemisMQ contains a connector which enables MQTT through the following URL:

<transportConnectors>
   <transportConnector name="mqtt" uri="mqtt://localhost:1883"/>
</transportConnectors>

Now let’s code an MQTT client. For this purpose we will create a simple Vert-x client using JBoss Forge.

So first of all start JBoss Forge and install the forge add-on:

addon-install --coordinate me.escoffier.forge:vertx-forge-addon

Next, the vert.x project type will be enabled so you will be able to create your first vert-x project:

project-new --named demo-mqtt --type vert.x --vertx-version 3.3.3

vertx-add-verticle --name Client.java --type java  

Now open with your IDE the Verticle you have just created and edit it as follows:

package org.demo.mqtt;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.*;


import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.buffer.Buffer;
 
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;

 
public class Client extends AbstractVerticle {

  private static final String MQTT_QUEUE = "ExampleQueue";
  private static final String MQTT_MESSAGE = "Hello Vert.x MQTT Client";
  private static final String BROKER_HOST = "localhost";
  private static final int BROKER_PORT = 1883;

 

  @Override
  public void start() throws Exception {
    MqttClientOptions options = new MqttClientOptions()
      .setPort(BROKER_PORT)
      .setHost(BROKER_HOST);

    MqttClient mqttClient = MqttClient.create(vertx, options);

    mqttClient.connect(ch -> {
      if (ch.succeeded()) {
        System.out.println("Connected to a server");

        mqttClient.publish(
          MQTT_QUEUE,
          Buffer.buffer(MQTT_MESSAGE),
          MqttQoS.AT_MOST_ONCE,
          false,
          false,
          s -> mqttClient.disconnect(d -> System.out.println("Disconnected from server")));
      } else {
        System.out.println("Failed to connect to a server");
        System.out.println(ch.cause());
      }
    });
  }
}

Finally, include the vertx-mqtt dependency to compile the project. Ensure that also the vertx-core uses the same library version:

<dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-mqtt</artifactId>
      <version>3.5.0.Beta1</version>
</dependency>

<dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-core</artifactId>
      <version>3.5.0.Beta1</version>
</dependency>

That’s all. In order to run it, you can either execute a:

$ java -jar demo-mqtt.jar 

Or, if you have configured in your vert-x maven plugin:

mvn vertx:run

You will see a short log message which informs you:

INFO: Succeeded in deploying verticle
Aug 21, 2017 11:31:46 AM io.vertx.mqtt.impl.MqttClientImpl
INFO: Connection with localhost:1883 established successfully
Connected to a server
Disconnected from server

Let’s have a look inside the ArtremisMQ server through JConsole. As you can see from the following picture, a message has been received by the server

ArtemisMQ mqtt tutorial

For the sake of completeness, we will mention how to code a simple MQTT standalone server using a Verticle. That’s pretty simple:

package org.demo.mqtt;

import io.vertx.core.AbstractVerticle;

import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;

/**
 * An example of using the MQTT server as a verticle
 */
public class Server extends AbstractVerticle {
 


  @Override
  public void start() throws Exception {

    MqttServerOptions options = new MqttServerOptions()
      .setPort(1883)
      .setHost("0.0.0.0");

    MqttServer server = MqttServer.create(vertx, options);

    server.endpointHandler(endpoint -> {

      System.out.println("connected client " + endpoint.clientIdentifier());

      endpoint.publishHandler(message -> {

        System.out.println("Just received message on [" + message.topicName() + "] payload [" +
          message.payload() + "] with QoS [" +
          message.qosLevel() + "]");
      });

      endpoint.accept(false);
    });

    server.listen(ar -> {
      if (ar.succeeded()) {
        System.out.println("MQTT server started and listening on port " + server.actualPort());
      } else {
        System.err.println("MQTT server error on start" + ar.cause().getMessage());
      }
    });
  }
}