DEV Community

Tiago Martinho
Tiago Martinho

Posted on

A summary on Structured Concurrency in Java

Structured concurrency is currently a preview feature in Java 21 that simplifies concurrent programming by treating groups of related tasks running in different threads as a single unit of work. This approach streamlines error handling, cancellation, and enhances observability, making it easier to build maintainable, reliable, and efficient concurrent applications.

Key Benefits of Structured Concurrency

  1. Simplified concurrent programming: Structured concurrency allows developers to focus on the business at hand instead of orchestrating threading.
  2. Improved error handling and cancellation: By treating tasks as a single unit, structured concurrency makes it easier to handle errors and cancel tasks when necessary.
  3. Enhanced observability: Structured concurrency helps developers better understand the performance impacts and possible bottlenecks in their concurrent applications.

Getting Started with Structured Concurrency

To use the StructuredTaskScope API in Java, you need to enable preview features in your IDE.

Example of Structured Concurrency

Here is an example of using the StructuredTaskScope API (by the way, using an unnamed class, also a preview feature in Java 21):

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
import java.util.function.Supplier;

     void main() throws ExecutionException, InterruptedException {
         try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
             Supplier<String> name  = scope.fork(fetchName());
             Supplier<Integer> age = scope.fork(fetchAge());

             scope
                     .join()            // Join both subtasks
                     .throwIfFailed();  // ... and propagate errors

             // Here, both subtasks have succeeded, so compose their results
             System.out.println("Name is " + name.get() + ". Age is " + age.get());
         }
     }

     private Callable<String> fetchName() {
         return () -> "Tiago";
     }

    private Callable<Integer> fetchAge() {
        return () -> 29;
    }
Enter fullscreen mode Exit fullscreen mode

In this example, the StructuredTaskScope.ShutdownOnFailure() instance is used to fork two tasks, fetchName and fetchAge, which run concurrently. The main task waits for both subtasks to complete before continuing. If a subtask fails, the entire task scope is terminated, ensuring that resources are properly cleaned up.
The use of StructuredTaskScope ensures several valuable properties in concurrent programming. Firstly, it implements error handling with short-circuiting, meaning that if either the fetchName() or fetchAge() subtasks fail, the other is promptly cancelled if it has not completed yet, with the ability to customize this behavior using different shutdown policies. Additionally, cancellation propagation is seamlessly managed; if the thread running main() is interrupted before or during the call to join(), both subtasks are automatically cancelled when the thread exits the scope. This contributes to code clarity, as the structure is evident: setting up subtasks, awaiting their completion or cancellation, and making a decision based on the results. Lastly, for observability, a thread dump clearly illustrates the task hierarchy, depicting the threads executing fetchName() and fetchAge() as children of the scope. This enhances transparency and aids in understanding the concurrent execution flow.

Extending the StructuredTaskScope Class

Developers can also create their own subclasses of StructuredTaskScope to customize its behavior. For example, a subclass can override the handleComplete method to define specific actions when a task is completed. The following is an example of a StructuredTaskScope subclass that collects the results of subtasks that complete successfully. It defines the method results() to be used by the main task to retrieve the results.

class MyScope<T> extends StructuredTaskScope<T> {

    private final Queue<T> results = new ConcurrentLinkedQueue<>();

    MyScope() { super(null, Thread.ofVirtual().factory()); }

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        if (subtask.state() == Subtask.State.SUCCESS)
            results.add(subtask.get());
    }

    @Override
    public MyScope<T> join() throws InterruptedException {
        super.join();
        return this;
    }

    // Returns a stream of results from the subtasks that completed successfully
    public Stream<T> results() {
        super.ensureOwnerAndJoined();
        return results.stream();
    }

}
Enter fullscreen mode Exit fullscreen mode

This custom policy can be used like so:

<T> List<T> allSuccessful(List<Callable<T>> tasks) throws InterruptedException {
    try (var scope = new MyScope<T>()) {
        for (var task : tasks) scope.fork(task);
        return scope.join()
                    .results().toList();
    }
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

Structured concurrency is a promising addition to the Java standard API, simplifying concurrent programming and making it easier to build maintainable, reliable, and efficient applications. As this feature progresses from preview to stable, it is poised to unlock new possibilities in concurrent application development.

References

https://openjdk.org/jeps/453

Top comments (0)