Working on the Assembly Line: parallel_pipeline#
Pipelining is a common parallel pattern that mimics a traditional manufacturing assembly line. Data flows through a series of pipeline filters and each filter processes the data in some way. Given an incoming stream of data, some of these filters can operate in parallel, and others cannot. For example, in video processing, some operations on frames do not depend on other frames, and so can be done on multiple frames at the same time. On the other hand, some operations on frames require processing prior frames first.
The oneAPI Threading Building Blocks (oneTBB) classes
parallel_pipeline
and filter implement the pipeline pattern. A
simple text processing example will be used to demonstrate the usage of
parallel_pipeline
and filter to perform parallel formatting. The
example reads a text file, squares each decimal numeral in the text, and
writes the modified text to a new file. Below is a picture of the
pipeline.
Caution
Since the body object provided to the filters of the
parallel_pipeline
might be copied, its operator()
should not
modify the body. Otherwise the modification might or might not become
visible to the thread that invoked parallel_pipeline
, depending
upon whether operator()
is acting on the original or a copy. As a
reminder of this nuance, parallel_pipeline
requires that the body
object’s operator()
be declared const
.
Assume that the raw file I/O is sequential. The squaring filter can be
done in parallel. That is, if you can serially read n
chunks very
quickly, you can transform each of the n
chunks in parallel, as long
as they are written in the proper order to the output file. Though the
raw I/O is sequential, the formatting of input and output can be moved
to the middle filter, and thus be parallel.
To amortize parallel scheduling overheads, the filters operate on chunks
of text. Each input chunk is approximately 4000 characters. Each chunk
is represented by an instance of class TextSlice
:
// Holds a slice of text.
/** Instances *must* be allocated/freed using methods herein, because the C++ declaration
represents only the header of a much larger object in memory. */
class TextSlice {
// Pointer to one past last character in sequence
char* logical_end;
// Pointer to one past last available byte in sequence.
char* physical_end;
public:
// Allocate a TextSlice object that can hold up to max_size characters.
static TextSlice* allocate( size_t max_size ) {
// +1 leaves room for a terminating null character.
TextSlice* t = (TextSlice*)oneapi::tbb::tbb_allocator<char>().allocate( sizeof(TextSlice)+max_size+1 );
t->logical_end = t->begin();
t->physical_end = t->begin()+max_size;
return t;
}
// Free this TextSlice object
void free() {
oneapi::tbb::tbb_allocator<char>().deallocate((char*)this, sizeof(TextSlice)+(physical_end-begin())+1);
}
// Pointer to beginning of sequence
char* begin() {return (char*)(this+1);}
// Pointer to one past last character in sequence
char* end() {return logical_end;}
// Length of sequence
size_t size() const {return logical_end-(char*)(this+1);}
// Maximum number of characters that can be appended to sequence
size_t avail() const {return physical_end-logical_end;}
// Append sequence [first,last) to this sequence.
void append( char* first, char* last ) {
memcpy( logical_end, first, last-first );
logical_end += last-first;
}
// Set end() to given value.
void set_end( char* p ) {logical_end=p;}
};
Below is the top-level code for building and running the pipeline.
TextSlice
objects are passed between filters using pointers to avoid
the overhead of copying a TextSlice
.
void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) {
oneapi::tbb::parallel_pipeline(
ntoken,
oneapi::tbb::make_filter<void,TextSlice*>(
oneapi::tbb::filter_mode::serial_in_order, MyInputFunc(input_file) )
&
oneapi::tbb::make_filter<TextSlice*,TextSlice*>(
oneapi::tbb::filter_mode::parallel, MyTransformFunc() )
&
oneapi::tbb::make_filter<TextSlice*,void>(
oneapi::tbb::filter_mode::serial_in_order, MyOutputFunc(output_file) ) );
}
The parameter ntoken
to method parallel_pipeline
controls the
level of parallelism. Conceptually, tokens flow through the pipeline. In
a serial in-order filter, each token must be processed serially in
order. In a parallel filter, multiple tokens can by processed in
parallel by the filter. If the number of tokens were unlimited, there
might be a problem where the unordered filter in the middle keeps
gaining tokens because the output filter cannot keep up. This situation
typically leads to undesirable resource consumption by the middle
filter. The parameter to method parallel_pipeline
specifies the
maximum number of tokens that can be in flight. Once this limit is
reached, the pipeline never creates a new token at the input filter
until another token is destroyed at the output filter.
The second parameter specifies the sequence of filters. Each filter is
constructed by function make_filter<inputType, outputType>(mode,functor)
.
The inputType specifies the type of values input by a filter. For the input filter, the type is
void
.The outputType specifies the type of values output by a filter. For the output filter, the type is
void
.The mode specifies whether the filter processes items in parallel, serial in-order, or serial out-of-order.
The functor specifies how to produce an output value from an input value.
The filters are concatenated with operator&
. When concatenating two
filters, the outputType of the first filter must match the inputType
of the second filter.
The filters can be constructed and concatenated ahead of time. An equivalent version of the previous example that does this follows:
void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) {
oneapi::tbb::filter<void,TextSlice*> f1( oneapi::tbb::filter_mode::serial_in_order,
MyInputFunc(input_file) );
oneapi::tbb::filter<TextSlice*,TextSlice*> f2(oneapi::tbb::filter_mode::parallel,
MyTransformFunc() );
oneapi::tbb::filter<TextSlice*,void> f3(oneapi::tbb::filter_mode::serial_in_order,
MyOutputFunc(output_file) );
oneapi::tbb::filter<void,void> f = f1 & f2 & f3;
oneapi::tbb::parallel_pipeline(ntoken,f);
}
The input filter must be serial_in_order
in this example because the
filter reads chunks from a sequential file and the output filter must
write the chunks in the same order. All serial_in_order
filters
process items in the same order. Thus if an item arrives at
MyOutputFunc
out of the order established by MyInputFunc
, the
pipeline automatically delays invoking MyOutputFunc::operator()
on
the item until its predecessors are processed. There is another kind of
serial filter, serial_out_of_order
, that does not preserve order.
The middle filter operates on purely local data. Thus any number of invocations of its functor can run concurrently. Hence it is specified as a parallel filter.
The functors for each filter are explained in detail now. The output
functor is the simplest. All it has to do is write a TextSlice
to a
file and free the TextSlice
.
// Functor that writes a TextSlice to a file.
class MyOutputFunc {
FILE* my_output_file;
public:
MyOutputFunc( FILE* output_file );
void operator()( TextSlice* item ) const;
};
MyOutputFunc::MyOutputFunc( FILE* output_file ) :
my_output_file(output_file)
{
}
void MyOutputFunc::operator()( TextSlice* out ) const {
size_t n = fwrite( out->begin(), 1, out->size(), my_output_file );
if( n!=out->size() ) {
fprintf(stderr,"Can't write into file '%s'\n", OutputFileName);
exit(1);
}
out->free();
}
Method operator()
processes a TextSlice
. The parameter out
points to the TextSlice
to be processed. Since it is used for the
last filter of the pipeline, it returns void
.
The functor for the middle filter is similar, but a bit more complex. It
returns a pointer to the TextSlice
that it produces.
// Functor that changes each decimal number to its square.
class MyTransformFunc {
public:
TextSlice* operator()( TextSlice* input ) const;
};
TextSlice* MyTransformFunc::operator()( TextSlice* input ) const {
// Add terminating null so that strtol works right even if number is at end of the input.
*input->end() = '\0';
char* p = input->begin();
TextSlice* out = TextSlice::allocate( 2*MAX_CHAR_PER_INPUT_SLICE );
char* q = out->begin();
for(;;) {
while( p<input->end() && !isdigit(*p) )
*q++ = *p++;
if( p==input->end() )
break;
long x = strtol( p, &p, 10 );
// Note: no overflow checking is needed here, as we have twice the
// input string length, but the square of a non-negative integer n
// cannot have more than twice as many digits as n.
long y = x*x;
sprintf(q,"%ld",y);
q = strchr(q,0);
}
out->set_end(q);
input->free();
return out;
}
The input functor is the most complicated, because it has to ensure that
no numeral crosses a boundary. When it finds what could be a numeral
crossing into the next slice, it copies the partial numeral to the next
slice. Furthermore, it has to indicate when the end of input is reached.
It does this by invoking method stop()
on a special argument of type
flow_control
. This idiom is required for any functor used for the
first filter of a pipline.
TextSlice* next_slice = NULL;
class MyInputFunc {
public:
MyInputFunc( FILE* input_file_ );
MyInputFunc( const MyInputFunc& f ) : input_file(f.input_file) { }
~MyInputFunc();
TextSlice* operator()( oneapi::tbb::flow_control& fc ) const;
private:
FILE* input_file;
};
MyInputFunc::MyInputFunc( FILE* input_file_ ) :
input_file(input_file_) { }
MyInputFunc::~MyInputFunc() {
}
TextSlice* MyInputFunc::operator()( oneapi::tbb::flow_control& fc ) const {
// Read characters into space that is available in the next slice.
if( !next_slice )
next_slice = TextSlice::allocate( MAX_CHAR_PER_INPUT_SLICE );
size_t m = next_slice->avail();
size_t n = fread( next_slice->end(), 1, m, input_file );
if( !n && next_slice->size()==0 ) {
// No more characters to process
fc.stop();
return NULL;
} else {
// Have more characters to process.
TextSlice* t = next_slice;
next_slice = TextSlice::allocate( MAX_CHAR_PER_INPUT_SLICE );
char* p = t->end()+n;
if( n==m ) {
// Might have read partial number.
// If so, transfer characters of partial number to next slice.
while( p>t->begin() && isdigit(p[-1]) )
--p;
assert(p>t->begin(),"Number too large to fit in buffer.\n");
next_slice->append( p, t->end()+n );
}
t->set_end(p);
return t;
}
}
The copy constructor must be defined because the functor is copied when
the underlying oneapi::tbb::filter_t
is built from the functor, and again when the pipeline runs.