Using parallelism with boost::future

by Jens Weller

Part of my new year resolution was to do weekly backups. Back in 2013 I already had written a small application, which writes several directories in one zip archive. I continued the work on this in the last days.

The reason I chose to use wxWidgets for this application is, that it supports writing and modifying zip archives, but up until now I just wrote a new zip archive every time. Now I wanted to rewrite this part, to update the same zip file every time. For this I need to create a list of all files in the directories, and a list with the modified files. A classic producer consumer situation.

While I'm fine with that the application locks up kind of hard during writing a GB zip file (its only job), I'd like to be as fast as possible. Thats why I decided to parallelize the part of the application that reads the file paths via boost::filesystem. The two short functions running inside the producers are quite similar to what I already presented earlier on boost::filesystem:

std::vector<string> modifiedFiles(const std::string& dir,const std::time_t& last_modified)
{
    std::vector<string> files;
    fs::path path_dir = dir;
    for(const fs::directory_entry& entry: fs::recursive_directory_iterator(path_dir))
    {
        auto p = entry.path();
        if(fs::is_regular_file(p) && fs::last_write_time(p) > last_modified)
            files.emplace_back(p.string());
    }
    return files;
}

std::vector<string> readAllFiles(const std::string& dir)
{
    std::vector<string> files;
    fs::path path_dir = dir;
    for(const fs::directory_entry& entry: fs::recursive_directory_iterator(path_dir))
    {
        if(fs::is_regular_file(entry.path()))
            files.emplace_back(entry.path().string());
    }
    return files;
}

Main difference: the first one is checking if the file has been modified after the given timestamp, where the other simply adds every file to a vector. I'm only interested in regular files, not directories. And, for wxWidgets needed is the native pathformat, hence using string() instead of generic_string(). Its probably faster to combine both functions, but for an example its better to split them up.

Parallelism with boost::future

First, yes I could be using the standard for this too. Except that boost::future already has what is currently planned for C++17 (or maybe already in std::experimental), and I do trust boost here a little more. Launching some code in parallel is very easy with the async function, which returns a future:

using entry_ptr = std::unique_ptr;
std::map<wxString,entry_ptr> entry_map;
auto entry_f = boost::async([&entry_map,&inzip](){
    entry_ptr entry(inzip.GetNextEntry());
    while(entry.get() != nullptr )
    {
        entry_map[entry->GetInternalName()]=std::move(entry);
        entry.reset(inzip.GetNextEntry());
    }
});

Lambdas are your friend with futures, you'll see them a lot. This is the code reading the entries from a zip archive, which also can run in parallel while I scan the file system...

The consumer is launched very similar, it calls a single function called consume, which is doing the work, this is the first version:

void consume(boost::mutex& m, const std::vector<wxString>& files,std::vector<boost::future>& tasks,const std::vector<wxString>& filter,std::function<void(const wxString&)> insert)
{
    wxString file;
    size_t s = 0;
    {
        boost::lock_guard guard(m);
        s = files.size();
    }
    if(s == 0 && !tasks.empty())
        wait_for_update(m,files,tasks,s);
    for(size_t i = 0; i < s ;)
    {
        {
            boost::lock_guard guard(m);
            file = files[i];
            s = files.size();
        }
        ++i;
        if(!checkFilter(file,filter))
            insert(file);
        if(i == s)
        {
            {
                boost::lock_guard guard(m);
                if(files.size() > s)
                    s = files.size();
                return;
            }
            if(!tasks.empty())
                wait_for_update(m,files,tasks,s);
        }
    }
}

Lots of locking for doing stuff on the vector. As other futures are writing to the vector, it always needs to be protected by a locked mutex, even when calling size. The main job of the consumer is to filter the files, currently by simply checking if a certain string occurs in the file path. The callback insert is a little hack. It allows the caller to decide what to do with the filtered value, in my situation, it is either to insert it into a vector, or a map, representing the external and internal path for the zip file.

And here is the point, where I have to say: above code with locks is all wrong. Do not combine futures like this with locks to have "easy" shared state. Shared state is essentially a global variable, across threads. It is safe to use it, as long as your locks are correct. Also the to goal of parallelism is to do things fast in parallel, each time you lock a mutex, you let one thread wait for the other. And this style is known to be error prone, deadlocks and other errors can occur. This is the refactored version, where each future returns a vector of file paths:

void consume(std::vector<boost::future<std::vector>>& tasks,const std::vector& filter,std::function<void(const wxString&)> insert)
{
    auto it = boost::wait_for_any(tasks.begin(),tasks.end());
    while(it != tasks.end())
    {
        for(const auto& file : it->get())
        {
            if(!checkFilter(file,filter))
                insert(file);
        }
        tasks.erase(it);
        it = boost::wait_for_any(tasks.begin(),tasks.end());
    }
}

It is not only much shorter, also it will only block, when there currently is no work. It will wait at the beginning for the first future to end, and then only if no future is ready to process. It is also a lot shorter, as locks are not needed, and less complex, no extra scopes for lock_guard and the function wait_for_update is not needed anymore. The reason I used the locking was, that - in this version of boost - without #define BOOST_RESULT_OF_USE_DECLTYPE in front of the boost thread include, boost::async will only return future<void>.

Launching the producers is also quite simple:

for(const wxString& dir:dirs)
{
    tasks.emplace_back(boost::async([sdir = dir.ToStdString(),lastrun](){
        return modifiedFiles(sdir,lastrun);
    }));
    allfiletasks.emplace_back(boost::async([sdir = dir.ToStdString(),&filter](){
         return readAllFiles(sdir);
    }));
}

The result of the calculation, in this case a vector of wxStrings is simply returned from the future. My code first launches the producers, then the consumer, and then the first future you saw above, reading and indexing the zip archive, which is needed later to obtain the entries, which do not need to be updated.

The last job, to write the actual new zip archive, can only be done when all features are finished, this is also easy to handle with wait_for_all:

boost::wait_for_all(consumer,allfile_consumer,entry_f);

wait_for_all will return when all listed futures have finished running. I already wait in each of the consumers for the launched producers, so no need to list them here. Removing the mutexes and locks saved over 50 lines of code, as some functions were not needed anymore, and all locks, mutexes and a few variables could simply disappear. The producers never stop until they are done, the consumers will only wait, if there is no available data from a finished future.

Locks & Synchronization

As you noticed, my code was full of lock_guard, as it is an easy way to lock a given mutex for the rest of the scope in a exception safe way. boost also offers shared_mutex, which is useful when you have more then one thread reading from a shared asset. As long as threads only read shared assets, it should be safe to do it in parallel, but if you have a producer thread adding new items to your container, you'll need to get an exclusive lock for this. So, boost offers with shared_mutex a way to share locks for reading, but to obtain exclusive locks for changing the shared asset. More details on thread synchronosation.

Locks are not good for the performance, and also can introduce errors into your code if you don't handle them carefully. Deadlocks can occur, and debugging multithreaded code is not always fun. The best solution is one, where you don't need to lock, so that each thread can run as fast as it can. Boost offers lockfree containers, which are an alternative, as long as your types are trivial.

.then

boost::future has also support for .then, which allows you to append a continuation to a future, which will be exectuted after the successfull execution of the code inside the future. The upcoming coroutines/resumable functions with await will make this even easier in the standard once C++17 is out. For now, .then is an easy way to attach a handler to a future, which gets executed with the result:

auto future = boost::async([](){return 42;}).then([](boost::future<int> future){return what_does_it_actually_mean(future.get());};

Yet, this chaining of futures is not leading to a good code quality, debugging and errortracking can be difficult. But it allows you to implement useful, multithreaded pipelines in your code. Also, .then returns a future, representing the result of the future inside .then.

Go back

Follow Meeting C++

tl_files/mcpp/yt.pngtl_files/mcpp/gplus-50.pngtl_files/mcpp/twitter.pngtl_files/mcpp/facebook.png