### Introduction

The Fork/Join framework is a framework to solve a problem using a concurrent divide-and-conquer approach. They were introduced to complement the existing concurrency API. Before their introduction, the existing ExecutorService implementations were the popular choice to run asynchronous tasks, but they work best when the tasks are homogenous and independent. Running dependent tasks and combining their results using those implementations were not easy. With the introduction of the Fork/Join framework, an attempt was made to address this shortcoming. In this post, we will take a brief look at the API and solve a couple of simple problems to understand how they work.

Let's jump directly into code. Let's create a task which would return the sum of all elements of a List. The following steps represent our algorithm in pseudo-code:
1. Find the middle index of the list
2. Divide the list in the middle
3. Recursively create a new task which will compute the sum of the left part
4. Recursively create a new task which will compute the sum of the right part
5. Add the result of the left sum, the middle element, and the right sum
Here is the code -
``````@Slf4j
public class ListSummer extends RecursiveTask<Integer> {
private final List<Integer> listToSum;

ListSummer(List<Integer> listToSum) {
this.listToSum = listToSum;
}

@Override
protected Integer compute() {
if (listToSum.isEmpty()) {
log.info("Found empty list, sum is 0");
return 0;
}

int middleIndex = listToSum.size() / 2;
log.info("List {}, middle Index: {}", listToSum, middleIndex);

List<Integer> leftSublist = listToSum.subList(0, middleIndex);
List<Integer> rightSublist = listToSum.subList(middleIndex + 1, listToSum.size());

ListSummer leftSummer = new ListSummer(leftSublist);
ListSummer rightSummer = new ListSummer(rightSublist);

leftSummer.fork();
rightSummer.fork();

Integer leftSum = leftSummer.join();
Integer rightSum = rightSummer.join();
int total = leftSum + listToSum.get(middleIndex) + rightSum;
log.info("Left sum is {}, right sum is {}, total is {}", leftSum, rightSum, total);

}
}```
```
Firstly, we extend the RecursiveTask subtype of the ForkJoinTask. This is the type to extend from when we expect our concurrent task to return a result. When a task does not return a result but only perform an effect, we extend the RecursiveAction subtype. For most of the practical tasks that we solve, these two subtypes are sufficient.

Secondly, both RecursiveTask and RecursiveAction define an abstract compute method. This is where we put our computation.

Thirdly, inside our compute method, we check the size of the list that is passed through the constructor. If it is empty, we already know the result of the sum which is zero, and we return immediately. Otherwise, we divide our lists into two sublists and create two instances of our ListSummer type. We then call the fork() method (defined in ForkJoinTask) on these two instances -
``````leftSummer.fork();
rightSummer.fork();
``````
Which cause these tasks to be scheduled for asynchronous execution, the exact mechanism which is used for this purpose will be explained later in this post.

After that, we invoke the join() method (also defined in ForkJoinTask) to wait for the result of these two parts -
``````Integer leftSum = leftSummer.join();
Integer rightSum = rightSummer.join();
``````
Which are then summed with the middle element of the list to get the final result.

Plenty of log messages have been added to make the example easier to understand. However, when we process a list containing thousands of entries, it might not be a good idea to have this detailed logging, especially logging the entire list.

That's pretty much it. Let's create a test class now for a test run -
``````public class ListSummerTest {

@Test
public void shouldSumEmptyList() {
ListSummer summer = new ListSummer(List.of());
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(summer);

int result = summer.join();

assertThat(result).isZero();
}

@Test
public void shouldSumListWithOneElement() {
ListSummer summer = new ListSummer(List.of(5));
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(summer);

int result = summer.join();

assertThat(result).isEqualTo(5);
}

@Test
public void shouldSumListWithMultipleElements() {
ListSummer summer = new ListSummer(List.of(
1, 2, 3, 4, 5, 6, 7, 8, 9
));
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(summer);

int result = summer.join();

assertThat(result).isEqualTo(45);
}
}
``````

Earlier, when we invoked fork() on our leftSummer and rightSummer task instances, they got pushed into the work queue of the executing thread, after which they were "stolen" by other active threads in the pool (and so on) since they did not have anything else to do at that point.

Pretty cool, right?

The problem we solved just now is non-blocking in nature. If we want to solve a problem which does some blocking operation, then to have a better throughput we will need to change our strategy.
Let's examine this with another example. Let's say we want to create a very simple web crawler. This crawler will receive a list of HTTP links, execute GET requests to fetch the response bodies, and then calculate the response length. Here is the code -
``````@Slf4j
public class ResponseLengthCalculator extends RecursiveTask<Map<String, Integer>> {

}

@Override
protected Map<String, Integer> compute() {
return Collections.emptyMap();
}

int middle = links.size() / 2;
ResponseLengthCalculator leftPartition = new ResponseLengthCalculator(links.subList(0, middle));

log.info("Forking left partition");
leftPartition.fork();
log.info("Left partition forked, now forking right partition");
rightPartition.fork();
log.info("Right partition forked");

String response;
try {
ForkJoinPool.managedBlock(httpRequester);
response = httpRequester.response;
} catch (InterruptedException ex) {
log.error("Error occurred while trying to implement blocking link fetcher", ex);
response = "";
}

Map<String, Integer> responseMap = new HashMap<>(links.size());

log.info("Left map {}, middle length {}, right map {}", leftLinks, response.length(), rightLinks);

return responseMap;
}

private static class HttpRequester implements ForkJoinPool.ManagedBlocker {
private String response;

}

@Override
public boolean block() {
CloseableHttpClient client = HttpClientBuilder
.create()
.disableRedirectHandling()
.build();

log.info("Executing blocking request for {}", link);

try (client; CloseableHttpResponse response = client.execute(headRequest)) {
this.response = EntityUtils.toString(response.getEntity());
} catch (IOException e) {
log.error("Error while trying to fetch response from link {}: {}", link, e.getMessage());
this.response = "";
}
return true;
}

@Override
public boolean isReleasable() {
return false;
}
}
}
``````
We create an implementation of the ForkJoinPool.ManagedBlocker where we put the blocking HTTP call. This interface defines two methods - block() and isReleasable(). The block() method is where we put our blocking call. After we are done with our blocking operation, we return true indicating that no further blocking is necessary. We return false from the isReleasable() implementation to indicate to a fork-join worker thread that the block() method implementation is potentially blocking in nature. The isReleasable() implementation will be invoked by a fork-join worker thread first before it invokes the block() method. Finally, we submit our  HttpRequester instance to our pool by invoking ForkJoinPool.managedBlock() static method. After that our blocking task will start executing. When it blocks on the HTTP request, the ForkJoinPool.managedBlock() method will also arrange for a spare thread to be activated if necessary to ensure sufficient parallelism.
Let's take this implementation for a test drive then! Here's the code -
``````public class ResponseLengthCalculatorTest {

@Test
public void shouldReturnEmptyMapForEmptyList() {
ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(Collections.emptyList());
ForkJoinPool pool = new ForkJoinPool();

pool.submit(responseLengthCalculator);

Map<String, Integer> result = responseLengthCalculator.join();
assertThat(result).isEmpty();
}

@Test
public void shouldHandle200Ok() {
ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of(
"http://httpstat.us/200"
));
ForkJoinPool pool = new ForkJoinPool();

pool.submit(responseLengthCalculator);

Map<String, Integer> result = responseLengthCalculator.join();
assertThat(result)
.hasSize(1)
.containsKeys("http://httpstat.us/200")
.containsValue(0);
}

@Test
public void shouldFetchResponseForDifferentResponseStatus() {
ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of(
"http://httpstat.us/200",
"http://httpstat.us/302",
"http://httpstat.us/404",
"http://httpstat.us/502"
));
ForkJoinPool pool = new ForkJoinPool();

pool.submit(responseLengthCalculator);

Map<String, Integer> result = responseLengthCalculator.join();
assertThat(result)
.hasSize(4);
}
}
``````
That's it for today, folks! As always, any feedback/improvement suggestions/comments are highly appreciated!

All the examples discussed here can be found on Github.

A big shout out to the awesome http://httpstat.us service, it was quite helpful for developing the simple tests.