Orocos Real-Time Toolkit  2.6.0
ExecutionEngine.cpp
00001 /***************************************************************************
00002   tag: Peter Soetens  Wed Jan 18 14:11:40 CET 2006  ExecutionEngine.cxx
00003 
00004                         ExecutionEngine.cxx -  description
00005                            -------------------
00006     begin                : Wed January 18 2006
00007     copyright            : (C) 2006 Peter Soetens
00008     email                : peter.soetens@mech.kuleuven.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 
00040 #include "Logger.hpp"
00041 #include "ExecutionEngine.hpp"
00042 #include "base/TaskCore.hpp"
00043 #include "rtt-fwd.hpp"
00044 #include "os/MutexLock.hpp"
00045 #include "internal/MWSRQueue.hpp"
00046 #include "TaskContext.hpp"
00047 
00048 #include <boost/bind.hpp>
00049 #include <boost/ref.hpp>
00050 #include <boost/lambda/lambda.hpp>
00051 #include <boost/lambda/bind.hpp>
00052 #include <functional>
00053 #include <algorithm>
00054 
00055 #define ORONUM_EE_MQUEUE_SIZE 100
00056 
00057 namespace RTT
00058 {
00066     using namespace std;
00067     using namespace detail;
00068     using namespace boost;
00069 
00070     ExecutionEngine::ExecutionEngine( TaskCore* owner )
00071         : taskc(owner),
00072           mqueue(new MWSRQueue<DisposableInterface*>(ORONUM_EE_MQUEUE_SIZE) ),
00073           f_queue( new MWSRQueue<ExecutableInterface*>(ORONUM_EE_MQUEUE_SIZE) )
00074     {
00075     }
00076 
00077     ExecutionEngine::~ExecutionEngine()
00078     {
00079         Logger::In in("~ExecutionEngine");
00080 
00081         // make a copy to avoid call-back troubles:
00082         std::vector<TaskCore*> copy = children;
00083         for (std::vector<TaskCore*>::iterator it = copy.begin(); it != copy.end();++it){
00084             (*it)->setExecutionEngine( 0 );
00085         }
00086         assert( children.empty() );
00087 
00088         ExecutableInterface* foo;
00089         while ( f_queue->dequeue( foo ) )
00090             foo->unloaded();
00091 
00092         DisposableInterface* dis;
00093         while ( mqueue->dequeue( dis ) )
00094             dis->dispose();
00095 
00096         delete f_queue;
00097         delete mqueue;
00098     }
00099 
00100     TaskCore* ExecutionEngine::getParent() {
00101         return taskc;
00102     }
00103 
00104     void ExecutionEngine::addChild(TaskCore* tc) {
00105         children.push_back( tc );
00106     }
00107 
00108     void ExecutionEngine::removeChild(TaskCore* tc) {
00109         vector<TaskCore*>::iterator it = find (children.begin(), children.end(), tc );
00110         if ( it != children.end() )
00111             children.erase(it);
00112     }
00113 
00114     void ExecutionEngine::processFunctions()
00115     {
00116         // Execute all loaded Functions :
00117         ExecutableInterface* foo = 0;
00118         int nbr = f_queue->size(); // nbr to process.
00119         // 1. Fetch new ones from queue.
00120         while ( f_queue->dequeue(foo) ) {
00121             assert(foo);
00122             if ( foo->execute() == false ){
00123                 foo->unloaded();
00124                 msg_cond.broadcast(); // required for waitForFunctions() (3rd party thread)
00125             } else {
00126                 f_queue->enqueue( foo );
00127             }
00128             if ( --nbr == 0) // we did a round-trip
00129                 break;
00130         }
00131     }
00132 
00133     bool ExecutionEngine::runFunction( ExecutableInterface* f )
00134     {
00135         if (this->getActivity() && f) {
00136             // We only reject running functions when we're in the FatalError state.
00137             if (taskc && taskc->mTaskState == TaskCore::FatalError )
00138                 return false;
00139             f->loaded(this);
00140             bool result = f_queue->enqueue( f );
00141             // signal work is to be done:
00142             this->getActivity()->trigger();
00143             return result;
00144         }
00145         return false;
00146     }
00147 
00148     struct RemoveMsg : public DisposableInterface {
00149         ExecutableInterface* mf;
00150         ExecutionEngine* mee;
00151         bool found;
00152         RemoveMsg(ExecutableInterface* f, ExecutionEngine* ee)
00153         : mf(f),mee(ee), found(false) {}
00154         virtual void executeAndDispose() {
00155             mee->removeSelfFunction( mf );
00156             found = true; // always true in order to be able to quit waitForMessages.
00157         }
00158         virtual void dispose() {}
00159         virtual bool isError() const { return false;}
00160 
00161     };
00162 
00163     bool ExecutionEngine::removeFunction( ExecutableInterface* f )
00164     {
00165         // Remove from the queue.
00166         if ( !f )
00167             return false;
00168 
00169         if ( !f->isLoaded() )
00170             return true;
00171 
00172         // When not running, just remove.
00173         if ( getActivity() == 0 || !this->getActivity()->isActive() ) {
00174             if ( removeSelfFunction( f ) == false )
00175                 return false;
00176         } else {
00177             // Running: create message on stack.
00178             RemoveMsg rmsg(f,this);
00179             if ( this->process(&rmsg) )
00180                 this->waitForMessages( ! lambda::bind(&ExecutableInterface::isLoaded, f) || lambda::bind(&RemoveMsg::found,boost::ref(rmsg)) );
00181             if (!rmsg.found)
00182                 return false;
00183         }
00184         // unloading was succesful, now notify unloading:
00185         f->unloaded();
00186         return true;
00187     }
00188 
00189     bool ExecutionEngine::removeSelfFunction(ExecutableInterface* f  )
00190     {
00191         // since this function is executed in process messages, it is always safe to execute.
00192         if ( !f )
00193             return false;
00194         int nbr = f_queue->size();
00195         while (nbr != 0) {
00196             ExecutableInterface* foo = 0;
00197             if ( !f_queue->dequeue(foo) )
00198                 return false;
00199             if ( f  == foo) {
00200                 return true;
00201             }
00202             f_queue->enqueue(foo);
00203             --nbr;
00204         }
00205         return true;
00206     }
00207 
00208     bool ExecutionEngine::initialize() {
00209         // nop
00210         return true;
00211     }
00212 
00213     bool ExecutionEngine::hasWork()
00214     {
00215         return !mqueue->isEmpty();
00216     }
00217 
00218     void ExecutionEngine::processMessages()
00219     {
00220         // execute all commands from the AtomicQueue.
00221         // msg_lock may not be held when entering this function !
00222         DisposableInterface* com(0);
00223         {
00224             while ( mqueue->dequeue(com) ) {
00225                 assert( com );
00226                 com->executeAndDispose();
00227             }
00228             // there's no need to hold the lock during
00229             // emptying the queue. But we must hold the
00230             // lock once between excuteAndDispose and the
00231             // broadcast to avoid the race condition in
00232             // waitForMessages().
00233             // This allows us to recurse into processMessages.
00234             MutexLock locker( msg_lock );
00235         }
00236         if ( com )
00237             msg_cond.broadcast(); // required for waitForMessages() (3rd party thread)
00238     }
00239 
00240     bool ExecutionEngine::process( DisposableInterface* c )
00241     {
00242         if ( c && this->getActivity() ) {
00243             // We only reject running functions when we're in the FatalError state.
00244             if (taskc && taskc->mTaskState == TaskCore::FatalError )
00245                 return false;
00246             bool result = mqueue->enqueue( c );
00247             this->getActivity()->trigger();
00248             msg_cond.broadcast(); // required for waitAndProcessMessages() (EE thread)
00249             return result;
00250         }
00251         return false;
00252     }
00253 
00254     void ExecutionEngine::waitForMessages(const boost::function<bool(void)>& pred)
00255     {
00256         if (this->getActivity()->thread()->isSelf())
00257             waitAndProcessMessages(pred);
00258         else
00259             waitForMessagesInternal(pred);
00260     }
00261 
00262 
00263     void ExecutionEngine::waitForFunctions(const boost::function<bool(void)>& pred)
00264     {
00265         if (this->getActivity()->thread()->isSelf())
00266             waitAndProcessFunctions(pred);
00267         else
00268             waitForMessagesInternal(pred); // same as for messages.
00269     }
00270 
00271 
00272     void ExecutionEngine::waitForMessagesInternal(boost::function<bool(void)> const& pred)
00273     {
00274         if ( pred() )
00275             return;
00276         // only to be called from the thread not executing step().
00277         os::MutexLock lock(msg_lock);
00278         while (!pred()) { // the mutex guards that processMessages can not run between !pred and the wait().
00279             msg_cond.wait(msg_lock); // now processMessages may run.
00280         }
00281     }
00282 
00283 
00284     void ExecutionEngine::waitAndProcessMessages(boost::function<bool(void)> const& pred)
00285     {
00286         while ( !pred() ){
00287             // may not be called while holding the msg_lock !!!
00288             this->processMessages();
00289             {
00290                 // only to be called from the thread executing step().
00291                 // We must lock because the cond variable will unlock msg_lock.
00292                 os::MutexLock lock(msg_lock);
00293                 if (!pred()) {
00294                     msg_cond.wait(msg_lock); // now processMessages may run.
00295                 } else {
00296                     return; // do not process messages when pred() == true;
00297                 }
00298             }
00299         }
00300     }
00301 
00302     void ExecutionEngine::waitAndProcessFunctions(boost::function<bool(void)> const& pred)
00303     {
00304         while ( !pred() ){
00305             // may not be called while holding the msg_lock !!!
00306             this->processFunctions();
00307             {
00308                 // only to be called from the thread executing step().
00309                 // We must lock because the cond variable will unlock msg_lock.
00310                 os::MutexLock lock(msg_lock);
00311                 if (!pred()) {
00312                     msg_cond.wait(msg_lock); // now processMessages may run.
00313                 } else {
00314                     return; // do not process messages when pred() == true;
00315                 }
00316             }
00317         }
00318     }
00319 
00320     void ExecutionEngine::step() {
00321         processMessages();
00322         processFunctions();
00323         processChildren(); // aren't these ExecutableInterfaces ie functions ?
00324     }
00325 
00326     void ExecutionEngine::processChildren() {
00327         // only call updateHook in the Running state.
00328         if ( taskc ) {
00329             // A trigger() in startHook() will be ignored, we trigger in TaskCore after startHook finishes.
00330             if ( taskc->mTaskState == TaskCore::Running && taskc->mTargetState == TaskCore::Running ) {
00331                 try {
00332                     taskc->prepareUpdateHook();
00333                     taskc->updateHook();
00334                 } catch(std::exception const& e) {
00335                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00336                     log(Error) << "  " << e.what() << endlog();
00337                     taskc->exception();
00338                 } catch(...){
00339                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00340                     taskc->exception(); // calls stopHook,cleanupHook
00341                 }
00342             }
00343             // in case start() or updateHook() called error(), this will be called:
00344             if (  taskc->mTaskState == TaskCore::RunTimeError ) {
00345                 try {
00346                     taskc->errorHook();
00347                 } catch(std::exception const& e) {
00348                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00349                     log(Error) << "  " << e.what() << endlog();
00350                     taskc->exception();
00351                 } catch(...){
00352                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00353                     taskc->exception(); // calls stopHook,cleanupHook
00354                 }
00355             }
00356         }
00357         if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00358 
00359         // call all children as well.
00360         for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00361             if ( (*it)->mTaskState == TaskCore::Running  && (*it)->mTargetState == TaskCore::Running  )
00362                 try {
00363                     (*it)->prepareUpdateHook();
00364                     (*it)->updateHook();
00365                 } catch(std::exception const& e) {
00366                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00367                     log(Error) << "  " << e.what() << endlog();
00368                     (*it)->exception();
00369                 } catch(...){
00370                     log(Error) << "in updateHook(): switching to exception state because of unhandled exception" << endlog();
00371                     (*it)->exception(); // calls stopHook,cleanupHook
00372                 }
00373             if (  (*it)->mTaskState == TaskCore::RunTimeError )
00374                 try {
00375                     (*it)->errorHook();
00376                 } catch(std::exception const& e) {
00377                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00378                     log(Error) << "  " << e.what() << endlog();
00379                     (*it)->exception();
00380                 } catch(...){
00381                     log(Error) << "in errorHook(): switching to exception state because of unhandled exception" << endlog();
00382                     (*it)->exception(); // calls stopHook,cleanupHook
00383                 }
00384             if ( !this->getActivity() || ! this->getActivity()->isRunning() ) return;
00385         }
00386     }
00387 
00388     bool ExecutionEngine::breakLoop() {
00389         bool ok = true;
00390         if (taskc)
00391             ok = taskc->breakUpdateHook();
00392         for (std::vector<TaskCore*>::iterator it = children.begin(); it != children.end();++it) {
00393             ok = (*it)->breakUpdateHook() && ok;
00394             }
00395         return ok;
00396     }
00397 
00398     bool ExecutionEngine::stopTask(TaskCore* task) {
00399         // stop and start where former will call breakLoop() in case of non-periodic.
00400         // this is a forced synchronization point, since stop() will only return when
00401         // step() returned.
00402         if ( getActivity() && this->getActivity()->stop() ) {
00403             this->getActivity()->start();
00404             return true;
00405         }
00406         return false;
00407     }
00408 
00409     void ExecutionEngine::setExceptionTask() {
00410         std::string name;
00411         TaskContext* tc = dynamic_cast<TaskContext*>(taskc);
00412         if (tc)
00413             name = tc->getName();
00414         else if (taskc)
00415             name = "TaskCore";
00416         else
00417             name = "GlobalEngine";
00418         log(Error) << "in "<<name<<": unhandled exception in sent operation." << endlog();
00419         if(taskc)
00420             taskc->exception();
00421     }
00422             
00423 
00424     void ExecutionEngine::finalize() {
00425         // nop
00426     }
00427 
00428 }
00429