RPA is an open-source portable C++ library of multi-threaded versions of some STL algorithms (transform, accumulate...), with standard containers and iterators. It can use any thread, mutex, spinlock. It is close, by some aspects, to Intel TBB, mptl, HPC++ or PSTL. It all came from an idea of Alexander Stepanov.
Imagine you want to apply the STL algorithm std::transform
to a pair of big arrays of numbers, with a thread-safe functor :
#define BIG 1000000 double Input[ BIG ], Output[ BIG ] ; std::transform( Input, Input + BIG, Output, std::negate<double>() );
Given the number of iterations, if you have a dual-processor machine, it would be tempting to parallelize this task, by having two threads, each working on parts of the original input and output arrays. The first thread would compute :
std::transform( Input, Input + BIG/2, Output, std::negate<double>() );
... and the second thread would compute :
std::transform( Input + BIG/2, Input + BIG, Output + BIG/2, std::negate<double>() );
Let's check quickly the basic multi-threading constraints :
std::negate<double>()
has no side effects.
There is no reason why it should not work, and you can do it this way:
rpa::transform( rpa::make_range_size( Input, Input + BIG ), Output, std::negate<double>() ) ( rpa::thread_array< rpa::posix_thread >(2).tree() );
rpa::transform
is the parallelized version of std::transform
, which uses an array
of two threads of type rpa::posix_thread
, which are thin and portable wrappers
around POSIX threads. You could use any number of threads, and any type, as log as they match
a simple interface of three methods.
The number of threads is used to split the input range into two equal parts. Because the
output iterator is random_access_tag
, it can be split into two parts. Therefore,
the two threads can run independently.
pthread_mutex_t
, pthread_spinlock_t
, etc...
Several kind of threads are provided with different level of performances. It is easy to add your own kind of thread, to fully benefit of a specific operating system, or add your own features.
It is possible to change the kind of thread of any algorithm, without any other change. You just replace a datatype by another.
Among these kind of threads, pseudo-threads are provided. They allow to execute the algorithms transparently, with the same code than with real threads. The goal for this is to :
It is also possible to use your own kind of mutex. This allows to compare the performance boost of different types of mutex, or use efficiently a specific kind of non-portable mutex (Spinlocks, futex, etc...) You may also use any proprietary library, by instanting its objects with this open-source library: This gives you the advantage of both proprietary and GPL software.
We list here different possibilities, which are orthogonal: You can combine any input with any output.
transform
reading from a vector and inserting into a list, static scheduling.We use again range_size
which wraps any pair of random_access_iterator_tag
iterators, which must simply be thread-safe, so that several threads can read from them
simultaneously. This range type formalizes what is called 'static scheduling'.
Several threads will read simultaneously from a vector, which is possible
because its size is known in advance. On the other hand, the insertion must be protected
by a mutex. RPA provides a wrapper around any kind of output iterator, iterator_lock
.
std::vector<double> inVec ; std::list<double> outLst ; rpa::posix_mutex mtxOut ; std::vector vec ; rpa::transform( rpa::make_range_size( inVec.begin(), inVec.end() ), rpa::hold_lock( std::back_inserter( outLst ), mtxOut ), std::negate<double>() ) ( rpa::thread_array< rpa::posix_thread >(2).tree() );
transform
reading from a list, writing to the screen, with dynamic scheduling.We introduce range_step
which applies to any type of forward_iterator_tag
iterators. The algorithm 'chops' from the input range, 'slices' of elements which
are processed by a sub-thread. It is not needed to know the total number of elements to
process. The size of slices must reflect the CPU cost of processing one element.
The input range must be protected by a mutex, of any type. We use here rpa::posix_mutex
which is a very thin wrapper around POSIX mutex, with a very simple interface (lock
and unlock
are the unique required methods).
This range formalizes what is called 'dynamic scheduling'.
The output iterator must be protected too, by another mutex. We reuse the wrapper
rpa::iterator_lock
. We will see later that it is possible to bufferize
the output to avoid a mutex lock/unlock for each write.
Note that the order of the output elements can, and will be different, of the result
of a sequential algorithm. If this is a problem, you can insert the elements
into a sorted container such as a std::set
.
Note: It is possible to avoid mutex in dynamic scheduling, if you can provide input iterators with atomic access.
std::list<double> inLst ; size_t chunk = 10 ; nb_thr_t nbThrds = 10 ; rpa::posix_mutex mtxIn ; rpa::posix_mutex mtxOut ; rpa::transform( rpa::make_range_step( inLst.begin(), inLst.end(), chunk, mtxIn ), rpa::hold_lock( std::ostream_iterator<double>( std::cout, "," ), mtxOut ), std::negate<double>() ) ( rpa::thread_array< rpa::posix_thread >(nbThrds).tree() );
transform
with std::list
as input and output, with interleaved scheduling.Now, imagine that your input is a very long list, so long that you do not want to calculate its size because you would have to reach the end, and bring it into memory even before computing into it. But you would like to avoid an extra mutex, because its cost would be too big, compared to the cost of processing one element. There is a solution, called 'interleaved scheduling', which consists of having N threads, each of them reading one element, than iterating N times to get the next one. As the price of iterating over a list is small, and as no input mutex is required, it may still be worth.
In this example, the output inserts at the end of a std::deque.
Just to change, instead of using a functor, we just call the function double ::cos(double)
.
Of course, this function must be thread-safe.
std::list<double> inLst ; size_t chunk = 10 ; std::deque<double> outDeq ; nb_thr_t nbThrds = 10 ; rpa::posix_mutex mtxOut ; rpa::transform( rpa::make_range_jump( inLst.begin(), inLst.end() ), rpa::hold_lock( std::back_inserter( outDeq ), mtxOut ), ::cos ) ( rpa::thread_array< rpa::posix_thread >(nbThrds).tree() );
std::accumulate
with std::list
as input.The algorithm accumulate
can be parallelized too, with the same tools.
It is a bit simpler because there is no output sequence. There is a variant
of this algorithm which allows to specify an addition operator.
The other ranges can be used too: range_size, range_jump.
Of course, any type of mutex and thread can be used as long as they
have the required interface.
The 'threadibility' conditions are then :
advance(list<T>::const_iterator)
takes a time proportional to the number of elements.
std::list<float> inLst ; size_t chunk = 100 ; nb_thr_t nbThrds = 3 ; rpa::posix_mutex mtxIn ; float result = rpa::accumulate( rpa::make_range_step( inLst.begin(), inLst.end(), chunk, mtxIn ), 0.0 ) ( rpa::thread_array< rpa::posix_thread >(nbThrds).tree() );
To summarize, there are many algorithms and containers configurations that can be partly or fully parallelised with a significant benefit, depending on the size of the data set and of the time taken to process each step: Individual processing of an element, time taken by an iteration.
std::accumulate
with std::vector
as input.The big difference here is that the array of threads is statically allocated.
This imply that no memory allocation is necessary at all. And, as the input
range is random_access_iterator_tag
, no mutex is needed either.
This is of course very efficient.
The fucntion make_thread_tree
is a simple wrapper around the beginning
of the array of threads, and its size. It can as well contain a 'head thread',
we will see that later.
std::vector<float> inVec(1000000) ; static rpa::posix_thread thrArr[10]; float result = rpa::accumulate( rpa::make_range_size( inVec.begin(), inVec.end() ), 0.0 ) ( rpa::make_thread_tree( 10, thrArr ) );
************* Futures *************
The consequences of this approach are that :
This library is strongly inspired from a paper named "Range Partition Adaptors" ( http://www.stepanovpapers.com/ ) :
Range Partition Adaptors: A Mechanism for Parallelizing STL Matthew H. Austern Ross A. Towle Alexander A. Stepanov Silicon Graphics, Inc., Mountain View, CA 94043 Range partition adaptors, a new type of adaptor for the C++ Standard Template Library (STL), can be the basis for a parallel version of the STL.
What we call ranges are, conceptually, a sequence of element which is given to a STL algorithm, and are represented as an aggregate based on iterator types. There are two types of ranges : Input ranges, and output ranges. Range types have nothing to do with the iterators types : They are data structure which are orthogonal to iterator types. In other words, it is possible to combine, in a matrix, nearly any type of range with any kind of iterators.
They are defined at least by a pair of iterators of the same type : The begin and the end. Depending on the input range type, there may be other extra parameters and parameters types. Input range types determine the scheduling policy of most algorithms (See later).
range_size
.The basic idea behind this kind of range is that the input range
will be cut into slices of approximately the same number of elements
(Using the method chop(void)
), and each slice will be attributed to a sub-thread.
They will all have the same number of elements to process.
All, but the last thread which gets at least as much as, but less than twice elements,
than all the other threads (Using method tail
).
This range type is usable only when the number of input elements can be calculated. That is, the distance between the begin and the end iterators.
range_step
.This range-type is usable when the number of input elements is NOT known.
It is usable with iterators of type std::random_iterator_category
,
std::forward_iterator_category
and std::bidirectional_iterator_category
,
with equivalent performances.
The arguments of this range are of course the beginning and ending iterators.
A third one is the number of steps, that is, the size of the slices that are chopped from ranges, to be processed by the sub-thread.
Another - optional argument is a reference to a mutex. If this argument is not provided, it means that atomic locks will be used.
There is another - optional - argument for this range, which is the minimum number of elements, under which it is not necessary to create a sub-thread.
The strategy behind using this kind of range, is that all sub-threads will chop
slices of elements,
i.e. pick up work to do, from the input range, one at a time, until the input range is empty.
Because this access can only be exclusive, it needs some sort of synchronization.
Therefore, there are two distinct sub-types of range_step
.
range_step
with mutex (Synchronization with a mutex)
When reading a slice from the input iterator, this input iterator is locked,
protected against the other threads, by a mutex. Any type of mutex can be used,
only the two methods lock()
and unlock()
are used.
range_step
can only work if the input iterator
has a compare-and-swap function allowing to atomically pick up a slice of several elements.
range_jump
.This is an implementation of interleave scheduling. If there are N threads, each thread picks up one element out of N. The implementation wraps the iterators into another type of iterators which silently increments of N, when accessing the next element.
They are defined by a single iterator, given as an output iterator to the algorithm.
The main task of this library is to partition, to chop
(Which is the term used in the library)
input and output ranges, into slices, these slices being given to sub-threads for execution.
At the end of the execution, results given by sub-threads are reduced.
A slice is a sub-range of a range elements, in input or in output. An input slice,
coming from an input range, always has the members begin
and end
.
An output slice just has the member begin
. These methods results are
used as parameters when a sub-thread finally calls a STL algorithm.
A slice represents a piece of work to execute, on one of the iterators
arguments of the algorithm to parallelize.
For each type of range, several methods can return a slice.
chop
This is the first method called for each sub-thread : Its role is to return the first piece of work that a sub-thread will have to execute.
chop_again
This is used in dynamic scheduling only, where each sub-thread reuses
its current slices each time it chops more data to compute.
This cannot apply for range_size
and range_jump
because in these two cases, there is one
and only one slice per sub-thread, so no slice can be reused.
tail
This applies only for range_size
, for the last sub-thread.
Most of algorithms have one and only one input range. These are the ones we consider here. The input range contains most of the useful information we need to deduce the best possible scheduling policy that can be applied to the algorithm.
Each algorithm has one and only argument which is a range, and which has the role of determining which scheduling policy will be applied, and with which parameters. This is why the execution style - the scheduling policy, is mostly determined by the user, by choosing carefully the input range type.
RPA provides three classical scheduling policies, implemented by the way the input sequence is cut into several chunks, each of them processed by a separate thread.
There is no general cases for parallelizing an algorithm because the input sequences may have different incompatible properties, leading to impossiblities or different threads behaviours. For example :
std::random_access_iterator_tag
iterators,
making possible to calculate in advance the number of threads,
and the iterators intervals they will work on.
std::forward_iterator
,
like its iterators. It would be impossible to split this sequence into multiple intervals
(One per thread) but, because of the type of the iterators,
it would be possible to have each thread 'reserve' a sub-interval of this input sequence.
Note that this approch needs some sort of exclusive access to the sequence.
The RPA library gives three distinct possibilities to control how the threads will process these input sequences.
range_size
.It can work only if the input iterator is at least a forward iterator, and if the number of elements is known in advance. It works best when the input iterator is a random_access iterator, but this is not mandatory. The main thread’s role is to cut the input range into even intervals : Only the last one may have a different size. Then each sub-thread is given a slice of the input range. According, if there are other ranges (Both input and output) the sub-threads are given slices for these ranges in a consistent way.
range_step
.This scheduling policy needs the input iterator is at least a forward iterator. The range can be accessed by only one sub-thread at a time, and must be protected by a mutex (The easiest method, which works with all types of iterators), or atomically. Each sub-thread access the ranges and picks up slices for each of them, and then processes them indentpendently of the other. The number of elements is important: If it is too big, only one thread will be running because it will take all elements. If it is too small, there will be too much overhead.
range_jump
.Interleaved scheduling does not neeeed any mutex, but needs forward iterators. If there are NbThr sub-threads, each picks one every NbThr element. There is no need to put any lock but the input forward iterators must be thread-safe (btw, all iterators, both input and output, must be thread-safe because there are no protection for them). The order of output elements is lost, as can be frequently expected for a parallel algorithm.
This library is independant to any kind of C++ containers, and does not enforce their use. For using a specific container type, the associated definition file (containing template specializations) must be included.
Therefore, RPA is designed to allow the adding of new container types, simply by adding a new include files which will implement some template specializations.
run
)This is designed to allow terminal recursion optimisation : When the compiler can replace recursive calls by a loop.
Always the same pattern applies:
chop
-s a slice for each range,
and starts the sub-thread pointed to by the iterator, with this slice as argument.
run
method, with the next iterator.
When there are no more threads, it calls reduction
.
This method waits for each threads to terminate, with join
.
Because reduction
is called at the end of the recursive calls of run
,
all the intermediary data created for each sub-threads, are still available on the stack.
These data are linked together, by the class rpa::thread_stack
.
This class allows to wait for each sub-thread one after the other :
Any kind of reduction is therefore possible each tine a sub-thread terminates.
Some technical reasons for this design :
One of the aims of this library is to try to reuse as much code, containers, functors, etc... as possible while bringing parallelism. The ideal goal is to parallelize an algorithm just by changing one line. Anyway, this is not always possible to take the full benefit of multi-threading without more than one-line changes. This is why these specific adapters are proposed.
rpa::obuf_iterator
When writing into an output iterator, it can be desirable to have a separate buffer for each sub-thread :
This library provide a specific iterator adapter, rpa::obuf_iterator
,
for providing this feature, with specific template specialization.
A whole chapter is dedicated to this adapter.
The wrapper rpa::iterator_lock
is provided for protecting iterators against simulatenous accesses.
It transforms any kind of iterator into a std::input_iterator_tag
category iterator.
push_back
method for containers.What we call ranges are, conceptually, a sequence of element which is given to a STL algorithm, and are represented as an aggregate based on iterator types. There are two types of ranges : Input ranges, and output ranges. Range types have nothing to do with the iterators types : They are data structure which are orthogonal to iterator types. In other words, it is possible to combine, in a matrix, nearly any type of range with any kind of iterators.
They are defined at least by a pair of iterators of the same type : The begin and the end. Depending on the input range type, there may be other extra parameters and parameters types. Input range types determine the scheduling policy of most algorithms (See later).
range_size
.The basic idea behind this kind of range is that the input range
will be cut into slices of approximately the same number of elements
(Using the method chop(void)
), and each slice will be attributed to a sub-thread.
They will all have the same number of elements to process.
All, but the last thread which gets at least as much as, but less than twice elements,
than all the other threads (Using method tail
).
This range type is usable only when the number of input elements can be calculated.
That is, the distance between the begin and the end iterators.
In a way, giving this number of elements, added to the begin and end iterators,
may seem redundant. But, calculating this distance may be a costly operation.
Therefore, two constructors are available: One with the size, and the other one without the size:
In this case, it is calculated with std::distance()
. There is another - optional - argument for this range, which is the minimum number of elements, under which it is not necessary to create a sub-thread. This number is used to reduce the number of actually used threads. It can be based on the cost of incrementing an iterator compared to the cost of processing one element.
This range type is usable with iterators of type :
std::random_iterator_category
: Much more prefered.
std::forward_iterator_category
and std::bidirectional_iterator_category
: Still usable but less efficient (Due to the fact that incrementing an iterator cannot be done in constant time).
Its methods are :
begin(void)
and end(void)
.
range_step
.This range-type is usable when the number of input elements is NOT known.
It is usable with iterators of type std::random_iterator_category
,
std::forward_iterator_category
and std::bidirectional_iterator_category
,
with equivalent performances.
The arguments of this range are of course the beginning and ending iterators.
A third one is the number of steps, that is, the size of the slices that are chopped form all ranges, each time a sub-thread gets some tasks to compute.
Another - optional argument is a reference to a mutex. If this argument is not provided, it means that atomic locks will be used : Beware that this is much more complicated to use, may imply more developments, and is much more difficult to test.
There is another - optional - argument for this range, which is the minimum number of elements, under which it is not necessary to create a sub-thread. This number is used to reduce the number of actually used threads. It can be based on the cost of incrementing an iterator compared to the cost of processing one element.
The strategy behind using this kind of range, is that all sub-threads will chop
slices of elements,
i.e. pick up work to do, from the input range, one at a time, until the input range is empty.
Because this access can only be exclusive, it needs some sort of synchronization.
Therefore, there are two distinct sub-types of range_step
.
range_step
with mutex (Synchronization with a mutex)
When reading a slice from the input iterator, this input iterator is locked,
protected against the other threads, by a mutex. Any type of mutex can be used,
only the two methods lock()
and unlock()
are used.
This very 'extreme' kind of range_step
can only work if the input iterator
has a compare-and-swap function allowing to atomically pick up a slice of several elements.
For example, this could be the case for a linked list,
using the Motorola 68000 CAS
and CAS2
instructions.
Of course, it cannot be used for most input containers,
but the library can anyway use this possibility.
For tests, there is an emulation of atomic containers, using mutex.
All of this is rather experimental,
but is clearly worth because the performance can potentially be excellent.
range_jump
.This is an implementation of interleave scheduling. If there are N threads, each thread picks up one element out of N. The implementation wraps the iterators into another type of iterators which silently increments of N, when accessing the next element.
The characteristics of this approach are:
random_access_iterator_tag
and forward_iterator_tag
.
They are defined by a single iterator, given as an output iterator to the algorithm.
Many development are done along with the development of algorithms. Therefore, the order at which the algorithms development is done, has a strong impact on the features which are developped.
Two methods operator()(void)
and operator()( thread_tree )
.
run()
Each time an algorithm has an output iterator, results are written to it. The execution in parallel may change this order. In fact, will always change it.
There are several ways to keep or restore the original output order, or to create a specific order in the output. By original order, we mean the order the output elements would have in sequential mode.
rpa::range_size
:
With this scheduling policy, it is guaranteed that the output
order is not changed, and is the one of the sequential mode.
std::back_inserter
into a sorted container, bufferized with rpa::obu_iterator
,
and a limited-size buffer.
At first glance, writing into a std::back_inserter
may imply
a serialization of the final insertion. Without buffering, this is
the case.
But, when buffering the output with rpa::obu_iterator
and a buffer
of limited size, the insertions are done slice by slice, by one sub-thread,
while the other sub-threads are processing.
rpa::transform_t
with one input iteratorThe class rpa::transform_t
models the parallel execution
of the algorithm rpa::transform
. It has several template arguments:
Its development has implied the following features:
This algorithm can also be used to implement the behaviour of std::copy
or std::for_each
.
The arguments of a creation of an object of the class rpa::transform_t
are :
The helper function rpa::make_transform
is designed to
make this creation very simple.
There are two possible execution modes for such a functor:
rpa::transform_t::operator()(void)
. This mode must
yield exactly the same results as the plain STL algorithms.
rpa::transform_t::operator()(ThreadTree)
, where
ThreadTree
is any type derived from a thread_tree.
The helper function rpa::make_thread_tree
simplifies
the creation of a thread tree.
rpa::accumulate_t
The class rpa::accumulate_t
models the parallel execution of the
algorithm std::accumulate
. It can also be used for implementing
thebahavour of the algorithm std::count
. It has several template parameters:
void_functor
, the plus
operator is applied.
Its development has implied the following features:
rpa::remove_copy_if_t
Its development will imply the following features:
It will of course benefit from all the previous features (Output buffering as an example).
To be implemented. Probably impossible with atomic iterators ?
rpa::transform_t
with two input iteratorsIts development will imply the following features:
It will of course benefit from all the previous features (Output buffering as an example).
rpa::obuf_iterator
rpa::obuf_iterator
presentationrpa::obuf_iterator
encapsulates output iterators. Its role is to keep the properties
of existing STL iterators, and at the same time allow template specializations and all manipulations,
in order to parallelize the code. In other words, it is our own thin layer between STL iterators
and STL algorithms. Here, this is the general case. This class must also manages
the various combinations of parameters for the constructors. It modelizes :
rpa::orow_iterator
and std::back_insert_iterator
.
std::basic_stream
(Using rpa::row_buffer
class).
When writing into an output iterator, it can be desirable to have a separate buffer for each sub-thread :
This library provide a specific iterator adapter, rpa::obuf_iterator
,
for providing this feature, with specific template specialization.
Many STL algorithms have an output iterator where the result must be written. If several threads execute this algorithm at the same tine, they will all write into the same output which will have to be protected by a mutex: This will be a major performance bottleneck. The idea of using output buffers is to give a buffer to each sub-thread, and periodically flush these buffers into the output iterator.
Each subthread read its input from the range, as in the normal behaviour. But it writes its results into the output iterator of an output buffer. At the end of the subthread execution (reduction), the buffer is flushed into the output iterator. Depending on the output iterator type, several optimizations are possible. For exemple :
std::back_insert_iterator< std::list >
and the thread-specific buffer is a std::list
, it is possible
to flush the buffer into the output with the method std::list::splice
,
which is very fast.
std::stringbuf
,
and the output is a std::output_iterator
to a POSIX file,
it is possible to flush the buffer by one single ::fwrite
:
This is much faster that writing one byte after the other.
rpa::obuf_iterator
with and without size limitsTwo very different cases : They are very different because the scheduling has to be changed.
The scheduling must be changed : Each sub-thread has its functor
(Which does only a part of the job of the main functor),
rescheduled with dynamic scheduling method (range_step
).
This, because the standard algorithm can be applied only on a limited number of elements,
that may not be known in advance (Except for the static scheduling).
There is therefore a two-level scheduling.
Of course, it is tempting to use a fixed-size C-style array as a buffer.
But, this is not the only interest of fixed-size buffer :
The point in using them is that they will never be resized, except at beginning :
No memory allocation will ever take place the initial allocation :
This will therefore reduce the bottleneck of memory allocation.
The original scheduling, chosen by the calling program is not changed. At the end of the execution of each sub-threads, all buffers are flushed.
rpa::obuf_iterator
.obuf_iterator< OutputIterator, Buffer, BufferIterator, Mutex >
This is the type of iterator that should normally be used in sequential mode.
It can typically be a std::back_inserter
, std::front_inserter
(not implemented yet),
std::inserter
(not implemented yet) or std::ostream_iterator
.
The type of buffer internally used by each thread for storing results before flushing them into the original output iterator.
An iterator pointing on a sequence of available buffers.
There must be one buffer per thread. If this type is void
,
the library will create a buffer on the stack of each thread, as an automatic variable.
Beware that if the buffer is a big and if there is not iterator of buffers, the stack will overflow.
Indicates whether this is a fixed-size buffering (And this type must be a valid mutex type, that can be locked and unlocked), or infinite-size buffering (And then the mutex type will be void). For infinite-size buffering, no mutex is necessary for protecting the output iterators, because the thread-specific buffers will be flushed at sub-threads termination, and by the main thread. On the other hand, of the buffers have a fixed-size, they are periodically flushed into the original output iterator, by each of the sub-threads. This original output iterator must therefore be protected by a mutex.
They are mandatory for using a given output iterator type, because no generic solution is possible. For the moment are provided these specializations :
std::back_inserter
with rpa::obuf_iterator
.If your algorithm writes into a std::back_insert_iterator
you may wonder how it is possible to have several threads inserting objects
into it at the same time - you're right.
If rpa::obuf_iterator
gets a std::back_insert_iterator
,
it is able to use its second template parameter as a per-thread output buffer.
Therefore, the insertion is not a bottleneck for all threads trying
to insert at the same time,
because these insertions are done in thread specific buffers.
Each sub-thread is given an input slice of elements,
whose size is equal than the maximum size of the buffers :
This makes sense because algorithms as std::transform
,
std::copy
, std::for_each
, std::remove
,
std::remove_if
do not create elements, and even remove some of them.
So it is sure that there is room enough to store them in the output thread-specific buffer.
At the end of the processing of the input slice, each thread now has to flush the content of its output buffer to the main algorithm output. This is where many possible optimizations take place :
Note: std::front_insert_iterator
and std::insert_iterator
are not taken into account at the same moment.
It is anyhow planned, using exactly the same techniques succesfully
used for parallelizing std::back_insert_iterator
.
rpa::orow_iterator
rpa::orow_iterator
and how it generalizes std::ostream_iterator
.This template class has two template parameters :
std::ostream_iterator
.
std::basic_streambuf
,
which makes it perfectly compatible with std::ostream_iterator
.
The role of this stream buffer type is to be able
to optimize flushing of buffers into this output stream.
Although this class can be used exactly as a std::ostream_iterator
,
it is used with much benefit as fort template argument of rpa::obuf_iterator
.
rpa::orow_iterator
Once this object is created, it is possible to access to the delimiter and to the output stream, with simple setters.
The first template arguments is the same, a second and optional one is added, which is the stream buffer, that the output stream is based on. This stream buffer:
std::basic_streambuf
std::basic_streambuf
.In this case,
no template specialization is provided.
rpa::orow_iterator
To summarize, rpa::orow_iterator
is compatible
with std::ostream_iterator
with the extra possiblity
to know the type of the stream buffer. This feature is necessary
to speed up the flushin of thread-specific buffers into the output stream.
rpa::obuf_iterator
specializations for rpa::orow_iterator
They are provided for speeding up the flush of the thread-specific buffers
into the output iterator.
It is possible to combine this kind of specialization
with a template specialization based on the container,
especially if the container is based in a std::basic_streambuf
. For example,
when plain POSIX files are used a per-thread output buffers, and if
the output iterator is a rpa::orow_iterator
based on a POSIX file too,
some specific optimizations are available - and can be expanded.
Therefore, the goal of the adapter rpa::orow_iterator
is to behave just
like a std::osteram_iterator
, with some extra arguments and added features :
This is a different matter : These specializations are not necessary if the buffers types are compliant with STL containers. The point in using them is the benefit of performances.
rpa::obuf_iterator
specializations for std::list
.We use the fact that std::splice
allows to insert a sub-list into a list
in constant time. It is especially interesting for infinite-size buffers because
each sub-thread has it own buffer that will be inserted into the global one,
at nearly no cost.
On the other hand, it is much preferable to allocate
the list nodes with thread-specific memory allocators.
rpa::obuf_iterator
specializations for std::vector
.It is possible to reserve, to pre-allocate a std::vector
:
This operation is done in constant time and avoids the allocation of N elements.
rpa::obuf_iterator
specializations for rpa::row_buffer
.Up to now, we have considered only STL containers as buffers.
But, it is possible to temporarily store elements into a file, a socket ...
anything represented by a std::streambuf
. std::istream_iterator
and std::ostream_iterator
makes it possible to have one thread writes its results into it,
and have these elements read and flushed into the algorithm output.
It is possible to use as temporary output buffer specific to threads,
a std::basic_streambuf
of any type, as long as:
std::basic_streambuf
.
std::ostream_iterator
,
allows to deserialize unambiguously, with the alternate reading, from the
resulting input stream, of values and of the delimiter.
Specializations are available given the specific type of this std::basic_streambuf
,
whether it is based on a file, on memory, etc...
Extra specializations taking into account the type of the output iterator are also available : Their goal is to speed up the flush of the thread-specific buffers into the output iterator.
std::streambuf
objects for buffering sub-threads output.These tools are made for creating std::basic_streambuf
derived classes
with a default constructor. Having a default constructor allows to create
automatically pools of buffers.
It is not possible to know in advance how many files will be needed :
One per thread, but many not all threads may be used.
More : It is cumbersome to loop over an array of file names,
when POSIX functions such as ::tmpnam
or ::mkstemp
are there
for this goal: File generators are there to wrap this functions.
File generators are objects which can generate a temporary filename,
or a temporary FILE *
, or a temporary integer file descriptor.
In this context, these temporary files are used as per-thread buffers.
Some streambuf's constructors may need a file name, some others a FILE *
,
and some others a int file descriptor, without regular pattern.
There are three different types of file generators, which share common properties:
operator()(void) const
method, which
always return the same value. The data type can be a const char *
,
an int
or a FILE *
.
A couple, of file generators are provided, given the usual POSIX standards. It is not mandatory to use them, any other file generator can be created.
rpa::filgen_tmpnam
wraps the POSIX function ::tmpnam
.
rpa::filgen_mkstemp
wraps the POSIX function ::mkstemp
.
FILE
pointer.rpa::filgen_tmpfile
wraps the POSIX function ::tmpfile
.
As seen before, some streambuf's constructors may need a file name, some others a FILE *
,
and some others a int file descriptor, without regular pattern.
On the other hand, different file generators do not all have the same properties.
For example, we may wish to create temporary file names having a specific pattern,
and, at the same time, use stream buffers whose constructors need a ::FILE
pointer.
Therefore, the role of file generators converters is to do the appropriate conversions between a file generator of XXX, into a file generator of YYY.
Not all combinations are available:
rpa::filegen_name_to_FILE
: The constructor uses ::fopen
to open
the file name, and operator()(void) const
returns a FILE *
.
rpa::filegen_name_to_fd
: Uses ::open
to open the file name
rpa::filegen_fd_to_FILE
: Uses ::fdopen
to transform an int
file descriptor into a FILE *
.
rpa::filegen_FILE_to_fd
: Uses ::fileno
to extract the file descriptor
of a POSIX FILE *
.
rpa::streambuf_tmp_XXX
.Three template classes are there to wrap a streambuf type, and a file generator, in order to create a new type a streambuf which will have a default constructor. This default constructor is mandatory to have automatically-created polls of buffers. These three classes all have two template arguments, which are the streambuf, and the file generator.
rpa::streambuf_tmp_name
must get a file generator whose operator()(void)
method must return a const char *
.
rpa::streambuf_tmp_file
must get a file generator whose operator()(void)
method must return a FILE *
.
rpa::streambuf_tmp_fd
must get a file generator whose operator()(void)
method must return a int
.
The very important point about these template classes, is that they have specializations
for a couple of streambuf which do not have default constructors. These streambufs
can now be used as 'anonymous' buffers, automatically created, one per sub-thread,
when they are needed, as easily as a std::basic_stringbuf
.
These classes are not strictly part of this library, not mandatory, but they make integration
into existing software, much easier and smoother. Of course, any kind of specializations
for other stream bufs can be added.
Specializations are available for:
std::basic_filebuf
__gnu_cxx::stdio_filebuf
(Before gcc 3.3)
__gnu_cxx::stdio_filebuf
(After gcc 3.3)
__gnu_cxx::stdio_sync_filebuf
rpa::row_buffer
specializations for fast flushes.When the input and output are based on descriptors, this problem is close to file copies. Some technical information are available here: http://www.developerweb.net/forum/archive/index.php/t-4291.htm.
The template row_buffer can be used just like a container, as second argument of the template obuf_iterator.
row_buffer has three arguments :
This optional third argument can be explicitely given, mostly for performances.
If each sub-thread has a FILE
as output buffer,
it is possible to map this file in memory.
Then, for flushing into the output, one just need to ::mmap()
it,
and copy it to the output iterator. This is done by using explicitely the template classes
rpa::augm_stdio_filebuf_fd_mmap
, rpa::augm_stdio_filebuf_FILE_mmap
, etc...
depending on the type of the streambuf used as container.
It is possible to override the size of memory segments.
Just like memory mapping, it is possible to flush the buffer with plain file copies. It is possible to specify the size of the buffer used for reading and writing.
::sendfile
and zero-copy techniques.The system call ::sendfile()
allows, on some platforms,
to copy N bytes from one file descriptor to another.
It manipulates kernel buffer so that no user copies are necessary.
Note that on Linux 2.6, it does no work with file descriptors : See why here: http://www.cs.helsinki.fi/linux/linux-kernel/2001-02/0684.html.
And I'd rather tell people using ::sendfile()
that you get EINVAL
if
it isn't able to optimize the transfer.
It is perfectly possible to provide an atomic specialization of an rpa::obuf_iterator
,
although none is provided here for the moment. Here are its requirements :
rpa::obuf_iterator
should not be void
(Because a void
mutex is for unlimited-size buffers only),
but a mutex type whose lock and unlock operations do not do anything
(Because the buffer is atomic).
flush
' operation of the buffer into the output operator should be atomic.
We have seen that the RPA library is able to parallelize algorithms
by cutting data ranges into pieces. It is able too, to bring extra
parallelism with pipelining, by having a data flow running from one thread
to another (and to another, and to another), just like Unix commands
can be piped with the pipe
, "|", operator.
Conceptually, the idea consists simply of storing the output of thread executing an algorithm into a container, and let another thread, running another algorithm, pick up data from this container, used as a buffer. The difficulties are mostly in the synchronization. They can have any number of readers and writers.
There are two sorts of pipelines:
These pipelines have an interface compliant with STL containers:
begin()
and end()
methods.
value_type
, iterator
etc... defined.
They have two template parameters:
Elements can be read or written by two different ways:
rpa::transform
parallel algorithm can read at once several elements from a pipeline,
with a single condition variable synchronization.
Pipelines need some sort of condition variables, for synchronization,
called 'conditions'.
Said shortly, the class condition
implemented in rpa/condition.h
is an abstraction of a POSIX condition variable. This may be surprising,
because until now, the library RPA tries to be as abstract as possible.
There are several reasons for this :
Therefore, abstracting POSIX conditions variables is not as general as mutexes and threads, but it is probably the most general architecture for this feature.
Conditions have a single template parameter which is the mutex type used for synchronization.
They are required to have three methods :
signal
.
wait
.
One can see in the following links that it is very easy, by wrapping an existing condition variable, to have this interface :
pipe_archiver
pipe_circular
A circular buffer is a generalisation of the usual buffer made of a static array, a mutex and two conditions variables such as this one : http://www.cs.albany.edu/~sdc/CSI400/Fal02/Lecture17.html. It is also called the producer-consumer problem.
The class pipeline
is designed to add pipelining capabilities
to the STL classes and algorithms :
pipeline::back_insert
is provided
and is compatible with the helper function std::back_inserter
.
begin()
and end()
can be used
with any STL algorithm.
The classes pipe_archiver
and pipe_circular
are designed to be integrated into the RPA library.
Any condition variable and mutex types can be used, as long as the condition variables are based on this mutex type.
shut()
and shutter threads.By definition, a pipeline can receive data at any moment,
in an unpredictible manner. Therefore, by default, it is impossible
to tell whether a pipeline is empty or not. So, if a thread
is reading input data from a pipeline, it will never finish.
There is a mechanism to set a pipeline as shut, closed,
analogous as an end-of-file marker, or a connection closing.
Pipelines classes have a method shut()
, which sets
an associated internal flag. When a thread reads data from
a pipeline which is in the shut
state, and if
there are no more data to read, the pipeline is considered
as empty, that is begin() == end()
and the
reading thread stops waiting.
To activate this mechanism, the writing thread must
call the method shut()
after all data are written.
In some circumstances, it is not convenient to call this method:
for example, if the writing thread just runs a parallel algorithm,
this thread executes the STL algorithm and then leaves: There
is no 'room' for performing extra termination actions.
Therefore, a thread derivation is available (shutter_thread
)
whose effect is, at thread exit, to call the method shut()
on the output pipeline.
This library is portable, and independent of any operating system. Many examples, template specializations are available, but no system library is necessary.
This library does not enforce the use of any operating system call, does not do any call to any external function (Except in debug mode, for displaying results ):
There are very specific reasons not to provide any operating-system level feature :
Choosing the 'right' synchronization object is extremely difficult, if possible.
In some situation, a pthread_mutex_t
might be the right choice, and in some others,
a pthread_spinlock_t
will be better.
It might be necessary to select a specific POSIX pthread library release.
So, it is not possible to do this choice for a library.
Synchronization primitives evolve very quickly : They have a specific version for each target architecture and operating system, they may be buggy. Developing these low-level system objects is a 100% high-level job, even for one architecture. It is a highly specialized job. So, it is not reasonable to select even a subset of synchronization object. Instead, this library gives all the tools to adapt to the available ones, at time T, and on the current platform.
This is a common practice in C++ libraries to separate mechanism and policy. This library let the user the total freedom to choose or develop its threads and mutex, as long as they match a very simple programming interface. By using templates and inlined code, this library guaranty that :
Some properties of the mutex used in the library :
lock
and unlock
. trylock
is optional.
lock
This method role is as expected to lock the mutex. No return value is expected and the only way to signal an error is with an exception : There is no mechanism in this library to handle errors of datatypes it knowns nothing about.
unlock
Same as lock
: No return code, no error management.
trylock
As expected, its role is to do a plain trylock
of the mutex and, as usual,
the return value is false if the mutex was locked before, and true if it was not locked,
but this method was succesful to lock it now.
This method is optional and necessary only for some threads adapters (For reusing already running threads).
Threads are never copied nor assigned. All their methods must be thread-safe.
create
This basically works just like pthread_create
function of the POSIX thread library :
This method gets a function pointer and executes it. This method is mandatory.
join
This method, which basically works just like pthread_join
, is mandatory too.
cancel
This method is modelled after pthread_cancel. It is not mandatory, but NO default implementation is provided. If it is available (For exemple based on pthread_cancel), it allows the efficient use of some thread adaptor. It is not mandatory and its use is not implemented yet.
exec_lock
The implementation of this method is not mandatory.
A minimal implementation simply returns false
,
which means that it was too late for the main thread to execute the function
before the sub-threads starts it : This behaviour is consistent with exec_lock
requirements.
An implementation of exec_lock
(Non portable, of course) can bring a performance benefit,
if it is possible for threads to have a fine control of scheduling and priorities.
There are several places where atomic access to data are needed :
iterator_lock< It, void >
.
The technical needs are :
On many platforms, some atomic types already exist, can be very fast, are absolutely not standardized,
even on a given operating-system. On the other hand, when such data type is not available,
it is always possible to provide an implementation based on classic mutexes.
The file rpa/atomic.h
, and the template class atomic
provide a way to encapsulate any platform-specific atomic data type, and if none exists,
to use a pseudo one, using the template class atomic_mutex
, internally based on any type of mutex.
Of course, when no atomic type is available on a given platform, it is a poor solution to simulate them, and it is much better to use instead, features based on plain mutex. But some applications may lack of the necessary flexibility.
Added to the specific atomic features that a given operating-system or processor may provide ( For example http://www-128.ibm.com/developerworks/library/pa-atom/ ) , there are many libraries, commercial, open-source etc... provide the needed features. For example :
The architecture of the library makes it very easy to specialize any atomic operations,
in specific use, with no or extremely small overhead (Using inlining). One just need
to add a template specialization such as the examples provided in rpa/architectures/SunOS_sparc.h
or rpa/architectures/linux_386.h
.
Strictly speaking, 'Compare And Swap' operations are atomic, but they are needed
for a very specific feature, which is totally different feature of the ones needed
by other atomic operations. Here, the need is in dynamic scheduling ( range_step
), when a new slice must be extracted ( with the method chop
) from one or several ranges ( At least one range_step
, atomically synchronized i.e. without mutex ).
Depending on the type of STL algorithm, we need compare-and-swap operations with several inputs, that is, one input per sequence involved in the algorithm. For example :
std::accumulate
: Needs a plain compare-and-swap, with one argument (aka CAS
).
std::transform
or std::remove
need a double compare-and-swap (aka CAS2
)
std::inner_product
will need a triple compare_and_swap (Not implemented yet).
On many platforms, some Compare And Swap features already exist, can be very fast (Sometimes specific processor instructions such as Motorola 68000 CAS
and CAS2
) , are absolutely not standardized, even on a given operating-system. On the other hand, when such data type is not available, it is always possible to provide an implementation based on classic mutexes.
We are aware that this is a very high requirement for architectures, very far from what is actually available. There are on the other hand some libraries which are able to implement a N-level compare_and_swap instruction :
CAS
and CASN
which conditionally updates the content of one or a collection of memory words
(Plus many atomic operations).
Like for atomic operations, it is very easy to use any library or processor instructions, by specialising the template classes :
template< typename Type > class cas1
...
TO BE CONTINUED. TEMPLATE SPECIALIZATIONS.
Multi-threaded programs are exceedingly difficult to debug. A useful techniques is to control synchronization objects such as mutex and threads, 'from the inside'. This allows, for example, to check :
There are specific adapters to add these features to existing objects, without changing their basic applicative features. In other words, it is possible to add run-time monitoring features to an application, just by encapsulating some into templates.
The specific architecture of this library allows to stack multiple adapters : Simply because all synchronization objects must have a well-defined interface that can be easily overloaded.
Some algorithms accept a functor, which is a unary function, as argument : std::transform
,
std::accumulate
, std::count_if
, etc...
The interface of functors is exteremely simple, it is the one of std::unary_function
objects
The interface of our mutexes is extremely simple (lock
, unlock
, trylock
)
and it is possible to wrap any mutex (Especially users mutex) in template type.
The result are objects which still have the interface of a mutex - and can be used as such -
but with extra features.
mutex_count
Each lock, unlock and trylock operation involves the incrementation of internal counters. More, in debug mode, an invariant is evaluated (Nb locks + number of successful trylocks = nb unlocks) for debugging purpose.
mutex_stats
: NOT IMPLEMENTED YETCounts the average number of threads waiting for a mutex, the average time to wait, etc... The plan is to use this adapter to evaluate the parallelism rate.
As long as a thread type has the mandatory minimal interface of the methods create()
and join()
, it can be used in this library. Therefore, it is very easy to wrap
an existing thread type into another one, adding extra features, but still providing the same interface.
The cost of the creation of a thread may be important. This is why using a pool of threads,
reusing them instead of creating them at each call, is an efficient technique.
This adapter creates a single thread and blocks until the create
method is called.
After the function call termination, it blocks again the next call to create
.
For safety reason, an efficient technique for debugging an application can be to compile it,
at first, without threads reuse and, when it is properly debugged, adds the adapter for threads reuse.
thread_counter
.This adapter counts each time a thread is created, cancelled, joined and computes an invariant with these counters, for inconsistency detection at run-time. As usual, the object created with the instantiation of any thread with the adapter is still a thread. Note that thread adapters are stackable.
Again, multi-threaded applications are difficult to debug. A good technique would be to debug
and check it, first, without any multi-threading. We propose here a set of 'void
' threads,
which have exactly the interface of a thread, but executes the code code serially.
Therefore, this eliminates all bugs coming from synchronization or race conditions problems.
When all 'serial' bugs are eliminated, it is simple to switch to multi-threaded mode,
simply by replacing a thread type (A 'void
' thread), by another one.
If all user functors, containers and iterators are thread-safe, no bug should be introduced by the library.
thread_crea
.This thread serially executes the function pointer it is passed to, just at creation.
thread_join
.This thread serially executes the function pointer it is passed to, when the thread is join
-ed.
This chapter explains how the sub-threads are given to the algorithms.
Anyway, the complexity of these subthreads aims at implementing :
Each algorithm, at execution time, receives several threads for execution.
These threads are passed to operator ()( thread_tree )
.
They are passed in a templatized structure called a thread tree.
Threads are gathered into trees, to allow recursive multithreading.
Each level is itself another tree. At each level, the threads may have a different type.
But, on the same level, they must have the same type (Or inherit from the same base class).
All types, except the number of sub-threads, can be templatized.
A thread tree can be easily built with the helper function make_thread_tree
.
The parameters of a thread tree are :
void
.
There are for the moment the moment tzo restrictions to the complete use of sub-threads:
rpa::transform_t
, does not have an operator()
accepting a thread tree, but simply void
..
It would be possible to build a thread tree with the beginning and the end of the available threads. But, we always need the number of threads. So, this constructor gives a chance to reuse this number, instead of having to compute it with std::distance, which may be a costly operation. A simple example of thread_tree construction is:
thread_tree< pthread_t * > anyThreadTree( 10, new[] pthread_t(10) );
**************** Jusqu ici on gardait le controle. Introduire les futures. **************
thread_array
This is a helper to allocate 'on-the-fly' an array of sub-threads.
thread_copy
.Threads are normally not copiable, but in some situations,
threads wrappers might be created on the fly, for example a shutter thread for a pipeline,
which is a thread in charge of 'shutting' a pipeline after the last insertion into it.
It is of course possible to write this temporary thread into a variable, and later
manipulate its address, but it is much simpler to use it as a value.
It is possible with thread_tree
which can work with thread addresses, as we have
just seen, but with thread values too, by wrapping them into a thread_copy
, which is a sort
of counted smart pointer to a thread, where the last reference is decremented when the thread finishes.
Not implemented yet. The idea is to have the thread tree matching the structure of the machine : If a functor can use several threads, it is worth to have the different nodes of the NUMA machine at the top level of the threads threes, each functor will then receive a node of the NUMA machine, each node containing a set of threads, running on the same node.
The algorithms are not enforced to use all the threads that are available in a thread tree. For this, they use for example the minimum number of elements, under which it is not necessary to create a thread. Each range, given the number of elements, the minimal numbers given, or any other parameter, is free to use any number of threads, as long as it uses them in the order of the threads iterator stored in the thread tree.
To pull the best benefit of mutli-threading in general and of this library specifically, it is worth to follow some steps. The idea is to solve one problem at a time, in order to control and isolates problems due to multi-threading, such as performance and race-conditions.
First, write and debug your developements only with pseudo-threads such as rpa::thread_crea
and rpa::thread_join
.
At this stage, it will be sure that this application does not have bugs in sequential mode.
Therefore, that all future bugs come from :
The assumption of this library is that everything must work the same, provide the same results, whatever the type of threads and mutexes you choose. So, in this step, you will use real threads (not pseudo-threads) and mutexes, and tests your application. The goal of this step is not to enhance performances, but check that the code is correct.
Remember that the order of results may be totally different depending on the run, the mutex and threads types, etc...
If any bug appears here, you can assume that it comes from a race condition, because it was tested in single-threaded mode, in the previous step. Theoretically, all race conditions should come from your code, because this library assumes that there are no synchronization dependencies between its code and yours.
A good heuristic to detect race conditions is to try several types of threads and mutexes, with several adapters.
Your application must be fast and efficient, even in single-thread mode. Just simply because very often, it will run on a unique thread. So, with pseudo-threads, you must do a profiling step. Simply compile your application with optimizations, then check that it is fast. It is time to choose specific kind of buffers. For the moment, forget about sizing (Buffer size, chunk size, etc... ), because in single-threaded mode, it will lead to false values. You can also choose a scheduling type. Specifically, everything which is decided at compile-time, except thread types and mutex types, must be chosen at this step. But the run-time parameters (Buffer size, chunks, number of threads) will be determined at the next step, and may be changed at run-time.
At this stage, you can assume that your application is debugged, and run efficiently enough in single-threaded mode. You can then assume that performance problems will come from :
rpa::thread_fast
adapter,
or too much resources cost for a thread, then you will have to use less threads)
Numeric parameters can be dynamically adjusted (See functions giving the number of processors).
Some space is allocated on the stack for each thread. Spawning threads is a recursive process: The main thread starts the first sub-thread with some arguments for it on the stack, then calls recursively the same procedure for starting the next sub-thread. The reason for this is that it is not necessary to allocate space for threads, because allocating memory is a lengthy process. More: The way this is programmed, allows terminal recursion optimization, yielding no recursive calls at all.
Anyway, if terminal recursion optimization does not apply, some space is needed for the stack for each sub-thread: This is basically the arguments given to the sub-thread: Iterators, etc...
If thread-specific buffers are used, with obuf iterator, more data are needed: Buffers, etc... If thread-specific buffers are allocated on the stack, this can imply a very big stack, and probably a stack overrun. It is anyway possible and easy to provide a pool of buffers, already allocated by the main thread, and reusable. It is strongly advisable to do so.
It is strongly advised that each sub-thread uses thread-aware memory allocators : A memory allocator needs to refer to a memory pool and, if this pool is shared among threads, all of them will have to wait at each allocation or deallocation. By the way, this is to overcome this problem that so much care is put in this library to avoid any dynamic memory allocation.
This library may seem complicated, but it is in fact very simple to use and control. For clarity purpose, only the most important lines are shown.
std::list
of std::string
-s.The order is not taken into account. This example is actually implemented in the tutorial t04. Because the input is in a list, we can suppose that we do not know the number of elements. Therefore, it is worth using dynamic scheduling.
pthread_t threadsArray[10]; pthread_mutex mtxAccum ; std::list< std::string > strLst ; int nbSteps = 100 ; std::string concatString = rpa::make_accumulate( rpa::make_range_step( strLst.begin(), strLst.end(), nbSteps, & mtxAccum ), "" )( rpa::make_thread_tree( threadsArray, 10 ) ;)
We note that the price of incrementing the iterators is very high compared to the time needed for computing a square root. Therefore, this example is not very realistic : It would be much faster to put the data in a vector. The interleaved schedule does not need a mutex. Anyway, for writing into an output iterator, one needs a mutex anyway.
std::set< float > fltSet ; size_t threadsNb = 5 ; std::vector< pthread_t > threadsVector( threads_Nb ); pthread_mutex mtxFile ; std::ofstream outFile( "squares.dat" ); rpa::make_transform( rpa::make_range_jump( fltSet.begin(), fltSet.end() ), rpa::make_iterator_lock( std::output_stream_iterator< float >( outFile, "-" ), mtxFile ), ::sqrt )( rpa::make_thread_tree( threadsVector ) );
This is basically the same example as 8.1.2. The differences are that:
We note that the price of incrementing the iterators is very high compared to the time needed for computing a square root. Therefore, this example is not very realistic : It would be much faster to put the data in a vector. The interleaved schedule does not need a mutex. Anyway, for writing into an output iterator, one needs a mutex anyway.
std::set< float > fltSet ; size_t threadsNb = 5 ; std::vector< pthread_t > threadsVector( threads_Nb ); rpa::thread_tree< std::vector< pthrad_t >::iterator > thrTree( threadsVector.begin(), threadsNb ); FILE outFile = ::fopen( "squares.dat", "w" ); std::ofstream outFile( "squares.dat" ); rpa::make_transform( rpa::make_range_jump( fltSet.begin(), fltSet.end() ), rpa::make_iterator_lock( std::output_stream_iterator< float >( outFile, "-" ), rpa::posix_file_mutex( mtxFile ) ), std::bind2nd( ::pow, 0.5 ) )( thrTree );
This shows the ability of the library to work even with 'old-style' objects - which very often are the most efficient ones. The generated code is very small and efficient too, the synchronization between threads is straightforward, they work totally independently. One can note that several threads will access the output array simultaneously, which may yield some cache thrashing, the end of an output slice being close to the beginning the the slice of the next thread. But note that if all threads are equally balanced, they will progress in their slices simultaneously too, and when one thread will write to the end of its slices, the next thread will have already written to the beginning of its own slice.
char chrMatrix[ 100 ][ 500 ]; size_t lenStrings[ 100 ]; rpa::make_transform( rpa::make_range_step( chrMatrix, chrMatrix + 100 ), lenStrings, ::strlen )( rpa::make_thread_tree( thrTree, 5 ) );
In this section, we explain the progressive transformations from simple examples to more sophisticated ones.
std::set< double >
, appended to a vector.In all the following examples, we assume the following conditions :
std::back_insert_iterator
.
This all is described in this header, common to all the code related to this example :
#define NB_THREADS 5 pthread_t thrArrs[ NB_THREADS ]; thread_tree< pthread_t > thrTree( NB_THREADS, thrArrs ); const double cstDbl[] = { 1.41421, 2.71828, 3.14159 }; std::set dblSet( cstDbl, cstDbl + 3 ); ; typedef std::vector< double > DblVecType ; DblVecType vecDbl ; pthread_mutex_t mtxIn ;
The execution without output buffers would be:
rpa::make_transform( rpa::make_range_step( dblSet.begin(), dblSet.end(), 10, &mtxIn ), std::back_inserter( vecDbl ), ::sqrt )( thrTree );
std::deque
, without size limit.Here, we assume thread-specific buffers of type
std::deque<double>
, without size limit.
They are therefore flushed into the output iterator at the end of each sub-stream.
No buffer pool is provided, which means that they must be created at each executino,
and allocated on the stack of the main thread.
The execution with output buffers can be:
rpa::make_transform( rpa::make_range_step( dblSet.begin(), dblSet.end(), 10, &mtxIn ), rpa::obuf_iterator< std::back_insert_iterator< DblVecType >, std::deque< double >, void, void >( vecDbl ), ::sqrt )( thrTree );
std::deque
, with a size limit.Here, we assume thread-specific buffers of 256 elements of type
std::deque<double>
. The benefit is that the flushing of each output buffer
is done by the sub-threads themselves, piece by piece.
No buffer pool is provided, with the same consequences as before.
The execution with output buffers can be:
pthread_mutex_t mtxOut ; rpa::make_transform( rpa::make_range_step( dblSet.begin(), dblSet.end(), 10, &mtxIn ), rpa::obuf_iterator< std::back_insert_iterator< DblVecType >, std::deque< double >, void, pthread_mutex_t >( vecDbl, mtxOut, 256 ), ::sqrt )( thrTree );
std::deque
, with a size limit and a buffers pool.Here, we assume thread-specific buffers of 256 elements of type
std::deque<double>
. The benefit is that the flushing of each output buffer
is done by the sub-threads themselves, piece by piece.
A buffer pool is provided: The algorithm will exclusively use the buffers
given as arguments : They can be reused across several executions. Therefore,
it is not necessary to create and delete them several times.
This buffer is a plain C array, but any container can be used,
as long as there is one buffer per thread.
The execution with output buffers can be:
pthread_mutex_t mtxOut ; typedef std::deque< double > BufTyp ; BufTyp bufPool[ NB_THREADS ]; rpa::make_transform( rpa::make_range_step( dblSet.begin(), dblSet.end(), 10, &mtxIn ), rpa::obuf_iterator< std::back_insert_iterator< DblVecType >, std::deque< double >, BufTyp[ NB_THREADS ], pthread_mutex_t >( vecDbl, bufPool, mtxOut, 256 ), ::sqrt )( thrTree );
transform
reading from a vector and inserting into a list, static scheduling.
transform
reading from a list, writing to the screen, with dynamic scheduling.
transform
with std::list
as input and output, with interleaved scheduling.
std::accumulate
with std::list
as input.
std::accumulate
with std::vector
as input.
rpa::obuf_iterator
rpa::obuf_iterator
presentation
rpa::obuf_iterator
with and without size limits
rpa::obuf_iterator
.
std::streambuf
objects for buffering sub-threads output.
rpa::row_buffer
specializations for fast flushes.