The combination of `RecursiveTask`

implementations running in a `ForkJoinPool`

allows tasks to be defined that may spawn subtasks that in turn may run asynchronously. The `ForkJoinPool`

manages efficient processing of those tasks.

This article presents a simple calculator application to evaluate a formula defined as a `List`

presents a single-threaded recursive solution, and then converts that solution to use `RecursiveTasks`

executed in a `ForkJoinPool`

.

## Recursive Solution

A formula to be computed is defined as follows:

```
public enum Operator { ADD, MULTIPLY; }
public static List<?> FORMULA =
List.of(Operator.ADD,
List.of(Operator.MULTIPLY, 1, 2, 3), 4, 5,
List.of(Operator.ADD, 6, 7, 8,
List.of(Operator.MULTIPLY, 9, 10, 11)));
```

A formula is either a `Number`

or a `List`

consisting of an `Operator`

followed by other formulae. For illustration, the Lisp equivalent of `FORMULA`

would be:

```
(+ (* 1 2 3) 4 5
(+ 6 7 8
(* 9 10 11)))
```

A `Task`

class is defined to solve formulae:

```
public static class Task {
private final Object formula;
public Task(Object formula) { this.formula = formula; }
public Integer compute() {
Integer result = null;
if (formula instanceof Number) {
result = ((Number) formula).intValue();
} else {
List<?> list = (List<?>) formula;
Operator operator = (Operator) list.get(0);
List<Task> subtasks =
list.subList(1, list.size())
.stream()
.map(Task::new)
.collect(toList());
IntStream operands =
subtasks.stream()
.map(Task::compute)
.mapToInt(Integer::intValue);
switch (operator) {
case ADD:
result = operands.sum();
break;
case MULTIPLY:
result = operands.reduce(1, (x, y) -> x * y);
break;
}
}
System.out.println(formula + " -> " + result);
return result;
}
}
```

The `compute()`

method evaluates the formula by creating another `Task`

instance to evaluate each operand recursively by calling `compute()`

. The actual mechanics are to create a `List`

of `Task`

s and then map the `Stream`

of `Task`

s to an `IntStream`

by calling `Task.compute()`

which is evaluated based on the operator.

The static `main(String[])`

function simply instantiates a `Task`

and calls `compute()`

.

```
public static void main(String[] argv) {
Task task = new Task(FORMULA);
System.out.println("Result: " + task.compute());
}
```

Which generates the following output:

```
1 -> 1
2 -> 2
3 -> 3
[MULTIPLY, 1, 2, 3] -> 6
4 -> 4
5 -> 5
6 -> 6
7 -> 7
8 -> 8
9 -> 9
10 -> 10
11 -> 11
[MULTIPLY, 9, 10, 11] -> 990
[ADD, 6, 7, 8, [MULTIPLY, 9, 10, 11]] -> 1011
[ADD, [MULTIPLY, 1, 2, 3], 4, 5, [ADD, 6, 7, 8, [MULTIPLY, 9, 10, 11]]] -> 1026
Result: 1026
```

The next chapter details how to convert this solution to use `ForkJoinPool`

to enable some level of parallel processing.

## ForkJoinPool Solution

The `Task`

class is modified to extend `RecursiveTask<Integer>`

. When a subtask is instantiated, `RecursiveTask.fork()`

is called to asynchronously execute this task in the pool the current task is running in. In the `IntStream`

, `RecursiveTask.join()`

is called to wait for the subtask to complete (if it hasn't already) and return the result of the `compute()`

method.

```
public static class Task extends RecursiveTask<Integer> {
...
@Override
public Integer compute() {
Integer result = null;
if (formula instanceof Number) {
...
} else {
...
List<Task> subtasks =
list.subList(1, list.size())
.stream()
.map(Task::new)
.peek(RecursiveTask::fork)
.collect(toList());
IntStream operands =
subtasks.stream()
.map(RecursiveTask::join)
.mapToInt(Integer::intValue);
...
}
System.out.println(Thread.currentThread() + "\t"
+ formula + " -> " + result);
return result;
}
}
```

The executing `Thread`

is included in the output to demonstrate the behavior. The `main(String[])`

function creates a `ForkJoinPool`

, the `Task`

to compute `FORMULA`

, and uses the pool to invoke the `Task`

. The result is obtained through `Task.join()`

to wait for the computation to complete.

```
public static int N = 10;
public static void main(String[] argv) {
ForkJoinPool pool = new ForkJoinPool(N);
RecursiveTask<Integer> task = new Task(FORMULA);
pool.invoke(task);
System.out.println("Result: " + task.join());
}
```

Output using 10 threads (`N = 10`

):

```
Thread[ForkJoinPool-1-worker-31,5,main] 2 -> 2
Thread[ForkJoinPool-1-worker-19,5,main] 8 -> 8
Thread[ForkJoinPool-1-worker-23,5,main] 4 -> 4
Thread[ForkJoinPool-1-worker-17,5,main] 3 -> 3
Thread[ForkJoinPool-1-worker-9,5,main] 1 -> 1
Thread[ForkJoinPool-1-worker-27,5,main] 5 -> 5
Thread[ForkJoinPool-1-worker-3,5,main] 7 -> 7
Thread[ForkJoinPool-1-worker-21,5,main] 6 -> 6
Thread[ForkJoinPool-1-worker-17,5,main] 11 -> 11
Thread[ForkJoinPool-1-worker-23,5,main] 10 -> 10
Thread[ForkJoinPool-1-worker-31,5,main] 9 -> 9
Thread[ForkJoinPool-1-worker-5,5,main] [MULTIPLY, 1, 2, 3] -> 6
Thread[ForkJoinPool-1-worker-31,5,main] [MULTIPLY, 9, 10, 11] -> 990
Thread[ForkJoinPool-1-worker-13,5,main] [ADD, 6, 7, 8, [MULTIPLY, 9, 10, 11]] -> 1011
Thread[ForkJoinPool-1-worker-19,5,main] [ADD, [MULTIPLY, 1, 2, 3], 4, 5, [ADD, 6, 7, 8, [MULTIPLY, 9, 10, 11]]] -> 1026
Result: 1026
```

And with 1 pool thread (`N = 1`

):

```
Thread[ForkJoinPool-1-worker-3,5,main] 1 -> 1
Thread[ForkJoinPool-1-worker-3,5,main] 2 -> 2
Thread[ForkJoinPool-1-worker-3,5,main] 3 -> 3
Thread[ForkJoinPool-1-worker-3,5,main] [MULTIPLY, 1, 2, 3] -> 6
Thread[ForkJoinPool-1-worker-3,5,main] 4 -> 4
Thread[ForkJoinPool-1-worker-3,5,main] 5 -> 5
Thread[ForkJoinPool-1-worker-3,5,main] 6 -> 6
Thread[ForkJoinPool-1-worker-3,5,main] 7 -> 7
Thread[ForkJoinPool-1-worker-3,5,main] 8 -> 8
Thread[ForkJoinPool-1-worker-3,5,main] 9 -> 9
Thread[ForkJoinPool-1-worker-3,5,main] 10 -> 10
Thread[ForkJoinPool-1-worker-3,5,main] 11 -> 11
Thread[ForkJoinPool-1-worker-3,5,main] [MULTIPLY, 9, 10, 11] -> 990
Thread[ForkJoinPool-1-worker-3,5,main] [ADD, 6, 7, 8, [MULTIPLY, 9, 10, 11]] -> 1011
Thread[ForkJoinPool-1-worker-3,5,main] [ADD, [MULTIPLY, 1, 2, 3], 4, 5, [ADD, 6, 7, 8, [MULTIPLY, 9, 10, 11]]] -> 1026
Result: 1026
```

Which (unsurprisingly) calculates the formulae in the same order as the recursive solution.

## Summary

Single threaded recursive solutions may be converted to `RecursiveTask`

^{1} implementations and invoked through a `ForkJoinPool`

to enable efficient processing of subtasks.

**[1]** Implementation class of `ForkJoinTask`

. ↩

## Top comments (0)