Using JAX-RS Server Sent Events with WildFly

JAX-RS 2.1 (JSR 370) contains several enhancements like Server Sent Events which will be discussed in this tutorial.

Here are in a nutshell the key enhancements provided by JAX-RS 2.1 :

  • Support for Server Sent Events (server and client)
  • JSON Binding (JSON-B) API integration

We have already discussed about JSON Binding (JSON-B) in this tutorial How to use JSON-B for parsing Java objects to/from JSON  so we will focus on the first item.

JAX-RS Support for Server Sent Events

Server Sent Events standard (SSE) are text/events which are streamed from the Server to the Client. JAX-RS includes both a Server API for publishing the SSE and a Client API for consuming them. Let’s see quickly an example of it:

@GET 
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sendEvent(@Context SseEventSink eventSink, @Context Sse sse) {  
	OutboundSseEvent sseEvent = sse.newEvent("Today is ", new java.util.Date().toString());  
	eventSink.send(sseEvent);  
	eventSink.close();  
}

There are several key elements in the above Rest service:

  • The generic interface javax.ws.rs.sse.SseEvent represents a generic abstraction for a Server Sent Event and it’s implemented by the InboundSseEvent and OutboundSseEvent representing incoming and outgoing events respectively.
  • In the above example we are sending a single event containing a text with the current Date.
  • The actual event streaming is done by the SseEventSink interface which can send either single events or broadcast events to subscribers.

JAX-RS 2.1 (JSR 370)

Coding a full Server-Client SSE example

Let’s create a full example which will deliver some server-events like generated random numbers to the Client. In first instance, we will use a very simple javascript client, then we will move to a Java client.

Here is our server service:

package com.itbuzzpress.chapter11;

import java.util.List;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import javax.annotation.Resource;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;


@Path("/events")
public class SseResource {

 
	Executor executorService = Executors.newSingleThreadExecutor();

	@GET
	@Produces(MediaType.SERVER_SENT_EVENTS)
	public void sendEvents(@Context SseEventSink sseEventSink, @Context Sse sse) {
		Random rnd = new Random();
		IntStream rndStream = IntStream.generate(() -> rnd.nextInt(90));
		List<Integer> lottery = rndStream.limit(5).boxed().collect(Collectors.toList());

		executorService.execute(() -> {
			lottery.forEach(value -> {
				try {
					TimeUnit.SECONDS.sleep(5);
					System.out.println("Sending the following value: " + value);
					final OutboundSseEvent outboundSseEvent = sse.newEventBuilder().name("lottery")
							.data(Integer.class, value).build();
					sseEventSink.send(outboundSseEvent);
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}

			});
			sseEventSink.close();
		});

	}
 
}

As you can see, the above code is just slightly more complex as it use the java.util.stream API to generate a set of Random numbers which are placed in a Collection. Then the events are delivered through the sendEvents method which produces MediaType.SERVER_SENT_EVENTS. The Collection is cycled every 5 seconds and the Random number is sent to the Client using the SseEventSink.

Now let’s move to the Client. At first we will use a minimal javascript client which uses the EventSource interface:

 <body onload="getRandomValues()">
  . . .
        <script>
            function getRandomValues() {
                var source = new EventSource("rest/events/");
                source.addEventListener('lottery', function (event) {
                    document.getElementById("number").innerHTML = event.data;
                }, false);
            }
        </script>
 </body>

The EventSource interface is a Javascript interface to server-sent events. An EventSource instance opens a persistent connection to an HTTP server, which sends events in text/event-stream format. The connection remains open until closed by calling EventSource.close().

Unlike WebSockets, server-sent events are unidirectional; that is, data messages are delivered in one direction, from the server to the client. That makes them an excellent choice when there’s no need to send data from the client to the server in message form.

By deploying the application will print on the screen Random numbers delivered by the Server side event

JAX-RS 2.1 (JSR 370)

Coding a Java Client

The following example is a Java client which is embedded in an EJB Timer. As soon as the Singleton EJB is deployed, the WebTarget is linked to the REST Service. Then, Timer object is created to register and read event data from the Server Side Event.

import java.util.ArrayList;
import java.util.Collection;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.Schedule;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.Timeout;
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerService;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.sse.SseEventSource;

@Singleton
@Startup
public class SSEClient {
	Client sseClient;
	WebTarget target;
	SseEventSource eventSource;
	@Resource 
	TimerService timerService;

	public ArrayList<String> listUsers;

	@PostConstruct
	public void init() {

		this.sseClient = ClientBuilder.newClient();
		this.target = this.sseClient.target("http://localhost:8080/jaxrs21sse-1.0/rest/events");

		timerService.createSingleActionTimer(5000, new TimerConfig());
		System.out.println("SSE client timer created");

		// Server side event source 
		eventSource = SseEventSource.target(target).build();
		System.out.println("SSE Event source created........");

	}

	public void addUser(String username) {
		listUsers.add(username);
	}

	public void removeUser(String username) {
		listUsers.remove(username);
	}

	public ArrayList<String> getListUsers() {
		return listUsers;
	}

	@Timeout
	public void client() {

		try {
			eventSource.register((sseEvent) -> {
				System.out.println("SSE event recieved ----- " + sseEvent.readData());
			}, (e) -> e.printStackTrace());

			eventSource.open();

		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	@PreDestroy
	public void close() {
		eventSource.close();
		System.out.println("Closed SSE Event source..");
		sseClient.close();
		System.out.println("Closed JAX-RS client..");
	}

}

Basically once SseEventSource is created and connected to a server, registered event consumer will be invoked when an inbound event arrives. In case of errors, an exception will be passed to a registered consumer so that it can be processed.

SSE Broadcasting

Another API which is worth mentioning is the javax.ws.rs.sse.SseBroadcaster. By using this interface, SSE events can be broadcasted to multiple clients simultaneously. It will iterate over all registered SseEventSinks and send events to all requested SSE Stream. An application can create a SseBroadcaster from an injected context Sse. The broadcast method on a SseBroadcaster is used to send SSE events to all registered clients. The following code snippet is an example on how to create SseBroadcaster, subscribe and broadcast events to all subscribed consumers:

   @GET
   @Path("/subscribe")
   @Produces(MediaType.SERVER_SENT_EVENTS)
   public void subscribe(@Context SseEventSink sseEventSink) throws IOException
   {
      if (sseEventSink == null)
      {
         throw new IllegalStateException("No client connected.");
      }
      if (sseBroadcaster == null)
      {
         sseBroadcaster = sse.newBroadcaster();
      }
      sseBroadcaster.register(sseEventSink);
   }

   @POST
   @Path("/broadcast")
   public void broadcast(String message) throws IOException
   {
      if (sseBroadcaster == null)
      {
         sseBroadcaster = sse.newBroadcaster();
      }
      sseBroadcaster.broadcast(sse.newEvent(message));

   }          

The source code for the JAX-RS Server Sent Event Demo is available on Github at: https://github.com/fmarchioni/practical-javaee7-development-wildfly/tree/javaee8/code/chapter11/jaxrs-sse