How to use the ManagedExecutorService to submit tasks

The Jakarta EE Concurrency API which outlines a standard way for executing tasks in parallel on a Jakarta EE Container using a set of Managed resources. In this excerpt from (Practical Enterprise Application development) we will provide a short introduction to the Concurrency Utilities and we will learn how to leverage asynchronous tasks using the ManagedExecutorService

Overview of Concurrency Utilities

Executing concurrent tasks within an Enterprise Container has been considered for long time a dangerous practice and sometimes even prohibited by the container:

“The enterprise bean must not attempt to manage threads. The enterprise bean must not attempt to start, stop, suspend, or resume a thread, or to change a thread’s priority or name. The enterprise bean must not attempt to manage thread groups”

Actually, by creating your own un-managed Threads in a Java EE container, using the J2SE API, would not guarantee that the context of the container is propagated to the thread executing the task.

The only available pattern was either using Asynchronous EJB or Message Driven Bean, in order to execute a task in an asynchronous way; most often this was enough for simple fire and forget patterns, yet the control of Threads still lied in the hands of the Container.

With the Jakarta EE Concurrency API you can use extensions to the java.util.concurrent API as Managed Resources, that is, managed by the Container. The only difference from the standard J2SE programming is that you will retrieve your Managed resources from the JNDI tree of the Container. Yet you will still use your Runnable interfaces or classes that are part of the java.util.concurrent package such as Future or ScheduledFuture.

In the next section, we will start from the simplest example, which is executing an asynchronous task using the ManagedExecutorService.

Using the ManagedExecutorService to submit tasks

In order to create our first asynchronous execution we will show how to use the ManagedExecutorService, which extends the Java SE ExecutorService to provide methods for submitting tasks for execution in a Jakarta EE environment. By using this managed service, the context of the container is propagated to the thread executing the task: The ManagedExecutorService is included as part of the EE configuration of the application server:

<subsystem xmlns="urn:jboss:domain:ee:2.0">
 
   <concurrent>
 
        <managed-executor-services>
             <managed-executor-service name="default"
                   jndi-name="java:jboss/ee/concurrency/executor/default"
                   context-service="default" hung-task-threshold="60000"
                   core-threads="5" max-threads="25" keepalive-time="5000"/>
        </managed-executor-services>
 
   </concurrent>
</subsystem>

In order to create our first example, we retrieve the ManagedExecutorService from the JNDI context of the container as follows:

@Resource(name = "DefaultManagedExecutorService")
ManagedExecutorService executor;

By using the ManagedExecutorService instance, you are able to submit your tasks that can implement either the java.lang.Runnable interface or the java.util.concurrent.Callable interface. Instead of having a run() method, the Callable interface offers a call() method, which can return any generic type.

Coding a simple Asynchronous Task

So let’s see a simple Servlet example which fires an asynchronous task using the ManagedExecutorService:

@WebServlet("/ExecutorServlet")
public class ExecutorServlet extends HttpServlet {

    @Resource(name = "DefaultManagedExecutorService")
    ManagedExecutorService executor;

        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
                PrintWriter writer = response.getWriter();
                executor.execute(new SimpleTask());
                writer.write("Task SimpleTask executed! check logs");
        }
}

The class SimpleTask in our example implements the Runnable interface by providing concurrent execution.

public class SimpleTask implements Runnable {
        @Override
        public void run() {
                System.out.println("Thread started.");
        }
}

Retrieving the result from the Asynchronous Task

The above Task is a good option for a down-to-earth scenario; as you might have noticed, there’s no way to intercept a return value from the Task. In addition, when using Runnable you are constrained to use unckecked exceptions (if run() threw a checked exception, who would catch it? There is no way for you to enclose that run() call in a handler, since you don’t write the code that invokes it).

If you want to overcome this limitations then you can implement a java.util.concurrent.Callable interface instead, submit it to the ExecutorService, and waiting for result with FutureTask.isDone() returned by the ExecutorService.submit().

Let’s see a new version of our Servlet, which captures the result of a Task named CallableTask:

@WebServlet("/CallableExecutorServlet")
public class CallableExecutorServlet extends HttpServlet {

    @Resource(name = "DefaultManagedExecutorService")
    ManagedExecutorService executor;

        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
                PrintWriter writer = response.getWriter();

                Future<Long> futureResult = executor.submit(new CallableTask(5));
                while (!futureResult.isDone()) {
                        // Wait
                        try {
                                Thread.sleep(100);
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                }

                try {
                        writer.write("Callable Task returned " +futureResult.get());

                } catch ( Exception e) {
                        e.printStackTrace();
                }

        }

}

As you can see from the code, we are polling for the task completion using the isDone() method. When the task is completed we can call the FutureTask’s get() method and get the return value.

Now let’s see our CallableTask implementation which, in our example, returns the value of the summation of a number:

public class CallableTask implements Callable<Long> {
        private int id;

        public CallableTask(int id) {
                this.id = id;
        }

        public Long call() {
                long summation = 0;
                for (int i = 1; i <= id; i++) {
                        summation += i;
                }
                return new Long(summation);
        }

}

In our example, all we had to do is implementing the call method, which returns the Integer that will be eventually collected via the get method of the Future interface.

If your Callable task has thrown an Exception, then FutureTask.get() will raise an Exception too and the original Exception can be accessed by using Exception.getCause()

Monitoring the state of a Future Task

In the above example, we are checking the status of the Future Task using the FutureTask.isDone() method. If you need a more accurate control over the Future Task life-cycle, then you can implement javax.enterprise.concurrent.ManagedTaskListener instance in order to receive life-cycle event notifications.

Here’s our enhanced Task, which implements the taskSubmitting, taskStarting, taskDone and taskAborted methods:

public class CallableListenerTask implements Callable<Long>,ManagedTaskListener {
        private int id;

        public CallableListenerTask(int id) {
                this.id = id;
        }
        public Long call() {
                long summation = 0;
                for (int i = 1; i <= id; i++) {
                        summation += i;
                }

                return new Long(summation);
        }
        public void taskSubmitted(Future<?> f, ManagedExecutorService es,
                        Object obj) {
                System.out.println("Task Submitted! "+f);
        }
        public void taskDone(Future<?> f, ManagedExecutorService es, Object obj,
                        Throwable exc) {
                System.out.println("Task DONE! "+f);
        }
        public void taskStarting(Future<?> f, ManagedExecutorService es,
                        Object obj) {
                System.out.println("Task Starting! "+f);
        }
        public void taskAborted(Future<?> f, ManagedExecutorService es,
                        Object obj, Throwable exc) {
                System.out.println("Task Aborted! "+f);
        }
}

The life-cycle notifications are invoked in this order:

  • taskSubmitting: on Task submission to the Executor
  • taskStarting: before the actual Task startup
  • taskDone: triggered on Task completion
  • taskAborted: triggered when the user invoked futureResult.cancel()

You can check the source code for this tutorial at: https://github.com/fmarchioni/practical-enterprise-development/tree/master/code/concurrency/ee-managedexecutor

Managed tasks got stuck?

Since WildFly 23, the managed executors in the EE subsystem are capable of identifying tasks which are running for a long unexpected time, also know as hung tasks.

The optional hung-task-threshold defines a runtime threshold value, in milliseconds, for tasks to be considered hung by the executor. A value of 0 will never consider tasks to be hung. When a task is evauated as hung, you can use one of the following strategies:

Evict tasks manually from the CLI:


/subsystem=ee/managed-executor-service=default:terminate-hung-tasks()

Evict tasks automatically: This in turn requires setting the hung-task-termination-period which s the period, in milliseconds, for hung tasks automatic termination. A value of 0, which is the default, deactivates the feature.

/subsystem=ee/managed-executor-service=default:write-attribute(name=hung-task-termination-period,value=5000)