Orocos Real-Time Toolkit  2.5.0
TaskContext.cpp
00001 /***************************************************************************
00002   tag: Peter Soetens  Tue Dec 21 22:43:08 CET 2004  TaskContext.cxx
00003 
00004                         TaskContext.cxx -  description
00005                            -------------------
00006     begin                : Tue December 21 2004
00007     copyright            : (C) 2004 Peter Soetens
00008     email                : peter.soetens@mech.kuleuven.ac.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 
00040 #include "TaskContext.hpp"
00041 #include "base/ActionInterface.hpp"
00042 #include "plugin/PluginLoader.hpp"
00043 
00044 #include <string>
00045 #include <algorithm>
00046 #include <functional>
00047 #include <boost/bind.hpp>
00048 #include <boost/mem_fn.hpp>
00049 
00050 #include "internal/DataSource.hpp"
00051 #include "internal/mystd.hpp"
00052 #include "internal/MWSRQueue.hpp"
00053 #include "OperationCaller.hpp"
00054 
00055 #include "rtt-config.h"
00056 
00057 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00058 #include "extras/SequentialActivity.hpp"
00059 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
00060 #include "Activity.hpp"
00061 #endif
00062 
00063 namespace RTT
00064 {
00065 
00066     using namespace boost;
00067     using namespace std;
00068     using namespace detail;
00069 
00070     TaskContext::TaskContext(const std::string& name, TaskState initial_state /*= Stopped*/)
00071         :  TaskCore( initial_state)
00072            ,portqueue( new MWSRQueue<PortInterface*>(64) )
00073            ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) )
00074 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00075            ,our_act( new SequentialActivity( this->engine() ) )
00076 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
00077            ,our_act( new Activity( this->engine(), name ) )
00078 #endif
00079     {
00080         this->setup();
00081     }
00082 
00083     TaskContext::TaskContext(const std::string& name, ExecutionEngine* parent, TaskState initial_state /*= Stopped*/ )
00084         :  TaskCore(parent, initial_state)
00085            ,portqueue( new MWSRQueue<PortInterface*>(64) )
00086            ,tcservice(new Service(name,this) ), tcrequests( new ServiceRequester(name,this) )
00087 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00088            ,our_act( parent ? 0 : new SequentialActivity( this->engine() ) )
00089 #elif defined(ORO_ACT_DEFAULT_ACTIVITY)
00090            ,our_act( parent ? 0 : new Activity( this->engine(), name ) )
00091 #endif
00092     {
00093         this->setup();
00094     }
00095 
00096     void TaskContext::setup()
00097     {
00098         tcservice->setOwner(this);
00099         // from Service
00100         provides()->doc("The interface of this TaskContext.");
00101 
00102         this->addOperation("configure", &TaskContext::configure, this, ClientThread).doc("Configure this TaskContext (= configureHook() ).");
00103         this->addOperation("isConfigured", &TaskContext::isConfigured, this, ClientThread).doc("Is this TaskContext configured ?");
00104         this->addOperation("start", &TaskContext::start, this, ClientThread).doc("Start this TaskContext (= startHook() + updateHook() ).");
00105         this->addOperation("activate", &TaskContext::activate, this, ClientThread).doc("Activate the Execution Engine of this TaskContext.");
00106         this->addOperation("stop", &TaskContext::stop, this, ClientThread).doc("Stop this TaskContext (= stopHook() ).");
00107         this->addOperation("isRunning", &TaskContext::isRunning, this, ClientThread).doc("Is this TaskContext started ?");
00108         this->addOperation("getPeriod", &TaskContext::getPeriod, this, ClientThread).doc("Get the configured execution period. -1.0: no thread associated, 0.0: non periodic, > 0.0: the period.");
00109         this->addOperation("setPeriod", &TaskContext::setPeriod, this, ClientThread).doc("Set the execution period in seconds.").arg("s", "Period in seconds.");
00110         this->addOperation("getCpuAffinity", &TaskContext::getCpuAffinity, this, ClientThread).doc("Get the configured cpu affinity.");
00111         this->addOperation("setCpuAffinity", &TaskContext::setCpuAffinity, this, ClientThread).doc("Set the cpu affinity.").arg("cpu", "Cpu mask.");
00112         this->addOperation("isActive", &TaskContext::isActive, this, ClientThread).doc("Is the Execution Engine of this TaskContext active ?");
00113         this->addOperation("inFatalError", &TaskContext::inFatalError, this, ClientThread).doc("Check if this TaskContext is in the FatalError state.");
00114         this->addOperation("error", &TaskContext::error, this, ClientThread).doc("Enter the RunTimeError state (= errorHook() ).");
00115         this->addOperation("inRunTimeError", &TaskContext::inRunTimeError, this, ClientThread).doc("Check if this TaskContext is in the RunTimeError state.");
00116         this->addOperation("cleanup", &TaskContext::cleanup, this, ClientThread).doc("Reset this TaskContext to the PreOperational state ( =cleanupHook() ).");
00117         this->addOperation("update", &TaskContext::update, this, ClientThread).doc("Execute (call) the update method directly.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
00118 
00119         this->addOperation("trigger", &TaskContext::trigger, this, ClientThread).doc("Trigger the update method for execution in the thread of this task.\n Only succeeds if the task isRunning() and allowed by the Activity executing this task.");
00120 
00121         // activity runs from the start.
00122         if (our_act)
00123             our_act->start();
00124     }
00125 
00126         TaskContext::~TaskContext()
00127         {
00128             if (our_act)
00129                 our_act->stop();
00130             // We don't call stop() or cleanup() here since this is
00131             // the responsibility of the subclass. Calling these functions
00132             // here would only lead to calling invalid virtual functions.
00133             // [Rule no 1: Don't call virtual functions in a destructor.]
00134             // [Rule no 2: Don't call virtual functions in a constructor.]
00135             tcservice->clear();
00136 
00137             delete tcrequests;
00138 
00139             // remove from all users.
00140             while( !musers.empty() ) {
00141                 musers.front()->removePeer(this);
00142             }
00143             // since we are destroyed, be sure that the peer no longer
00144             // has a 'user' pointer to us.
00145             while ( !_task_map.empty() ) {
00146                 _task_map.begin()->second->removeUser(this);
00147                 _task_map.erase( _task_map.begin() );
00148             }
00149             // Do not call this->disconnect() !!!
00150             // Ports are probably already destructed by user code.
00151             delete portqueue;
00152         }
00153 
00154     bool TaskContext::connectPorts( TaskContext* peer )
00155     {
00156         bool failure = false;
00157         const std::string& location = this->getName();
00158         Logger::In in( location.c_str()  );
00159 
00160         DataFlowInterface::Ports myports = this->ports()->getPorts();
00161         for (DataFlowInterface::Ports::iterator it = myports.begin();
00162              it != myports.end();
00163              ++it) {
00164 
00165             // Then try to get the peer port's connection
00166             PortInterface* peerport = peer->ports()->getPort( (*it)->getName() );
00167             if ( !peerport ) {
00168                 log(Debug)<< "Peer Task "<<peer->getName() <<" has no Port " << (*it)->getName() << endlog();
00169                 continue;
00170             }
00171 
00172             // Skip if they have the same type
00173             if((dynamic_cast<OutputPortInterface*>(*it) && dynamic_cast<OutputPortInterface*>(peerport)) ||
00174                (dynamic_cast<InputPortInterface*>(*it) &&  dynamic_cast<InputPortInterface*>(peerport)))
00175               {
00176                 log(Debug)<< (*it)->getName() << " and " << peerport->getName() << " have the same type" << endlog();
00177                 continue;
00178               }
00179 
00180             // Try to find a way to connect them
00181             if ( !(*it)->connectTo( peerport ) ) {
00182                 log(Debug)<< "Data flow incompatible between ports "
00183                           << getName() << "." << (*it)->getName() << " and "
00184                           << peer->getName() << "." << (*it)->getName() << endlog();
00185                 failure = true;
00186             }
00187         }
00188         return !failure;
00189     }
00190 
00191     bool TaskContext::connectServices( TaskContext* peer )
00192     {
00193         bool failure = false;
00194         const std::string& location = this->getName();
00195         Logger::In in( location.c_str()  );
00196 
00197         vector<string> myreqs = this->requires()->getRequesterNames();
00198         vector<string> peerreqs = peer->requires()->getRequesterNames();
00199 
00200         this->requires()->connectTo( peer->provides() );
00201         for (vector<string>::iterator it = myreqs.begin();
00202              it != myreqs.end();
00203              ++it) {
00204             ServiceRequester* sr = this->requires(*it);
00205             if ( !sr->ready() ) {
00206                 if (peer->provides()->hasService( *it ))
00207                     sr->connectTo( peer->provides(*it) );
00208                 else {
00209                     log(Debug)<< "Peer Task "<<peer->getName() <<" provides no Service " << *it << endlog();
00210                 }
00211             }
00212         }
00213 
00214         peer->requires()->connectTo( this->provides() );
00215         for (vector<string>::iterator it = peerreqs.begin();
00216                 it != peerreqs.end();
00217                 ++it) {
00218             ServiceRequester* sr = peer->requires(*it);
00219             if ( !sr->ready() ) {
00220                 if (this->provides()->hasService(*it))
00221                     sr->connectTo( this->provides(*it) );
00222                 else
00223                     log(Debug)<< "This Task provides no Service " << *it << " for peer Task "<<peer->getName() <<"."<< endlog();
00224             }
00225         }
00226         return !failure;
00227     }
00228 
00229     bool TaskContext::prepareProvide(const std::string& name) {
00230          return tcservice->hasService(name) || plugin::PluginLoader::Instance()->loadService(name, this);
00231     }
00232 
00233     void TaskContext::addUser( TaskContext* peer )
00234     {
00235         if (peer)
00236             musers.push_back(peer);
00237     }
00238 
00239     void TaskContext::removeUser( TaskContext* peer )
00240     {
00241         Users::iterator it = find(musers.begin(), musers.end(), peer);
00242         if ( it != musers.end() )
00243             musers.erase(it);
00244     }
00245 
00246         bool TaskContext::addPeer( TaskContext* peer, std::string alias )
00247         {
00248             if ( alias.empty() )
00249                 alias = peer->getName();
00250             if ( !peer || _task_map.count( alias ) != 0 )
00251                 return false;
00252             _task_map[ alias ] = peer;
00253             peer->addUser( this );
00254             return true;
00255         }
00256 
00257         void TaskContext::removePeer( const std::string& name )
00258         {
00259             PeerMap::iterator it = _task_map.find( name );
00260             if ( _task_map.end() != it ) {
00261                 it->second->removeUser( this );
00262                 _task_map.erase( _task_map.find( name ) );
00263             }
00264         }
00265 
00266         void TaskContext::removePeer( TaskContext* peer )
00267         {
00268             for( PeerMap::iterator it = _task_map.begin(); it != _task_map.end(); ++it)
00269                 if ( it->second == peer ) {
00270                     peer->removeUser( this );
00271                     _task_map.erase( it );
00272                     return;
00273                 }
00274         }
00275 
00276         bool TaskContext::connectPeers( TaskContext* peer )
00277         {
00278             if ( _task_map.count( peer->getName() ) != 0
00279                  || peer->hasPeer( this->getName() ) )
00280                 return false;
00281             this->addPeer ( peer );
00282             peer->addPeer ( this );
00283             return true;
00284         }
00285 
00286     void TaskContext::disconnect() {
00287         Logger::In in( this->getName().c_str()  );
00288         // disconnect all our ports
00289         DataFlowInterface::Ports myports = this->ports()->getPorts();
00290         for (DataFlowInterface::Ports::iterator it = myports.begin();
00291              it != myports.end();
00292              ++it) {
00293             (*it)->disconnect();
00294         }
00295 
00296         // remove from all users.
00297         while( !musers.empty() ) {
00298             musers.front()->removePeer(this);
00299         }
00300 
00301         while ( !_task_map.empty() ) {
00302             _task_map.begin()->second->removeUser(this);
00303             _task_map.erase( _task_map.begin() );
00304         }
00305     }
00306 
00307         void TaskContext::disconnectPeers( const std::string& name )
00308         {
00309             if ( _task_map.end() != _task_map.find( name ) ) {
00310                 TaskContext* peer = _task_map.find(name)->second;
00311                 this->removePeer(peer);
00312                 peer->removePeer(this);
00313             }
00314         }
00315 
00316         std::vector<std::string> TaskContext::getPeerList() const
00317         {
00318             std::vector<std::string> res;
00319             std::transform(_task_map.begin(), _task_map.end(),
00320                            std::back_inserter( res ),
00321                            select1st<PeerMap::value_type>() );
00322             return res;
00323         }
00324 
00325         bool TaskContext::hasPeer( const std::string& peer_name ) const
00326         {
00327             return _task_map.count( peer_name ) == 1;
00328         }
00329 
00330         TaskContext* TaskContext::getPeer(const std::string& peer_name ) const
00331         {
00332             if (this->hasPeer( peer_name ) )
00333                 return _task_map.find(peer_name)->second;
00334             return 0;
00335         }
00336 
00337     bool TaskContext::setActivity(ActivityInterface* new_act)
00338     {
00339         if (this->isRunning())
00340             return false;
00341         if ( new_act == 0) {
00342 #if defined(ORO_ACT_DEFAULT_SEQUENTIAL)
00343             new_act = new SequentialActivity();
00344 #elseif defined(ORO_ACT_DEFAULT_ACTIVITY)
00345             new_act = new Activity();
00346 #endif
00347         }
00348         new_act->stop();
00349         our_act->stop();
00350         new_act->run( this->engine() );
00351         our_act = ActivityInterface::shared_ptr( new_act );
00352         our_act->start();
00353         return true;
00354     }
00355 
00356     void TaskContext::forceActivity(ActivityInterface* new_act)
00357     {
00358         if (!new_act)
00359             return;
00360         new_act->stop();
00361         our_act->stop();
00362         our_act.reset( new_act );
00363         our_act->run( this->engine() );
00364         our_act->start();
00365     }
00366 
00367     ActivityInterface* TaskContext::getActivity()
00368     {
00369         if (this->engine()->getActivity() != our_act.get() )
00370             return this->engine()->getActivity();
00371         return our_act.get();
00372     }
00373 
00374     void TaskContext::clear()
00375     {
00376         tcservice->clear();
00377     }
00378 
00379     bool TaskContext::ready()
00380     {
00381         return true;
00382     }
00383 
00384     bool connectPorts(TaskContext* A, TaskContext* B) {
00385         return A->connectPorts(B);
00386     }
00387 
00388     bool connectPeers(TaskContext* A, TaskContext* B) {
00389         return A->connectPeers(B);
00390     }
00391 
00392     bool TaskContext::start()
00393     {
00394         if ( this->isRunning() )
00395             return false;
00396 #ifdef ORO_SIGNALLING_PORTS
00397         ports()->setupHandles();
00398 #endif
00399         return TaskCore::start(); // calls startHook()
00400     }
00401 
00402     bool TaskContext::stop()
00403     {
00404         if ( !this->isRunning() )
00405             return false;
00406         if (TaskCore::stop()) { // calls stopHook()
00407 #ifdef ORO_SIGNALLING_PORTS
00408             ports()->cleanupHandles();
00409 #endif
00410             return true;
00411         }
00412         return false;
00413     }
00414 
00415     void TaskContext::dataOnPort(PortInterface* port)
00416     {
00417         portqueue->enqueue( port );
00418         this->getActivity()->trigger();
00419     }
00420 
00421     void TaskContext::dataOnPortCallback(InputPortInterface* port, TaskContext::SlotFunction callback) {
00422         // user_callbacks will only be emitted from updateHook().
00423         MutexLock lock(mportlock);
00424         user_callbacks[port] = callback;
00425     }
00426 
00427     void TaskContext::dataOnPortRemoved(PortInterface* port) {
00428         MutexLock lock(mportlock);
00429         UserCallbacks::iterator it = user_callbacks.find(port);
00430         if (it != user_callbacks.end() ) {
00431             user_callbacks.erase(it);
00432         }
00433     }
00434 
00435     void TaskContext::prepareUpdateHook()
00436     {
00437         MutexLock lock(mportlock);
00438         PortInterface* port = 0;
00439         while ( portqueue->dequeue( port ) == true ) {
00440             UserCallbacks::iterator it = user_callbacks.find(port);
00441             if (it != user_callbacks.end() )
00442                 it->second(port); // fire the user callback
00443         }
00444     }
00445 }
00446