Structured concurrency is currently a preview feature in Java 21 that simplifies concurrent programming by treating groups of related tasks running in different threads as a single unit of work. This approach streamlines error handling, cancellation, and enhances observability, making it easier to build maintainable, reliable, and efficient concurrent applications.
Key Benefits of Structured Concurrency
- Simplified concurrent programming: Structured concurrency allows developers to focus on the business at hand instead of orchestrating threading.
- Improved error handling and cancellation: By treating tasks as a single unit, structured concurrency makes it easier to handle errors and cancel tasks when necessary.
- Enhanced observability: Structured concurrency helps developers better understand the performance impacts and possible bottlenecks in their concurrent applications.
Getting Started with Structured Concurrency
To use the StructuredTaskScope
API in Java, you need to enable preview features in your IDE.
Example of Structured Concurrency
Here is an example of using the StructuredTaskScope
API (by the way, using an unnamed class, also a preview feature in Java 21):
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
import java.util.function.Supplier;
void main() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<String> name = scope.fork(fetchName());
Supplier<Integer> age = scope.fork(fetchAge());
scope
.join() // Join both subtasks
.throwIfFailed(); // ... and propagate errors
// Here, both subtasks have succeeded, so compose their results
System.out.println("Name is " + name.get() + ". Age is " + age.get());
}
}
private Callable<String> fetchName() {
return () -> "Tiago";
}
private Callable<Integer> fetchAge() {
return () -> 29;
}
In this example, the StructuredTaskScope.ShutdownOnFailure()
instance is used to fork two tasks, fetchName
and fetchAge
, which run concurrently. The main task waits for both subtasks to complete before continuing. If a subtask fails, the entire task scope is terminated, ensuring that resources are properly cleaned up.
The use of StructuredTaskScope
ensures several valuable properties in concurrent programming. Firstly, it implements error handling with short-circuiting, meaning that if either the fetchName()
or fetchAge()
subtasks fail, the other is promptly cancelled if it has not completed yet, with the ability to customize this behavior using different shutdown policies. Additionally, cancellation propagation is seamlessly managed; if the thread running main()
is interrupted before or during the call to join()
, both subtasks are automatically cancelled when the thread exits the scope. This contributes to code clarity, as the structure is evident: setting up subtasks, awaiting their completion or cancellation, and making a decision based on the results. Lastly, for observability, a thread dump clearly illustrates the task hierarchy, depicting the threads executing fetchName()
and fetchAge()
as children of the scope. This enhances transparency and aids in understanding the concurrent execution flow.
Extending the StructuredTaskScope Class
Developers can also create their own subclasses of StructuredTaskScope
to customize its behavior. For example, a subclass can override the handleComplete
method to define specific actions when a task is completed. The following is an example of a StructuredTaskScope
subclass that collects the results of subtasks that complete successfully. It defines the method results()
to be used by the main task to retrieve the results.
class MyScope<T> extends StructuredTaskScope<T> {
private final Queue<T> results = new ConcurrentLinkedQueue<>();
MyScope() { super(null, Thread.ofVirtual().factory()); }
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS)
results.add(subtask.get());
}
@Override
public MyScope<T> join() throws InterruptedException {
super.join();
return this;
}
// Returns a stream of results from the subtasks that completed successfully
public Stream<T> results() {
super.ensureOwnerAndJoined();
return results.stream();
}
}
This custom policy can be used like so:
<T> List<T> allSuccessful(List<Callable<T>> tasks) throws InterruptedException {
try (var scope = new MyScope<T>()) {
for (var task : tasks) scope.fork(task);
return scope.join()
.results().toList();
}
}
Conclusion
Structured concurrency is a promising addition to the Java standard API, simplifying concurrent programming and making it easier to build maintainable, reliable, and efficient applications. As this feature progresses from preview to stable, it is poised to unlock new possibilities in concurrent application development.
Top comments (0)