Concurrent Programming
We now take a brief look at concurrent programming, where a program is structured so that several computations can execute concurrently during overlapping time periods. We focus on aspects of concurrency that are explicitly specified by a programmer, rather than the implicit concurrency provided by compiler optimizations or the underlying system hardware.
Parallel Computing
From the 1970s through the mid-2000s, the speed of individual processor cores grew at an exponential rate. Much of this increase in speed was accomplished by increasing the clock frequency, the rate at which a processor performs basic operations. In the mid-2000s, however, this exponential increase came to an abrupt end, due to power and thermal constraints, and the speed of individual processor cores has increased much more slowly since then. Figure 49 is graph from Stanford’s CPU database that illustrates this trend:
Instead of increasing clock frequency, CPU manufacturers began to place multiple cores in a single processor, enabling more operations to be performed concurrently.
Parallelism is not a new concept. Large-scale parallel machines have been used for decades, primarily for scientific computing and data analysis. Even in personal computers with a single processor core, operating systems and interpreters have provided the abstraction of concurrency. This is done through context switching, or rapidly switching between different tasks without waiting for them to complete. Thus, multiple programs can run on the same machine concurrently, even if it only has a single processing core.
Given the current trend of increasing the number of processor cores, individual applications must now take advantage of parallelism in order to run faster. Within a single program, computation must be arranged so that as much work can be done in parallel as possible. However, parallelism introduces new challenges in writing correct code, particularly in the presence of shared, mutable state.
For problems that can be solved efficiently in the functional model, with no shared mutable state, parallelism poses few problems. Pure functions provide referential transparency, meaning that expressions can be replaced with their values, and vice versa, without affecting the behavior of a program. This enables expressions that do not depend on each other to be evaluated in parallel. The MapReduce framework is one system that allows functional programs to be specified and run in parallel with minimal programmer effort. Several functional languages, including NESL and Clojure, have been designed with parallelism at their core.
Unfortunately, not all problems can be solved efficiently using functional programming. The Berkeley View project has identified thirteen common computational patterns in science and engineering, only one of which is MapReduce. The remaining patterns require shared state.
In the remainder of this section, we will see how mutable shared state can introduce bugs into parallel programs and a number of approaches to prevent such bugs. We will examine these techniques in the context of two applications, a web crawler and a particle simulator.
Parallelism in Python
Before we dive deeper into the details of parallelism, let us first explore Python’s support for parallel computation. Python provides two means of parallel execution: threading and multiprocessing.
Threading
In threading, multiple “threads” of execution exist within a single interpreter. Each thread executes code independently from the others, though they share the same data. However, the CPython interpreter, the main implementation of Python, only interprets code in one thread at a time, switching between them in order to provide the illusion of parallelism. On the other hand, operations external to the interpreter, such as writing to a file or accessing the network, may run in parallel.
The threading
module contains classes that enable threads to be
created and synchronized. The following is a simple example of a
multithreaded program:
import threading
def thread_hello():
other = threading.Thread(target=thread_say_hello, args=())
other.start()
thread_say_hello()
def thread_say_hello():
print('hello from', threading.current_thread().name)
>>> thread_hello()
hello from Thread-1
hello from MainThread
The Thread
constructor creates a new thread. It requires a target
function that the new thread should run, as well as the arguments to
that function. Calling start
on a Thread
object marks it ready
to run. The current_thread
function returns the Thread
object
associated with the current thread of execution.
In this example, the prints can happen in any order, since we haven’t synchronized them in any way. The output can even be interleaved on some systems.
Multiprocessing
Python also supports multiprocessing, which allows a program to spawn multiple interpreters, or processes, each of which can run code independently. These processes do not generally share data, so any shared state must be communicated between processes. On the other hand, processes execute in parallel according to the level of parallelism provided by the underlying operating system and hardware. Thus, if the CPU has multiple processor cores, Python processes can truly run concurrently.
The multiprocessing
module contains classes for creating and
synchronizing processes. The following is the hello example using
processes:
import multiprocessing
def process_hello():
other = multiprocessing.Process(target=process_say_hello,
args=())
other.start()
process_say_hello()
def process_say_hello():
print('hello from', multiprocessing.current_process().name)
>>> process_hello()
hello from MainProcess
>>> hello from Process-1
As this example demonstrates, many of the classes and functions in
multiprocessing
are analogous to those in threading
. This
example also demonstrates how lack of synchronization affects shared
state, as the display can be considered shared state. Here, the
interpreter prompt from the interactive process appears before the
print output from the other process.
When No Synchronization is Necessary
In some cases, access to shared data need not be synchronized, if concurrent access cannot result in incorrect behavior. The simplest example is read-only data. Since such data is never mutated, all threads will always read the same values regardless when they access the data.
In rare cases, shared data that is mutated may not require synchronization. However, understanding when this is the case requires a deep knowledge of how the interpreter and underlying software and hardware work. Consider the following example:
items = []
flag = []
def consume():
while not flag:
pass
print('items is', items)
def produce():
for i in range(10):
items.append(i)
flag.append('go')
consumer = threading.Thread(target=consume, args=())
consumer.start()
produce()
Here, the producer thread adds items to items
, while the consumer
waits until flag
is non-empty. When the producer finishes adding
items, it adds an element to flag
, allowing the consumer to
proceed.
In most Python implementations, this example will work correctly.
However, a common optimization in other compilers and interpreters,
and even the hardware itself, is to reorder operations within a single
thread that do not depend on each other for data. In such a system,
the statement flag.append('go')
may be moved before the loop,
since neither depends on the other for data. In general, you should
avoid code like this unless you are certain that the underlying system
won’t reorder the relevant operations.
Synchronized Data Structures
The simplest means of synchronizing shared data is to use a data
structure that provides synchronized operations. The queue
module
contains a Queue
class that provides synchronized first-in,
first-out access to data. The put
method adds an item to the
Queue
and the get
method retrieves an item. The class itself
ensures that these methods are synchronized, so items are not lost no
matter how thread operations are interleaved. Here is a
producer/consumer example that uses a Queue
:
from queue import Queue
queue = Queue()
def synchronized_consume():
while True:
print('got an item:', queue.get())
queue.task_done()
def synchronized_produce():
for i in range(10):
queue.put(i)
queue.join()
consumer = threading.Thread(target=synchronized_consume, args=())
consumer.daemon = True
consumer.start()
synchronized_produce()
There are a few changes to this code, in addition to the Queue
and
get
and put
calls. We have marked the consumer thread as a
daemon, which means that the program will not wait for that thread
to complete before exiting. This allows us to use an infinite loop in
the consumer. However, we do need to ensure that the main thread
exits, but only after all items have been consumed from the Queue
.
The consumer calls the task_done
method to inform the Queue
that it is done processing an item, and the main thread calls the
join
method, which waits until all items have been processed,
ensuring that the program exits only after that is the case.
A more complex example that makes use of a Queue
is a parallel
web crawler that searches for dead links on a website.
Locks
When a synchronized version of a particular data structure is not available, we have to provide our own synchronization. A lock is a basic mechanism to do so. It can be acquired by at most one thread, after which no other thread may acquire it until it is released by the thread that previously acquired it.
In Python, the threading
module contains a Lock
class to
provide locking. A Lock
has acquire
and release
methods to
acquire and release the lock, and the class guarantees that only one
thread at a time can acquire it. All other threads that attempt to
acquire a lock while it is already being held are forced to wait until
it is released.
For a lock to protect a particular set of data, all the threads need
to be programmed to follow a rule: no thread will access any of the
shared data unless it owns that particular lock. In effect, all the
threads need to “wrap” their manipulation of the shared data in
acquire
and release
calls for that lock.
The following is an example of two threads incrementing a counter that is protected by a lock, avoiding a race condition:
from threading import Thread, Lock
counter = [0]
counter_lock = Lock()
def increment():
counter_lock.acquire()
count = counter[0]
counter[0] = count + 1
counter_lock.release()
other = Thread(target=increment, args=())
other.start()
increment()
other.join()
print('count is now', counter[0])
Acquiring the lock prevents another thread from acquiring it and proceeding to increment the counter. When the lock has been acquired, the thread can be assured that no other thread can enter the critical section that is protected by the lock. Once the thread has incremented the counter, it releases the lock so that another thread can access the counter.
In this code, we had to be careful not to return until after we released the lock. In general, we have to ensure that we release a lock when we no longer need it. This can be very error-prone, particularly in the presence of exceptions, so Python locks are context managers that can be used with scope-based resource management:
def increment():
with counter_lock:
count = counter[0]
counter[0] = count + 1
The with
statement ensures that counter_lock
is acquired
before its suite is executed and that it is released when the suite is
exited for any reason.
Operations that must be synchronized with each other must use the same lock. However, two disjoint sets of operations that must be synchronized only with operations in the same set should use two different lock objects to avoid over-synchronization.
Barriers
Another way to avoid conflicting access to shared data is to divide a program into phases, ensuring that shared data is mutated in a phase in which no other thread accesses it. A barrier divides a program into phases by requiring all threads to reach it before any of them can proceed. Code that is executed after a barrier cannot be concurrent with code executed before the barrier.
In Python, the threading
module provides a barrier in the form of
the the wait
method of a Barrier
instance:
counters = [0, 0]
barrier = threading.Barrier(2)
def count(thread_num, steps):
for i in range(steps):
other = counters[1 - thread_num]
barrier.wait() # wait for reads to complete
counters[thread_num] = other + 1
barrier.wait() # wait for writes to complete
def threaded_count(steps):
other = threading.Thread(target=count, args=(1, steps))
other.start()
count(0, steps)
print('counters:', counters)
threaded_count(10)
In this example, reading and writing to shared data take place in different phases, separated by barriers. The writes occur in the same phase, but they are disjoint; this disjointness is necessary to avoid concurrent writes to the same data in the same phase. Since this code is properly synchronized, both counters will always be 10 at the end.
Message Passing
A final mechanism to avoid improper mutation of shared data is to entirely avoid concurrent access to the same data. In Python, using multiprocessing rather than threading naturally results in this, since processes run in separate interpreters with their own data. Any state required by multiple processes can be communicated by passing messages between processes.
The Pipe
function in the multiprocessing
module constructs a
communication channel between processes, returning a pair of
connection endpoints. By default, the connection is duplex, meaning a
two-way channel, though passing in the argument False
results in a
one-way channel. The send
method on a connection sends an object
over the channel, while the recv
method receives an object. The
latter is blocking, meaning that a process that calls recv
will
wait until an object is received.
The following is a producer/consumer example using processes and pipes:
def process_consume(in_pipe):
while True:
item = in_pipe.recv()
if item is None:
return
print('got an item:', item)
def process_produce(out_pipe):
for i in range(10):
out_pipe.send(i)
out_pipe.send(None) # done signal
pipe = multiprocessing.Pipe(False)
consumer = multiprocessing.Process(target=process_consume,
args=(pipe[0],))
consumer.start()
process_produce(pipe[1])
The two ends of the pipe are obtained by indexing into the result of
Pipe()
. Since the pipe is created as a one-way channel, the sender
must use the end at index 1 and the receiver the end at index 2.
In this example, we use a None
message to signal the end of
communication. We also passed in one end of the pipe as an argument to
the target function when creating the consumer process. This is
necessary, since state must be explicitly shared between processes.
The multiprocessing
module provides other synchronization
mechanisms for processes, including synchronized queues, locks, and as
of Python 3.3, barriers. For example, a lock or a barrier can be used
to synchronize printing to the screen, avoiding the improper display
output we saw previously.
Application Examples
We now examine two application examples in more detail, exploring how the techniques above can be used to properly synchronize access to shared resources.
Web Crawler
A web crawler is a program that systematically browses the Internet. Such a program may have several uses; one example is a crawler that validates links on a website, recursively checking that all links hosted by the site are to valid webpages. This crawler could be implemented with a work queue of URLs that need to be recursively checked and a set of URLs that have already been encountered by the program. Then for each URL in the work queue, the program would:
Load the webpage, parsing it for outgoing links.
For each link on the page:
Check if the link has already been seen.
If the link has not been seen, then add it to both the seen set and the work queue.
Since Python threading enables network requests to be serviced concurrently, this program can be parallelized by using several threads to process different URLs. However, the shared queue and set data structures must be protected from concurrent access.
The work queue can be represented using the synchronized Queue
class, since it ensures that no more than one thread can perform an
operation on the Queue
at a time. However, Python does not provide
a synchronized set, so we must use a lock to protect access to a
normal set:
seen = set()
seen_lock = threading.Lock()
def already_seen(item):
with seen_lock:
if item not in seen:
seen.add(item)
return False
return True
A lock is necessary here, in order to prevent another thread from
adding the URL to the set between this thread checking if it is in the
set and adding it to the set. Furthermore, adding to a set is not
atomic, so concurrent attempts to add to a set may corrupt its
internal data. The already_seen()
function adds the given item to
the set if it is not already in there, returning whether or not the
item was added.
The following then checks if a URL has been seen and adds it to the work queue if not:
work_queue = Queue()
def queue_url(url):
if not already_seen(url):
work_queue.put(url)
The call to already_seen()
ensures that a given URL has not been
seen when it is added to the work queue, so that the URL is only
processed once.
Particle Simulator
A particle simulator simulates the interactions between independent particles within a confined space. Each particle interacts with every other particle; for example, molecules may apply a repulsive force to other molecules based on the distance between them, resulting from the electric field of the electrons in each molecule. This interaction can be can be computed over the course of many discrete timesteps. A particle has a position, velocity, and acceleration, and a new acceleration is computed in each timestep based on the positions of the other particles. The velocity of the particle must be updated accordingly, and its position according to its velocity.
A natural way to parallelize a particle simulator is to divide the particles among several threads or processes, as illustrated in Figure 50.
Each thread or process is then responsible for computing the forces on its own particles, updating their positions and velocities accordingly. The algorithm for a single timestep on each thread can then be divided into the following phases:
Read the current position of every particle.
For each of its own particles, compute the force resulting from interactions with every other particle, using their current positions.
Update the velocities of its particles based on the forces computed.
Update the positions of its particles based on the new velocities.
In this algorithm, the positions of the particles constitute shared data and must be protected from concurrent access. The multithreaded implementation of the simulator uses barriers to separate phases 1 and 4, which access the shared data. Two barriers are required, one to ensure that all threads move together between phase 1 and 4 within a timestep, and another to ensure that they synchronously move between phase 4 in a timestep to phase 1 in the next timestep. The writes in phases 2 and 3 are to separate data on each thread, so they need not be synchronized.
An alternative algorithm is to use message passing to send copies of particle positions to other threads or processes. This is the strategy implemented by the multiprocess version of the particle simulator, with pipes used to communicate particle positions between processes in each timestep. A circular pipeline is set up between processes in order to minimize communication. Each process injects its own particles’ positions into its pipeline stage, which eventually go through a full rotation of the pipeline, as shown in Figure 51.
At each step of the rotation, a process applies forces from the positions that are currently in its own pipeline stage on to its own particles, so that after a full rotation, all forces have been applied to its particles.
Synchronization Pitfalls
While synchronization methods are effective for protecting shared state, they can also be used incorrectly, failing to accomplish the proper synchronization, over-synchronizing, or causing the program to hang as a result of deadlock.
Under-synchronization
A common pitfall in parallel computing is to neglect to properly synchronize shared accesses. In the set example for the web crawler, we need to synchronize the membership check and insertion together, so that another thread cannot perform an insertion in between these two operations. Failing to synchronize the two operations together is erroneous, even if they are separately synchronized:
def already_seen(item):
with seen_lock:
present = item in seen
if not present
with seen_lock:
seen.add(item)
return not present
Here, it is possible for one thread to acquire seen_lock
and see
that the item is not in the set. But between releasing the lock and
requiring it for insertion, another thread can obtain the lock and
also see that the item is not in the set. This results in both threads
thinking that they inserted the item, potentially resulting in
duplicate work.
Over-synchronization
Another common error is to over-synchronize a program, so that non-conflicting operations cannot occur concurrently. As a trivial example, we can avoid all conflicting access to shared data by acquiring a master lock when a thread starts and only releasing it when a thread completes. This serializes our entire code, so that nothing runs in parallel. In some cases, this can even cause our program to hang indefinitely. For example, consider a consumer/producer program in which the consumer obtains the lock and never releases it:
items = []
lock = Lock()
def consume():
with lock:
while not items:
sleep(1) # wait for a bit
print('got an item:', items.pop())
def synchronized_produce():
with lock:
for i in range(10):
items.append(i)
This prevents the producer from producing any items, which in turn prevents the consumer from doing anything since it has nothing to consume.
While this example is trivial, in practice, programmers often over-synchronize their code to some degree, preventing their code from taking complete advantage of the available parallelism.
Deadlock
Because they cause threads or processes to wait on each other, synchronization mechanisms are vulnerable to deadlock, a situation in which two or more threads or processes are stuck, waiting for each other to finish. We have just seen how neglecting to release a lock can cause a thread to get stuck indefinitely. But even if threads or processes do properly release locks, programs can still reach deadlock.
The source of deadlock is a circular wait, illustrated in Figure 52 with processes. A process cannot continue because it is waiting for other processes, which are in turn waiting for the first process to complete.
As an example, we will set up a deadlock with two processes. Suppose they share a duplex pipe and attempt to communicate with each other as follows:
def deadlock(in_pipe, out_pipe):
item = in_pipe.recv()
print('got an item:', item)
out_pipe.send(item + 1)
def create_deadlock():
pipe = multiprocessing.Pipe()
other = multiprocessing.Process(target=deadlock,
args=(pipe[0], pipe[1]))
other.start()
deadlock(pipe[1], pipe[0])
create_deadlock()
Both processes attempt to receive data first. Recall that the recv
method blocks until an item is available. Since neither process has
sent anything, both will wait indefinitely for the other to send it
data, resulting in deadlock.
Synchronization operations must be properly aligned to avoid deadlock. This may require sending over a pipe before receiving, acquiring multiple locks in the same order, and ensuring that all threads reach the right barrier at the right time.
Conclusion
As we have seen, parallelism presents new challenges in writing correct and efficient code. As the trend of increasing parallelism at the hardware level will continue for the foreseeable future, parallel computation will become more and more important in application programming. There is a very active body of research on making parallelism easier and less error-prone for programmers. Our discussion here serves only as a basic introduction to this crucial area of computer science.
Asynchronous Tasks
In parallelizing a computation, one strategy is to explicitly decompose a program over the set of workers, as we did in the previous section. Another option is to divide the work according to the natural granularity of an operation and to rely on the runtime system to schedule the work appropriately. This latter strategy can be accomplished with asynchronous tasks, where an operation is launched to be computed asynchronously, and its result used at some further point.
In C++11, an asynchronous task can be launched with the async()
function template, contained in the <future>
header. The first
argument to async()
is the function or function object
representing a task, and the remaining arguments are the arguments
with which to invoke that function. The following is a basic example:
void foo(int x, int y) {
cout << (x + y) << endl;
}
int main() {
async(foo, 3, 4);
async(foo, 5, 6);
}
The code above launches separate tasks to compute foo(3, 4)
and
foo(5, 6)
asynchronously. The print outputs 7 and 11 can appear in
any order, since the two tasks aren’t synchronized with respect to
each other, and the outputs can even be interleaved with each other.
The return value of async()
is a future object, which is a proxy
for the result of the asynchronous task. In particular, the
async()
calls above return objects of type future<void>
, since
the return type of foo()
is void
. We can wait on the result
of an asynchronous task by calling the wait()
method of the
corresponding future
object, as in the following:
int main() {
future<void> f1 = async(foo, 3, 4);
f1.wait();
future<void> f2 = async(foo, 5, 6);
f2.wait();
}
Here, we wait for the first task to complete before launching the second. This ensures that the 7 will appear as output before the 11.
In the case of a function that returns a non-void value, we can also
obtain the result by calling the get()
method of the future
object, which waits until the result is available and then returns the
result. This is particularly useful if we have some computation that
depends on the result of the task, as in the following:
int main() {
future<int> f1 = async([](int x, int y) {
return x + y;
}, 3, 4);
cout << (f1.get() + 5) << endl;
}
This launches a task to asynchronously call a lambda function, waits for the result and adds 5 to it, and prints the sum.
As a more complex example, let’s consider the tree-recursive computation of the Fibonacci sequence. The following is a sequential function to compute a Fibonacci number:
long long fib(int n) {
if (n <= 1)
return n;
return fib(n - 1) + fib(n - 2);
}
We can observe that the two recursive calls do not depend on each other, so we can compute them asynchronously by launching a separate task for one of the calls. The following code does so:
long long async_fib(int n) {
if (n <= 1)
return n;
future<long long> res1 = async(async_fib, n - 1);
long long res2 = async_fib(n - 2);
return res2 + res1.get();
}
This code uses async()
to compute one recursive call, while the
other call is computed in the existing task. We require the result of
the asynchronous task before we can compute the sum and return, so we
use get()
on its future
object in order to obtain its result.
As an aside, we write the two recursive calls in separate statements to ensure that the asynchronous task is launched before the recursive call that takes place in the existing task. Consider the following version that makes both calls in the same statement:
return async(async_fib, n - 1).get() + async_fib(n - 2);
In C++, the order of evaluation of the two operands to +
is
unspecified, so it would be valid for the compiler to produce code
that sequentially computes the right-hand side before launching the
asynchronous task to compute the left-hand side. This would turn the
whole computation into a sequential one. Thus, we need to use
statement sequencing to ensure that the asynchronous task is launched
before the sequential recursive call is made.
Limiting the Number of Tasks
Most implementations of C++ that execute tasks in parallel do so with the use of an internal thread pool, scheduling the tasks among the available threads in the pool. There is significant overhead to computing a function with a task, as it needs go through the scheduling system and then be dispatched to a thread. As such, we often need to limit the granularity of our tasks to be large enough to amortize this overhead, as well as to reduce the number of tasks to limit the total overhead.
As an example, computing async_fib(15)
on the author’s quad-core
iMac computer takes about 4000 times longer than fib(15)
, using
Clang 8, due to the large number of small tasks that are launched.
Instead, we need to rewrite async_fib()
to do the remaining
computation sequentially when a threshold is reached. The following
does so, using the number of tasks launched so far to determine if the
threshold has been met:
long long async_fib(int n, int tasks, int max_tasks) {
if (n <= 1)
return n;
if (tasks < max_tasks) {
future<long long> res1 = async(async_fib, n - 1, 2 * tasks,
max_tasks);
long long res2 = async_fib(n - 2, 2 * tasks, max_tasks);
return res2 + res1.get();
} else {
return fib(n - 1) + fib(n - 2);
}
}
The function takes in two extra arguments, representing the current
number of tasks and the threshold value for the maximum number of
tasks. If the threshold has not been reached, then the recursion
proceeds as before, launching a new asynchronous task for one of the
calls. In making the recursive calls, we double the number of current
tasks to account for the fact that each step of the recursion doubles
the number of concurrent computations. On the other hand, if the
threshold has been reached, then we do the rest of the computation
sequentially by calling fib()
. Figure 53 is the
task graph for computing async_fib(5, 1, 4)
, limiting the number
of tasks to four.
With the ability to limit the number of tasks, we find that
fib(42)
takes 1.63 seconds on the author’s quad-core iMac, whereas
async_fib(42, 1, 512)
takes 0.47 seconds, about a 3.5x speedup.
The 512-task limit was determined experimentally to be close to the
optimal value.
As another example, let’s write quicksort using asynchronous tasks. First, we write the sequential version as follows:
size_t partition(int *A, size_t size) {
int pivot = A[0];
size_t start = 1;
size_t end = size - 1;
while (start <= end) {
if (A[start] >= pivot)
std::swap(A[start], A[end--]);
else {
std::swap(A[start - 1], A[start]);
start++;
}
}
return start - 1;
}
void quicksort(int *A, size_t size) {
if (size <= CUTOFF) {
std::sort(A, A + size);
return;
}
int pivot = A[size/2];
std::swap(A[0], A[size/2]);
size_t pivot_index = partition(A, size);
quicksort(A, pivot_index);
quicksort(A + pivot_index + 1, size - pivot_index - 1);
}
This implements an in-place quicksort, partitioning the input array by
swapping elements to the appropriate side of the pivot. We cut off the
quicksort itself once we reach a small number of elements, since at
that point other sorts such as insertion sort are more efficient. For
simplicity, we use std::sort()
when we reach the cutoff point,
which will be 10 elements in our examples.
As with the Fibonacci sequence, we can launch a separate task to compute one of the recursive calls, limiting ourselves to a maximum number of tasks:
void async_quicksort(int *A, size_t size, int thread_count,
int max_tasks) {
if (size <= CUTOFF) {
std::sort(A, A + size);
return;
}
int pivot = A[size/2];
std::swap(A[0], A[size/2]);
size_t pivot_index = partition(A, size);
if (thread_count < max_tasks) {
future<void> rec1 = async(async_quicksort, A, pivot_index,
2 * thread_count, max_tasks);
async_quicksort(A + pivot_index + 1, size - pivot_index - 1,
2 * thread_count, max_tasks);
rec1.wait();
} else {
quicksort(A, pivot_index);
quicksort(A + pivot_index + 1, size - pivot_index - 1);
}
}
In order to ensure that the asynchronous recursive call completes
before returning, we call wait()
on its associated future
object. Sorting ten million elements with sequential quicksort()
takes 0.93 seconds on the author’s iMac, while sorting with
async_quicksort()
takes 0.35 seconds with the task limit at 128.
Launch Policy
By default, launching an asynchronous task does not require it to be
immediately run in another thread. Rather, it merely allows the task
to be run concurrently. Equally valid semantically is to defer
execution of the task until the wait()
or get()
method is
called on the associated future
object, obtaining lazy evaluation
of the task.
We can explicitly specify whether the task should be run in a
different thread or deferred until its completion is required. We do
so by specifying std::launch::async
or std::launch::deferred
as the first argument to async()
, before the function to be run:
async(std::launch::async, async_fib, n - 1, 2 * tasks, max_tasks)
Without the policy specifier, the implementation is free to follow either launch policy.
We can use the std::launch::async
policy to partition work over a
fixed set of computational resources, as in multithreading. As an
example, the we can estimate the value of \(pi\) by choosing
random points in the range \([(0, 0), (1, 1)]\) and determining
whether the point lies in the upper-right quadrant of the unit circle,
as illustrated by the shaded area in Figure 54.
The ratio of samples within the circle to total samples approximates \(\frac{\pi}{4}\), the ratio of the area of a quadrant of the unit circle to the area of a unit square. The following sequential function implements this algorithm:
double compute_pi(size_t samples) {
default_random_engine generator;
uniform_real_distribution<> dist(0.0, 1.0);
size_t count = 0;
for (size_t i = 0; i < samples; i++) {
double x = dist(generator), y = dist(generator);
if (x * x + y * y <= 1.0)
count++;
}
return 4.0 * count / samples;
}
We use the default random-generation engine from the <random>
header, along with a uniform distribution of real numbers between 0.0
and 1.0. Run sequentially for 100 million samples, the computation
takes 1.86 seconds on the author’s iMac computer.
We can parallelize the computation over a fixed set of threads with the following:
double async_compute_pi(size_t samples, size_t num_workers) {
future<double> *results = new future<double>[num_workers];
for (size_t i = 0; i < num_workers; i++) {
results[i] = async(std::launch::async,
compute_pi, samples / num_workers);
}
double total = 0;
for (size_t i = 0; i < num_workers; i++) {
total += results[i].get();
}
delete[] results;
return total / num_workers;
}
Here, we construct a new task for each worker, launching it on a new
thread using the async::launch::async
policy. The main thread then
waits on each worker thread in turn, accumulating the results from
each worker. On the author’s quad-core iMac, the computation takes
0.95 seconds for 100 million total samples with two worker threads,
and 0.52 seconds with four worker threads. The latter is a speedup of
about 3.6x over the sequential computation.