Data Flow 2.0 Example

This mail is to inform you of my impressions of the new data flow
framework. First of all, the concept behind the new structure is given
in the wiki: http://www.orocos.org/wiki/rtt/rtt-2.0/dataflow

The idea is that *outputs* are send and forget, while *inputs* specify
a 'policy': e.g. 'I want to read all samples -> so buffer the input'
or: ' I want lock-based protection' or:...
The policy is specified in a 'ConnPolicy' object which you can give to
the input, to use as a default, or override during the connection of
the ports during deployment.

This is the basic use case of the new code:

#incude <rtt/Port.hpp>
using namespace RTT;
 
// Component A:
OutputPort<double> a_output("MyOutput");
//...
double x = ...;
a_output.write( x );
 
// Component B buffers data produced by A (default buf size==20):
bool init_connection = true; // read last written value after connection
bool pull = true;                   // fetch data directly from output
port during read
InputPort<double> b_input("MyInput", internal::ConnPolicy::buffer(20,
internal::ConnPolicy::LOCK_FREE, init_connection, pull));
//...
double x;
while ( b_input.read( x ) {
   // process sample x...
} else {
   // buffer empty
}
 
// Component C gets the most recent data produced by A:
bool init_connection = true; // read last written value after connection
bool pull = true;                   // fetch data directly from output
port during read
InputPort<double> c_input("MyInput",
internal::ConnPolicy::data(internal::ConnPolicy::LOCK_FREE,
init_connection, pull));
//...
double x;
if ( c_input.read( x ) {
   // use last value of x...
} else {
  // no new data
}
 
// Finally connect some ports. The order/direction of connecting does
not matter anymore,
// it will always do as expected !
a_output.connectTo( b_input ); // or: b_input.connectTo( a_output );
a_output.connectTo( c_input ); // or other way around
 
//Change buffer size for B by giving a policy during connectTo:
b_input.disconnect();
b_input.connectTo( a_output, internal::ConnPolicy::buffer(20,
internal::ConnPolicy::LOCK_FREE, init_connection, pull));

Note: ConnPolicy will probably move to RTT or RTT::base.

Since each InputPort takes a default policy (which is type = DATA,
lock_policy = LOCK_FREE, init=false, pull=false) we can keep using the
old DeploymentComponent + XML scripts. The 'only' addition necessary
is to extend the XML elements such that a connection policy can be
defined in addition, to override the default. I propose to update the
deployment manual such that the connection semantics are clearer. What
we call now a 'connection', I would propose to call a 'Topic',
analogous to ROS. So you'd define a OutputPort -> Topic and Topic ->
InputPort mapping in your XML file. We could easily generalize Topic
to also allow for port names, such that in simple setups, you just set
OutputPort -> InputPort. I'm not even proposing a new XML format here,
because when we write:

RTT 1.0, from DeploymentComponent example:

    <struct name="Ports"  type="PropertyBag">
      <simple name="a_output_port"
type="string"><value>AConnection</value></simple>
      <simple name="a_input_port"
type="string"><value>BConnection</value></simple>
    </struct>

We actually mean to write (what's in a name):
RTT 2.0

    <struct name="Topics"  type="PropertyBag">
      <simple name="a_output_port" type="string"><value>ATopic</value></simple>
      <simple name="b_input_port" type="string"><value>BTopic</value></simple>
    </struct>

In 2.0, you need to take care that exactly one port writes the given
topic (so one OutputPort) and the other ports are all InputPorts. If
this is not correct, the deployer refuses to setup the connections.

So far deployement. The whole mechanism is reported to work
transparantly over CORBA, but I still need to test that statement
personally though.

As before, the receiver can subscribe an RTT::Event to receive
notifications when a new data sample is ready. The scripting interface
of ports are only 'bool read( sample )' or 'bool write(sample)'.

Peter