# Concurrent Programming

We now take a brief look at concurrent programming, where a program is structured so that several computations can execute concurrently during overlapping time periods. We focus on aspects of concurrency that are explicitly specified by a programmer, rather than the implicit concurrency provided by compiler optimizations or the underlying system hardware.

# Parallel Computing¶

From the 1970s through the mid-2000s, the speed of individual processor cores grew at an exponential rate. Much of this increase in speed was accomplished by increasing the clock frequency, the rate at which a processor performs basic operations. In the mid-2000s, however, this exponential increase came to an abrupt end, due to power and thermal constraints, and the speed of individual processor cores has increased much more slowly since then. Figure 46 is graph from Stanford’s CPU database that illustrates this trend:

Figure 46 Historical data of CPU clock frequencies.

Instead of increasing clock frequency, CPU manufacturers began to place multiple cores in a single processor, enabling more operations to be performed concurrently.

Parallelism is not a new concept. Large-scale parallel machines have been used for decades, primarily for scientific computing and data analysis. Even in personal computers with a single processor core, operating systems and interpreters have provided the abstraction of concurrency. This is done through context switching, or rapidly switching between different tasks without waiting for them to complete. Thus, multiple programs can run on the same machine concurrently, even if it only has a single processing core.

Given the current trend of increasing the number of processor cores, individual applications must now take advantage of parallelism in order to run faster. Within a single program, computation must be arranged so that as much work can be done in parallel as possible. However, parallelism introduces new challenges in writing correct code, particularly in the presence of shared, mutable state.

For problems that can be solved efficiently in the functional model, with no shared mutable state, parallelism poses few problems. Pure functions provide referential transparency, meaning that expressions can be replaced with their values, and vice versa, without affecting the behavior of a program. This enables expressions that do not depend on each other to be evaluated in parallel. The MapReduce framework is one system that allows functional programs to be specified and run in parallel with minimal programmer effort. Several functional languages, including NESL and Clojure, have been designed with parallelism at their core.

Unfortunately, not all problems can be solved efficiently using functional programming. The Berkeley View project has identified thirteen common computational patterns in science and engineering, only one of which is MapReduce. The remaining patterns require shared state.

In the remainder of this section, we will see how mutable shared state can introduce bugs into parallel programs and a number of approaches to prevent such bugs. We will examine these techniques in the context of two applications, a web crawler and a particle simulator.

## Parallelism in Python¶

Before we dive deeper into the details of parallelism, let us first explore Python’s support for parallel computation. Python provides two means of parallel execution: threading and multiprocessing.

In threading, multiple “threads” of execution exist within a single interpreter. Each thread executes code independently from the others, though they share the same data. However, the CPython interpreter, the main implementation of Python, only interprets code in one thread at a time, switching between them in order to provide the illusion of parallelism. On the other hand, operations external to the interpreter, such as writing to a file or accessing the network, may run in parallel.

The threading module contains classes that enable threads to be created and synchronized. The following is a simple example of a multithreaded program:

import threading

other.start()


>>> thread_hello()


The Thread constructor creates a new thread. It requires a target function that the new thread should run, as well as the arguments to that function. Calling start on a Thread object marks it ready to run. The current_thread function returns the Thread object associated with the current thread of execution.

In this example, the prints can happen in any order, since we haven’t synchronized them in any way. The output can even be interleaved on some systems.

### Multiprocessing¶

Python also supports multiprocessing, which allows a program to spawn multiple interpreters, or processes, each of which can run code independently. These processes do not generally share data, so any shared state must be communicated between processes. On the other hand, processes execute in parallel according to the level of parallelism provided by the underlying operating system and hardware. Thus, if the CPU has multiple processor cores, Python processes can truly run concurrently.

The multiprocessing module contains classes for creating and synchronizing processes. The following is the hello example using processes:

import multiprocessing

def process_hello():
other = multiprocessing.Process(target=process_say_hello,
args=())
other.start()
process_say_hello()

def process_say_hello():
print('hello from', multiprocessing.current_process().name)

>>> process_hello()
hello from MainProcess
>>> hello from Process-1


As this example demonstrates, many of the classes and functions in multiprocessing are analogous to those in threading. This example also demonstrates how lack of synchronization affects shared state, as the display can be considered shared state. Here, the interpreter prompt from the interactive process appears before the print output from the other process.

## The Problem with Shared State¶

To further illustrate the problem with shared state, let’s look at a simple example of a counter that is shared between two threads:

import threading
from time import sleep

counter = [0]     # store in a list to avoid global statements

def increment():
count = counter[0]
counter[0] = count + 1

other.start()
increment()
print('count is now: ', counter[0])


In this program, two threads attempt to increment the same counter. The CPython interpreter can switch between threads at almost any time. Only the most basic operations are atomic, meaning that they appear to occur instantly, with no switch possible during their evaluation or execution. Incrementing a counter requires multiple basic operations: read the old value, add one to it, and write the new value. The interpreter can switch threads between any of these operations.

In order to show what happens when the interpreter switches threads at the wrong time, we can attempt to force a switch by sleeping for 0 seconds:

from time import sleep

counter = [0]

def increment():
count = counter[0]
sleep(0) # try to force a switch to the other thread
counter[0] = count + 1


When this code is run, the interpreter often does switch threads at the sleep call. This can result in the following sequence of operations:

Thread 0                    Thread 1
calculate 0 + 1: 1
write 1 -> counter[0]
calculate 0 + 1: 1
write 1 -> counter[0]


The end result is that the counter has a value of 1, even though it was incremented twice! Worse, the interpreter may only switch at the wrong time very rarely, making this difficult to debug. Even with the sleep call, this program sometimes produces a correct count of 2 and sometimes an incorrect count of 1.

This problem arises only in the presence of shared data that may be mutated by one thread while another thread accesses it. Such a conflict is called a race condition, and it is an example of a bug that only exists in the parallel world.

In order to avoid race conditions, shared data that may be mutated and accessed by multiple threads must be protected against concurrent access. For example, if we can ensure that thread 1 only accesses the counter after thread 0 finishes accessing it, or vice versa, we can guarantee that the right result is computed. We say that shared data is synchronized if it is protected from concurrent access. In the next few subsections, we will see multiple mechanisms providing synchronization.

## When No Synchronization is Necessary¶

In some cases, access to shared data need not be synchronized, if concurrent access cannot result in incorrect behavior. The simplest example is read-only data. Since such data is never mutated, all threads will always read the same values regardless when they access the data.

In rare cases, shared data that is mutated may not require synchronization. However, understanding when this is the case requires a deep knowledge of how the interpreter and underlying software and hardware work. Consider the following example:

items = []
flag = []

def consume():
while not flag:
pass
print('items is', items)

def produce():
for i in range(10):
items.append(i)
flag.append('go')

consumer.start()
produce()


Here, the producer thread adds items to items, while the consumer waits until flag is non-empty. When the producer finishes adding items, it adds an element to flag, allowing the consumer to proceed.

In most Python implementations, this example will work correctly. However, a common optimization in other compilers and interpreters, and even the hardware itself, is to reorder operations within a single thread that do not depend on each other for data. In such a system, the statement flag.append('go') may be moved before the loop, since neither depends on the other for data. In general, you should avoid code like this unless you are certain that the underlying system won’t reorder the relevant operations.

## Synchronized Data Structures¶

The simplest means of synchronizing shared data is to use a data structure that provides synchronized operations. The queue module contains a Queue class that provides synchronized first-in, first-out access to data. The put method adds an item to the Queue and the get method retrieves an item. The class itself ensures that these methods are synchronized, so items are not lost no matter how thread operations are interleaved. Here is a producer/consumer example that uses a Queue:

from queue import Queue

queue = Queue()

def synchronized_consume():
while True:
print('got an item:', queue.get())

def synchronized_produce():
for i in range(10):
queue.put(i)
queue.join()

consumer.daemon = True
consumer.start()
synchronized_produce()


There are a few changes to this code, in addition to the Queue and get and put calls. We have marked the consumer thread as a daemon, which means that the program will not wait for that thread to complete before exiting. This allows us to use an infinite loop in the consumer. However, we do need to ensure that the main thread exits, but only after all items have been consumed from the Queue. The consumer calls the task_done method to inform the Queue that it is done processing an item, and the main thread calls the join method, which waits until all items have been processed, ensuring that the program exits only after that is the case.

A more complex example that makes use of a Queue is a parallel web crawler that searches for dead links on a website.

## Locks¶

When a synchronized version of a particular data structure is not available, we have to provide our own synchronization. A lock is a basic mechanism to do so. It can be acquired by at most one thread, after which no other thread may acquire it until it is released by the thread that previously acquired it.

In Python, the threading module contains a Lock class to provide locking. A Lock has acquire and release methods to acquire and release the lock, and the class guarantees that only one thread at a time can acquire it. All other threads that attempt to acquire a lock while it is already being held are forced to wait until it is released.

For a lock to protect a particular set of data, all the threads need to be programmed to follow a rule: no thread will access any of the shared data unless it owns that particular lock. In effect, all the threads need to “wrap” their manipulation of the shared data in acquire and release calls for that lock.

The following is an example of two threads incrementing a counter that is protected by a lock, avoiding a race condition:

from threading import Thread, Lock

counter = [0]
counter_lock = Lock()

def increment():
counter_lock.acquire()
count = counter[0]
counter[0] = count + 1
counter_lock.release()

other.start()
increment()
other.join()
print('count is now', counter[0])


Acquiring the lock prevents another thread from acquiring it and proceeding to increment the counter. When the lock has been acquired, the thread can be assured that no other thread can enter the critical section that is protected by the lock. Once the thread has incremented the counter, it releases the lock so that another thread can access the counter.

In this code, we had to be careful not to return until after we released the lock. In general, we have to ensure that we release a lock when we no longer need it. This can be very error-prone, particularly in the presence of exceptions, so Python locks are context managers that can be used with scope-based resource management:

def increment():
with counter_lock:
count = counter[0]
counter[0] = count + 1


The with statement ensures that counter_lock is acquired before its suite is executed and that it is released when the suite is exited for any reason.

Operations that must be synchronized with each other must use the same lock. However, two disjoint sets of operations that must be synchronized only with operations in the same set should use two different lock objects to avoid over-synchronization.

## Barriers¶

Another way to avoid conflicting access to shared data is to divide a program into phases, ensuring that shared data is mutated in a phase in which no other thread accesses it. A barrier divides a program into phases by requiring all threads to reach it before any of them can proceed. Code that is executed after a barrier cannot be concurrent with code executed before the barrier.

In Python, the threading module provides a barrier in the form of the the wait method of a Barrier instance:

counters = [0, 0]

for i in range(steps):
barrier.wait() # wait for reads to complete
barrier.wait() # wait for writes to complete

other.start()
count(0, steps)
print('counters:', counters)



In this example, reading and writing to shared data take place in different phases, separated by barriers. The writes occur in the same phase, but they are disjoint; this disjointness is necessary to avoid concurrent writes to the same data in the same phase. Since this code is properly synchronized, both counters will always be 10 at the end.

## Message Passing¶

A final mechanism to avoid improper mutation of shared data is to entirely avoid concurrent access to the same data. In Python, using multiprocessing rather than threading naturally results in this, since processes run in separate interpreters with their own data. Any state required by multiple processes can be communicated by passing messages between processes.

The Pipe function in the multiprocessing module constructs a communication channel between processes, returning a pair of connection endpoints. By default, the connection is duplex, meaning a two-way channel, though passing in the argument False results in a one-way channel. The send method on a connection sends an object over the channel, while the recv method receives an object. The latter is blocking, meaning that a process that calls recv will wait until an object is received.

The following is a producer/consumer example using processes and pipes:

def process_consume(in_pipe):
while True:
item = in_pipe.recv()
if item is None:
return
print('got an item:', item)

def process_produce(out_pipe):
for i in range(10):
out_pipe.send(i)
out_pipe.send(None) # done signal

pipe = multiprocessing.Pipe(False)
consumer = multiprocessing.Process(target=process_consume,
args=(pipe[0],))
consumer.start()
process_produce(pipe[1])


The two ends of the pipe are obtained by indexing into the result of Pipe(). Since the pipe is created as a one-way channel, the sender must use the end at index 1 and the receiver the end at index 2.

In this example, we use a None message to signal the end of communication. We also passed in one end of the pipe as an argument to the target function when creating the consumer process. This is necessary, since state must be explicitly shared between processes.

The multiprocessing module provides other synchronization mechanisms for processes, including synchronized queues, locks, and as of Python 3.3, barriers. For example, a lock or a barrier can be used to synchronize printing to the screen, avoiding the improper display output we saw previously.

## Application Examples¶

We now examine two application examples in more detail, exploring how the techniques above can be used to properly synchronize access to shared resources.

### Web Crawler¶

A web crawler is a program that systematically browses the Internet. Such a program may have several uses; one example is a crawler that validates links on a website, recursively checking that all links hosted by the site are to valid webpages. This crawler could be implemented with a work queue of URLs that need to be recursively checked and a set of URLs that have already been encountered by the program. Then for each URL in the work queue, the program would:

2. For each link on the page:

2. If the link has not been seen, then add it to both the seen set and the work queue.

Since Python threading enables network requests to be serviced concurrently, this program can be parallelized by using several threads to process different URLs. However, the shared queue and set data structures must be protected from concurrent access.

The work queue can be represented using the synchronized Queue class, since it ensures that no more than one thread can perform an operation on the Queue at a time. However, Python does not provide a synchronized set, so we must use a lock to protect access to a normal set:

seen = set()

with seen_lock:
if item not in seen:
return False
return True


A lock is necessary here, in order to prevent another thread from adding the URL to the set between this thread checking if it is in the set and adding it to the set. Furthermore, adding to a set is not atomic, so concurrent attempts to add to a set may corrupt its internal data. The already_seen() function adds the given item to the set if it is not already in there, returning whether or not the item was added.

The following then checks if a URL has been seen and adds it to the work queue if not:

work_queue = Queue()

def queue_url(url):
work_queue.put(url)


The call to already_seen() ensures that a given URL has not been seen when it is added to the work queue, so that the URL is only processed once.

### Particle Simulator¶

A particle simulator simulates the interactions between independent particles within a confined space. Each particle interacts with every other particle; for example, molecules may apply a repulsive force to other molecules based on the distance between them, resulting from the electric field of the electrons in each molecule. This interaction can be can be computed over the course of many discrete timesteps. A particle has a position, velocity, and acceleration, and a new acceleration is computed in each timestep based on the positions of the other particles. The velocity of the particle must be updated accordingly, and its position according to its velocity.

A natural way to parallelize a particle simulator is to divide the particles among several threads or processes, as illustrated in Figure 47.

Figure 47 A particle interaction can be parallelized by splitting the particles among the computational units.

Each thread or process is then responsible for computing the forces on its own particles, updating their positions and velocities accordingly. The algorithm for a single timestep on each thread can then be divided into the following phases:

1. Read the current position of every particle.

2. For each of its own particles, compute the force resulting from interactions with every other particle, using their current positions.

3. Update the velocities of its particles based on the forces computed.

4. Update the positions of its particles based on the new velocities.

In this algorithm, the positions of the particles constitute shared data and must be protected from concurrent access. The multithreaded implementation of the simulator uses barriers to separate phases 1 and 4, which access the shared data. Two barriers are required, one to ensure that all threads move together between phase 1 and 4 within a timestep, and another to ensure that they synchronously move between phase 4 in a timestep to phase 1 in the next timestep. The writes in phases 2 and 3 are to separate data on each thread, so they need not be synchronized.

An alternative algorithm is to use message passing to send copies of particle positions to other threads or processes. This is the strategy implemented by the multiprocess version of the particle simulator, with pipes used to communicate particle positions between processes in each timestep. A circular pipeline is set up between processes in order to minimize communication. Each process injects its own particles’ positions into its pipeline stage, which eventually go through a full rotation of the pipeline, as shown in Figure 48.

Figure 48 Copies of each particle can be rotated among the processes. A process computes the interaction between its own particles and the copies it sees in each step of the rotation.

At each step of the rotation, a process applies forces from the positions that are currently in its own pipeline stage on to its own particles, so that after a full rotation, all forces have been applied to its particles.

## Synchronization Pitfalls¶

While synchronization methods are effective for protecting shared state, they can also be used incorrectly, failing to accomplish the proper synchronization, over-synchronizing, or causing the program to hang as a result of deadlock.

### Under-synchronization¶

A common pitfall in parallel computing is to neglect to properly synchronize shared accesses. In the set example for the web crawler, we need to synchronize the membership check and insertion together, so that another thread cannot perform an insertion in between these two operations. Failing to synchronize the two operations together is erroneous, even if they are separately synchronized:

def already_seen(item):
with seen_lock:
present = item in seen
if not present
with seen_lock:
return not present


Here, it is possible for one thread to acquire seen_lock and see that the item is not in the set. But between releasing the lock and requiring it for insertion, another thread can obtain the lock and also see that the item is not in the set. This results in both threads thinking that they inserted the item, potentially resulting in duplicate work.

### Over-synchronization¶

Another common error is to over-synchronize a program, so that non-conflicting operations cannot occur concurrently. As a trivial example, we can avoid all conflicting access to shared data by acquiring a master lock when a thread starts and only releasing it when a thread completes. This serializes our entire code, so that nothing runs in parallel. In some cases, this can even cause our program to hang indefinitely. For example, consider a consumer/producer program in which the consumer obtains the lock and never releases it:

items = []
lock = Lock()

def consume():
with lock:
while not items:
sleep(1)    # wait for a bit
print('got an item:', items.pop())

def synchronized_produce():
with lock:
for i in range(10):
items.append(i)


This prevents the producer from producing any items, which in turn prevents the consumer from doing anything since it has nothing to consume.

While this example is trivial, in practice, programmers often over-synchronize their code to some degree, preventing their code from taking complete advantage of the available parallelism.

Because they cause threads or processes to wait on each other, synchronization mechanisms are vulnerable to deadlock, a situation in which two or more threads or processes are stuck, waiting for each other to finish. We have just seen how neglecting to release a lock can cause a thread to get stuck indefinitely. But even if threads or processes do properly release locks, programs can still reach deadlock.

The source of deadlock is a circular wait, illustrated in Figure 49 with processes. A process cannot continue because it is waiting for other processes, which are in turn waiting for the first process to complete.

Figure 49 Deadlock arises when a set of threads or processes is each waiting on another thread or process.

As an example, we will set up a deadlock with two processes. Suppose they share a duplex pipe and attempt to communicate with each other as follows:

def deadlock(in_pipe, out_pipe):
item = in_pipe.recv()
print('got an item:', item)
out_pipe.send(item + 1)

pipe = multiprocessing.Pipe()
args=(pipe[0], pipe[1]))
other.start()



Both processes attempt to receive data first. Recall that the recv method blocks until an item is available. Since neither process has sent anything, both will wait indefinitely for the other to send it data, resulting in deadlock.

Synchronization operations must be properly aligned to avoid deadlock. This may require sending over a pipe before receiving, acquiring multiple locks in the same order, and ensuring that all threads reach the right barrier at the right time.

## Conclusion¶

As we have seen, parallelism presents new challenges in writing correct and efficient code. As the trend of increasing parallelism at the hardware level will continue for the foreseeable future, parallel computation will become more and more important in application programming. There is a very active body of research on making parallelism easier and less error-prone for programmers. Our discussion here serves only as a basic introduction to this crucial area of computer science.

In parallelizing a computation, one strategy is to explicitly decompose a program over the set of workers, as we did in the previous section. Another option is to divide the work according to the natural granularity of an operation and to rely on the runtime system to schedule the work appropriately. This latter strategy can be accomplished with asynchronous tasks, where an operation is launched to be computed asynchronously, and its result used at some further point.

In C++11, an asynchronous task can be launched with the async() function template, contained in the <future> header. The first argument to async() is the function or function object representing a task, and the remaining arguments are the arguments with which to invoke that function. The following is a basic example:

void foo(int x, int y) {
cout << (x + y) << endl;
}

int main() {
async(foo, 3, 4);
async(foo, 5, 6);
}


The code above launches separate tasks to compute foo(3, 4) and foo(5, 6) asynchronously. The print outputs 7 and 11 can appear in any order, since the two tasks aren’t synchronized with respect to each other, and the outputs can even be interleaved with each other.

The return value of async() is a future object, which is a proxy for the result of the asynchronous task. In particular, the async() calls above return objects of type future<void>, since the return type of foo() is void. We can wait on the result of an asynchronous task by calling the wait() method of the corresponding future object, as in the following:

int main() {
future<void> f1 = async(foo, 3, 4);
f1.wait();
future<void> f2 = async(foo, 5, 6);
f2.wait();
}


Here, we wait for the first task to complete before launching the second. This ensures that the 7 will appear as output before the 11.

In the case of a function that returns a non-void value, we can also obtain the result by calling the get() method of the future object, which waits until the result is available and then returns the result. This is particularly useful if we have some computation that depends on the result of the task, as in the following:

int main() {
future<int> f1 = async([](int x, int y) {
return x + y;
}, 3, 4);
cout << (f1.get() + 5) << endl;
}


This launches a task to asynchronously call a lambda function, waits for the result and adds 5 to it, and prints the sum.

As a more complex example, let’s consider the tree-recursive computation of the Fibonacci sequence. The following is a sequential function to compute a Fibonacci number:

long long fib(int n) {
if (n <= 1)
return n;
return fib(n - 1) + fib(n - 2);
}


We can observe that the two recursive calls do not depend on each other, so we can compute them asynchronously by launching a separate task for one of the calls. The following code does so:

long long async_fib(int n) {
if (n <= 1)
return n;
future<long long> res1 = async(async_fib, n - 1);
long long res2 = async_fib(n - 2);
return res2 + res1.get();
}


This code uses async() to compute one recursive call, while the other call is computed in the existing task. We require the result of the asynchronous task before we can compute the sum and return, so we use get() on its future object in order to obtain its result.

As an aside, we write the two recursive calls in separate statements to ensure that the asynchronous task is launched before the recursive call that takes place in the existing task. Consider the following version that makes both calls in the same statement:

return async(async_fib, n - 1).get() + async_fib(n - 2);


In C++, the order of evaluation of the two operands to + is unspecified, so it would be valid for the compiler to produce code that sequentially computes the right-hand side before launching the asynchronous task to compute the left-hand side. This would turn the whole computation into a sequential one. Thus, we need to use statement sequencing to ensure that the asynchronous task is launched before the sequential recursive call is made.

## Limiting the Number of Tasks¶

As an example, computing async_fib(15) on the author’s quad-core iMac computer takes about 4000 times longer than fib(15), using Clang 8, due to the large number of small tasks that are launched. Instead, we need to rewrite async_fib() to do the remaining computation sequentially when a threshold is reached. The following does so, using the number of tasks launched so far to determine if the threshold has been met:

long long async_fib(int n, int tasks, int max_tasks) {
if (n <= 1)
return n;
future<long long> res1 = async(async_fib, n - 1, 2 * tasks,
return res2 + res1.get();
} else {
return fib(n - 1) + fib(n - 2);
}
}


The function takes in two extra arguments, representing the current number of tasks and the threshold value for the maximum number of tasks. If the threshold has not been reached, then the recursion proceeds as before, launching a new asynchronous task for one of the calls. In making the recursive calls, we double the number of current tasks to account for the fact that each step of the recursion doubles the number of concurrent computations. On the other hand, if the threshold has been reached, then we do the rest of the computation sequentially by calling fib(). Figure 50 is the task graph for computing async_fib(5, 1, 4), limiting the number of tasks to four.

Figure 50 Task graph for computing async_fib(5, 1, 4).

With the ability to limit the number of tasks, we find that fib(42) takes 1.63 seconds on the author’s quad-core iMac, whereas async_fib(42, 1, 512) takes 0.47 seconds, about a 3.5x speedup. The 512-task limit was determined experimentally to be close to the optimal value.

As another example, let’s write quicksort using asynchronous tasks. First, we write the sequential version as follows:

size_t partition(int *A, size_t size) {
int pivot = A[0];
size_t start = 1;
size_t end = size - 1;
while (start <= end) {
if (A[start] >= pivot)
std::swap(A[start], A[end--]);
else {
std::swap(A[start - 1], A[start]);
start++;
}
}
return start - 1;
}

void quicksort(int *A, size_t size) {
if (size <= CUTOFF) {
std::sort(A, A + size);
return;
}
int pivot = A[size/2];
std::swap(A[0], A[size/2]);
size_t pivot_index = partition(A, size);
quicksort(A, pivot_index);
quicksort(A + pivot_index + 1, size - pivot_index - 1);
}


This implements an in-place quicksort, partitioning the input array by swapping elements to the appropriate side of the pivot. We cut off the quicksort itself once we reach a small number of elements, since at that point other sorts such as insertion sort are more efficient. For simplicity, we use std::sort() when we reach the cutoff point, which will be 10 elements in our examples.

As with the Fibonacci sequence, we can launch a separate task to compute one of the recursive calls, limiting ourselves to a maximum number of tasks:

void async_quicksort(int *A, size_t size, int thread_count,
if (size <= CUTOFF) {
std::sort(A, A + size);
return;
}
int pivot = A[size/2];
std::swap(A[0], A[size/2]);
size_t pivot_index = partition(A, size);
future<void> rec1 = async(async_quicksort, A, pivot_index,
async_quicksort(A + pivot_index + 1, size - pivot_index - 1,
rec1.wait();
} else {
quicksort(A, pivot_index);
quicksort(A + pivot_index + 1, size - pivot_index - 1);
}
}


In order to ensure that the asynchronous recursive call completes before returning, we call wait() on its associated future object. Sorting ten million elements with sequential quicksort() takes 0.93 seconds on the author’s iMac, while sorting with async_quicksort() takes 0.35 seconds with the task limit at 128.

## Launch Policy¶

By default, launching an asynchronous task does not require it to be immediately run in another thread. Rather, it merely allows the task to be run concurrently. Equally valid semantically is to defer execution of the task until the wait() or get() method is called on the associated future object, obtaining lazy evaluation of the task.

We can explicitly specify whether the task should be run in a different thread or deferred until its completion is required. We do so by specifying std::launch::async or std::launch::deferred as the first argument to async(), before the function to be run:

async(std::launch::async, async_fib, n - 1, 2 * tasks, max_tasks)


Without the policy specifier, the implementation is free to follow either launch policy.

We can use the std::launch::async policy to partition work over a fixed set of computational resources, as in multithreading. As an example, the we can estimate the value of $$pi$$ by choosing random points in the range $$[(0, 0), (1, 1)]$$ and determining whether the point lies in the upper-right quadrant of the unit circle, as illustrated by the shaded area in Figure 51.

Figure 51 The value of $$pi$$ can be estimated by generating random points in $$[(0, 0), (1, 1)]$$ and counting how many lie within the upper-right quadrant of the unit circle.

The ratio of samples within the circle to total samples approximates $$\frac{\pi}{4}$$, the ratio of the area of a quadrant of the unit circle to the area of a unit square. The following sequential function implements this algorithm:

double compute_pi(size_t samples) {
default_random_engine generator;
uniform_real_distribution<> dist(0.0, 1.0);
size_t count = 0;
for (size_t i = 0; i < samples; i++) {
double x = dist(generator), y = dist(generator);
if (x * x + y * y <= 1.0)
count++;
}
return 4.0 * count / samples;
}


We use the default random-generation engine from the <random> header, along with a uniform distribution of real numbers between 0.0 and 1.0. Run sequentially for 100 million samples, the computation takes 1.86 seconds on the author’s iMac computer.

We can parallelize the computation over a fixed set of threads with the following:

double async_compute_pi(size_t samples, size_t num_workers) {
future<double> *results = new future<double>[num_workers];
for (size_t i = 0; i < num_workers; i++) {
results[i] = async(std::launch::async,
compute_pi, samples / num_workers);
}
double total = 0;
for (size_t i = 0; i < num_workers; i++) {
total += results[i].get();
}
delete[] results;

Here, we construct a new task for each worker, launching it on a new thread using the async::launch::async policy. The main thread then waits on each worker thread in turn, accumulating the results from each worker. On the author’s quad-core iMac, the computation takes 0.95 seconds for 100 million total samples with two worker threads, and 0.52 seconds with four worker threads. The latter is a speedup of about 3.6x over the sequential computation.