How to send JMS messages across Java EE components

In this short tutorial we will give a tip about sending JMS messages between Java EE components such as Servlet/EJB and MDBs.

JMS objects like connection, session, consumer and producer were designed to be re-used. In most implementations connection and session are pretty heavyweight to set-up and consumer usually requires a network round trip to set up. Producer is often more lightweight, although there is often some overhead in creating it.

Since JMS 2.0 you can use the JMSContext interface to combine combines the functionality of two separate objects from the JMS 1.1 API: a Connection and a Session. For example:

@WebServlet("/HelloWorldMDBServletClient")
public class HelloWorldMDBServletClient extends HttpServlet {

    private static final long serialVersionUID = -8314035702649252239L;

    private static final int MSG_COUNT = 5;

    @Inject
    private JMSContext context;

    @Resource(lookup = "java:/queue/HELLOWORLDMDBQueue")
    private Queue queue;

    @Resource(lookup = "java:/topic/HELLOWORLDMDBTopic")
    private Topic topic;

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        resp.setContentType("text/html");
        PrintWriter out = resp.getWriter();
        out.write("<h1>Quickstart: Example demonstrates the use of <strong>JMS 2.0</strong> and <strong>EJB 3.2 Message-Driven Bean</strong> in JBoss EAP.</h1>");
        try {
            boolean useTopic = req.getParameterMap().keySet().contains("topic");
            final Destination destination = useTopic ? topic : queue;

            out.write("<p>Sending messages to <em>" + destination + "</em></p>");
            out.write("<h2>The following messages will be sent to the destination:</h2>");
            for (int i = 0; i < MSG_COUNT; i++) {
                String text = "This is message " + (i + 1);
                context.createProducer().send(destination, text);
                out.write("Message (" + i + "): " + text + "</br>");
            }
            out.write("<p><i>Go to your JBoss EAP server console or server log to see the result of messages processing.</i></p>");
        } finally {
            if (out != null) {
                out.close();
            }
        }
    }

    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        doGet(req, resp);
    }
}

Messages which we produce through the above Servlet can be consumed through a Message Driven Bean. For example, to consume the Queue messages for “queue/HELLOWORLDMDBQueue” we can add the following MDB:

@MessageDriven(name = "HelloWorldQueueMDB", activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "queue/HELLOWORLDMDBQueue"),
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
        @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge")})
public class HelloWorldQueueMDB implements MessageListener {

    private static final Logger LOGGER = Logger.getLogger(HelloWorldQueueMDB.class.toString());

    /**
     * @see MessageListener#onMessage(Message)
     */
    public void onMessage(Message rcvMessage) {
        TextMessage msg = null;
        try {
            if (rcvMessage instanceof TextMessage) {
                msg = (TextMessage) rcvMessage;
                LOGGER.info("Received Message from queue: " + msg.getText());
            } else {
                LOGGER.warning("Message of wrong type: " + rcvMessage.getClass().getName());
            }
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }
}

Using a Custom Connection Factory

The JMSContext hides much of the complexity of the connection to the JMS Server. You can also inject a JMSConnectionFactory in your JMSContext. For example:

@Stateless
public class InvoiceManagerEJB {

    @Inject
    @JMSConnectionFactory("java:/JmsXA")
    private JMSContext jmsContext;

    @Resource(lookup = "java:/queue/CMTQueue")
    private Queue queue;

    @TransactionAttribute(TransactionAttributeType.MANDATORY)
    public void createInvoice(String name) {
        jmsContext.createProducer()
                .send(queue, "Created invoice for customer named: " + name);
    }
}

In the above example, we are referring to the pooled connection factory available, by default, in the full configuration:

<pooled-connection-factory name="activemq-ra" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm" transaction="xa"/>

The pooled connection factory is special in that it leverages the outbound adapter of the Artemis JCA Resource Adapter. Therefore providing superior performance compared with the standard JMS Connection factory. It is by default available to local clients although it can be configured to work with a remote server.

As the name suggests, resources acquired from the Pooled connection factory will be automatically enlisted any on-going JTA transaction. If you want to send a message from an EJB using CMT then this is likely the connection factory you want to use so the send operation will be atomically committed along with the rest of the EJB’s transaction operations.

Delivering JMS messages within an XA transaction

The last example shows how to inject an XAConnectionFactory to deliver messages within an XA Transaction.

@Inject
private UserTransaction userTransaction;

@Resource(mappedName = "java:/JmsXA")
private XAConnectionFactory xaConnectionFactory; // we want to deliver JMS messages withing an XA transaction

@Resource(mappedName = "java:/queue/jta-crash-rec-quickstart")
private Queue queue;

private void notifyUpdate(Queue queue, String msg) throws Exception {
    XAConnection connection = null;

    try {
        connection = xaConnectionFactory.createXAConnection();
        XASession session = connection.createXASession();
        MessageProducer messageProducer = session.createProducer(queue);

        connection.start();
        TextMessage message = session.createTextMessage();
        message.setText(msg);

        messageProducer.send(message);
        messageProducer.close();
    } finally {
        if (connection != null) {
            try {
                connection.close();
            } catch (JMSException e) {
                LOGGER.info("Error closing JMS connection: " + e.getMessage());
            }
        }
    }
}

To publish messages in a XA transaction to a destination, you need to create a MessageProducer in the XA Session to establish a producer flow.

As you can see from the above code, to create a message producer, call XASession.createProducer(Destination destination) with the obtained XA Session. Then, to publish messages in the XA Session, call MessageProducer.send(…).

Following up message publishing the callback in the XAResource.start(…) happens. The event broker stages the outgoing messages and assigns them to the transaction branch. However, the messages are not delivered to their destinations on the event broker until XAResource.commit(…) is called. A successful commit will deliver the messages to their destinations and remove the staging information.

On the other hand, if any message is published in an XA Session before XAResource.start(…), they are handled in same way as messages sent in a regular Session. That is, they are not “transacted” messages.

To learn more about the difference between JTA and XA Transactions check this article: Demystifying Datasource JTA and XA settings on JBoss-WildFly

Found the article helpful? if so please follow us on Socials