Java Structured Concurrency in practice

Structured concurrency (JEP 453) is a method of organizing concurrent programming in a way that maintains the relationship between tasks and subtasks. This approach aims to make concurrent code more readable, maintainable, and reliable. In structured concurrency, tasks and their subtasks have well-defined entry and exit points. This creates a clear structure similar to how structured programming works in a single thread.

Purpose of Structured Concurrency

Firstly, the key idea is that if a task breaks into concurrent Subtasks. Each Subtask works on behalf of a main task. The main task waits for the subtasks’ results while monitoring them for any failures.

Because the entry and exit points of a code block are well-defined, the lifetime of a concurrent subtask is limited to the syntactic block of its parent task. This allows sibling subtasks to be reasoned about and managed as a unit. The parent task’s lifetime is nested within that of its parent, creating a hierarchical tree of tasks similar to the call stack of a single-threaded program. This tree structure enables the application of policies, such as deadlines, to entire sub-trees of tasks and facilitates the presentation of subtasks as subordinate to their parent tasks by observability tools.

A practical example of Structured Concurrency

The following example shows in practice how to run two concurrent tasks as a single unit:

import java.security.MessageDigest;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.*;
import static java.util.concurrent.StructuredTaskScope.Subtask;


public class StructuredConcurrencyExample {
    public static void main(String[] args) throws Exception {
        var message = new StructuredConcurrencyExample().calcutateInParallel("Hello World!");
        System.out.println(message);
    }

    private String calcutateInParallel(String str) throws InterruptedException, ExecutionException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // JDK 21: changed `fork` to return `Subtask` instead of `Future`
            Subtask<String> digest1 = scope.fork(() -> this.getDigest("MD5",str));
            Subtask<String> digest2 = scope.fork(() -> this.getDigest("SHA-256",str));

            scope.join();
            scope.throwIfFailed();

            return digest1.get() + digest2.get();
        }
    }

    private String getDigest(String digest, String str) throws Exception {
        System.out.printf("Calculating %s Digest for string : %s \n" , digest, str);
        MessageDigest md = MessageDigest.getInstance(digest);
        return new String(md.digest(str.getBytes("UTF-8")));

    }
}

The StructuredTaskScope defines the fork method to start a thread to execute a task, the join method to wait for all threads to finish, and the close method to close the task scope. The intention is that code in the block uses the fork method to fork threads to execute the subtasks, wait for the threads to finish with the join method, and then process the results.

You can handle the Subtasks in two ways:

  • ShutdownOnFailure captures the first exception and shuts down the task scope. This is the Invoke All pattern and deals with use cases where we need the results of all subtasks
  • ShutdownOnSuccess captures the first result and shuts down the task scope to interrupt unfinished threads and wakeup the owner. This class is the invoke Any pattern and deals with cases where any result of subtasks will do.

In our example, we are using the invoke All pattern which we can depict as follows:

java structured concurrency example

Finally, please note that JDK 21 changed fork to return Subtask instead of Future which was in the incubator implementation of the Structured Task:

//  Future<String>  digest1  = scope.fork(() -> this.getDigest("MD5",str));
//  Future<String>  digest2 =  scope.fork(() -> this.getDigest("SHA-256",str));
 
Subtask<String> digest1 = scope.fork(() -> this.getDigest("MD5",str));
Subtask<String> digest2 = scope.fork(() -> this.getDigest("SHA-256",str));

Since this feature is still a preview feature, to run it you need to include the --enable-preview option to your JDK 21 launch:

java --source 21 --enable-preview StructuredConcurrencyExample.java

Handling Shutdown on Success

Then, we can represent the use case where any Task completion will leads to a success as follows:

private Object calcutateInParallel(String str) throws InterruptedException, ExecutionException {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {

            Subtask<String> digest1 = scope.fork(() -> this.getDigest("MD5",str));
            Subtask<String> digest2 = scope.fork(() -> this.getDigest("SHA-256",str));

            scope.join();

            return scope.result();
        }
}

In this example, after calling scope.join() we can get the result of the first finished thread by calling scope.result().

Conclusion

In summary, the Java Development Kit (JDK) includes an API for structured concurrency, with the principal class being StructuredTaskScope in the java.util.concurrent package. This class allows developers to structure a task as a family of concurrent subtasks and coordinate them as a unit. The API provides methods for forking, joining, canceling, handling errors, and composing results within a clear lexical scope, making concurrent code more manageable.

Found the article helpful? if so please follow us on Socials