Orocos Real-Time Toolkit  2.6.0
TaskContext.cpp
00001 /***************************************************************************
00002   tag: Peter Soetens  Tue Dec 21 22:43:08 CET 2004  TaskContext.cxx
00003 
00004                         TaskContext.cxx -  description
00005                            -------------------
00006     begin                : Tue December 21 2004
00007     copyright            : (C) 2004 Peter Soetens
00008     email                : peter.soetens@mech.kuleuven.ac.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 
00040 #include "TaskContext.hpp"
00041 #include "base/ActionInterface.hpp"
00042 #include "plugin/PluginLoader.hpp"
00043 
00044 #include <string>
00045 #include <algorithm>
00046 #include <functional>
00047 #include <boost/bind.hpp>
00048 #include <boost/mem_fn.hpp>
00049 
00050 #include "internal/DataSource.hpp"
00051 #include "internal/mystd.hpp"
00052 #include "internal/MWSRQueue.hpp"
00053 #include "OperationCaller.hpp"
00054 
00055 #include "rtt-config.h"
00056 
00057 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00058 #include "extras/SequentialActivity.hpp"
00059 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
00060 #include "Activity.hpp"
00061 #endif
00062 
00063 namespace RTT
00064 {
00065 
00066     using namespace boost;
00067     using namespace std;
00068     using namespace detail;
00069 
00070     TaskContext::TaskContext(const std::string& name, TaskState initial_state /*= Stopped*/)
00071         :  TaskCore( initial_state)
00072            ,portqueue( new MWSRQueue<PortInterface*>(64) )
00073            ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) )
00074 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00075            ,our_act( new SequentialActivity( this->engine() ) )
00076 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
00077            ,our_act( new Activity( this->engine(), name ) )
00078 #endif
00079     {
00080         this->setup();
00081     }
00082 
00083     TaskContext::TaskContext(const std::string& name, ExecutionEngine* parent, TaskState initial_state /*= Stopped*/ )
00084         :  TaskCore(parent, initial_state)
00085            ,portqueue( new MWSRQueue<PortInterface*>(64) )
00086            ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) )
00087 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00088            ,our_act( parent ? 0 : new SequentialActivity( this->engine() ) )
00089 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
00090            ,our_act( parent ? 0 : new Activity( this->engine(), name ) )
00091 #endif
00092     {
00093         this->setup();
00094     }
00095 
00096     void TaskContext::setup()
00097     {
00098         tcservice->setOwner(this);
00099         // from Service
00100         provides()->doc("The interface of this TaskContext.");
00101 
00102         this->addOperation("configure", &TaskContext::configure, this, ClientThread).doc("Configure this TaskContext (= configureHook() ).");
00103         this->addOperation("isConfigured", &TaskContext::isConfigured, this, ClientThread).doc("Is this TaskContext configured ?");
00104         this->addOperation("start", &TaskContext::start, this, ClientThread).doc("Start this TaskContext (= startHook() + updateHook() ).");
00105         this->addOperation("activate", &TaskContext::activate, this, ClientThread).doc("Activate the Execution Engine of this TaskContext.");
00106         this->addOperation("stop", &TaskContext::stop, this, ClientThread).doc("Stop this TaskContext (= stopHook() ).");
00107         this->addOperation("isRunning", &TaskContext::isRunning, this, ClientThread).doc("Is this TaskContext started ?");
00108         this->addOperation("getPeriod", &TaskContext::getPeriod, this, ClientThread).doc("Get the configured execution period. -1.0: no thread associated, 0.0: non periodic, > 0.0: the period.");
00109         this->addOperation("setPeriod", &TaskContext::setPeriod, this, ClientThread).doc("Set the execution period in seconds.").arg("s", "Period in seconds.");
00110         this->addOperation("getCpuAffinity", &TaskContext::getCpuAffinity, this, ClientThread).doc("Get the configured cpu affinity.");
00111         this->addOperation("setCpuAffinity", &TaskContext::setCpuAffinity, this, ClientThread).doc("Set the cpu affinity.").arg("cpu", "Cpu mask.");
00112         this->addOperation("isActive", &TaskContext::isActive, this, ClientThread).doc("Is the Execution Engine of this TaskContext active ?");
00113         this->addOperation("inFatalError", &TaskContext::inFatalError, this, ClientThread).doc("Check if this TaskContext is in the FatalError state.");
00114         this->addOperation("error", &TaskContext::error, this, ClientThread).doc("Enter the RunTimeError state (= errorHook() ).");
00115         this->addOperation("inRunTimeError", &TaskContext::inRunTimeError, this, ClientThread).doc("Check if this TaskContext is in the RunTimeError state.");
00116         this->addOperation("cleanup", &TaskContext::cleanup, this, ClientThread).doc("Reset this TaskContext to the PreOperational state ( =cleanupHook() ).");
00117         this->addOperation("update", &TaskContext::update, this, ClientThread).doc("Execute (call) the update method directly.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
00118 
00119         this->addOperation("trigger", &TaskContext::trigger, this, ClientThread).doc("Trigger the update method for execution in the thread of this task.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
00120         this->addOperation("loadService", &TaskContext::loadService, this, ClientThread).doc("Loads a service known to RTT into this component.").arg("service_name","The name with which the service is registered by in the PluginLoader.");
00121         // activity runs from the start.
00122         if (our_act)
00123             our_act->start();
00124     }
00125 
00126         TaskContext::~TaskContext()
00127         {
00128             if (our_act)
00129                 our_act->stop();
00130             // We don't call stop() or cleanup() here since this is
00131             // the responsibility of the subclass. Calling these functions
00132             // here would only lead to calling invalid virtual functions.
00133             // [Rule no 1: Don't call virtual functions in a destructor.]
00134             // [Rule no 2: Don't call virtual functions in a constructor.]
00135             tcservice->clear();
00136 
00137             delete tcrequests;
00138 
00139             // remove from all users.
00140             while( !musers.empty() ) {
00141                 musers.front()->removePeer(this);
00142             }
00143             // since we are destroyed, be sure that the peer no longer
00144             // has a 'user' pointer to us.
00145             while ( !_task_map.empty() ) {
00146                 _task_map.begin()->second->removeUser(this);
00147                 _task_map.erase( _task_map.begin() );
00148             }
00149             // Do not call this->disconnect() !!!
00150             // Ports are probably already destructed by user code.
00151             delete portqueue;
00152         }
00153 
00154     bool TaskContext::connectPorts( TaskContext* peer )
00155     {
00156         bool failure = false;
00157         const std::string& location = this->getName();
00158         Logger::In in( location.c_str()  );
00159 
00160         DataFlowInterface::Ports myports = this->ports()->getPorts();
00161         for (DataFlowInterface::Ports::iterator it = myports.begin();
00162              it != myports.end();
00163              ++it) {
00164 
00165             // Then try to get the peer port's connection
00166             PortInterface* peerport = peer->ports()->getPort( (*it)->getName() );
00167             if ( !peerport ) {
00168                 log(Debug)<< "Peer Task "<<peer->getName() <<" has no Port " << (*it)->getName() << endlog();
00169                 continue;
00170             }
00171 
00172             // Skip if they have the same type
00173             if((dynamic_cast<OutputPortInterface*>(*it) && dynamic_cast<OutputPortInterface*>(peerport)) ||
00174                (dynamic_cast<InputPortInterface*>(*it) &&  dynamic_cast<InputPortInterface*>(peerport)))
00175               {
00176                 log(Debug)<< (*it)->getName() << " and " << peerport->getName() << " have the same type" << endlog();
00177                 continue;
00178               }
00179 
00180             // Try to find a way to connect them
00181             if ( !(*it)->connectTo( peerport ) ) {
00182                 log(Debug)<< "Data flow incompatible between ports "
00183                           << getName() << "." << (*it)->getName() << " and "
00184                           << peer->getName() << "." << (*it)->getName() << endlog();
00185                 failure = true;
00186             }
00187         }
00188         return !failure;
00189     }
00190 
00191     bool TaskContext::connectServices( TaskContext* peer )
00192     {
00193         bool failure = false;
00194         const std::string& location = this->getName();
00195         Logger::In in( location.c_str()  );
00196 
00197         vector<string> myreqs = this->requires()->getRequesterNames();
00198         vector<string> peerreqs = peer->requires()->getRequesterNames();
00199 
00200         this->requires()->connectTo( peer->provides() );
00201         for (vector<string>::iterator it = myreqs.begin();
00202              it != myreqs.end();
00203              ++it) {
00204             ServiceRequester* sr = this->requires(*it);
00205             if ( !sr->ready() ) {
00206                 if (peer->provides()->hasService( *it ))
00207                     sr->connectTo( peer->provides(*it) );
00208                 else {
00209                     log(Debug)<< "Peer Task "<<peer->getName() <<" provides no Service " << *it << endlog();
00210                 }
00211             }
00212         }
00213 
00214         peer->requires()->connectTo( this->provides() );
00215         for (vector<string>::iterator it = peerreqs.begin();
00216                 it != peerreqs.end();
00217                 ++it) {
00218             ServiceRequester* sr = peer->requires(*it);
00219             if ( !sr->ready() ) {
00220                 if (this->provides()->hasService(*it))
00221                     sr->connectTo( this->provides(*it) );
00222                 else
00223                     log(Debug)<< "This Task provides no Service " << *it << " for peer Task "<<peer->getName() <<"."<< endlog();
00224             }
00225         }
00226         return !failure;
00227     }
00228 
00229     bool TaskContext::prepareProvide(const std::string& name) {
00230          return tcservice->hasService(name) || plugin::PluginLoader::Instance()->loadService(name, this);
00231     }
00232 
00233     bool TaskContext::loadService(const std::string& service_name) {
00234         if ( provides()->hasService(service_name))
00235             return true;
00236         return PluginLoader::Instance()->loadService(service_name, this);
00237     }
00238 
00239     void TaskContext::addUser( TaskContext* peer )
00240     {
00241         if (peer)
00242             musers.push_back(peer);
00243     }
00244 
00245     void TaskContext::removeUser( TaskContext* peer )
00246     {
00247         Users::iterator it = find(musers.begin(), musers.end(), peer);
00248         if ( it != musers.end() )
00249             musers.erase(it);
00250     }
00251 
00252         bool TaskContext::addPeer( TaskContext* peer, std::string alias )
00253         {
00254             if ( alias.empty() )
00255                 alias = peer->getName();
00256             if ( !peer || _task_map.count( alias ) != 0 )
00257                 return false;
00258             _task_map[ alias ] = peer;
00259             peer->addUser( this );
00260             return true;
00261         }
00262 
00263         void TaskContext::removePeer( const std::string& name )
00264         {
00265             PeerMap::iterator it = _task_map.find( name );
00266             if ( _task_map.end() != it ) {
00267                 it->second->removeUser( this );
00268                 _task_map.erase( _task_map.find( name ) );
00269             }
00270         }
00271 
00272         void TaskContext::removePeer( TaskContext* peer )
00273         {
00274             for( PeerMap::iterator it = _task_map.begin(); it != _task_map.end(); ++it)
00275                 if ( it->second == peer ) {
00276                     peer->removeUser( this );
00277                     _task_map.erase( it );
00278                     return;
00279                 }
00280         }
00281 
00282         bool TaskContext::connectPeers( TaskContext* peer )
00283         {
00284             if ( _task_map.count( peer->getName() ) != 0
00285                  || peer->hasPeer( this->getName() ) )
00286                 return false;
00287             this->addPeer ( peer );
00288             peer->addPeer ( this );
00289             return true;
00290         }
00291 
00292     void TaskContext::disconnect() {
00293         Logger::In in( this->getName().c_str()  );
00294         // disconnect all our ports
00295         DataFlowInterface::Ports myports = this->ports()->getPorts();
00296         for (DataFlowInterface::Ports::iterator it = myports.begin();
00297              it != myports.end();
00298              ++it) {
00299             (*it)->disconnect();
00300         }
00301 
00302         // remove from all users.
00303         while( !musers.empty() ) {
00304             musers.front()->removePeer(this);
00305         }
00306 
00307         while ( !_task_map.empty() ) {
00308             _task_map.begin()->second->removeUser(this);
00309             _task_map.erase( _task_map.begin() );
00310         }
00311     }
00312 
00313         void TaskContext::disconnectPeers( const std::string& name )
00314         {
00315             if ( _task_map.end() != _task_map.find( name ) ) {
00316                 TaskContext* peer = _task_map.find(name)->second;
00317                 this->removePeer(peer);
00318                 peer->removePeer(this);
00319             }
00320         }
00321 
00322         std::vector<std::string> TaskContext::getPeerList() const
00323         {
00324             std::vector<std::string> res;
00325             std::transform(_task_map.begin(), _task_map.end(),
00326                            std::back_inserter( res ),
00327                            select1st<PeerMap::value_type>() );
00328             return res;
00329         }
00330 
00331         bool TaskContext::hasPeer( const std::string& peer_name ) const
00332         {
00333             return _task_map.count( peer_name ) == 1;
00334         }
00335 
00336         TaskContext* TaskContext::getPeer(const std::string& peer_name ) const
00337         {
00338             if (this->hasPeer( peer_name ) )
00339                 return _task_map.find(peer_name)->second;
00340             return 0;
00341         }
00342 
00343     bool TaskContext::setActivity(ActivityInterface* new_act)
00344     {
00345         if (this->isRunning())
00346             return false;
00347         if ( new_act == 0) {
00348 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00349             new_act = new SequentialActivity();
00350 #elseif defined(ORO_ACT_DEFAULT_ACTIVITY)
00351             new_act = new Activity();
00352 #endif
00353         }
00354         new_act->stop();
00355         our_act->stop();
00356         new_act->run( this->engine() );
00357         our_act = ActivityInterface::shared_ptr( new_act );
00358         our_act->start();
00359         return true;
00360     }
00361 
00362     void TaskContext::forceActivity(ActivityInterface* new_act)
00363     {
00364         if (!new_act)
00365             return;
00366         new_act->stop();
00367         our_act->stop();
00368         our_act.reset( new_act );
00369         our_act->run( this->engine() );
00370         our_act->start();
00371     }
00372 
00373     ActivityInterface* TaskContext::getActivity()
00374     {
00375         if (this->engine()->getActivity() != our_act.get() )
00376             return this->engine()->getActivity();
00377         return our_act.get();
00378     }
00379 
00380     void TaskContext::clear()
00381     {
00382         tcservice->clear();
00383     }
00384 
00385     bool TaskContext::ready()
00386     {
00387         return true;
00388     }
00389 
00390     bool connectPorts(TaskContext* A, TaskContext* B) {
00391         return A->connectPorts(B);
00392     }
00393 
00394     bool connectPeers(TaskContext* A, TaskContext* B) {
00395         return A->connectPeers(B);
00396     }
00397 
00398     bool TaskContext::start()
00399     {
00400         if ( this->isRunning() )
00401             return false;
00402 #ifdef ORO_SIGNALLING_PORTS
00403         ports()->setupHandles();
00404 #endif
00405         return TaskCore::start(); // calls startHook()
00406     }
00407 
00408     bool TaskContext::stop()
00409     {
00410         if ( !this->isRunning() )
00411             return false;
00412         if (TaskCore::stop()) { // calls stopHook()
00413 #ifdef ORO_SIGNALLING_PORTS
00414             ports()->cleanupHandles();
00415 #endif
00416             return true;
00417         }
00418         return false;
00419     }
00420 
00421     void TaskContext::dataOnPort(PortInterface* port)
00422     {
00423         portqueue->enqueue( port );
00424         this->getActivity()->trigger();
00425     }
00426 
00427     void TaskContext::dataOnPortCallback(InputPortInterface* port, TaskContext::SlotFunction callback) {
00428         // user_callbacks will only be emitted from updateHook().
00429         MutexLock lock(mportlock);
00430         user_callbacks[port] = callback;
00431     }
00432 
00433     void TaskContext::dataOnPortRemoved(PortInterface* port) {
00434         MutexLock lock(mportlock);
00435         UserCallbacks::iterator it = user_callbacks.find(port);
00436         if (it != user_callbacks.end() ) {
00437             user_callbacks.erase(it);
00438         }
00439     }
00440 
00441     void TaskContext::prepareUpdateHook()
00442     {
00443         MutexLock lock(mportlock);
00444         PortInterface* port = 0;
00445         while ( portqueue->dequeue( port ) == true ) {
00446             UserCallbacks::iterator it = user_callbacks.find(port);
00447             if (it != user_callbacks.end() )
00448                 it->second(port); // fire the user callback
00449         }
00450     }
00451 }
00452