DEV Community

Cover image for Mastering Java Concurrency: CompletableFuture, Fork/Join, and Beyond
Aarav Joshi
Aarav Joshi

Posted on

Mastering Java Concurrency: CompletableFuture, Fork/Join, and Beyond

As a Java developer with years of experience, I've seen the language evolve dramatically, especially in the realm of concurrency. The introduction of new features has revolutionized how we approach multi-threaded programming, making it easier to build high-performance applications that can fully utilize modern hardware.

CompletableFuture is one of the most significant additions to Java's concurrency toolkit. It provides a powerful way to work with asynchronous computations, allowing us to chain operations, combine results, and handle errors with ease. I've found it particularly useful in scenarios where I need to orchestrate multiple independent tasks.

Here's a practical example of how CompletableFuture can be used:

CompletableFuture<String> usernameFuture = CompletableFuture.supplyAsync(() -> fetchUsername());
CompletableFuture<String> emailFuture = CompletableFuture.supplyAsync(() -> fetchEmail());

CompletableFuture<User> userFuture = usernameFuture.thenCombine(emailFuture, (username, email) -> {
    return new User(username, email);
});

userFuture.thenAccept(user -> System.out.println("User created: " + user));
Enter fullscreen mode Exit fullscreen mode

In this code, we're fetching a username and email asynchronously, then combining the results to create a User object. The beauty of CompletableFuture is that it allows us to express this complex asynchronous logic in a clear, readable manner.

The Fork/Join Framework is another powerful tool in our concurrency arsenal. It's designed for recursive divide-and-conquer problems, making it ideal for tasks like sorting large arrays or processing tree-like data structures. The framework uses a work-stealing algorithm, which allows idle threads to "steal" tasks from busy ones, ensuring efficient use of system resources.

Here's an example of using the Fork/Join Framework to sum the elements of a large array:

public class SumTask extends RecursiveTask<Long> {
    private final long[] array;
    private final int start;
    private final int end;
    private static final int THRESHOLD = 10000;

    public SumTask(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            int mid = (start + end) / 2;
            SumTask left = new SumTask(array, start, mid);
            SumTask right = new SumTask(array, mid, end);
            left.fork();
            long rightResult = right.compute();
            long leftResult = left.join();
            return leftResult + rightResult;
        }
    }
}

// Usage
ForkJoinPool pool = ForkJoinPool.commonPool();
long[] array = new long[1000000];
// ... fill array with values
SumTask task = new SumTask(array, 0, array.length);
long sum = pool.invoke(task);
Enter fullscreen mode Exit fullscreen mode

This example demonstrates how we can use the Fork/Join Framework to parallelize a computationally intensive task. The framework takes care of distributing the work across available threads, allowing us to focus on the logic of our algorithm.

Parallel Streams are a higher-level abstraction built on top of the Fork/Join Framework. They provide a simple way to parallelize operations on collections, making it easy to leverage multi-core processors for data processing tasks. I've found parallel streams to be particularly useful when working with large datasets where operations can be performed independently on each element.

Here's an example of using parallel streams to process a large list of numbers:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
long sum = numbers.parallelStream()
                  .filter(n -> n % 2 == 0)
                  .mapToLong(n -> n * n)
                  .sum();
System.out.println("Sum of squares of even numbers: " + sum);
Enter fullscreen mode Exit fullscreen mode

In this code, we're using a parallel stream to filter even numbers, square them, and sum the results. The parallelStream() method automatically uses the Fork/Join pool to distribute the work across multiple threads.

The StampedLock is a more recent addition to Java's concurrency utilities. It's designed to provide better performance than ReentrantReadWriteLock in situations where reads are much more frequent than writes. StampedLock allows for three modes of locking: writing, reading, and optimistic reading.

Here's an example of how to use StampedLock:

public class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    double distanceFromOrigin() {
        long stamp = sl.tryOptimisticRead();
        double currentX = x, currentY = y;
        if (!sl.validate(stamp)) {
            stamp = sl.readLock();
            try {
                currentX = x;
                currentY = y;
            } finally {
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}
Enter fullscreen mode Exit fullscreen mode

In this example, the move method uses a write lock to ensure exclusive access when updating the coordinates. The distanceFromOrigin method uses an optimistic read, which doesn't actually lock the state. If the optimistic read fails (i.e., if a write occurred during the read), it falls back to a regular read lock.

The Flow API, introduced in Java 9, brings reactive programming to the standard library. It provides interfaces for implementing the Reactive Streams specification, allowing for asynchronous stream processing with non-blocking back pressure. While the Flow API itself is quite low-level, it serves as a foundation for higher-level reactive libraries.

Here's a simple example of using the Flow API:

public class SimplePublisher implements Flow.Publisher<Integer> {
    private final List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Flow.Subscription() {
            private int index = 0;

            @Override
            public void request(long n) {
                for (int i = 0; i < n && index < data.size(); i++) {
                    subscriber.onNext(data.get(index++));
                }
                if (index >= data.size()) {
                    subscriber.onComplete();
                }
            }

            @Override
            public void cancel() {
                index = data.size();
            }
        });
    }
}

public class SimpleSubscriber implements Flow.Subscriber<Integer> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Received: " + item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Complete");
    }
}

// Usage
SimplePublisher publisher = new SimplePublisher();
SimpleSubscriber subscriber = new SimpleSubscriber();
publisher.subscribe(subscriber);
Enter fullscreen mode Exit fullscreen mode

This example demonstrates a simple publisher-subscriber interaction using the Flow API. The publisher emits a sequence of integers, and the subscriber processes them one at a time.

These concurrency features have significantly improved my ability to write efficient, scalable Java applications. CompletableFuture has made asynchronous programming much more intuitive, allowing me to express complex workflows clearly. The Fork/Join Framework has been invaluable for parallelizing recursive algorithms, while parallel streams have simplified bulk data processing.

StampedLock has proven useful in scenarios where I need fine-grained control over concurrent access, especially in read-heavy situations. And while I haven't used the Flow API directly as much, it's exciting to see reactive programming principles becoming part of the Java standard library.

One of the key lessons I've learned while working with these features is the importance of choosing the right tool for the job. While parallel streams are convenient, they're not always the best choice for every concurrent task. Sometimes, a more low-level approach using CompletableFuture or the Fork/Join Framework can provide better performance and control.

I've also found that these features often work best in combination. For example, I might use CompletableFuture to coordinate multiple asynchronous tasks, each of which internally uses parallel streams for data processing. Or I might use StampedLock to protect shared state in a reactive application built with the Flow API.

It's important to note that while these features make concurrent programming easier, they don't eliminate the need for careful design and testing. Concurrency bugs can still occur, and they're often subtle and hard to reproduce. I always make sure to thoroughly test my concurrent code, using tools like jcstress when appropriate.

In conclusion, Java's concurrency enhancements have provided developers with a powerful set of tools for building high-performance applications. By leveraging CompletableFuture, the Fork/Join Framework, parallel streams, StampedLock, and the Flow API, we can create responsive, efficient, and scalable systems that make full use of modern multi-core processors. As Java continues to evolve, I'm excited to see what new concurrency features will be introduced in future versions of the language.


Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)