DEV Community

Ruby Valappil
Ruby Valappil

Posted on • Originally published at Medium on

4 3

Exploring What’s Inside java.util.concurrent Package (Part 1)

Just the stuff you need for your day-to-day programming tasks

java.util.concurrent package in Java contains Interfaces and Classes that make concurrent programming possible.

What is Concurrent Programming?


Concurrency

In a program, when more than one process appears to be running at the same time we have concurrency. Why appear and not in real? If two processes are actually running at the same time we have parallelism.

We are not getting much into the concept of concurrency but rather exploring the content inside the java package that makes concurrency possible.

NB: It will not be possible to cover all that’s mentioned in the content segment in a single article, so I will cover them all in multiple parts (hopefully two would be enough).

Content

  1. java.util.concurrent a) Executors & Future b) Concurrent Collections
  2. java.util.concurrent.locks
  3. java.util.concurrent.atomic

java.util.concurrent

The main components contained in this package are — Executors, Queues, Timing, Synchronizers, and Concurrent Collections.

In this article, we will explore the two components that are heavily used in web-based applications — Executors and Concurrent Collections.

The classes and interfaces along with their hierarchy in the executor framework are shown in the below image,

Executors


Interface and Class Hierarchy in the java.util.concurrent package

The difference between ThreadPoolExecutor and ScheduledThreadPoolExecutor classes is that the latter can be used to schedule task executions, like add a delay, etc.

Another important class one needs to remember in this package is Executors. This class provides factory methods for most of the configurations that one would need while working on executors.

We can either use these given classes or implement the interface and provide our own implementation.

Let’s try to use the interfaces and create a ThreadPool and execute a few tasks by fetching threads from that pool. We will use the methods from the Executors factory class to create thread pool.

package com.example.concurrent.config;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class ExecutorServiceSample {
public static void main(String[] args) {
ExecutorService executor = Executors.newScheduledThreadPool(2, new MyThreadFactory());
// submit(Runnable task)
executor.submit(() -> {
System.out.println("Implemented Runnable Interface"+Thread.currentThread().getName());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// submit(Runnable task, T result)
executor.submit(() -> {
System.out.println("Implemented Runnable Interface"+Thread.currentThread().getName());
}, 1);
// submit(Callable<T> task)
executor.submit(() -> {
System.out.println("Implemented Callable Interface"+Thread.currentThread().getName());
return 1;
});
executor.shutdown();
}
}
//Implement ThreadFactory and give name to thread
class MyThreadFactory implements ThreadFactory {
static int i = 1;
public Thread newThread(Runnable r) {
Thread t = new Thread(r,""+i);
i++;
return t;
}
}

In the above sample, we created a ThreadPool with the initial size of 2. We also provided our own implementation of ThreadFactory which is optional. We can skip our own implementation and the executor will take the default ThreadFactory implementation. The reason we are implementing our own factory is to give a name to the new Thread and identify which thread is running our tasks.

Next, we executed three tasks using the submit() methods of ExecutorService. Note that, all three of them have a different signature. We can submit the tasks using tasks of type Callable or Runnable.

The output of the above code is::

Implemented Runnable Interface1

Implemented Runnable Interface2

Implemented Callable Interface2
Enter fullscreen mode Exit fullscreen mode

The first task was run by Thread 1 and the next two were run by Thread 2.

Future, Runnable, and Callable

Runnable and Callable perform a similar job. They both execute a task in a new thread but Callable can return a result and also can throw a checked exception.

Future is an interface with a signature Interface Future. It represents the result of asynchronous execution. In the above code sample, submit methods of executor returns back a Future.

Many classes in the concurrent package implement this interface, let’s take a look at one of the most commonly used implementations of Future Interface — CompletableFuture.

CompletableFuture has the following signature,

public class CompletableFuture<T>
extends Object
implements Future<T>, CompletionStage<T>
Enter fullscreen mode Exit fullscreen mode

Let’s check some of the methods that belong to this class.

  1. runAsync() : Returns a new CompletableFuture. Takes a Runnable object. If you want to run a code block and do not expect a return value then use this method.
  2. supplyAsync() : Returns a new CompletableFuture.Takes a Supplier object and also returns the value that was obtained by calling the given Supplier . If you want to run a code block and expect a return value then use this method.

Note that, both of these methods have an option to accept an executor instance as a parameter. If we do not supply an executor to the these methods then new Threads are created using ForkJoinPool.commonPool().

We have already implemented our own ThreadPool and have control over the number of threads that would be created by default so I prefer to pass an instance of executor as a param.

package com.example.concurrent.config;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class ExecutorServiceSample {
public static void main(String[] args) {
ExecutorService executor = Executors.newScheduledThreadPool(2, new MyThreadFactory());
//no return value
CompletableFuture.runAsync(() -> {
System.out.println("Inside Runnable " + Thread.currentThread().getName());
}, executor);
//with return value
CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("Inside Supplier " + Thread.currentThread().getName());
return true;
}, executor);
//get blocks the thread and waits for the future to complete
try {
System.out.println("Result from supplier " + cf.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//shutdown executor manually
executor.shutdown();
}
}
//Implement ThreadFactory and give name to thread
class MyThreadFactory implements ThreadFactory {
static int i = 1;
public Thread newThread(Runnable r) {
Thread t = new Thread(r,""+i);
i++;
return t;
}
}
  1. thenAccept() : Get the result of a CompletedFuture task and pass it as an argument to a Consumer object. If you need to run a task based on the result of a Future object and do not want a result back from that task then use this method.

  2. thenApply() : Get the result of a CompletedFuture task and pass it as an argument to a Function object. If you need to run a task based on the result of a Future object and need a result back from that task then use this method.

package com.example.concurrent.config;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class ExecutorServiceSample {
public static void main(String[] args) {
ExecutorService executor = Executors.newScheduledThreadPool(2, new MyThreadFactory());
// no return value
CompletableFuture.runAsync(() -> {
System.out.println("Inside Runnable " + Thread.currentThread().getName());
}, executor);
// with return value
CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> {
System.out.println("Inside Supplier " + Thread.currentThread().getName());
return true;
}, executor);
// accept the new Consumer object once the future is completed
cf.thenAccept(flag -> {
if (flag)
System.out.println("Inside thenAccept() ");
});
// apply a Function object once the future is completed and returns boolean
CompletableFuture<Boolean> applyResult = cf.thenApply(flag -> {
System.out.println("Inside thenApply() ");
return false;
});
// shutdown executor manually
executor.shutdown();
}
}
//Implement ThreadFactory and give name to thread
class MyThreadFactory implements ThreadFactory {
static int i = 1;
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "" + i);
i++;
return t;
}
}
view raw Apply.java hosted with ❤ by GitHub
  1. thenCompose() : If you want to combine the result of two future executions where one future task depends on another future task’s output then use this method.
package com.example.concurrent.config;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class ExecutorServiceSample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newScheduledThreadPool(2, new MyThreadFactory());
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
return 1;
}, executor);
// result of cf is passed to another future object that invokes add method
CompletableFuture<Integer> result = cf.thenCompose(x -> {
return CompletableFuture.supplyAsync(() -> {
return add(x, 2);
}, executor);
});
System.out.println("Result " + result.get());
// shutdown executor manually
executor.shutdown();
}
public static int add(int x, int y) {
return x + y;
}
}
//Implement ThreadFactory and give name to thread
class MyThreadFactory implements ThreadFactory {
static int i = 1;
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "" + i);
i++;
return t;
}
}
view raw Compose.java hosted with ❤ by GitHub
  1. thenCombine(): If you want to combine the output of two independent future tasks then use this method.

Modify the above code to make the following changes and we will yield the same result as the above code.

CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
return 1;
}, executor);
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
return 2;
}, executor);
// result of cf is passed to another future object that invokes add method
CompletableFuture<Integer> result = cf.thenCombine(cf1, (x,y) -> {
return add(x,y);
});
System.out.println("Result " + result.get());
view raw Combine.java hosted with ❤ by GitHub

Concurrent Collections

Collection implementations that are designed for multi-threaded contexts are

ConcurrentHashMap, ConcurrentSkipListMap, ConcurrentSkipListSet, CopyOnWriteArrayList, and CopyOnWriteArraySet

Note that the behavior of these classes is different from the synchronized classes.

For example, Collections.synchronizedList(new ArrayList()); returns a synchronized ArrayList. Thread safety on a synchronized collection is achieved by applying a Thread lock on the entire object.

CopyOnWriteArrayList, the concurrent alternative to the synchronized list achieves Thread safety by allowing multiple threads to read from the collection without applying a lock on the instance and the lock is applied only for the update.

concurrent collections are preferred over the Synchronized collection for better performance and scalability.

Note:: We will explore the other two sub packages in the next article.


AWS Security LIVE!

Join us for AWS Security LIVE!

Discover the future of cloud security. Tune in live for trends, tips, and solutions from AWS and AWS Partners.

Learn More

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay