DEV Community

Pavan K Jadda
Pavan K Jadda

Posted on

Synchronous and Asynchronous Data access using ThreadPoolExecutor and Spring Boot, Spring Data

Overview

Java multi-threading requests the Operating System to create and destroy threads. Using multiple threads to execute multiple tasks is an efficient way to use CPU and gives a real performance boost to the application.


This blog post explains the process to access data Synchronous and Asynchronous manner using ThreadPoolExecutor. The primary motive behind to use the Thread Pool concept to access data is to improve the efficiency of multithreading and restrict the number of threads to prevent overload on CPU. ThreadPoolExecutor takes care of creating and destroying threads and BlockingQueue stores pending tasks in a queue. As soon ThreadPoolExecutor is available to take more tasks, it takes tasks based on FIFO (First in First Out) order.

Background

ExecutorService is a framework provided by the JDK which simplifies the execution of tasks in asynchronous mode. Thread pools address two different problems:

  1. Usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead, and
  2. And provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks.

There are a couple of ways to implement this. One directly uses ExecutorService interface and create Thread Pool. Second, use ThreadPoolExecutor class, which is an ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using [Executors](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html) factory methods.

Technologies:

  1. Java 11
  2. Spring Boot 2.x
  3. Spring Data 2.x
  4. MySql 8.x

Steps:

  1. Spring Boot and Spring Data used to develop simple REST API and data access repository calls redirected to ThreadPoolExecutor
  2. Create Employee, EmployeeController, and Employee Services as shown below

Employee.java

package com.pj.springdatademo.model;import lombok.Data;import javax.persistence.\*;  
import javax.validation.constraints.NotEmpty;  
import javax.validation.constraints.NotNull;  
import java.io.Serializable;@Entity  
@Data  
@Table(name = "employee")  
public class Employee  implements Serializable  
{  
    private static final long _serialVersionUID_ \= -2994315037642107537L;@Id  
    @GeneratedValue(strategy = GenerationType._SEQUENCE_)  
    @Column(name = "id")  
    private Long id;@NotNull(message = "First name must not be null")  
    @NotEmpty  
    @Column(name = "first\_name", nullable = false)  
    private String firstName;@NotNull(message = "Last name must not be null")  
    @NotEmpty  
    @Column(name = "last\_name", nullable = false)  
    private String lastName;@NotNull(message = "Email must not be null")  
    @NotEmpty  
    @Column(name = "email", nullable = false)  
    private String email;@Column(name = "phone")  
    private String phone;}
Enter fullscreen mode Exit fullscreen mode

EmployeeController.java

@RestController  
@RequestMapping("/api/v1/employee")  
public class EmployeeController  
{  
    private final EmployeeService employeeService;public EmployeeController(EmployeeService employeeService)  
    {  
        this.employeeService \= employeeService;  
    }@GetMapping(path = "/list")  
    public List<Employee> getAllEmployees()  
    {  
        return employeeService.getAllEmployees();  
    }@GetMapping(path = "/list/async")  
    public List<Employee> getAllEmployeesAsync()  
    {  
        return employeeService.getAllEmployeesAsync();  
    }  
}
Enter fullscreen mode Exit fullscreen mode

EmployeeServiceImpl.java

@Service  
public class EmployeeServiceImpl implements EmployeeService  
{  
    private final EmployeeRepository employeeRepository;  
    private final ThreadPoolExecutorUtil threadPoolExecutorUtil;public EmployeeServiceImpl(EmployeeRepository employeeRepository, ThreadPoolExecutorUtil threadPoolExecutorUtil)  
    {  
        this.employeeRepository \= employeeRepository;  
        this.threadPoolExecutorUtil \= threadPoolExecutorUtil;  
    }@Override  
    public List<Employee> getAllEmployeesAsync()  
    {  
        for (int i=0;i<10000;i++)  
        {  
            TaskThread taskThread=new TaskThread(employeeRepository);  
            threadPoolExecutorUtil.executeTask(taskThread);  
        }  
        _/\*  
            Following code created to just return list of values at the end  
         \*/_ TaskThread taskThread=new TaskThread(employeeRepository);  
        threadPoolExecutorUtil.executeTask(taskThread);return taskThread.employees;  
    }@Override  
    public List<Employee> getAllEmployees()  
    {  
        return employeeRepository.findAll();  
    }  
}
Enter fullscreen mode Exit fullscreen mode

3. Employee service class has 2 methods. getAllEmployees() method, which does not use the thread pool mechanism and getAllEmployeesAsync() method, which uses the thread pool mechanism

4. Let’s create ThreadPoolExecutorUtil class, which takes care of multithreading (explained in next step)

@Component  
public class ThreadPoolExecutorUtil  
{  
    private Logger logger\= LoggerFactory._getLogger_(ThreadPoolExecutorUtil.class);private ThreadPoolExecutor threadPoolExecutor;public ThreadPoolExecutorUtil()  
    {  
        _//Handle 10000 tasks at a time_ BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue(10000);  
        threadPoolExecutor \= new ThreadPoolExecutor(2, 10, 20, TimeUnit._SECONDS_, blockingQueue);  
        threadPoolExecutor.setRejectedExecutionHandler((r, executor) ->  
        {  
            try {  
                Thread._sleep_(1000);  
                logger.error("Exception occurred while adding task, Waiting for some time");  
            }  
            catch (InterruptedException e)  
            {  
                logger.error("Thread interrupted:  ()",e.getCause());  
                Thread._currentThread_().interrupt();  
            }  
            threadPoolExecutor.execute(r);  
        });  
    }void executeTask(TaskThread taskThread)  
    {  
        Future<?> future=threadPoolExecutor.submit(taskThread);  
        logger.info("Number of Active Threads: {}",threadPoolExecutor.getActiveCount());while (!future.isDone())  
        {  
            try {  
                future.get();  
                logger.info("task.employees: {}",taskThread.employees.toString());  
            }  
            catch (Exception e)  
            {  
                logger.error("Thread interrupted:  ()",e.getCause());  
            }  
        }  
    }  
}
Enter fullscreen mode Exit fullscreen mode

5. Let’s go through line by line. First the constructor, we create BlockingQueue that holds 10000 tasks at a time. You can increase or decrease the number but remember it consumes lots of resources such as memory and CPU to hold a large number of tasks.

_//Handle 10000 tasks at a time  
_BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue(10000);
Enter fullscreen mode Exit fullscreen mode

6. Create ThreadPoolExecutor object by passing corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit, and Blocking Queue name

threadPoolExecutor \= new ThreadPoolExecutor(2, 10, 20, TimeUnit._SECONDS_, blockingQueue);
Enter fullscreen mode Exit fullscreen mode

7. Set setRejectedExecutionHandler method to handle rejections, if the queue is full or tasks cannot be added to Queue. Put a sleep period to the Thread until one of the thread from the pool is free and then add it ThreadPoolExecutor with execuet() method

threadPoolExecutor.setRejectedExecutionHandler((r, executor) ->  
{  
    try {  
        Thread._sleep_(1000);  
        logger.error("Exception occurred while adding task, Waiting for some time");  
    }  
    catch (InterruptedException e)  
    {  
        logger.error("Thread interrupted:  ()",e.getCause());  
        Thread._currentThread_().interrupt();  
    }  
    threadPoolExecutor.execute(r);  
});
Enter fullscreen mode Exit fullscreen mode

8. Create executeTask(TaskThread taskThread) method, that executes a task from BlockingQueue. Submit the task to threadPoolExecutor and copy the result into Future the object. Now the tasks are executed Asynchronously

void executeTask(TaskThread taskThread)  
{  
    Future<?> future=threadPoolExecutor.submit(taskThread);  
   System._out_.println("Queue Size: "+threadPoolExecutor.getQueue().size());  
System._out_.println("Number of Active Threads: "+threadPoolExecutor.getActiveCount());  

}
Enter fullscreen mode Exit fullscreen mode

9. To make executeTask() method Synchronous, Add while statement in the method and use future.isDone() method to check the status of the request. Once it’s done, get the result using future.get().

while (future.isDone())  
{  
   try {  
            future.get();  
            logger.info("task.employees: {}",taskThread.employees.toString());  
        }  
        catch (Exception e)  
        {  
            logger.error("Thread interrupted:  ()",e.getCause());  
        }}
Enter fullscreen mode Exit fullscreen mode

10. Now create 20000 or more tasks in EmployeeService to test the multithreading

for (int i=0;i<20000;i++)  
{  
    TaskThread taskThread=new TaskThread(employeeRepository);  
    threadPoolExecutorUtil.executeTask(taskThread);  
}
Enter fullscreen mode Exit fullscreen mode

11. Go to the database and insert some employee data

INSERT INTO \`threadpooldemo\`.\`employee\` (\`id\`, \`email\`, \`first\_name\`, \`last\_name\`, \`phone\`) VALUES ('1', 'john.doe@hj.com', 'John', 'Doe', '233323');  
INSERT INTO \`threadpooldemo\`.\`employee\` (\`id\`, \`email\`, \`first\_name\`, \`last\_name\`, \`phone\`) VALUES ('2', 'jack@hj.com', 'Jack', 'Doe', '09094044');
Enter fullscreen mode Exit fullscreen mode

12. Go to http://localhost:8080/api/v1/employee/list to see employees list returned by EmployeeRespository class without Thread Pooling

12. Go to http://localhost:8080/api/v1/employee/list/async on the browser and see IntelliJ console log and you should see following messages in the log.

....Number of Active Threads: 10  
Queue Size: 9996  
Number of Active Threads: 10  
Queue Size: 9997  
Number of Active Threads: 10  
Queue Size: 9998  
Number of Active Threads: 10  
Queue Size: 9999  
Number of Active Threads: 10  
Queue Size: 10000  
Number of Active Threads: 102019-06-19 01:18:09.983 ERROR 43110 --- \[nio-8080-exec-1\] c.p.s.service.ThreadPoolExecutorUtil     : Exception occurred while adding task, Waiting for some timeQueue Size: 5066  
Number of Active Threads: 10....
Enter fullscreen mode Exit fullscreen mode

We added 20000 tasks which more than the Queue can handle, so RejectedExecutionHandler invoked and put the thread to sleep and other threads finished operations by that time. Queue size decreased from 10000 to 5066 in the above log.

Code uploaded Github for reference. Let me know if you have any questions. Happy Coding :)

Discussion (0)