Patch for event driven ports

The patch in next mail is in addition to the proposal of Sylvain's event driven
data ports. It shows how I think a more efficient implementation
can be done, without requiring EventDrivenActivity. The main
difference for the user is that you need to use ports()->addEventPort
to specify which ports should trigger events.

A next logical step is to extend this such that one can react to these
ports from the scripting interface. This requires changes in the port
and connection classes. Knowing of the on-going work of Sylvain, it's
maybe not that usefull to invest time in that (necessary) extension at
this moment.

Peter

[PATCH] Improved data driven tasks.

This patch removes the dependency on EventDrivenActivity for DataDrivenTask
components. The event-triggering ports management is now done in DataFlowInterface
(where port management belongs) and the DataDrivenTask only relies on information
in that class. The new DataFlowInterface::addEventPort method sets which ports
should fire events for this task. the DataDrivenTask::start method picks this up
registers a callback and this one collects the data for updateHook(updated_ports).

---
src/DataDrivenTask.cpp | 56 +++++++++++++++-----------------------------
src/DataDrivenTask.hpp | 11 ++++++--
src/DataFlowInterface.cpp | 28 ++++++++++++++++++++++
src/DataFlowInterface.hpp | 37 +++++++++++++++++++++++++++--
src/EventService.hpp | 4 +++
src/PortInterface.cpp | 16 ++++++------
src/PortInterface.hpp | 17 ++-----------
7 files changed, 104 insertions(+), 65 deletions(-)

diff --git a/src/DataDrivenTask.cpp b/src/DataDrivenTask.cpp
index 177be44..8351433 100644
--- a/src/DataDrivenTask.cpp
+++ b/src/DataDrivenTask.cpp
@@ -11,53 +11,35 @@ DataDrivenTask::DataDrivenTask(const std::string& name, ExecutionEngine* parent,

bool DataDrivenTask::start()
{
- EventDrivenActivity* activity = dynamic_cast(engine()->getActivity());
- if (activity)
+ size_t port_count = 0;
+ const DataFlowInterface::Ports& ports = this->ports()->getEventPorts();
+ for (DataFlowInterface::Ports::const_iterator it = ports.begin(); it != ports.end(); ++it)
{
- size_t port_count = 0;
- log(Info) << getName() << " is attached to an EventDrivenActivity. Registering ports." << endlog();
- DataFlowInterface::Ports ports = this->ports()->getPorts();
- for (DataFlowInterface::Ports::iterator it = ports.begin(); it != ports.end(); ++it)
+ int porttype = (*it)->getPortType();
+ if (porttype == PortInterface::ReadPort || porttype == PortInterface::ReadWritePort)
{
- int porttype = (*it)->getPortType();
- if (porttype == PortInterface::ReadPort || porttype == PortInterface::ReadWritePort)
- {
- PortInterface::NewDataEvent* ev = (*it)->getNewDataEvent();
- if (!activity->addEvent(ev))
- return false;
- port_count++;
-
- log(Info) << getName() << " will be triggered when new data is available on " << (*it)->getName() << endlog();
- }
+ (*it)->getNewDataOnPortEvent()->connect(boost::bind(&DataDrivenTask::dataOnPort, this, _1), this->events()->getEventProcessor());
+ port_count++;
+ log(Info) << getName() << " will be triggered when new data is available on " << (*it)->getName() << endlog();
}
- updated_ports.resize(port_count);
- }
- else
- {
- log(Warning) << "activity is not an EventDrivenActivity" << endlog();
}
-
+ updated_ports.reserve(port_count);
return TaskContext::start();
}

-void DataDrivenTask::updateHook()
+void DataDrivenTask::dataOnPort(PortInterface* port)
{
- EventDrivenActivity* activity = dynamic_cast(engine()->getActivity());
- if (activity)
- {
- typedef std::vector< Event* > TriggerSet;
- TriggerSet const& updates = activity->getWakeupEvents();
- updated_ports.clear();
-
- for (TriggerSet::const_iterator it = updates.begin(); it != updates.end(); ++it)
- {
- PortInterface::NewDataEvent* ev = static_cast(*it);
- if (ev)
- updated_ports.push_back(ev->getPort());
- }
- }
+ // Since this handler is executed in our thread, we are always running.
+ if (find(updated_ports.begin(), updated_ports.end(), port) == updated_ports.end() )
+ updated_ports.push_back(port);
+ // this is in essence superfluous. We are already triggered.
+ //this->getActivity()->trigger();
+}

+void DataDrivenTask::updateHook()
+{
updateHook(updated_ports);
+ updated_ports.clear();
}

void DataDrivenTask::updateHook(std::vector const& updated_ports)
diff --git a/src/DataDrivenTask.hpp b/src/DataDrivenTask.hpp
index 9ae33de..e9b2d3b 100644
--- a/src/DataDrivenTask.hpp
+++ b/src/DataDrivenTask.hpp
@@ -7,13 +7,18 @@
namespace RTT {
class DataDrivenTask : public TaskContext
{
+ protected:
std::vector< PortInterface* > updated_ports;
-
+ /**
+ * This callback is called each time data arrived on an
+ * event port.
+ */
+ void dataOnPort(PortInterface*);
public:
/**
* Create a DataDrivenTask.
*
- * It's ExecutionEngine will be newly constructed with private
+ * It's ExecutionEngine will be newly constructed with private
* processing of commands, events, programs and state machines.
*
* @param name The name of this component.
@@ -23,7 +28,7 @@ namespace RTT {
DataDrivenTask( const std::string& name, TaskState initial_state = Stopped );

/**
- * Create a DataDrivenTask.
+ * Create a DataDrivenTask.
*
* Its commands programs and state machines are processed by \a parent.
* Use this constructor to share execution engines among task contexts, such that
diff --git a/src/DataFlowInterface.cpp b/src/DataFlowInterface.cpp
index 759d83e..5823b24 100644
--- a/src/DataFlowInterface.cpp
+++ b/src/DataFlowInterface.cpp
@@ -74,6 +74,16 @@ namespace RTT
return true;
}

+ bool DataFlowInterface::addEventPort(PortInterface* port, PortInterface::NewDataOnPortEvent::SlotFunction callback) {
+ if (this->addPort(port)) {
+ if (callback)
+ port->getNewDataOnPortEvent()->connect(callback, mparent->events()->getEventProcessor() );
+ eports.push_back(port);
+ return true;
+ }
+ return false;
+ }
+
bool DataFlowInterface::addPort(PortInterface* port, std::string description) {
if (this->addPort(port) == false)
return false;
@@ -86,6 +96,21 @@ namespace RTT
return true;
}

+ bool DataFlowInterface::addEventPort(PortInterface* port, std::string description, PortInterface::NewDataOnPortEvent::SlotFunction callback) {
+ if (this->addPort(port)) {
+ if (callback)
+ port->getNewDataOnPortEvent()->connect(callback, mparent->events()->getEventProcessor() );
+ eports.push_back(port);
+ return true;
+ }
+ return false;
+ }
+
+ const DataFlowInterface::Ports& DataFlowInterface::getEventPorts() const
+ {
+ return eports;
+ }
+
void DataFlowInterface::removePort(const std::string& name) {
for ( PortStore::iterator it(mports.begin());
it != mports.end();
@@ -93,6 +118,9 @@ namespace RTT
if ( it->first->getName() == name ) {
if (mparent)
mparent->removeObject( name );
+ Ports::iterator ep = find(eports.begin(), eports.end(),it->first);
+ if ( ep!= eports.end() )
+ eports.erase( ep );
mports.erase(it);
return;
}
diff --git a/src/DataFlowInterface.hpp b/src/DataFlowInterface.hpp
index a0ea188..e61b1af 100644
--- a/src/DataFlowInterface.hpp
+++ b/src/DataFlowInterface.hpp
@@ -53,9 +53,6 @@ namespace RTT
*/
class DataFlowInterface
{
- typedef std::vector > PortStore;
- PortStore mports;
- OperationInterface* mparent;
public:
/**
* A sequence of pointers to ports.
@@ -86,6 +83,23 @@ namespace RTT
bool addPort(PortInterface* port);

/**
+ * Add an Event triggering Port to this task. It is only added to the C++
+ * interface and can not be used in scripting.
+ * @param port The port to add.
+ * @return true if the port could be added, false if already added.
+ * @param callback (Optional) provide a function which will be called asynchronously
+ * when new data arrives on this port. You can add more functions by using the port
+ * directly using PortInterface::getNewDataOnPort().
+ */
+ bool addEventPort(PortInterface* port, PortInterface::NewDataOnPortEvent::SlotFunction callback = PortInterface::NewDataOnPortEvent::SlotFunction() );
+
+ /**
+ * Returns the list of all ports emitting events when new
+ * data arrives.
+ */
+ const Ports& getEventPorts() const;
+
+ /**
* Add a Port to the interface of this task. It is added to
* both the C++ interface and the scripting interface.
* @param port The port to add.
@@ -94,6 +108,17 @@ namespace RTT
bool addPort(PortInterface* port, std::string description);

/**
+ * Add an Event triggering Port to the interface of this task. It is added to
+ * both the C++ interface and the scripting interface.
+ * @param port The port to add.
+ * @param description A user readable description of this port.
+ * @param callback (Optional) provide a function which will be called asynchronously
+ * when new data arrives on this port. You can add more functions by using the port
+ * directly using PortInterface::getNewDataOnPort().
+ */
+ bool addEventPort(PortInterface* port, std::string description, PortInterface::NewDataOnPortEvent::SlotFunction callback = PortInterface::NewDataOnPortEvent::SlotFunction() );
+
+ /**
* Remove a Port from this interface.
* @param port The port to remove.
*/
@@ -156,6 +181,12 @@ namespace RTT
* all associated TaskObjects.
*/
void clear();
+ protected:
+ typedef std::vector > PortStore;
+ Ports eports;
+ PortStore mports;
+ OperationInterface* mparent;
+
};

}
diff --git a/src/EventService.hpp b/src/EventService.hpp
index 768d792..821692e 100644
--- a/src/EventService.hpp
+++ b/src/EventService.hpp
@@ -145,6 +145,10 @@ namespace RTT
* @param Signature Signature of the Event, for example void(int, int)
* @param ename The name of the event to lookup.
* @return A shared pointer which is to be assigned to an Event object.
+ * @internal We need Signature for two reasons: 1. for consistency with the general
+ * getPrimitive(name) API in RTT and 2. because we may need the type to reconstruct the
+ * object dynamically, for example, when the Event is remote. See the CommandRepository
+ * for and example where this is necessary (see also RTT::RemoteCommand )
*/
template
boost::shared_ptr getEvent(const std::string& ename)
diff --git a/src/PortInterface.cpp b/src/PortInterface.cpp
index 787684f..7ceac96 100644
--- a/src/PortInterface.cpp
+++ b/src/PortInterface.cpp
@@ -43,11 +43,11 @@
namespace RTT
{

- PortInterface::PortInterface(const std::string& name) : portname(name), new_data_event(0) {}
+ PortInterface::PortInterface(const std::string& name) : portname(name), new_data_on_port_event(0) {}

PortInterface::~PortInterface()
{
- delete new_data_event;
+ delete new_data_on_port_event;

}

@@ -208,14 +208,14 @@ namespace RTT

void PortInterface::signal()
{
- if (new_data_event)
- (*new_data_event)();
+ if (new_data_on_port_event)
+ (*new_data_on_port_event)(this);
}

- PortInterface::NewDataEvent* PortInterface::getNewDataEvent()
+ PortInterface::NewDataOnPortEvent* PortInterface::getNewDataOnPortEvent()
{
- if (!new_data_event)
- new_data_event = new NewDataEvent(this);
- return new_data_event;
+ if (!new_data_on_port_event)
+ new_data_on_port_event = new NewDataOnPortEvent( this->getName() );
+ return new_data_on_port_event;
}
}
diff --git a/src/PortInterface.hpp b/src/PortInterface.hpp
index 9c972a3..8e13836 100644
--- a/src/PortInterface.hpp
+++ b/src/PortInterface.hpp
@@ -55,22 +55,11 @@ namespace RTT
class PortInterface
{
public:
- class NewDataEvent : public Event< void() >
- {
- PortInterface* port;
-
- public:
- NewDataEvent(PortInterface* port)
- : Event< void() >(port->getName() + "Trigger")
- , port(port) {}
-
- PortInterface* getPort() const { return port; }
- };
-
+ typedef Event NewDataOnPortEvent;
protected:
friend class ConnectionInterface;
std::string portname;
- NewDataEvent* new_data_event;
+ NewDataOnPortEvent* new_data_on_port_event;
PortInterface(const std::string& name);

/**
@@ -121,7 +110,7 @@ namespace RTT
* Returns an Event object which is triggered every time new data is
* made available on this port
*/
- NewDataEvent* getNewDataEvent();
+ NewDataOnPortEvent* getNewDataOnPortEvent();

/**
* Get the ConnectionModel of this port.

[PATCH] Improved data driven tasks.

> + (*it)->getNewDataOnPortEvent()->connect(boost::bind(&DataDrivenTask::dataOnPort, this, _1), this->events()->getEventProcessor());
I did not know about this one. Is this for getting the callback called into our
own thread ? How is it actually implemented ?

> + // Since this handler is executed in our thread, we are always running.
> + if (find(updated_ports.begin(), updated_ports.end(), port) == updated_ports.end() )
> + updated_ports.push_back(port);
> + // this is in essence superfluous. We are already triggered.
> + //this->getActivity()->trigger();
Why are we already triggered ??? How does that manage the race condition I
mentioned in earlier mails ?

More general remarks:
* EventDrivenActivity should in my opinion not go away because being
event-driven is useful. I actually plan to use it in my new dataflow code for
the CORBA transport.

* I like the idea of separating triggering and non-triggering ports.
* One thing I don't like: the new DataFlowInterface API, related to "event
ports" is actually only valid for DataDrivenTask. What is an event port for
TaskContexts ?

Would it work to merge your DataDrivenTask into TaskContext, allowing to either
use NonPeriodicActivity (purely data driven) or PeriodicActivity (data + period)
? That would give more sense to the modification to DataFlowInterface.

[PATCH] Improved data driven tasks.

On Monday 27 October 2008 18:59:17 Sylvain Joyeux wrote:
> > +
> > (*it)->getNewDataOnPortEvent()->connect(boost::bind(&DataDrivenTask::data
> >OnPort, this, _1), this->events()->getEventProcessor());
>
> I did not know about this one. Is this for getting the callback called into
> our own thread ? How is it actually implemented ?

Yes, the callback will be executed in the event processor of our task, hence,
in our own thread.
The data of the callback is stored in a data object (which is stored in an
EventCatcher). and retrieved later by the EventProcessor. In case
of 'overruns', by default, only the first one is stored and the others are
discarded. You can specify the inverse by specifying an AsynStorageType of
EventProcessor::OnlyLast as last argument of the connect call. In that case,
a DataObjectLockFree is used to store the data and the most recent sample is
stored. After the data is stored, trigger() is called.

We are not relying heavily on this mechanism in the current implementation,
since the only data passed is the pointer to the port, which is always the
same.

I don't like this asynchronous Event implementation though. It was designed
with zero-malloc and lock-free in mind and that strongly restricted the
possibilities and it stresses memory usage heavily. I'd prefer to have every
event recorded (not just first xor last). But this will require a kind of
free heap to allocate storage from (which requires locks and mallocs :-).

>
> > + // Since this handler is executed in our thread, we are always
> > running. + if (find(updated_ports.begin(), updated_ports.end(), port)
> > == updated_ports.end() ) + updated_ports.push_back(port);
> > + // this is in essence superfluous. We are already triggered.
> > + //this->getActivity()->trigger();
>
> Why are we already triggered ???

The callback is executed in the EventProcessor, which only executes after a
trigger(). updateHook() is called (as last) after the EventProcessor. So it
all fits.

> How does that manage the race condition I
> mentioned in earlier mails ?

The only potential race we have is in PeriodicThread, which manages the
trigger() behaviour. I have to look into it, we wouldn't have a race if we
had binary semaphores. So we need to emulate that or so. Since the rest is
serialised in the TC's thread, there are no races overthere.

>
> More general remarks:
> * EventDrivenActivity should in my opinion not go away because being
> event-driven is useful. I actually plan to use it in my new dataflow
> code for the CORBA transport.

I see it in the same category as IRQActivity and others...

>
> * I like the idea of separating triggering and non-triggering ports.

Me too :-) Having a 'driving' data port looked like a realistic use case.

> * One thing I don't like: the new DataFlowInterface API, related to "event
> ports" is actually only valid for DataDrivenTask. What is an event port
> for TaskContexts ?
>
> Would it work to merge your DataDrivenTask into TaskContext, allowing to
> either use NonPeriodicActivity (purely data driven) or PeriodicActivity
> (data + period) ? That would give more sense to the modification to
> DataFlowInterface. --

My patch was just the 'minimal effort' to get the flow I had in mind. What you
propose is a logical next step imho too.

Peter

[PATCH] Improved data driven tasks.

> > > + // Since this handler is executed in our thread, we are always
> > > running. + if (find(updated_ports.begin(), updated_ports.end(), port)
> > > == updated_ports.end() ) + updated_ports.push_back(port);
> > > + // this is in essence superfluous. We are already triggered.
> > > + //this->getActivity()->trigger();
> >
> > Why are we already triggered ???
>
> The callback is executed in the EventProcessor, which only executes after a
> trigger(). updateHook() is called (as last) after the EventProcessor. So it
> all fits.

Just to be sure. You are saying:
the event processor is executed AFTER a trigger()

Which I interpret as: the event gets queued, and the callback will be called in
the next trigger, before updateHook(), while, in our case we want:
trigger() is called BECAUSE OF an event.

> > How does that manage the race condition I
> > mentioned in earlier mails ?
>
> The only potential race we have is in PeriodicThread, which manages the
> trigger() behaviour. I have to look into it, we wouldn't have a race if we
> had binary semaphores. So we need to emulate that or so. Since the rest is
> serialised in the TC's thread, there are no races overthere.
Mmmm ... Just to be sure again. Given how trigger() is implemented I actually
doubt the following situation will not arise:
* a port P1 is updated. The task gets triggered and the scheduler yields to
it.
* the event processor executes the callback for P1.
* updateHook() gets called
- during the execution of updateHook(), another port P2 gets updated
- P2 *does* call trigger(), but that is ignored because updateHook()
is already running
* updateHook() returns, the task goes to sleep, and we missed one update (i.e.
it will have to wait for the next trigger)

This is the specific situation I tried to avoid in EventDrivenActivity

> > More general remarks:
> > * EventDrivenActivity should in my opinion not go away because being
> > event-driven is useful. I actually plan to use it in my new dataflow
> > code for the CORBA transport.
>
> I see it in the same category as IRQActivity and others...
Agreed.

[PATCH] Improved data driven tasks.

On Monday 27 October 2008 22:05:06 Sylvain Joyeux wrote:
> > > > + // Since this handler is executed in our thread, we are always
> > > > running. + if (find(updated_ports.begin(), updated_ports.end(),
> > > > port) == updated_ports.end() ) +
> > > > updated_ports.push_back(port); + // this is in essence
> > > > superfluous. We are already triggered. +
> > > > //this->getActivity()->trigger();
> > >
> > > Why are we already triggered ???
> >
> > The callback is executed in the EventProcessor, which only executes after
> > a trigger(). updateHook() is called (as last) after the EventProcessor.
> > So it all fits.
>
> Just to be sure. You are saying:
> the event processor is executed AFTER a trigger()
>
> Which I interpret as: the event gets queued, and the callback will be
> called in the next trigger, before updateHook(), while, in our case we
> want: trigger() is called BECAUSE OF an event.

Yes, and this is what happens. the trigger() is done synchronously with the
event being emited. (so in Event::emit ). It's the EventCatcher that calls
trigger().

>
> > > How does that manage the race condition I
> > > mentioned in earlier mails ?
> >
> > The only potential race we have is in PeriodicThread, which manages the
> > trigger() behaviour. I have to look into it, we wouldn't have a race if
> > we had binary semaphores. So we need to emulate that or so. Since the
> > rest is serialised in the TC's thread, there are no races overthere.
>
> Mmmm ... Just to be sure again. Given how trigger() is implemented I
> actually doubt the following situation will not arise:
> * a port P1 is updated. The task gets triggered and the scheduler yields
> to it.
> * the event processor executes the callback for P1.
> * updateHook() gets called
> - during the execution of updateHook(), another port P2 gets updated
> - P2 *does* call trigger(), but that is ignored because updateHook()
> is already running

Where/who decides that trigger() is ignored ?

> * updateHook() returns, the task goes to sleep, and we missed one update
> (i.e. it will have to wait for the next trigger)

This will only happen if there is a bug in the trigger() implementation. In
NonPeriodicActivity, trigger is defined as:
bool NonPeriodicActivity::trigger() {
return SingleThread::isActive() ? SingleThread::start() : false;
}

And start() will raise the semaphore. The idea of trigger() is: if called, the
thread must run (at least) once completely after trigger() returns.

Peter

[PATCH] Improved data driven tasks.

> > Mmmm ... Just to be sure again. Given how trigger() is implemented I
> > actually doubt the following situation will not arise:
> > * a port P1 is updated. The task gets triggered and the scheduler yields
> > to it.
> > * the event processor executes the callback for P1.
> > * updateHook() gets called
> > - during the execution of updateHook(), another port P2 gets updated
> > - P2 *does* call trigger(), but that is ignored because updateHook()
> > is already running
>
> Where/who decides that trigger() is ignored ?
>
> > * updateHook() returns, the task goes to sleep, and we missed one update
> > (i.e. it will have to wait for the next trigger)
>
> This will only happen if there is a bug in the trigger() implementation. In
> NonPeriodicActivity, trigger is defined as:
> bool NonPeriodicActivity::trigger() {
> return SingleThread::isActive() ? SingleThread::start() : false;
> }
>
> And start() will raise the semaphore. The idea of trigger() is: if called, the
> thread must run (at least) once completely after trigger() returns.

OK. It appears that I misread this code a while ago ... thus assuming false
things.

Great !