RPA «Range Partition Adaptors»

Copyright © 2005-2007 Remi Chateauneu

1 Introduction

RPA is an open-source portable C++ library of multi-threaded versions of some STL algorithms (transform, accumulate...), with standard containers and iterators. It can use any thread, mutex, spinlock. It is close, by some aspects, to Intel TBB, mptl, HPC++ or PSTL. It all came from an idea of Alexander Stepanov.

1.1 A short example

Imagine you want to apply the STL algorithm std::transform to a pair of big arrays of numbers, with a thread-safe functor :

	#define BIG 1000000
	double Input[ BIG ], Output[ BIG ] ;
	std::transform( Input, Input + BIG, Output, std::negate<double>() );

Given the number of iterations, if you have a dual-processor machine, it would be tempting to parallelize this task, by having two threads, each working on parts of the original input and output arrays. The first thread would compute :

	std::transform( Input, Input + BIG/2, Output, std::negate<double>() );

... and the second thread would compute :

	std::transform( Input + BIG/2, Input + BIG, Output + BIG/2, std::negate<double>() );

Let's check quickly the basic multi-threading constraints :

There is no reason why it should not work, and you can do it this way:

	rpa::transform(
		rpa::make_range_size( Input, Input + BIG ),
		Output,
		std::negate<double>() )
		( rpa::thread_array< rpa::posix_thread >(2).tree() );

rpa::transform is the parallelized version of std::transform, which uses an array of two threads of type rpa::posix_thread, which are thin and portable wrappers around POSIX threads. You could use any number of threads, and any type, as log as they match a simple interface of three methods.

The number of threads is used to split the input range into two equal parts. Because the output iterator is random_access_tag, it can be split into two parts. Therefore, the two threads can run independently.

1.2 Some properties of this library

Several kind of threads are provided with different level of performances. It is easy to add your own kind of thread, to fully benefit of a specific operating system, or add your own features.

It is possible to change the kind of thread of any algorithm, without any other change. You just replace a datatype by another.

Among these kind of threads, pseudo-threads are provided. They allow to execute the algorithms transparently, with the same code than with real threads. The goal for this is to :

It is also possible to use your own kind of mutex. This allows to compare the performance boost of different types of mutex, or use efficiently a specific kind of non-portable mutex (Spinlocks, futex, etc...) You may also use any proprietary library, by instanting its objects with this open-source library: This gives you the advantage of both proprietary and GPL software.

1.3 More examples

We list here different possibilities, which are orthogonal: You can combine any input with any output.

1.3.1 transform reading from a vector and inserting into a list, static scheduling.

We use again range_size which wraps any pair of random_access_iterator_tag iterators, which must simply be thread-safe, so that several threads can read from them simultaneously. This range type formalizes what is called 'static scheduling'.

Several threads will read simultaneously from a vector, which is possible because its size is known in advance. On the other hand, the insertion must be protected by a mutex. RPA provides a wrapper around any kind of output iterator, iterator_lock.

	std::vector<double> inVec ;
	std::list<double> outLst ;

	rpa::posix_mutex mtxOut ;

	std::vector vec ;

	rpa::transform(
		rpa::make_range_size( inVec.begin(), inVec.end() ),
		rpa::hold_lock( std::back_inserter( outLst ), mtxOut ),
		std::negate<double>() )
		( rpa::thread_array< rpa::posix_thread >(2).tree() );

1.3.2 transform reading from a list, writing to the screen, with dynamic scheduling.

We introduce range_step which applies to any type of forward_iterator_tag iterators. The algorithm 'chops' from the input range, 'slices' of elements which are processed by a sub-thread. It is not needed to know the total number of elements to process. The size of slices must reflect the CPU cost of processing one element. The input range must be protected by a mutex, of any type. We use here rpa::posix_mutex which is a very thin wrapper around POSIX mutex, with a very simple interface (lock and unlock are the unique required methods). This range formalizes what is called 'dynamic scheduling'.

The output iterator must be protected too, by another mutex. We reuse the wrapper rpa::iterator_lock. We will see later that it is possible to bufferize the output to avoid a mutex lock/unlock for each write.

Note that the order of the output elements can, and will be different, of the result of a sequential algorithm. If this is a problem, you can insert the elements into a sorted container such as a std::set.

Note: It is possible to avoid mutex in dynamic scheduling, if you can provide input iterators with atomic access.

	std::list<double> inLst ;
	size_t chunk = 10 ;

	nb_thr_t nbThrds = 10 ;

	rpa::posix_mutex mtxIn ;
	rpa::posix_mutex mtxOut ;

	rpa::transform(
		rpa::make_range_step( inLst.begin(), inLst.end(), chunk, mtxIn ),
		rpa::hold_lock( std::ostream_iterator<double>( std::cout, "," ), mtxOut ),
		std::negate<double>() )
		( rpa::thread_array< rpa::posix_thread >(nbThrds).tree() );

1.3.3 transform with std::list as input and output, with interleaved scheduling.

Now, imagine that your input is a very long list, so long that you do not want to calculate its size because you would have to reach the end, and bring it into memory even before computing into it. But you would like to avoid an extra mutex, because its cost would be too big, compared to the cost of processing one element. There is a solution, called 'interleaved scheduling', which consists of having N threads, each of them reading one element, than iterating N times to get the next one. As the price of iterating over a list is small, and as no input mutex is required, it may still be worth.

In this example, the output inserts at the end of a std::deque.

Just to change, instead of using a functor, we just call the function double ::cos(double). Of course, this function must be thread-safe.

	std::list<double> inLst ;
	size_t chunk = 10 ;

        std::deque<double> outDeq ;
	nb_thr_t nbThrds = 10 ;

	rpa::posix_mutex mtxOut ;

	rpa::transform(
		rpa::make_range_jump( inLst.begin(), inLst.end() ),
		rpa::hold_lock( std::back_inserter( outDeq ), mtxOut ),
		::cos )
		( rpa::thread_array< rpa::posix_thread >(nbThrds).tree() );

1.3.4 Algorithm std::accumulate with std::list as input.

The algorithm accumulate can be parallelized too, with the same tools. It is a bit simpler because there is no output sequence. There is a variant of this algorithm which allows to specify an addition operator. The other ranges can be used too: range_size, range_jump. Of course, any type of mutex and thread can be used as long as they have the required interface.

The 'threadibility' conditions are then :

	std::list<float> inLst ;
	size_t chunk = 100 ;

	nb_thr_t nbThrds = 3 ;

	rpa::posix_mutex mtxIn ;

	float result = rpa::accumulate(
		rpa::make_range_step( inLst.begin(), inLst.end(), chunk, mtxIn ),
		0.0 )
		( rpa::thread_array< rpa::posix_thread >(nbThrds).tree() );

To summarize, there are many algorithms and containers configurations that can be partly or fully parallelised with a significant benefit, depending on the size of the data set and of the time taken to process each step: Individual processing of an element, time taken by an iteration.

1.3.5 Algorithm std::accumulate with std::vector as input.

The big difference here is that the array of threads is statically allocated. This imply that no memory allocation is necessary at all. And, as the input range is random_access_iterator_tag, no mutex is needed either. This is of course very efficient. The fucntion make_thread_tree is a simple wrapper around the beginning of the array of threads, and its size. It can as well contain a 'head thread', we will see that later.

	std::vector<float> inVec(1000000) ;

	static rpa::posix_thread thrArr[10];

	float result = rpa::accumulate(
		rpa::make_range_size( inVec.begin(), inVec.end() ),
		0.0 )
		( rpa::make_thread_tree( 10, thrArr ) );

1.3.6 Conclusion

************* Futures *************

1.3.7 Conclusion

The consequences of this approach are that :

1.4 Credits

This library is strongly inspired from a paper named "Range Partition Adaptors" ( http://www.stepanovpapers.com/ ) :

Range Partition Adaptors: A Mechanism for Parallelizing STL
Matthew H. Austern Ross A. Towle Alexander A. Stepanov
Silicon Graphics, Inc., Mountain View, CA 94043

Range partition adaptors, a new type of adaptor for the C++ Standard Template Library (STL),
can be the basis for a parallel version of the STL.

2 Architecture and concepts

2.1 Ranges

What we call ranges are, conceptually, a sequence of element which is given to a STL algorithm, and are represented as an aggregate based on iterator types. There are two types of ranges : Input ranges, and output ranges. Range types have nothing to do with the iterators types : They are data structure which are orthogonal to iterator types. In other words, it is possible to combine, in a matrix, nearly any type of range with any kind of iterators.

2.1.1 Input ranges, a.k.a. range partition adapter.

They are defined at least by a pair of iterators of the same type : The begin and the end. Depending on the input range type, there may be other extra parameters and parameters types. Input range types determine the scheduling policy of most algorithms (See later).

2.1.1.1 Range partition adapter range_size.

The basic idea behind this kind of range is that the input range will be cut into slices of approximately the same number of elements (Using the method chop(void)), and each slice will be attributed to a sub-thread. They will all have the same number of elements to process. All, but the last thread which gets at least as much as, but less than twice elements, than all the other threads (Using method tail).

This range type is usable only when the number of input elements can be calculated. That is, the distance between the begin and the end iterators.

2.1.1.2 Range partition adapter range_step.

This range-type is usable when the number of input elements is NOT known. It is usable with iterators of type std::random_iterator_category, std::forward_iterator_category and std::bidirectional_iterator_category, with equivalent performances.

The arguments of this range are of course the beginning and ending iterators.

A third one is the number of steps, that is, the size of the slices that are chopped from ranges, to be processed by the sub-thread.

Another - optional argument is a reference to a mutex. If this argument is not provided, it means that atomic locks will be used.

There is another - optional - argument for this range, which is the minimum number of elements, under which it is not necessary to create a sub-thread.

The strategy behind using this kind of range, is that all sub-threads will chop slices of elements, i.e. pick up work to do, from the input range, one at a time, until the input range is empty. Because this access can only be exclusive, it needs some sort of synchronization. Therefore, there are two distinct sub-types of range_step.

2.1.1.3 Range partition adapter range_jump.

This is an implementation of interleave scheduling. If there are N threads, each thread picks up one element out of N. The implementation wraps the iterators into another type of iterators which silently increments of N, when accessing the next element.

2.1.2 Output ranges

They are defined by a single iterator, given as an output iterator to the algorithm.

2.2 Slices

The main task of this library is to partition, to chop (Which is the term used in the library) input and output ranges, into slices, these slices being given to sub-threads for execution. At the end of the execution, results given by sub-threads are reduced.

2.2.1 Slices methods

A slice is a sub-range of a range elements, in input or in output. An input slice, coming from an input range, always has the members begin and end. An output slice just has the member begin. These methods results are used as parameters when a sub-thread finally calls a STL algorithm. A slice represents a piece of work to execute, on one of the iterators arguments of the algorithm to parallelize.

2.2.2 Creating slices

For each type of range, several methods can return a slice.

2.2.2.1 Range method chop

This is the first method called for each sub-thread : Its role is to return the first piece of work that a sub-thread will have to execute.

2.2.2.2 Range method chop_again

This is used in dynamic scheduling only, where each sub-thread reuses its current slices each time it chops more data to compute. This cannot apply for range_size and range_jump because in these two cases, there is one and only one slice per sub-thread, so no slice can be reused.

2.2.2.3 Range method tail

This applies only for range_size, for the last sub-thread.

2.3 Scheduling policies.

2.3.1 Determination of the scheduling policy

Most of algorithms have one and only one input range. These are the ones we consider here. The input range contains most of the useful information we need to deduce the best possible scheduling policy that can be applied to the algorithm.

Each algorithm has one and only argument which is a range, and which has the role of determining which scheduling policy will be applied, and with which parameters. This is why the execution style - the scheduling policy, is mostly determined by the user, by choosing carefully the input range type.

2.3.2 Different types of scheduling policies

RPA provides three classical scheduling policies, implemented by the way the input sequence is cut into several chunks, each of them processed by a separate thread.

There is no general cases for parallelizing an algorithm because the input sequences may have different incompatible properties, leading to impossiblities or different threads behaviours. For example :

The RPA library gives three distinct possibilities to control how the threads will process these input sequences.

2.3.2.1 Static or simple scheduling policy, range_size.

It can work only if the input iterator is at least a forward iterator, and if the number of elements is known in advance. It works best when the input iterator is a random_access iterator, but this is not mandatory. The main thread’s role is to cut the input range into even intervals : Only the last one may have a different size. Then each sub-thread is given a slice of the input range. According, if there are other ranges (Both input and output) the sub-threads are given slices for these ranges in a consistent way.

2.3.2.2 Dynamic scheduling policy, range_step.

This scheduling policy needs the input iterator is at least a forward iterator. The range can be accessed by only one sub-thread at a time, and must be protected by a mutex (The easiest method, which works with all types of iterators), or atomically. Each sub-thread access the ranges and picks up slices for each of them, and then processes them indentpendently of the other. The number of elements is important: If it is too big, only one thread will be running because it will take all elements. If it is too small, there will be too much overhead.

2.3.2.3 Interleaved scheduling policy, range_jump.

Interleaved scheduling does not neeeed any mutex, but needs forward iterators. If there are NbThr sub-threads, each picks one every NbThr element. There is no need to put any lock but the input forward iterators must be thread-safe (btw, all iterators, both input and output, must be thread-safe because there are no protection for them). The order of output elements is lost, as can be frequently expected for a parallel algorithm.

2.4 Genericity : Independance towards containers

This library is independant to any kind of C++ containers, and does not enforce their use. For using a specific container type, the associated definition file (containing template specializations) must be included.

Therefore, RPA is designed to allow the adding of new container types, simply by adding a new include files which will implement some template specializations.

2.5 Creation of sub-threads

2.5.1 Recursive creation (Method run )

This is designed to allow terminal recursion optimisation : When the compiler can replace recursive calls by a loop.

Always the same pattern applies:

When there are no more threads, it calls reduction. This method waits for each threads to terminate, with join. Because reduction is called at the end of the recursive calls of run, all the intermediary data created for each sub-threads, are still available on the stack. These data are linked together, by the class rpa::thread_stack. This class allows to wait for each sub-thread one after the other : Any kind of reduction is therefore possible each tine a sub-thread terminates.

2.5.2 Thread-specific data stored on the stack of the main thread

Some technical reasons for this design :

2.6 Tools for parallelization

One of the aims of this library is to try to reuse as much code, containers, functors, etc... as possible while bringing parallelism. The ideal goal is to parallelize an algorithm just by changing one line. Anyway, this is not always possible to take the full benefit of multi-threading without more than one-line changes. This is why these specific adapters are proposed.

2.6.1 Buffered output iterator : rpa::obuf_iterator

When writing into an output iterator, it can be desirable to have a separate buffer for each sub-thread :

This library provide a specific iterator adapter, rpa::obuf_iterator, for providing this feature, with specific template specialization. A whole chapter is dedicated to this adapter.

2.6.2 Locking iterators

The wrapper rpa::iterator_lock is provided for protecting iterators against simulatenous accesses. It transforms any kind of iterator into a std::input_iterator_tag category iterator.

2.6.3 Thread-safe push_back method for containers.

3 Ranges

What we call ranges are, conceptually, a sequence of element which is given to a STL algorithm, and are represented as an aggregate based on iterator types. There are two types of ranges : Input ranges, and output ranges. Range types have nothing to do with the iterators types : They are data structure which are orthogonal to iterator types. In other words, it is possible to combine, in a matrix, nearly any type of range with any kind of iterators.

3.1 Input ranges, a.k.a. range partition adapter.

They are defined at least by a pair of iterators of the same type : The begin and the end. Depending on the input range type, there may be other extra parameters and parameters types. Input range types determine the scheduling policy of most algorithms (See later).

3.1.1 Range partition adapter range_size.

The basic idea behind this kind of range is that the input range will be cut into slices of approximately the same number of elements (Using the method chop(void)), and each slice will be attributed to a sub-thread. They will all have the same number of elements to process. All, but the last thread which gets at least as much as, but less than twice elements, than all the other threads (Using method tail).

This range type is usable only when the number of input elements can be calculated. That is, the distance between the begin and the end iterators. In a way, giving this number of elements, added to the begin and end iterators, may seem redundant. But, calculating this distance may be a costly operation. Therefore, two constructors are available: One with the size, and the other one without the size: In this case, it is calculated with std::distance(). There is another - optional - argument for this range, which is the minimum number of elements, under which it is not necessary to create a sub-thread. This number is used to reduce the number of actually used threads. It can be based on the cost of incrementing an iterator compared to the cost of processing one element.

This range type is usable with iterators of type :

Its methods are :

3.1.2 Range partition adapter range_step.

This range-type is usable when the number of input elements is NOT known. It is usable with iterators of type std::random_iterator_category, std::forward_iterator_category and std::bidirectional_iterator_category, with equivalent performances.

The arguments of this range are of course the beginning and ending iterators.

A third one is the number of steps, that is, the size of the slices that are chopped form all ranges, each time a sub-thread gets some tasks to compute.

Another - optional argument is a reference to a mutex. If this argument is not provided, it means that atomic locks will be used : Beware that this is much more complicated to use, may imply more developments, and is much more difficult to test.

There is another - optional - argument for this range, which is the minimum number of elements, under which it is not necessary to create a sub-thread. This number is used to reduce the number of actually used threads. It can be based on the cost of incrementing an iterator compared to the cost of processing one element.

The strategy behind using this kind of range, is that all sub-threads will chop slices of elements, i.e. pick up work to do, from the input range, one at a time, until the input range is empty. Because this access can only be exclusive, it needs some sort of synchronization. Therefore, there are two distinct sub-types of range_step.

3.1.3 Range partition adapter range_jump.

This is an implementation of interleave scheduling. If there are N threads, each thread picks up one element out of N. The implementation wraps the iterators into another type of iterators which silently increments of N, when accessing the next element.

The characteristics of this approach are:

3.2 Output ranges

They are defined by a single iterator, given as an output iterator to the algorithm.

4 Algorithms

4.1 Introduction

4.1.1 Why having chosen these algorithms ?

Many development are done along with the development of algorithms. Therefore, the order at which the algorithms development is done, has a strong impact on the features which are developped.

4.1.2 Common structure

4.1.2.1 Functor class

Two methods operator()(void) and operator()( thread_tree ).

4.1.2.2 Method run()

4.1.3 Order of output results.

Each time an algorithm has an output iterator, results are written to it. The execution in parallel may change this order. In fact, will always change it.

There are several ways to keep or restore the original output order, or to create a specific order in the output. By original order, we mean the order the output elements would have in sequential mode.

4.2 rpa::transform_t with one input iterator

4.2.1 Introduction

The class rpa::transform_t models the parallel execution of the algorithm rpa::transform. It has several template arguments:

Its development has implied the following features:

This algorithm can also be used to implement the behaviour of std::copy or std::for_each.

4.2.2 Creation

The arguments of a creation of an object of the class rpa::transform_t are :

The helper function rpa::make_transform is designed to make this creation very simple.

4.2.3 Execution

There are two possible execution modes for such a functor:

4.3 rpa::accumulate_t

The class rpa::accumulate_t models the parallel execution of the algorithm std::accumulate. It can also be used for implementing thebahavour of the algorithm std::count. It has several template parameters:

4.3.1 Introduction

Its development has implied the following features:

4.3.2 Creation

4.3.3 Execution

4.4 rpa::remove_copy_if_t

4.4.1 Introduction

Its development will imply the following features:

It will of course benefit from all the previous features (Output buffering as an example).

To be implemented. Probably impossible with atomic iterators ?

4.4.2 Creation

4.4.3 Execution

4.5 rpa::transform_t with two input iterators

4.5.1 Introduction

Its development will imply the following features:

It will of course benefit from all the previous features (Output buffering as an example).

4.5.2 Creation

4.5.3 Execution

5 Buffered output iterator : rpa::obuf_iterator

5.1 rpa::obuf_iterator presentation

rpa::obuf_iterator encapsulates output iterators. Its role is to keep the properties of existing STL iterators, and at the same time allow template specializations and all manipulations, in order to parallelize the code. In other words, it is our own thin layer between STL iterators and STL algorithms. Here, this is the general case. This class must also manages the various combinations of parameters for the constructors. It modelizes :

When writing into an output iterator, it can be desirable to have a separate buffer for each sub-thread :

This library provide a specific iterator adapter, rpa::obuf_iterator, for providing this feature, with specific template specialization.

5.1.1 Why thread-specific ?

Many STL algorithms have an output iterator where the result must be written. If several threads execute this algorithm at the same tine, they will all write into the same output which will have to be protected by a mutex: This will be a major performance bottleneck. The idea of using output buffers is to give a buffer to each sub-thread, and periodically flush these buffers into the output iterator.

5.1.2 Behaviour

Each subthread read its input from the range, as in the normal behaviour. But it writes its results into the output iterator of an output buffer. At the end of the subthread execution (reduction), the buffer is flushed into the output iterator. Depending on the output iterator type, several optimizations are possible. For exemple :

5.2 rpa::obuf_iterator with and without size limits

Two very different cases : They are very different because the scheduling has to be changed.

5.2.1 Limited-size buffers : Output buffer with a maximum number of elements.

The scheduling must be changed : Each sub-thread has its functor (Which does only a part of the job of the main functor), rescheduled with dynamic scheduling method (range_step). This, because the standard algorithm can be applied only on a limited number of elements, that may not be known in advance (Except for the static scheduling). There is therefore a two-level scheduling. Of course, it is tempting to use a fixed-size C-style array as a buffer. But, this is not the only interest of fixed-size buffer : The point in using them is that they will never be resized, except at beginning : No memory allocation will ever take place the initial allocation : This will therefore reduce the bottleneck of memory allocation.

5.2.2 Limit-less buffers : Output buffers without size limit.

The original scheduling, chosen by the calling program is not changed. At the end of the execution of each sub-threads, all buffers are flushed.

5.3 Template parameters of the output iterator adapter rpa::obuf_iterator.

	obuf_iterator< OutputIterator, Buffer, BufferIterator, Mutex >

5.3.1 Parameter OutputIterator

This is the type of iterator that should normally be used in sequential mode. It can typically be a std::back_inserter, std::front_inserter (not implemented yet), std::inserter (not implemented yet) or std::ostream_iterator.

5.3.2 Parameter Buffer

The type of buffer internally used by each thread for storing results before flushing them into the original output iterator.

5.3.3 Parameter BufferIterator

An iterator pointing on a sequence of available buffers. There must be one buffer per thread. If this type is void, the library will create a buffer on the stack of each thread, as an automatic variable. Beware that if the buffer is a big and if there is not iterator of buffers, the stack will overflow.

5.3.4 Parameter Mutex

Indicates whether this is a fixed-size buffering (And this type must be a valid mutex type, that can be locked and unlocked), or infinite-size buffering (And then the mutex type will be void). For infinite-size buffering, no mutex is necessary for protecting the output iterators, because the thread-specific buffers will be flushed at sub-threads termination, and by the main thread. On the other hand, of the buffers have a fixed-size, they are periodically flushed into the original output iterator, by each of the sub-threads. This original output iterator must therefore be protected by a mutex.

5.4 Template specializations depending on the output iterator type.

They are mandatory for using a given output iterator type, because no generic solution is possible. For the moment are provided these specializations :

5.4.1 Bufferizing std::back_inserter with rpa::obuf_iterator.

If your algorithm writes into a std::back_insert_iterator you may wonder how it is possible to have several threads inserting objects into it at the same time - you're right.

If rpa::obuf_iterator gets a std::back_insert_iterator, it is able to use its second template parameter as a per-thread output buffer. Therefore, the insertion is not a bottleneck for all threads trying to insert at the same time, because these insertions are done in thread specific buffers.

Each sub-thread is given an input slice of elements, whose size is equal than the maximum size of the buffers : This makes sense because algorithms as std::transform, std::copy, std::for_each, std::remove, std::remove_if do not create elements, and even remove some of them. So it is sure that there is room enough to store them in the output thread-specific buffer.

At the end of the processing of the input slice, each thread now has to flush the content of its output buffer to the main algorithm output. This is where many possible optimizations take place :

Note: std::front_insert_iterator and std::insert_iterator are not taken into account at the same moment. It is anyhow planned, using exactly the same techniques succesfully used for parallelizing std::back_insert_iterator.

5.4.2 rpa::orow_iterator

5.4.2.1 rpa::orow_iterator and how it generalizes std::ostream_iterator.

This template class has two template parameters :

Although this class can be used exactly as a std::ostream_iterator, it is used with much benefit as fort template argument of rpa::obuf_iterator.

5.4.2.2 Added features to rpa::orow_iterator

Once this object is created, it is possible to access to the delimiter and to the output stream, with simple setters.

The first template arguments is the same, a second and optional one is added, which is the stream buffer, that the output stream is based on. This stream buffer:

5.4.2.3 rpa::orow_iterator

To summarize, rpa::orow_iterator is compatible with std::ostream_iterator with the extra possiblity to know the type of the stream buffer. This feature is necessary to speed up the flushin of thread-specific buffers into the output stream.

5.4.3 rpa::obuf_iterator specializations for rpa::orow_iterator

They are provided for speeding up the flush of the thread-specific buffers into the output iterator. It is possible to combine this kind of specialization with a template specialization based on the container, especially if the container is based in a std::basic_streambuf. For example, when plain POSIX files are used a per-thread output buffers, and if the output iterator is a rpa::orow_iterator based on a POSIX file too, some specific optimizations are available - and can be expanded.

Therefore, the goal of the adapter rpa::orow_iterator is to behave just like a std::osteram_iterator, with some extra arguments and added features :

5.5 Template specializations depending on the buffer type

This is a different matter : These specializations are not necessary if the buffers types are compliant with STL containers. The point in using them is the benefit of performances.

5.5.1 rpa::obuf_iterator specializations for std::list.

We use the fact that std::splice allows to insert a sub-list into a list in constant time. It is especially interesting for infinite-size buffers because each sub-thread has it own buffer that will be inserted into the global one, at nearly no cost. On the other hand, it is much preferable to allocate the list nodes with thread-specific memory allocators.

5.5.2 rpa::obuf_iterator specializations for std::vector.

It is possible to reserve, to pre-allocate a std::vector : This operation is done in constant time and avoids the allocation of N elements.

5.5.3 rpa::obuf_iterator specializations for rpa::row_buffer.

Up to now, we have considered only STL containers as buffers. But, it is possible to temporarily store elements into a file, a socket ... anything represented by a std::streambuf. std::istream_iterator and std::ostream_iterator makes it possible to have one thread writes its results into it, and have these elements read and flushed into the algorithm output.

It is possible to use as temporary output buffer specific to threads, a std::basic_streambuf of any type, as long as:

Specializations are available given the specific type of this std::basic_streambuf, whether it is based on a file, on memory, etc...

Extra specializations taking into account the type of the output iterator are also available : Their goal is to speed up the flush of the thread-specific buffers into the output iterator.

5.6 Automatic std::streambuf objects for buffering sub-threads output.

These tools are made for creating std::basic_streambuf derived classes with a default constructor. Having a default constructor allows to create automatically pools of buffers.

5.6.1 Temporary files used a per-thread buffers.

It is not possible to know in advance how many files will be needed : One per thread, but many not all threads may be used. More : It is cumbersome to loop over an array of file names, when POSIX functions such as ::tmpnam or ::mkstemp are there for this goal: File generators are there to wrap this functions.

5.6.2 File generators.

File generators are objects which can generate a temporary filename, or a temporary FILE *, or a temporary integer file descriptor. In this context, these temporary files are used as per-thread buffers. Some streambuf's constructors may need a file name, some others a FILE *, and some others a int file descriptor, without regular pattern.

There are three different types of file generators, which share common properties:

A couple, of file generators are provided, given the usual POSIX standards. It is not mandatory to use them, any other file generator can be created.

5.6.2.1 File generator based on a file name.

rpa::filgen_tmpnam wraps the POSIX function ::tmpnam.

5.6.2.2 File generator based on an integer file descriptor.

rpa::filgen_mkstemp wraps the POSIX function ::mkstemp.

5.6.2.3 File generator based on a FILE pointer.

rpa::filgen_tmpfile wraps the POSIX function ::tmpfile.

5.6.3 File generators conversions.

As seen before, some streambuf's constructors may need a file name, some others a FILE *, and some others a int file descriptor, without regular pattern. On the other hand, different file generators do not all have the same properties. For example, we may wish to create temporary file names having a specific pattern, and, at the same time, use stream buffers whose constructors need a ::FILE pointer.

Therefore, the role of file generators converters is to do the appropriate conversions between a file generator of XXX, into a file generator of YYY.

Not all combinations are available:

5.6.4 Wrapping a streambuf and a file generator with rpa::streambuf_tmp_XXX.

Three template classes are there to wrap a streambuf type, and a file generator, in order to create a new type a streambuf which will have a default constructor. This default constructor is mandatory to have automatically-created polls of buffers. These three classes all have two template arguments, which are the streambuf, and the file generator.

The very important point about these template classes, is that they have specializations for a couple of streambuf which do not have default constructors. These streambufs can now be used as 'anonymous' buffers, automatically created, one per sub-thread, when they are needed, as easily as a std::basic_stringbuf. These classes are not strictly part of this library, not mandatory, but they make integration into existing software, much easier and smoother. Of course, any kind of specializations for other stream bufs can be added.

Specializations are available for:

5.7 rpa::row_buffer specializations for fast flushes.

When the input and output are based on descriptors, this problem is close to file copies. Some technical information are available here: http://www.developerweb.net/forum/archive/index.php/t-4291.htm.

The template row_buffer can be used just like a container, as second argument of the template obuf_iterator.

row_buffer has three arguments :

This optional third argument can be explicitely given, mostly for performances.

5.7.1 Flushes using memory mapping.

If each sub-thread has a FILE as output buffer, it is possible to map this file in memory. Then, for flushing into the output, one just need to ::mmap() it, and copy it to the output iterator. This is done by using explicitely the template classes rpa::augm_stdio_filebuf_fd_mmap, rpa::augm_stdio_filebuf_FILE_mmap, etc... depending on the type of the streambuf used as container. It is possible to override the size of memory segments.

5.7.2 Flushes using file copies.

Just like memory mapping, it is possible to flush the buffer with plain file copies. It is possible to specify the size of the buffer used for reading and writing.

5.7.3 Flushes using ::sendfile and zero-copy techniques.

The system call ::sendfile() allows, on some platforms, to copy N bytes from one file descriptor to another. It manipulates kernel buffer so that no user copies are necessary.

Note that on Linux 2.6, it does no work with file descriptors : See why here: http://www.cs.helsinki.fi/linux/linux-kernel/2001-02/0684.html.

And I'd rather tell people using ::sendfile() that you get EINVAL if it isn't able to optimize the transfer.

5.8 Atomic output iterator buffers.

It is perfectly possible to provide an atomic specialization of an rpa::obuf_iterator, although none is provided here for the moment. Here are its requirements :

6 Archiver pipelines and circular pipelines.

We have seen that the RPA library is able to parallelize algorithms by cutting data ranges into pieces. It is able too, to bring extra parallelism with pipelining, by having a data flow running from one thread to another (and to another, and to another), just like Unix commands can be piped with the pipe, "|", operator.

Conceptually, the idea consists simply of storing the output of thread executing an algorithm into a container, and let another thread, running another algorithm, pick up data from this container, used as a buffer. The difficulties are mostly in the synchronization. They can have any number of readers and writers.

There are two sorts of pipelines:

These pipelines have an interface compliant with STL containers:

They have two template parameters:

Elements can be read or written by two different ways:

6.1 Conditions

Pipelines need some sort of condition variables, for synchronization, called 'conditions'. Said shortly, the class condition implemented in rpa/condition.h is an abstraction of a POSIX condition variable. This may be surprising, because until now, the library RPA tries to be as abstract as possible. There are several reasons for this :

Therefore, abstracting POSIX conditions variables is not as general as mutexes and threads, but it is probably the most general architecture for this feature.

6.1.1 Conditions interface

Conditions have a single template parameter which is the mutex type used for synchronization.

They are required to have three methods :

One can see in the following links that it is very easy, by wrapping an existing condition variable, to have this interface :

6.1.2 Available implementations for conditions.

6.1.2.1 Implementation with any mutex type and an atomic boolean.
6.1.2.2 Implementation based on a POSIX condition variable.

6.2 Class pipe_archiver

6.3 Class pipe_circular

A circular buffer is a generalisation of the usual buffer made of a static array, a mutex and two conditions variables such as this one : http://www.cs.albany.edu/~sdc/CSI400/Fal02/Lecture17.html. It is also called the producer-consumer problem.

6.3.1 STL-compatibility features.

The class pipeline is designed to add pipelining capabilities to the STL classes and algorithms :

6.3.2 RPA features.

The classes pipe_archiver and pipe_circular are designed to be integrated into the RPA library.

Any condition variable and mutex types can be used, as long as the condition variables are based on this mutex type.

6.3.2.1 Derived thread
6.3.2.2 Creation of a range_step out of a circular buffer.

6.4 Method shut() and shutter threads.

By definition, a pipeline can receive data at any moment, in an unpredictible manner. Therefore, by default, it is impossible to tell whether a pipeline is empty or not. So, if a thread is reading input data from a pipeline, it will never finish. There is a mechanism to set a pipeline as shut, closed, analogous as an end-of-file marker, or a connection closing. Pipelines classes have a method shut(), which sets an associated internal flag. When a thread reads data from a pipeline which is in the shut state, and if there are no more data to read, the pipeline is considered as empty, that is begin() == end() and the reading thread stops waiting.

To activate this mechanism, the writing thread must call the method shut() after all data are written. In some circumstances, it is not convenient to call this method: for example, if the writing thread just runs a parallel algorithm, this thread executes the STL algorithm and then leaves: There is no 'room' for performing extra termination actions. Therefore, a thread derivation is available (shutter_thread) whose effect is, at thread exit, to call the method shut() on the output pipeline.

7 Interface with the Operating system, synchronization objects.

7.1 Portability : Independance toward hardware architecture.

7.1.1 Presentation

This library is portable, and independent of any operating system. Many examples, template specializations are available, but no system library is necessary.

This library does not enforce the use of any operating system call, does not do any call to any external function (Except in debug mode, for displaying results ):

There are very specific reasons not to provide any operating-system level feature :

Choosing the 'right' synchronization object is extremely difficult, if possible. In some situation, a pthread_mutex_t might be the right choice, and in some others, a pthread_spinlock_t will be better. It might be necessary to select a specific POSIX pthread library release. So, it is not possible to do this choice for a library.

Synchronization primitives evolve very quickly : They have a specific version for each target architecture and operating system, they may be buggy. Developing these low-level system objects is a 100% high-level job, even for one architecture. It is a highly specialized job. So, it is not reasonable to select even a subset of synchronization object. Instead, this library gives all the tools to adapt to the available ones, at time T, and on the current platform.

This is a common practice in C++ libraries to separate mechanism and policy. This library let the user the total freedom to choose or develop its threads and mutex, as long as they match a very simple programming interface. By using templates and inlined code, this library guaranty that :

7.1.2 Mutexes interface.

Some properties of the mutex used in the library :

7.1.2.1 Method lock

This method role is as expected to lock the mutex. No return value is expected and the only way to signal an error is with an exception : There is no mechanism in this library to handle errors of datatypes it knowns nothing about.

7.1.2.2 Method unlock

Same as lock : No return code, no error management.

7.1.2.3 Method trylock

As expected, its role is to do a plain trylock of the mutex and, as usual, the return value is false if the mutex was locked before, and true if it was not locked, but this method was succesful to lock it now.

This method is optional and necessary only for some threads adapters (For reusing already running threads).

7.1.3 Threads interface.

Threads are never copied nor assigned. All their methods must be thread-safe.

7.1.3.1 Method create

This basically works just like pthread_create function of the POSIX thread library : This method gets a function pointer and executes it. This method is mandatory.

7.1.3.2 Method join

This method, which basically works just like pthread_join, is mandatory too.

7.1.3.3 Method cancel

This method is modelled after pthread_cancel. It is not mandatory, but NO default implementation is provided. If it is available (For exemple based on pthread_cancel), it allows the efficient use of some thread adaptor. It is not mandatory and its use is not implemented yet.

7.1.3.4 Method exec_lock

The implementation of this method is not mandatory. A minimal implementation simply returns false, which means that it was too late for the main thread to execute the function before the sub-threads starts it : This behaviour is consistent with exec_lock requirements.

An implementation of exec_lock (Non portable, of course) can bring a performance benefit, if it is possible for threads to have a fine control of scheduling and priorities.

7.1.4 Atomic operations

7.1.4.1 Needed features

There are several places where atomic access to data are needed :

The technical needs are :

7.1.4.2 Implementation

On many platforms, some atomic types already exist, can be very fast, are absolutely not standardized, even on a given operating-system. On the other hand, when such data type is not available, it is always possible to provide an implementation based on classic mutexes. The file rpa/atomic.h, and the template class atomic provide a way to encapsulate any platform-specific atomic data type, and if none exists, to use a pseudo one, using the template class atomic_mutex, internally based on any type of mutex.

Of course, when no atomic type is available on a given platform, it is a poor solution to simulate them, and it is much better to use instead, features based on plain mutex. But some applications may lack of the necessary flexibility.

Added to the specific atomic features that a given operating-system or processor may provide ( For example http://www-128.ibm.com/developerworks/library/pa-atom/ ) , there are many libraries, commercial, open-source etc... provide the needed features. For example :

The architecture of the library makes it very easy to specialize any atomic operations, in specific use, with no or extremely small overhead (Using inlining). One just need to add a template specialization such as the examples provided in rpa/architectures/SunOS_sparc.h or rpa/architectures/linux_386.h.

7.1.5 Compare-and-swap operations.

7.1.5.1 Presentation and needs.

Strictly speaking, 'Compare And Swap' operations are atomic, but they are needed for a very specific feature, which is totally different feature of the ones needed by other atomic operations. Here, the need is in dynamic scheduling ( range_step ), when a new slice must be extracted ( with the method chop ) from one or several ranges ( At least one range_step , atomically synchronized i.e. without mutex ).

Depending on the type of STL algorithm, we need compare-and-swap operations with several inputs, that is, one input per sequence involved in the algorithm. For example :

7.1.5.2 Implementation of compare-and-swap

On many platforms, some Compare And Swap features already exist, can be very fast (Sometimes specific processor instructions such as Motorola 68000 CAS and CAS2 ) , are absolutely not standardized, even on a given operating-system. On the other hand, when such data type is not available, it is always possible to provide an implementation based on classic mutexes.

7.1.5.3 Possible user implementations (TO BE CONTINUED)

We are aware that this is a very high requirement for architectures, very far from what is actually available. There are on the other hand some libraries which are able to implement a N-level compare_and_swap instruction :

Like for atomic operations, it is very easy to use any library or processor instructions, by specialising the template classes :

TO BE CONTINUED. TEMPLATE SPECIALIZATIONS.

8 Debugging and monitoring adapters.

Multi-threaded programs are exceedingly difficult to debug. A useful techniques is to control synchronization objects such as mutex and threads, 'from the inside'. This allows, for example, to check :

There are specific adapters to add these features to existing objects, without changing their basic applicative features. In other words, it is possible to add run-time monitoring features to an application, just by encapsulating some into templates.

The specific architecture of this library allows to stack multiple adapters : Simply because all synchronization objects must have a well-defined interface that can be easily overloaded.

8.1 Functors adapters.

Some algorithms accept a functor, which is a unary function, as argument : std::transform, std::accumulate, std::count_if, etc...

The interface of functors is exteremely simple, it is the one of std::unary_function objects

8.2 Mutexes adapters.

The interface of our mutexes is extremely simple (lock, unlock, trylock) and it is possible to wrap any mutex (Especially users mutex) in template type. The result are objects which still have the interface of a mutex - and can be used as such - but with extra features.

8.2.1 Mutex adapter mutex_count

Each lock, unlock and trylock operation involves the incrementation of internal counters. More, in debug mode, an invariant is evaluated (Nb locks + number of successful trylocks = nb unlocks) for debugging purpose.

8.2.2 Mutex adapter mutex_stats : NOT IMPLEMENTED YET

Counts the average number of threads waiting for a mutex, the average time to wait, etc... The plan is to use this adapter to evaluate the parallelism rate.

8.3 Threads adapters.

As long as a thread type has the mandatory minimal interface of the methods create() and join(), it can be used in this library. Therefore, it is very easy to wrap an existing thread type into another one, adding extra features, but still providing the same interface.

8.3.1 Adapter for reusing threads.

The cost of the creation of a thread may be important. This is why using a pool of threads, reusing them instead of creating them at each call, is an efficient technique. This adapter creates a single thread and blocks until the create method is called. After the function call termination, it blocks again the next call to create. For safety reason, an efficient technique for debugging an application can be to compile it, at first, without threads reuse and, when it is properly debugged, adds the adapter for threads reuse.

8.3.2 Thread adapter thread_counter.

This adapter counts each time a thread is created, cancelled, joined and computes an invariant with these counters, for inconsistency detection at run-time. As usual, the object created with the instantiation of any thread with the adapter is still a thread. Note that thread adapters are stackable.

8.4 Pseudo-threads

Again, multi-threaded applications are difficult to debug. A good technique would be to debug and check it, first, without any multi-threading. We propose here a set of 'void' threads, which have exactly the interface of a thread, but executes the code code serially. Therefore, this eliminates all bugs coming from synchronization or race conditions problems. When all 'serial' bugs are eliminated, it is simple to switch to multi-threaded mode, simply by replacing a thread type (A 'void' thread), by another one. If all user functors, containers and iterators are thread-safe, no bug should be introduced by the library.

8.4.1 Pseudo-thread thread_crea.

This thread serially executes the function pointer it is passed to, just at creation.

8.4.2 Pseudo-thread thread_join.

This thread serially executes the function pointer it is passed to, when the thread is join-ed.

9 Thread trees : The data structure grouping threads together.

This chapter explains how the sub-threads are given to the algorithms.

9.1 Goal of threads trees

Anyway, the complexity of these subthreads aims at implementing :

Each algorithm, at execution time, receives several threads for execution. These threads are passed to operator ()( thread_tree ). They are passed in a templatized structure called a thread tree. Threads are gathered into trees, to allow recursive multithreading. Each level is itself another tree. At each level, the threads may have a different type. But, on the same level, they must have the same type (Or inherit from the same base class). All types, except the number of sub-threads, can be templatized.

9.2 Building a thread tree

A thread tree can be easily built with the helper function make_thread_tree. The parameters of a thread tree are :

There are for the moment the moment tzo restrictions to the complete use of sub-threads:

It would be possible to build a thread tree with the beginning and the end of the available threads. But, we always need the number of threads. So, this constructor gives a chance to reuse this number, instead of having to compute it with std::distance, which may be a costly operation. A simple example of thread_tree construction is:

thread_tree< pthread_t * > anyThreadTree( 10, new[] pthread_t(10) );

9.3 'Head' thread for pipelining and detached threads.

**************** Jusqu ici on gardait le controle. Introduire les futures. **************

9.4 Using thread_array

This is a helper to allocate 'on-the-fly' an array of sub-threads.

9.5 Passing threads by value, thread_copy.

Threads are normally not copiable, but in some situations, threads wrappers might be created on the fly, for example a shutter thread for a pipeline, which is a thread in charge of 'shutting' a pipeline after the last insertion into it. It is of course possible to write this temporary thread into a variable, and later manipulate its address, but it is much simpler to use it as a value. It is possible with thread_tree which can work with thread addresses, as we have just seen, but with thread values too, by wrapping them into a thread_copy, which is a sort of counted smart pointer to a thread, where the last reference is decremented when the thread finishes.

9.6 NUMA

Not implemented yet. The idea is to have the thread tree matching the structure of the machine : If a functor can use several threads, it is worth to have the different nodes of the NUMA machine at the top level of the threads threes, each functor will then receive a node of the NUMA machine, each node containing a set of threads, running on the same node.

9.7 Available thread number / Used thread number

The algorithms are not enforced to use all the threads that are available in a thread tree. For this, they use for example the minimum number of elements, under which it is not necessary to create a thread. Each range, given the number of elements, the minimal numbers given, or any other parameter, is free to use any number of threads, as long as it uses them in the order of the threads iterator stored in the thread tree.

10 Development methodology

To pull the best benefit of mutli-threading in general and of this library specifically, it is worth to follow some steps. The idea is to solve one problem at a time, in order to control and isolates problems due to multi-threading, such as performance and race-conditions.

10.1 Single-threaded debugging.

First, write and debug your developements only with pseudo-threads such as rpa::thread_crea and rpa::thread_join. At this stage, it will be sure that this application does not have bugs in sequential mode. Therefore, that all future bugs come from :

10.2 Multi-threaded debugging.

The assumption of this library is that everything must work the same, provide the same results, whatever the type of threads and mutexes you choose. So, in this step, you will use real threads (not pseudo-threads) and mutexes, and tests your application. The goal of this step is not to enhance performances, but check that the code is correct.

Remember that the order of results may be totally different depending on the run, the mutex and threads types, etc...

If any bug appears here, you can assume that it comes from a race condition, because it was tested in single-threaded mode, in the previous step. Theoretically, all race conditions should come from your code, because this library assumes that there are no synchronization dependencies between its code and yours.

A good heuristic to detect race conditions is to try several types of threads and mutexes, with several adapters.

10.3 Single-threaded profiling

Your application must be fast and efficient, even in single-thread mode. Just simply because very often, it will run on a unique thread. So, with pseudo-threads, you must do a profiling step. Simply compile your application with optimizations, then check that it is fast. It is time to choose specific kind of buffers. For the moment, forget about sizing (Buffer size, chunk size, etc... ), because in single-threaded mode, it will lead to false values. You can also choose a scheduling type. Specifically, everything which is decided at compile-time, except thread types and mutex types, must be chosen at this step. But the run-time parameters (Buffer size, chunks, number of threads) will be determined at the next step, and may be changed at run-time.

10.4 Multi-threaded profiling.

At this stage, you can assume that your application is debugged, and run efficiently enough in single-threaded mode. You can then assume that performance problems will come from :

Numeric parameters can be dynamically adjusted (See functions giving the number of processors).

10.5 Other problems :

11 Resources, stack size for threads.

11.1 Stack sizes

Some space is allocated on the stack for each thread. Spawning threads is a recursive process: The main thread starts the first sub-thread with some arguments for it on the stack, then calls recursively the same procedure for starting the next sub-thread. The reason for this is that it is not necessary to allocate space for threads, because allocating memory is a lengthy process. More: The way this is programmed, allows terminal recursion optimization, yielding no recursive calls at all.

Anyway, if terminal recursion optimization does not apply, some space is needed for the stack for each sub-thread: This is basically the arguments given to the sub-thread: Iterators, etc...

If thread-specific buffers are used, with obuf iterator, more data are needed: Buffers, etc... If thread-specific buffers are allocated on the stack, this can imply a very big stack, and probably a stack overrun. It is anyway possible and easy to provide a pool of buffers, already allocated by the main thread, and reusable. It is strongly advisable to do so.

11.2 Memory allocators

It is strongly advised that each sub-thread uses thread-aware memory allocators : A memory allocator needs to refer to a memory pool and, if this pool is shared among threads, all of them will have to wait at each allocation or deallocation. By the way, this is to overcome this problem that so much care is put in this library to avoid any dynamic memory allocation.

12 Examples.

This library may seem complicated, but it is in fact very simple to use and control. For clarity purpose, only the most important lines are shown.

12.1 Simple examples without output buffers

12.1.1 Concatenating a std::list of std::string-s.

The order is not taken into account. This example is actually implemented in the tutorial t04. Because the input is in a list, we can suppose that we do not know the number of elements. Therefore, it is worth using dynamic scheduling.

pthread_t threadsArray[10];
pthread_mutex mtxAccum ;

std::list< std::string > strLst ;

int nbSteps = 100 ;

std::string concatString
    = rpa::make_accumulate(
        rpa::make_range_step( strLst.begin(), strLst.end(), nbSteps, & mtxAccum ),
        "" )( rpa::make_thread_tree( threadsArray, 10 ) ;)

12.1.2 The square root of a set of floats, written to a file.

We note that the price of incrementing the iterators is very high compared to the time needed for computing a square root. Therefore, this example is not very realistic : It would be much faster to put the data in a vector. The interleaved schedule does not need a mutex. Anyway, for writing into an output iterator, one needs a mutex anyway.

std::set< float > fltSet ;
size_t threadsNb = 5 ;
std::vector< pthread_t > threadsVector( threads_Nb );
pthread_mutex mtxFile ;

std::ofstream outFile( "squares.dat" );

rpa::make_transform( rpa::make_range_jump( fltSet.begin(), fltSet.end() ),
                     rpa::make_iterator_lock(
                         std::output_stream_iterator< float >( outFile, "-" ),
                         mtxFile ),
                     ::sqrt )( rpa::make_thread_tree( threadsVector ) );

12.1.3 A functor applied to a set of floats, written to a POSIX file.

This is basically the same example as 8.1.2. The differences are that:

We note that the price of incrementing the iterators is very high compared to the time needed for computing a square root. Therefore, this example is not very realistic : It would be much faster to put the data in a vector. The interleaved schedule does not need a mutex. Anyway, for writing into an output iterator, one needs a mutex anyway.

std::set< float > fltSet ;
size_t threadsNb = 5 ;
std::vector< pthread_t > threadsVector( threads_Nb );
rpa::thread_tree< std::vector< pthrad_t >::iterator > thrTree( threadsVector.begin(), threadsNb );

FILE outFile = ::fopen( "squares.dat", "w" );

std::ofstream outFile( "squares.dat" );

rpa::make_transform( rpa::make_range_jump( fltSet.begin(), fltSet.end() ),
                     rpa::make_iterator_lock(
                         std::output_stream_iterator< float >( outFile, "-" ),
                         rpa::posix_file_mutex( mtxFile )  ),
                     std::bind2nd( ::pow, 0.5 )  )( thrTree );

12.1.4 Calculating the length of strings which are stored in an array.

This shows the ability of the library to work even with 'old-style' objects - which very often are the most efficient ones. The generated code is very small and efficient too, the synchronization between threads is straightforward, they work totally independently. One can note that several threads will access the output array simultaneously, which may yield some cache thrashing, the end of an output slice being close to the beginning the the slice of the next thread. But note that if all threads are equally balanced, they will progress in their slices simultaneously too, and when one thread will write to the end of its slices, the next thread will have already written to the beginning of its own slice.

char chrMatrix[ 100 ][ 500 ];
size_t lenStrings[ 100 ];

rpa::make_transform( rpa::make_range_step( chrMatrix, chrMatrix + 100 ),
                     lenStrings,
                     ::strlen )( rpa::make_thread_tree( thrTree, 5 ) );

12.2 Example with output buffers

In this section, we explain the progressive transformations from simple examples to more sophisticated ones.

12.2.1 Square root of a std::set< double >, appended to a vector.

In all the following examples, we assume the following conditions :

This all is described in this header, common to all the code related to this example :

#define NB_THREADS 5
pthread_t thrArrs[ NB_THREADS ];
thread_tree< pthread_t > thrTree( NB_THREADS, thrArrs );

const double cstDbl[] = { 1.41421, 2.71828, 3.14159 };
std::set dblSet( cstDbl, cstDbl + 3 ); ;

typedef std::vector< double > DblVecType ;

DblVecType vecDbl ;

pthread_mutex_t mtxIn ;

12.2.1.1 No output buffers.

The execution without output buffers would be:

rpa::make_transform(
    rpa::make_range_step( dblSet.begin(), dblSet.end(), 10, &mtxIn ),
    std::back_inserter( vecDbl ),
    ::sqrt )( thrTree );
12.2.1.2 The thread-specific buffer is std::deque, without size limit.

Here, we assume thread-specific buffers of type std::deque<double>, without size limit. They are therefore flushed into the output iterator at the end of each sub-stream. No buffer pool is provided, which means that they must be created at each executino, and allocated on the stack of the main thread. The execution with output buffers can be:


rpa::make_transform(
    rpa::make_range_step( dblSet.begin(), dblSet.end(), 10, &mtxIn ),
    rpa::obuf_iterator<
        std::back_insert_iterator< DblVecType >,
        std::deque< double >,
        void,
        void >( vecDbl ),
    ::sqrt )( thrTree );
12.2.1.3 The thread-specific buffer is std::deque, with a size limit.

Here, we assume thread-specific buffers of 256 elements of type std::deque<double>. The benefit is that the flushing of each output buffer is done by the sub-threads themselves, piece by piece. No buffer pool is provided, with the same consequences as before. The execution with output buffers can be:

pthread_mutex_t mtxOut ;

rpa::make_transform(
    rpa::make_range_step( dblSet.begin(), dblSet.end(), 10, &mtxIn ),
    rpa::obuf_iterator<
        std::back_insert_iterator< DblVecType >,
        std::deque< double >,
        void,
        pthread_mutex_t >( vecDbl, mtxOut, 256 ),
    ::sqrt )( thrTree );
12.2.1.4 The thread-specific buffer is std::deque, with a size limit and a buffers pool.

Here, we assume thread-specific buffers of 256 elements of type std::deque<double>. The benefit is that the flushing of each output buffer is done by the sub-threads themselves, piece by piece. A buffer pool is provided: The algorithm will exclusively use the buffers given as arguments : They can be reused across several executions. Therefore, it is not necessary to create and delete them several times. This buffer is a plain C array, but any container can be used, as long as there is one buffer per thread. The execution with output buffers can be:

pthread_mutex_t mtxOut ;

typedef std::deque< double > BufTyp ;

BufTyp bufPool[ NB_THREADS ];

rpa::make_transform(
    rpa::make_range_step( dblSet.begin(), dblSet.end(), 10, &mtxIn ),
    rpa::obuf_iterator<
        std::back_insert_iterator< DblVecType >,
        std::deque< double >,
        BufTyp[ NB_THREADS ],
        pthread_mutex_t >( vecDbl, bufPool, mtxOut, 256 ),
    ::sqrt )( thrTree );

12.2.2 Square root of the set of doubles, appended to a file. The buffer is a file.

13 Contents

Table des matières