///C++ Concurrency in Action: Exception Safety in Parallel Algorithms

C++ Concurrency in Action: Exception Safety in Parallel Algorithms

Exception Safety in Parallel Algorithms

Exception safety is an essential aspect of good C++ code, and code that uses concurrency is no exception. In fact, parallel algorithms often need more care taken with regards to concurrency than normal sequential algorithms. If an operation in a sequential algorithm throws an exception, then the algorithm only has to worry about ensuring that it tidies up after itself to avoid resource leaks and broken invariants; it can merrily allow the exception to propagate to the caller for them to handle. By contrast, in a parallel algorithm many of the operations will be running on separate threads. In this case, the exception cannot be allowed to propagate, as it is on the wrong call stack — if a function spawned on a new thread exits with an exception then the application is terminated.

This article is based on C++ Concurrency in Action, published June, 2008. It is being reproduced here by permission from Manning Publications. Manning early access books and ebooks are sold exclusively through Manning. Visit the book’s page for more information.

Developer Tutorial readers can get 30% off any version (ebook or print book) of Collective Intelligence in Action. Simply use the code “devtut30” at checkout. Offer expires January 31, 2010.

As a concrete example, let’s see the parallel_accumulate function in listing A.

Listing A: A parallel version of std::accumulate

template
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumulate(first,last,result); #10
}
};
template
T parallel_accumulate(Iterator first,Iterator last,T init)
{

unsigned long const length=std::distance(first,last); #1

if(!length)
return init;

unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;

unsigned long const hardware_threads=
std::thread::hardware_concurrency();

unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);

unsigned long const block_size=length/num_threads;

std::vector results(num_threads); #2
std::vector threads(num_threads-1); #3

Iterator block_start=first; #4
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start; #5
std::advance(block_end,block_size);
threads[i]=std::thread( #6
(),
block_start,block_end,std::ref(results[i]));
block_start=block_end; #7
}
accumulate_block()(block_start,last,results[num_threads-1]); #8

std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join));

return std::accumulate(results.begin(),results.end(),init); #9

}

Now, let’s go through and identify the places where an exception can be thrown: basically anywhere where we call a function we know can throw, or we perform an operation on a user-defined type which may throw.

First up, we’ve got the call to distance (#1), which performs operations on the user-supplied iterator type. Since we haven’t yet done any work, and this is on the calling thread, this is fine. Next up, we’ve got the allocation of the results vector (#2), and the threads vector (#3). Again, these are on the calling thread, and we haven’t done any work or spawned any threads, so this is fine. Of course, if the construction of threads throws then the memory allocated for results will have to be cleaned up, but the destructor will take care of that for us.

Skipping over the initialization of block_start (#4) as that’s similarly safe, we come to the operations in the thread-spawning loop (#5, #6 & #7). Once we’ve been through the creation of the first thread at #6 we’re in trouble if we throw any exceptions: our new threads will become detached, and continue running with references to the (now destroyed) elements of results, as well as copies of the supplied iterators. This is not a good place to be.

The call to accumulate_block (#8) can throw with similar problems: our new threads will become detached and continue running. On the other hand, the final call to std::accumulate (#9) can throw without causing any hardship, as all the threads have been joined by this point.

That’s it for the main thread, but there’s more: the calls to accumulate_block on the new threads might throw at #10. We’ve got no catch blocks, so this exception will be left unhandled, and cause the library to call std::terminate() to abort the application.

In case it’s not glaringly obvious: this code is not exception safe.

Adding Exception Safety

OK, so we’ve identified all the possible throw points and the nasty consequences of exceptions. What can we do about it? Let’s start by addressing the issue of the exceptions thrown on our new threads. If we look carefully at what we’re trying to achieve from our new threads, it’s apparent that we’re trying to calculate a result to return, whilst allowing for the possibility that the code might throw an exception. This is precisely what the combination of std::packaged_task and std::unique_future are designed for. If we rearrange our code to use std::packaged_task then we end up with listing B.

Listing B: A parallel version of std::accumulate using std::packaged_task

template
struct accumulate_block
{
T operator()(Iterator first,Iterator last) #1
{
return std::accumulate(first,last,T()); #2
}
};

template
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);

if(!length)
return init;

unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;

unsigned long const hardware_threads=
std::thread::hardware_concurrency();

unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);

unsigned long const block_size=length/num_threads;

std::vector > futures(num_threads-1); #3
std::vector threads(num_threads-1);

Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
std::packaged_task task( #4
accumulate_block());
futures[i]=std::move(task.get_future()); #5
threads[i]=std::thread(std::move(task),block_start,block_end); #6
block_start=block_end;
}
T last_result=accumulate_block()(block_start,last); #7

std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join));

T result=init; #8
for(unsigned long i=0;i<(num_threads-1);++i)
{
result+=futures[i].get(); #9
}
result += last_result; #10
return result;
}

The first change is that the function call operator of accumulate_block now returns the result directly, rather than taking a reference to somewhere to store it (#1) — we’re using std::packaged_task and std::unique_future for the exception safety, so we can use it to transfer the result too. This does require that we explicitly pass in a default-constructed T in the call to std::accumulate (#2), rather than reusing the supplied result value, but that’s a minor change.

The next change is that rather than having a vector of results, we have a vector of futures (#3) to store a std::unique_future for each spawned thread. In the thread-spawning loop, we first create a task for accumulate_block (#4) — std::packaged_task declares a task that takes two Iterators and returns a T, which is what our function does. We then get the future for that task (#5), and run that task on a new thread, passing in the start and end of the block to process (#6). When the task runs, then the result will be captured in the future, as will any exception thrown.

Since we’ve been using futures, we don’t have a result array so we must store the result from the final block in a variable (#7) rather than in a slot in the array. We also can’t use std::accumulate to total the results, as we’ve got to get the values out of the futures. Instead, we can just code a simple loop, starting with the supplied initial value (#8), and adding in the result from each future (#9). If the corresponding task threw an exception this will have been captured in the future, and will now be thrown again by the call to get(). Finally, we add the result from the last block (#10) before returning the overall result to the caller. So, that’s removed one of the potential problems: exceptions thrown in the worker threads are re-thrown in the main thread. Of course, if more than one of the worker threads throws an exception, only one will be propagated, but that’s not too big a deal. If it really matters you can use something like std::nested_exception to capture all the exceptions, and throw that instead. The remaining problem is the leaking threads if an exception is thrown between when we spawn the first thread, and when we’ve joined with them all. The simplest solution is just to catch any exceptions, join with the threads that are still joinable() and rethrow the exception:

try
{
for(unsigned long i=0;i<(num_threads-1);++i)
{
// … as before
}
T last_result=accumulate_block()(block_start,last);

std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join));
}
catch(…)
{
for(unsigned long i=0;i<(num_thread-1);++i)
{
if(threads[i].joinable())
thread[i].join();
}
throw;
}

Now, this works: all the threads will be joined, however the code leaves the block. However, try-catch blocks are ugly, and we’ve got duplicate code: we’re joining the threads both in the “normal” control flow, and in the catch block. Duplicate code is rarely a good thing, as it means more places to change. Instead, let’s extract this out into the destructor of an object: it is after all the idiomatic way of cleaning up resources in C++. Here’s our class:

class join_threads
{
std::vector& threads;
public:
explicit join_threads(std::vector& threads_):
threads(threads_)
{}
~join_threads()
{
for(unsigned long i=0;i
{
if(threads[i].joinable())
threads[i].join();
}
}
};

We can then simplify our code to listing C.

Listing C: An exception-safe parallel version of std::accumulate

template
T parallel_accumulate(Iterator first,Iterator last,T init)
{

unsigned long const length=std::distance(first,last);

if(!length)
return init;

unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;

unsigned long const hardware_threads=
std::thread::hardware_concurrency();

unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);

unsigned long const block_size=length/num_threads;

std::vector > futures(num_threads-1);
std::vector threads(num_threads-1);
join_threads joiner(threads); #1

Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
std::packaged_task task(
accumulate_block());
futures[i]=std::move(task.get_future());
threads[i]=std::thread(std::move(task),block_start,block_end);
block_start=block_end;
}
T last_result=accumulate_block()(block_start,last);
T result=init;
for(unsigned long i=0;i<(num_threads-1);++i)
{
result+=futures[i].get(); #2
}
result += last_result;
return result;
}

Once we’ve created our container of threads, we create an instance of our new class (#1) to join with all the threads on exit. We can then remove our explicit join loop, safe in the nowledge that the threads will be joined however the function exits. Note that the calls to futures[i].get() (#2) will block until the results are ready, so we don’t need to have explicitly oined with the threads at this point. This is unlike the original from listing where we needed to have joined with the threads to ensure that the results vector was correctly populated. Not only do we get exception safe code, but our function is actually shorter because we’ve extracted the join code into our new (reusable) class.

2010-05-25T21:12:09+00:00 December 17th, 2009|Miscellaneous|0 Comments

About the Author:

Leave A Comment