Use Concurrency Limits

Use Concurrency Limits#

To control the number of instances of a single node, you can use the concurrency limit on the node. To cause it to reject messages after it reaches its concurrency limit, you construct it as a “rejecting” node.

A function node is constructed with one or more template arguments. The third argument controls the buffer policy used by the node, and is by default queueing. With a queueing policy, a function_node that has reached its concurrency limit still accepts incoming messages, but buffers them internally. If the policy is set to rejecting the node will instead reject the incoming messages.

template < typename Input,
           typename Output = continue_msg,
           graph_buffer_policy = queueing >
class function_node;

For example, you can control the number of big objects in flight in a graph by placing a rejecting function_node downstream of an input_node, as is done below:

graph g;


int src_count = 0;
int number_of_objects = 0;
int max_objects = 3;


input_node< big_object * > s( g, [&]( oneapi::tbb::flow_control& fc ) -> big_object* {
    if ( src_count < M ) {
      big_object* v = new big_object();
      ++src_count;
      return v;
    } else {
      fc.stop();
      return nullptr;
    }
} );
s.activate();

function_node< big_object *, continue_msg, rejecting > f( g, 3,
    []( big_object *v ) -> continue_msg {
    spin_for(1);
       delete v;
    return continue_msg();
} );


make_edge( s, f );
g.wait_for_all();

The function_node will operate on at most three big objects concurrently. The node’s concurrency threshold that limits the node to three concurrent invocations. When the function_node is running three instances concurrently, it will start rejecting incoming messages from the input_node, causing the input_node to buffer its last created object and temporarily stop invoking its body object. Whenever the function_node drops below its concurrency limit, it will pull new messages from the input_node. At most four big objects will exist simultaneously, three in the function_node and one buffered in the input_node.