FJava: Fork Join for Java

By: Ankit Agarwal and Iosef Kaver


Final Report

Summary

FJava is a high level fork join framework for the Java programming language. Our framework outperforms the native Java 8 Fork Join framework under some workloads, and gets competitive results for others. With our implementation, we demonstrate that private deques are an effective work staling algorithm for a Fork Join framework. FJava also includes an instrumentation feature, that provides metrics that allow you to fine tune your program easily for your machine.


Background

In the foreseeable future, the primary way to obtain higher performance is through increased parallelism and hardware parallelization[1].

However, as we have learned from 15418, writing efficient parallel code is hard. We also learned that it is possible to build systems which can efficiently parallelize code if the problem structure is constrained. In this project, we consider the parallelization of problems which fit into the fork join model of computation. Specifically, we have implemented a framework for efficiently parallelizing fork join computations in Java.

As studied in class, fork join frameworks usually implement one of the following two approaches: child stealing and continuation stealing. For this framework, we only considered child stealing, since continuation stealing is really tough to implement without compiler support.

One important detail about this framework is that it uses private deques instead of concurrent deques. In class, we studied concurrent deques: any deque can steal work from others at any point in time. Private deques work differently, busy deques will only respond to work stealing requests from idle deques when they are done processing a task.

Private deques might sound really inefficient initially. However, for most applications they perform competitively (or better) than concurrent deques. They can be implemented with less atomic operations than concurrent deques, and they’re easier to maintain, since the communication between deques is very localized, and can be described in 2-3 operations.

With this project, we demonstrate that private deques are an effective implementation for a Fork Join framework. They perform competitively with concurrent deques, even when larger sequential thresholds are used.

For this project, we consider two implementations of private deques: receiver initiated private deques and sender initiated private deques.

For receiver initiated deques, it is the responsibility of idle deques to request work to busy deques. On the contrary, for the sender initiated deques, it is the responsibility of busy deques to find idle deques to give work.

What are the key data structures?

There are four main components in FJava framework:

  • A Fork Join Pool, called FJavaPool
  • Task Runner
  • Work Stealing Deques
  • FJavaTasks

The next figure gives a brief overview of the components of FJava’s system:

A Fork Join Pool simply manages a group of Task Runners. The Fork Join Pool is the entry point of the client to our system. The client simply submits a task to the fork join pool, and the pool gives task to any Task Runner, and afterwards waits for all of the task runners to finish.

A Task Runner is an entity that runs tasks. Each Task Runner owns a work stealing deque. The Task Runner simply gets work from the deque, and inserts new work back into the deque. The way the task runner actually does this is slightly more involved, but we will continue this discussion later on.

As we mentioned, each Task Runner has its own work stealing deque. The deques responsibility is to give work to the Task Runner when the task runner requests work. It might be the case that the deque has no work when the Task Runner makes the request. In this case, the Task Runner Deque communicates with other Task Runner Deques to get work. We call this action work stealing. Details about orchestration of the transfer are detailed in subsequent sections.

Now, what exactly are these ‘tasks’ that the client submits? We call them FJavaTasks. Basically, a FJavaTask is an object that encapsulates computation, some piece of work that needs to be done. A FJavaTask can potentially generate child tasks, that are added to the Task Runners deque to be computed later.

What are the key operations on these data structures?

  • FJavaPool

    • The only operation that the FJavaPool supports is run(FJavaTask). It simply takes an FJavaTask, and somehow computes the work represented by the FJavaTask and all of the child FJavaTasks created by it.
  • TaskRunner

    • run( ): This is the entry point for the Task Runner. The task runner will run in a loop, fetching a task from its TaskRunnerDeque and executing it, until the FJavaPool indicates that there are no more tasks left.
    • addTask(FJavaTask): This is basically a NOOP for the Task Runner, it simply forwards the call to its deque. FJavaTasks call this operation to make sure that child tasks get executed.
    • sync(FJavaTask): This is one of the complicated operations for the Task Runner. The TaskRunner will keep on completing tasks, fetched from its own deque, until all of the parent tasks children have completed.
  • TaskRunnerDeque

    • addTask(FJavaTask): Add a task to the back of the deque. It is only called by the Task Runner that owns the deque.
    • getTask( ): Gets a task for the Task Runner. If the deque is empty, it steals a task from the front of another Task Runner Deque.
    • tryLoadBalance( ): If we have too much work, try giving work to some other deque that has no work.
  • FJavaTask: Each FJavaTask supports the following four operations:

    • compute( ): This function encapsulates the computation which needs to be performed by the client.
    • runAsync( ): This operation is called on child tasks created by an FJavaTask. It means that this child task should be run asynchronously, without interrupting execution of the parent task or any other child task.
    • runSync( ): This operation is called on child tasks created by an FJavaTask. It means that the child task should be run synchronously, i.e the parent task blocks until the child task completes.
    • sync( ): This operation blocks until all of the child tasks created by the FJavaTask complete. Usually, it is called by the client at the end of every compute method.

What are the algorithm’s inputs and outputs?

The input to FJava framework is a FJavaTask. Each FJavaTask simply expresses some piece of work that needs to be done.

The output of the system is the end to end execution of the work expressed by the input FJavaTask and all of the child tasks which are created during the computation of the FJavaTask.

What is the part that is computationally expensive and could benefit from parallelization?

Certain types of problems, particularly the ones such as divide and conquer paradigm can easily be expressed in fork join model. These kind of problems can usually be expressed as a combination of multiple independent subproblems. Since the subproblems are independent, they can be executed in parallel. Thus, the speedup can be achieved by parallelizing computation of each of the subtasks.

Break down the workload. Where are the dependencies in the program? How much parallelism is there? Is it data-parallel? Where is the locality? Is it amenable to SIMD execution?

Each FJavaTask can create multiple child tasks. Each of these tasks can be executed independent of one another; possibly across multiple cores in the same machine. This is a form of task parallelism.

The parent task needs to accumulate results from all the child tasks created and may perform some computation based on the results of the child task. To achieve this, the parent task calls sync() which “blocks” until all children have finished their computation. Therefore, sync() in some sense acts like a barrier. Note that however, the thread that is running the parent task cannot simply block and do nothing while the child tasks are executing. This implementation would be terribly inefficient. Instead of doing nothing, the thread should look for more independent work to do while the child tasks execute, and after the child tasks are done, perform the sync and continue executing the parent tasks. We will go into more detail about how we implemented this in the approach section.

It was observed in our framework that most of the steals occurred either at the beginning or at the end of computation. Therefore, we expect that most of the tasks are executed by the task runner which generated it. We hypothesize that this leads to spatial and temporal locality; this is the case for the most divide and conquer algorithms which we implemented.


Approach:

Describe the technologies used. What language/APIs? What machines did you target?

We implemented our framework using Java 8. We did not target any specific machines; we expect our framework to perform well on any multi-core machine. However, most of the performance tests where done on the CMU unix machines.

Describe how you mapped the problem to your target parallel machine(s).

There isn’t much complexity here in our framework. Each Task Runner executes on its own independent thread. The deques don’t have their own independent thread, they run in the same thread as the Task Runners.

Sadly, Java has no concept of thread affinity, so we must trust Java and the OS to make the appropriate mapping of threads to cores.

Explain the approach of your framework, how is it implemented.

In this section we will explain the implementaiton details of all of the components of our system.

Task Runner

As mentioned before, all of the task runners run in a tight loop, they fetch a task from their deque, and execute the task. After they are done with the task, they repeat the loop. However, there is high complexity involved in executing a task. Executing a task may involve executing its child tasks as well, and possibly one (or multiple) sync operations.

Implementing sync wasn’t trivial. We implemented it by using another tight loop, in which the exit condition is that the parent is done with the sync operation. Inside the loop, we fetch a new task from the deque and execute it. Note that this may lead to a very deep stack trace, however, due to the nature of the work stealing deques (insert and pop from the back, steal from the front), the stack trace isn’t usually too deep.

Additionally, this implementation of sync involve additional complexity in our deques implementations. Most deque implementations assume that they can loop indefinitely until they get a task. This doesn’t work with our implementation, since quite possibly, no new tasks can be created until the parent task is done with the sync, leading all of the task runners to starvation. To fix this, we had to add additional logic to our deques to make sure they abort the get task operation whenever the parent task is done with the sync.

FJavaTask

Clients interact with our framework in two ways. First, with the FJavaPool. They use the FJavaPool to submit the first task. Secondly, they interact with the FJavaTask. They define how does the operation need to be done, and what can be executed asynchronously, and what cannot.

As mentioned before, the API of the FJavaTask is quite simple. Now we will describe it with more detail:

  • compute(): This operation is the computation that needs to be done. The client will probably indicate some logic for small tasks, this is computation that should be done sequentially. This method will also involve code for creating child tasks, and specifying which of them should run asynchronously or synchronously.
  • runAsync(): This operation is called on child tasks created by an FJavaTask. It means that this child task should be run asynchronously, without interrupting execution of the parent task or any other child task. This was quite simple to implement, the call is simply forwarded to the task runner of the parent task, and the child task is added to the back of the deque of the parent task.
  • runSync(): This operation is called on child tasks created by an FJavaTask. It means that the child task should be run synchronously, i.e the parent task blocks until the child task completes. This was quite simple to implement as well, simply set the task runner of the child task and run the task synchronously.
  • sync(): This operation blocks until all of the child tasks created by the FJavaTask complete. Usually, it is called by the client at the end of every compute method. This was complex to implement, and the implementation is described on the above TaskRunner section. The call to sync is simply forwarded to the Task Runner.

Deques

Arguably, implementing the deques was one of the most complex parts of our project. We implemented several deques:

  • Private deques:
    • Receiver initiated work stealing deque (RID). We will explain this in detail in the next section.
    • Sender initiated work stealing deque (SID). We will explain this in detail in the next section.
  • Concurrent deques:
    • LinkedListConcurrentDeque. This was easy to implement since we piggybacked on Java’s ConcurrentLinkedDeque. Performance isn’t so good though, we just implemented it as a reference solution.
    • ConcurrentArrayDeque. We implemented our own concurrent array deque, based on a paper by Chase and Lev[2]. However, the performance is not as good as Java’s Fork Join or our own RID and SID deques. We decided not to optimize this further, and focus on the RID and SID deques.
    • SharedConcurrentDeque. We implemented this as a baseline solution. The idea is: what happens if we we only have a single deque, instead of N? The answer is that your framework performs really badly. All sense of locality is lost, and the stack depth of sync and compute calls is way too high.

We defined a common interface for this deques, so we were able to test them interchangeably quite easily.

1
2
3
4
5
public interface TaskRunnerDeque {
  public void addTask(FJavaTask task);
  public FJavaTask getTask(FJavaTask task);
  public void tryLoadBalance();
}

We will explain with more detail the Receiver Initiated and Sender Initiated deques, since we spent a considerable amount of time fine-tuning this deques, and we managed to get good performance against the Java 8 Fork Join framework.

Receiver Initiated Deque

High level idea

Idle workers ask for work to busy workers. A busy worker accepts only one steal request from a single idle worker at a time. Therefore, the “request work” operation must be atomic. Whenever a busy worker is done with a task, it checks if anybody has made a steal request. If it finds any steal requests, it responds by sending work to the idle worker response cell. Note that this response needs no concurrency control at all.

Implementation details

Receiver initiated deques were originially designed by Acar, et al. 3

As it was explained before, in receiver initiated deques, the idle deques with no work request work to the busy deques.

All of the deques have a reference to the following arrays. The i-th entry of the array represents the state of the i-th deque.

  • Status array: status[i] indicates 1 if, and only if the deque i has some work to offer to other deques.
  • Request cells array: requestCells[i] = j if, and only if, the deque i is waiting for work from deque j.
  • Response cells array: responseCells[i] holds the response from the steal request made by deque i.

Whenever getTask fails because it has no tasks to offer, the deque executes the acquire function. Acquire runs indefinitely until it steals a task, or the sync operation that called getTask terminates.

In acquire, we pick a random victim deque to which we will try to steal work from. If the victim deque has work to offer, we try to atomically set our id in the requestCells entry of the victim deque. If we succeed, we loop indefinitely until we are given work, else, we retry with a new victim deque. Note that the atomic write guarantees that a deque that has work to offer will only receive at most one steal request at once.

Now, whenever a deque receives an addTask or getTask request, it checks if it has any pending steal requests (i.e requestCells[i] is not -1), if it does, it simply adds the stolen task to the responseCells array entry of the requesting deque. It may be the case that when a deque made the steal request, the victim deque had work to offer, but at the moment in which the victim deque replied, it no longer had any work to offer. In this case, it indicates NULL in its reply, and the requesting deque must try searching for work again.

Another possible problem is that we might have received a steal request while we are executing acquire. For this reason, we must check for incoming steal requests whenever we are are waiting for steal requests as well.

Sender Initiated Deque

High level idea

In sender initiated deque, a worker which is idle indicates that it is looking for work by setting its state to WAITING_TO_RECEIVE_TASK. Another worker which has work to give chooses a random victim and examines its state. If the worker finds that the victim is waiting to receive additional work, it provides a piece of a work via a CAS operation to the victim.

Implementation details

Sender initiated deques were originially designed by Acar, et al. 3

Communication cells is a shared array which is used for passing work from one worker to another. All deques have reference to the communication cells of each worker. The ith communication cell corresponds to the ith worker.

Deal time is a thread local variable which indicates the time after which a deque should attempt to delegate its task to another deque. The process of attempting to delegate a task from one deque to another is called a deal.

When a task runner (say ith) finishes the execution of a piece of work, it requests its deque for a new piece of work. There are two scenarios:

  • If deque is not empty, it simply removes the task which was last added to it and returns it to the task runner.
  • If deque is empty, it sets the ith communication cell with a sentinel task; in our code this sentinel task is referred to as “WAITING_TO_RECEIVE_TASK” which as the name suggests indicates that it is waiting to receive some task. The deque busy spins on the ith communication cell until it finds that the sentinel has been replaced with a task.

When a task which is currently being run by an ith task runner is executing, any work which is generated by it is added to its own deque. While adding a new piece of work, a deque tries to delegate some piece of work which has been lying around for the largest amount of time i.e. we try to delegate the work is at the front of the queue.

How does the queue delegate some work? A deque picks up a random victim (say jth) and looks at the victim’s communicationCell. If the deque finds that the jth communication cell is waiting to receive some work (indicated via WAITING_TO_RECEIVE_TASK), it attempts a CAS operation to assign a task to the victim.

Since the process of delegation is expensive, we do not attempt it until a certain amount of time has passed; this is calculated using the formula:


NOW - DELTA * Math.log(randomBetween(0.2, 0.9))

The problem of Sender Initiated Deques is that there is an extra parameter that needs to be tuned. The value of DELTA can seriously affect performance. A high value of DELTA can lead to work imbalance, because very few deals will take place. A low value of DELTA can lead to many deals, causing too much overhead.

What did you try that did not work? How did you arrive at your solution?

This section describes the challenges we faced while optimizing the framework. Specifically, we look at certain issues which we encountered and how we went about fixing them:

Bug on SID pseudocode

The paper3 we were using as reference for implementing Sender Initiated Private Deques had a subtle race condition in the suggested pseudocode.

Specifically, the paper assumes a sentinel incoming task which is set using a CAS instruction on a concurrent shared cell. Since the other worker is busy waiting on the concurrent shared cell, it may deque the sentinel task and attempt to execute it.

The next figure shows the code for the acquire function, called by idle deques that need tasks from other deques.

The next figure shows pseudocode for dealing a task. The buggy portion of the code has been marked with a red rectangle.

To solve it, we CAS using the reference of the top most task from deque. If the CAS succeeds, we simply remove the task from the queue.

Stale references are bad for your frameworks’ health

In the first iteration of our API each FJavaTask kept references to its child tasks. This was required in order to implement sync() operation. While testing our code against the benchmarks, we noticed a large number of GC calls and several GBs of additional memory being used. The increased GC time, coupled with huge amounts of additional memory was causing our code to run painstakingly slow! This fix was surprisingly trivial. We cleared the references to the child tasks as soon as they completed.

Subsequently, we improved the memory overhead of our framework by keeping a counter (atomic) which is incremented when a child is created, decremented when the child finishes execution. This leads to a minimalistic overhead; the references to children tasks can be garbage collected as soon as they finish execution.

Another memory leak, ouch!

We decided to rely upon Apache’s log4j framework for our logging needs. Unfortunately, we noticed that our framework was taking up too much memory. We initially suspected this to be due to a large number of FJavaTasks being created. Unfortunately, this seemed to be hurting our performance. We instrumented our code using Netbeans built in memory instrumentation framework. On further investigation, we found out that log4j was leaking memory. We later on found out this bug on apache’s website.

Cache friendly RID and SID

As mentioned previously, both sender and receiver initiated deques use data structures to communicate information between them; in practice, we used an array of atomic integer references. We realized the using an array of integers (which are modified atomically) could be leading to false sharing; we padded each integer with 15 integers. At the same time, we must point out that Java’s memory management abstract away the information of how things are actually laid out in memory. We did see performance improvements after adding the padding to the communication variables.

Experiments with @Contended

Java8 introduces an annotation to mark the fields which are expected to be modified by multiple threads. This annotation acts as a hint to the compiler to pad the annotated field with additional bytes in order to prevent sharing. However, we actually noticed that performance slightly decreased when using @Contented instead of our padded structures. We believe that the reason for this is that @Contented adds too much padding to ensure that fields are on different cache lines. It goes to extremes of making sure that there are 128 bytes between contended fields. We believe that the decrease in performance was that the excessive padding was wasting too much valuable cache space.

sun.misc.Unsafe optimizations

While researching on obscure dark corners of Java’s Fork Join framework, we came across sun.misc.Unsafe class. Optimizations in sun.misc.Unsafe class seemed to promise better performance for CAS operations, as compared to Java’s atomic integers.

Solved workload imbalance: tryLoadBalance() method

Since FJava uses private deques, it performs better when the sequential threshold used is small.

The sequential threshold indicates how small does the problem need to be before we start solving the problem directly instead of creating more child tasks. With a smaller sequential threshold, the size of the problems that will be solved sequentially is smaller. In some scenarios, a smaller sequential threshold is desirable since more work units are generated which leads to a better work balance. In other cases, a larger sequential threshold is more optimal since the overhead of generating a large number of child tasks is more than the time it takes to solve a problem sequentially. Thus, there is a tradeoff in choosing the right value of the sequential threshold.

The reason why private deques perform better with a smaller sequential threshold is because idle workers must wait for busy workers to stop working on a task before the busy worker can answer steal requests. The larger the sequential threshold, the more time busy workers will spend working on tasks instead of answering steal requests. Therefore, large sequential thresholds can lead to work imbalance when private deques are used.

To solve this problem we added to our API a tryLoadBalance call. Basically, the user can call tryLoadBalance anywhere inside the compute method. When called, the deque associated to the task runner running the task tries to service any steal requests that it has pending.

When used appropriately, FJava shows competitive performance to Java 8 Fork Join even when the sequential threshold is large. However, it is the responsibility of the user to call tryLoadBalance periodically during large computations. Note that this could be done automatically with compiler support.

We will show how tryLoadBalance impacts performance in the results section.

Implemented stealing K tasks at a time

We tried implementing stealing K tasks at a time, at least for Receiver Initiated Deques. The idea initially made sense, since we want to amortize the cost of a steal as much as possible.

However, did this not made any performance difference. In some cases, it made our algorithm slower. This makes sense: Usually, a worker only needs 1 task. In most cases, that task will generate many child tasks, filling up the deque of the before-idle worker with tasks. The additional logic to steal K tasks at a time made steals more costly, therefore decreasing performance.


Results

How did you measure performance?

We measured performance by comparing the speedup of the framework to the sequential version of the code. We also used the Java 8 native Fork Join implementation as a benchmark.

Description of the experimental setup

We implemented several Divide and Conquer problems to measure the performance of our framework.

Matrix Multiplication: Multiply two 2048x2048 matrices recursively.

The recursive formulation of the following problem isn’t too complex. The next figure describes the recursive formulation:

We recursively divide each matrix into 4 blocks. Note that each step involves 8 recursive computations, and some of them need to be executed sequentially. In the figure above, AE and BG must be run sequentially, but they don’t have dependencies with any of the other subproblems. Basically, we will end up with 4 subproblems that can be run completely in parallel, and each of them will consist of 2 sub-subproblems that need to be executed sequentially. Here is an example implementation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  public void compute() {
    if(size <= Definitions.MATRIX_MULT_SEQ_THRESHOLD) {
      multiplySequential();
      return;
    }
    int mid = size/2;
    FJavaSeq seq1 = new FJavaSeq(
        new FJavaMatrixMultiplication(A, B, C, mid, aRow, aCol,     bRow,     bCol, cRow, cCol),
        new FJavaMatrixMultiplication(A, B, C, mid, aRow, aCol+mid, bRow+mid, bCol, cRow, cCol)
        );
    FJavaSeq seq2 = new FJavaSeq(
        new FJavaMatrixMultiplication(A, B, C, mid, aRow, aCol,     bRow,     bCol+mid, cRow, cCol+mid),
        new FJavaMatrixMultiplication(A, B, C, mid, aRow, aCol+mid, bRow+mid, bCol+mid, cRow, cCol+mid)
        );
    FJavaSeq seq3 = new FJavaSeq(
        new FJavaMatrixMultiplication(A, B, C, mid, aRow+mid, aCol,     bRow,     bCol,     cRow+mid, cCol),
        new FJavaMatrixMultiplication(A, B, C, mid, aRow+mid, aCol+mid, bRow+mid, bCol, cRow+mid, cCol)
        );
    FJavaSeq seq4 = new FJavaSeq(
        new FJavaMatrixMultiplication(A, B, C, mid, aRow+mid, aCol,     bRow,     bCol+mid, cRow+mid, cCol+mid),
        new FJavaMatrixMultiplication(A, B, C, mid, aRow+mid, aCol+mid, bRow+mid, bCol+mid, cRow+mid, cCol+mid)
        );
    seq1.runAsync(this);
    seq2.runAsync(this);
    seq3.runAsync(this);
    seq4.runSync(this);
    sync();
  }

LU Decomposition: Decompose a 4096x4096 matrix.

This was one of the most complex problems we tested our framework on. Background material on LU decomposition can be found here and here.

As a disclaimer, our code for LU is heavily based on the code developed in this paper. 4

We won’t post the entire code for LU decomposition here, but you can see it in Github. As a sample, we post one of the 4 recursive tasks involved in the computation. Schur, Lower and Upper involves the creation of 4-8 additional child tasks each!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public void compute() {
  if (size == BLOCK_SIZE) {
    lu();
  }
  else {
    int h = size / 2;

    Block M00 = new Block(M.m, M.loRow,   M.loCol);
    Block M01 = new Block(M.m, M.loRow,   M.loCol+h);
    Block M10 = new Block(M.m, M.loRow+h, M.loCol);
    Block M11 = new Block(M.m, M.loRow+h, M.loCol+h);

    new FJavaSeq(
        //Compute A00 = L00 * U00 (we get L00 and U00 from this step)
        //After this step, M00 will be LU decomposed.
        //(Top right half of M00 is U00, bottom left is L00)
        new LowerUpper(h, M00),
        //Now, compute in parallel:
        //L10 = A10 * U00-1 and U01 = L00-1 * A01
        //Store L10 in M01, and U01 in M10.
        //After this step, M01 and M10 will be LU decomposed.
        new FJavaConcurrent(
            new Lower(h, M00, M01),
            new Upper(h, M00, M10)
        ),
        //Now, calculate Schur's complement 
        //S = A11 - L10 * U01, store in M11
        new Schur(h, M10, M01, M11),
        //Now, LU decompose S = A11 - L10 * U01
        //M11 = L11 * U11
        //Store in M11.
        //After this step, M11 will be LU decomposed.
        new LowerUpper(h, M11)
    ).runSync(this);
  }

The results given by LU are the most interesting since this problem causes the highest number of steals (around 15%). We will discuss them in a latter section.

Primes: Call isPrime for an array of 5,000,000 numbers.

This is basically a map function that maps each element of the array to a boolean, is prime or not.

We implemented the map function similar to how it was shown in lecture.

Fibonacci: Solve fibonacci(50) recursively.

Not much to be said here, it is just the basic F(n) = F(n-1) + F(n-2).

QuickSort: Sort 10,000,000 longs using QuickSort.

Again, not much to be said here. It is just a basic QuickSort implementation.

Provide graphs of speedup or execute time

The next figure shows the speedup of FJava and Java 8 native Fork Join relative to the sequential version on a 10 core machine, for the input sizes described before. As a review, here are the input sizes used:

  • Primes: Call isPrime for an array of 5,000,000 numbers.
  • Matrix Multiplication: Multiply two 2048x2048 matrices recursively.
  • Fibonacci: Solve fibonacci(50) recursively.
  • QuickSort: Sort 10,000,000 longs using QuickSort.
  • LU Decomposition: Decompose a 4096x4096 matrix.

For the next figure, the RID algorithm was used in FJava.

The next figure shows the speedup achieved by FJava and Java Fork Join relative to the sequential version of the code for the Matrix Multiplication problem, as we increase the number of cores. We can see the speedup grows linearly until we reach 12 cores for both frameworks.

An important thing to notice is that the number of steals varies according to the algorithm used, and, more importantly, it varies greatly according to the problem. The next figure shows the number of steals for each of the problems we tested with our framework, when run on a 12 core machine. Note that the Y axis is in Log scale.

We must note, that if we decreased the value of Delta, the number of steals for SID would increase. Recall that Delta indicates how often do we make deal attempts in the SID algorithm (how often do busy deques check if idle deques need work).

The next figure shows how the value of Delta can greatly influence the number of steals, and more importantly the total runtime of the algorithm. In the figure, we show the results for the LU decomposition problem on a 12 core machine.

The speedup we gain with our framework is largely influenced by the problem size. If the problem size is too small, the overhead of spawning threads, in addition to the communication overhead of the framework makes the sequential version of the code run slightly faster. The next figure describes this behavior.

The speedup reported is relative to the sequential version of the code.

Note that SID might perform better with a different value of Delta. Although, from the experiments we have done, we have noticed that SID performs better when the number of steals isn’t so large.

The next figure shows the speedup achieved by each of our deque implementations, and Java 8 native Fork Join implementation, relative to the sequential version of the code on a 12 core machine.

What limited your speedup?

FJava achieves good workload balance even with a large number of cores. The next figure shows how much time each worker spends doing actual work versus waiting for tasks when the Receiver Initiated algorithm is used for the LU decomposition task for a 4096x4096 matrix. Please note that in this task, about 2 million tasks are created, and about 15% of them are stolen.

As we can see, the overhead of getTask isn’t too large (less than 5% of the total time).

The results for the Sender Initiated algorithm vary greatly depending on the value of Delta chosen. The next figure shows how much time each worker spends doing actual work versus waiting for tasks when Delta = 1010.

As we can see, the workload distribution is highly skewed. Basically, core 0 does all of the work. However, when a good value of Delta is used, the Sender Initiated algorithm also achieves good workload balance. The next graph shows the distribution for Delta = 5000.

However, for tasks that have lesser number of steals, the overhead of getTask is almost non existent. The next graph shows the time distribution for the Receiver Initiated algorithm in the Primes task.

The next figure shows the time distribution for the Sender Initiated algorithm in the Primes task.

So, what is limiting speedup? In the LU task, we could improve our algorithms by decreasing the runtime of getTask. Each core spends about 5% of its time in getTask, therefore making steals more efficient might lead to a lower run time.

However, from the graphs we can infer that an important factor that limits performance is that a non trivial fraction of the problem happens to be sequential. For instance, in the LU problem, we must decompose several 16x16 matrices without any paralellism. It would be interesting to perhaps try using SIMD for this sequential aspect of the problem, but we didn’t get around doing this.

The obvious next question to ask is: well, if the sequential aspect of the task is limiting performance, then why not simply decrease the sequential threshold? This way, the tasks would be smaller and the inherently sequential part of the code would be minimized.

The next figure shows what happens if we decrease the sequential threshold of the LU decomposition task from 16x16 to 4x4.

Interestingly, both the compute time and the getTask time increased. This is not too hard to understand. Decreasing the sequential threshold incurs a larger overhead of creating more tasks. This involves both a memory overhead and a framework overhead, since we must store a larger portion of the recursion tree, and also, more work units are created.

Therefore, we could go in two different directions to try to improve our speedup. The first one, is trying to make the sequential aspect of the code to run faster, as mentioned before, using techniques such as SIMD to perform them as quickly as possible. The other idea is to somehow minimize the memory footprint and the framework overhead when creating a large number of tasks. This may involve writing even more efficient work stealing strategies.

Did you achieve your goals?

We strongly believe that we achieved all of the goals we proposed at the beginning of the project. We developed a Fork Join framework from scratch that achieves competitive performance to Java 8 native Fork Join framework.

Additionally, we developed an instrumentation framework that allowed us to gather statistics and easily tune our framework to achieve higher speedups in the workloads that we tested. Without this instrumentation framework, we wouldn’t have been able to discover many of the bugs and bottlenecks that we found halfway through our project.

Lastly, we believe that we developed an API that is friendly to use. The idea of using runAsync, runSync and sync to indicate parallel work seems intuitive and the mapping of the sequential version of the algorithm to our framework is fairly straightforward. Also, the instrumentation framework allows programmers to identify bottlenecks which limit performance.

Work distribution

Equal work was done by both members of the team.


References


[1] Fatahalian, K. (2015, April 29). Slide View : Parallel Computer Architecture and Programming : 15-418/618 Spring 2015. http://15418.courses.cs.cmu.edu/spring2015/lecture/wrapup/slide_037

[2] David Chase and Yossi Lev. 2005. Dynamic circular work-stealing deque. In Proceedings of the seventeenth annual ACM symposium on Parallelism in algorithms and architectures (SPAA ‘05). ACM, New York, NY, USA, 21-28. DOI=10.1145/1073970.1073974 http://doi.acm.org/10.1145/1073970.1073974

[3] Umut A. Acar, Arthur Chargueraud, and Mike Rainey. 2013. Scheduling parallel programs by work stealing with private deques. SIGPLAN Not. 48, 8 (February 2013), 219-228. DOI=10.1145/2517327.2442538 http://doi.acm.org/10.1145/2517327.2442538

[4] Doug Lea. 2000. A Java fork/join framework. In Proceedings of the ACM 2000 conference on Java Grande (JAVA ‘00). ACM, New York, NY, USA, 36-43. DOI=10.1145/337449.337465 http://doi.acm.org/10.1145/337449.337465