Orocos Real-Time Toolkit  2.5.0
TaskContextServer.cpp
00001 /***************************************************************************
00002   tag: Peter Soetens  Wed Jan 18 14:09:49 CET 2006  TaskContextServer.cxx
00003 
00004                         TaskContextServer.cxx -  description
00005                            -------------------
00006     begin                : Wed January 18 2006
00007     copyright            : (C) 2006 Peter Soetens
00008     email                : peter.soetens@fmtc.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 
00040 #include "TaskContextServer.hpp"
00041 #include "TaskContextProxy.hpp"
00042 #include "corba.h"
00043 #ifdef CORBA_IS_TAO
00044 #include "TaskContextS.h"
00045 #include <orbsvcs/CosNamingC.h>
00046 // ACE Specific, for printing exceptions.
00047 #include <ace/SString.h>
00048 #include "tao/TimeBaseC.h"
00049 #include "tao/Messaging/Messaging.h"
00050 #include "tao/Messaging/Messaging_RT_PolicyC.h"
00051 #else
00052 #include <omniORB4/Naming.hh>
00053 #endif
00054 #include "TaskContextC.h"
00055 #include "TaskContextI.h"
00056 #include "DataFlowI.h"
00057 #include "POAUtility.h"
00058 #include <iostream>
00059 #include <fstream>
00060 
00061 #include "../../os/threads.hpp"
00062 #include "../../Activity.hpp"
00063 
00064 namespace RTT
00065 {namespace corba
00066 {
00067     using namespace std;
00068 
00069     std::map<TaskContext*, TaskContextServer*> TaskContextServer::servers;
00070 
00071     base::ActivityInterface* TaskContextServer::orbrunner = 0;
00072 
00073     bool TaskContextServer::is_shutdown = false;
00074 
00075     std::map<TaskContext*, std::string> TaskContextServer::iors;
00076 
00077   TaskContextServer::~TaskContextServer()
00078   {
00079     Logger::In in("~TaskContextServer()");
00080     servers.erase(mtaskcontext);
00081 
00082     // Remove taskcontext ior reference
00083     iors.erase(mtaskcontext);
00084 
00085     PortableServer::ObjectId_var oid = mpoa->servant_to_id(mtask_i.in());
00086     mpoa->deactivate_object(oid);
00087 
00088     if (muse_naming) {
00089         try {
00090             CORBA::Object_var rootObj = orb->resolve_initial_references("NameService");
00091             CosNaming::NamingContext_var rootNC = CosNaming::NamingContext::_narrow(rootObj.in());
00092 
00093             if (CORBA::is_nil( rootNC.in() ) ) {
00094                 log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' could not find CORBA Naming Service."<<endlog();
00095             } else {
00096                 // Nameserver found...
00097                 CosNaming::Name name;
00098                 name.length(2);
00099                 name[0].id = CORBA::string_dup("TaskContexts");
00100                 name[1].id = CORBA::string_dup( mtaskcontext->getName().c_str() );
00101                 try {
00102                     rootNC->unbind(name);
00103                     log(Info) << "Successfully removed CTaskContext '"<<mtaskcontext->getName()<<"' from CORBA Naming Service."<<endlog();
00104                 }
00105                 catch( ... ) {
00106                     log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed."<<endlog();
00107                 }
00108             }
00109         } catch (...) {
00110             log(Warning) << "CTaskContext '"<< mtaskcontext->getName() << "' unbinding failed from CORBA Naming Service."<<endlog();
00111         }
00112     }
00113   }
00114 
00115 
00116 
00117 
00118     TaskContextServer::TaskContextServer(TaskContext* taskc, bool use_naming, bool require_name_service)
00119       : mtaskcontext(taskc), muse_naming(use_naming)
00120     {
00121         Logger::In in("TaskContextServer()");
00122         servers[taskc] = this;
00123         try {
00124             // Each server has its own POA.
00125             // The server's objects have their own poa as well.
00126             CORBA::Object_var poa_object =
00127                 orb->resolve_initial_references ("RootPOA");
00128             mpoa = PortableServer::POA::_narrow(poa_object);
00129             PortableServer::POAManager_var poa_manager =
00130                 mpoa->the_POAManager ();
00131 
00132             //poa = POAUtility::create_basic_POA( poa, poa_manager, taskc->getName().c_str(), 0, 1);
00133             //            poa_manager->activate ();
00134 
00135             // TODO : Use a better suited POA than create_basic_POA, use the 'session' or so type
00136             // But watch out: we need implicit activation, our you will get exceptions upon ->_this()
00137             // The POA for the Server's objects:
00138 //             PortableServer::POA_var objpoa = POAUtility::create_basic_POA(poa,
00139 //                                                               poa_manager,
00140 //                                                               std::string(taskc->getName() + "OBJPOA").c_str(),
00141 //                                                               0, 0); // Not persistent, allow implicit.
00142 
00143             // The servant : TODO : cleanup servant in destructor !
00144             RTT_corba_CTaskContext_i* serv;
00145             mtask_i = serv = new RTT_corba_CTaskContext_i( taskc, mpoa );
00146             mtask   = serv->activate_this();
00147 
00148             // Store reference to iors
00149             CORBA::String_var ior = orb->object_to_string( mtask.in() );
00150             iors[taskc] = std::string( ior.in() );
00151 
00152             if ( use_naming ) {
00153                 CORBA::Object_var rootObj;
00154                 CosNaming::NamingContext_var rootNC;
00155                 try {
00156                     rootObj = orb->resolve_initial_references("NameService");
00157                     rootNC = CosNaming::NamingContext::_narrow(rootObj);
00158                 } catch (...) {}
00159 
00160                 if (CORBA::is_nil( rootNC ) ) {
00161                     std::string  err("CTaskContext '" + taskc->getName() + "' could not find CORBA Naming Service.");
00162                     if (require_name_service) {
00163                         servers.erase(taskc);
00164                         log(Error) << err << endlog();
00165                         servers.erase(taskc);
00166                         throw IllegalServer(err);
00167                     }
00168                     else
00169                     {
00170                         log(Warning) << err << endlog();
00171                         log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog();
00172 
00173                         // this part only publishes the IOR to a file.
00174                         CORBA::String_var ior = orb->object_to_string( mtask.in() );
00175                         std::cerr << ior.in() <<std::endl;
00176                         {
00177                             // write to a file as well.
00178                             std::string iorname( taskc->getName());
00179                             iorname += ".ior";
00180                             std::ofstream file_ior( iorname.c_str() );
00181                             file_ior << ior.in() <<std::endl;
00182                         }
00183                         return;
00184                     }
00185                 }
00186                 log(Info) << "CTaskContext '"<< taskc->getName() << "' found CORBA Naming Service."<<endlog();
00187                 // Nameserver found...
00188                 CosNaming::Name name;
00189                 name.length(1);
00190                 name[0].id = CORBA::string_dup("TaskContexts");
00191                 CosNaming::NamingContext_var controlNC;
00192                 try {
00193                     controlNC = rootNC->bind_new_context(name);
00194                 }
00195                 catch( CosNaming::NamingContext::AlreadyBound&) {
00196                     log(Debug) << "NamingContext 'TaskContexts' already bound to CORBA Naming Service."<<endlog();
00197                     // NOP.
00198                 }
00199 
00200                 name.length(2);
00201                 name[1].id = CORBA::string_dup( taskc->getName().c_str() );
00202                 try {
00203                     rootNC->bind(name, mtask );
00204                     log(Info) << "Successfully added CTaskContext '"<<taskc->getName()<<"' to CORBA Naming Service."<<endlog();
00205                 }
00206                 catch( CosNaming::NamingContext::AlreadyBound&) {
00207                     log(Warning) << "CTaskContext '"<< taskc->getName() << "' already bound to CORBA Naming Service."<<endlog();
00208                     log() <<"Trying to rebind...";
00209                     try {
00210                         rootNC->rebind(name, mtask);
00211                     } catch( ... ) {
00212                         log() << " failed!"<<endlog();
00213                         return;
00214                     }
00215                     log() << " done. New CTaskContext bound to Naming Service."<<endlog();
00216                 }
00217             } // use_naming
00218             else {
00219                 log(Info) <<"CTaskContext '"<< taskc->getName() << "' is not using the CORBA Naming Service."<<endlog();
00220                 log() <<"Writing IOR to 'std::cerr' and file '" << taskc->getName() <<".ior'"<<endlog();
00221 
00222                 // this part only publishes the IOR to a file.
00223                 CORBA::String_var ior = orb->object_to_string( mtask.in() );
00224                 std::cerr << ior.in() <<std::endl;
00225                 {
00226                     // write to a file as well.
00227                     std::string iorname( taskc->getName());
00228                     iorname += ".ior";
00229                     std::ofstream file_ior( iorname.c_str() );
00230                     file_ior << ior.in() <<std::endl;
00231                 }
00232                 return;
00233             }
00234         }
00235         catch (CORBA::Exception &e) {
00236             log(Error) << "CORBA exception raised!" << endlog();
00237             log() << CORBA_EXCEPTION_INFO(e) << endlog();
00238         }
00239 
00240     }
00241 
00242     void TaskContextServer::CleanupServers() {
00243         if ( !CORBA::is_nil(orb) && !is_shutdown) {
00244             log(Info) << "Cleaning up TaskContextServers..."<<endlog();
00245             while ( !servers.empty() ){
00246                 delete servers.begin()->second;
00247                 // note: destructor will self-erase from map !
00248             }
00249             CDataFlowInterface_i::clearServants();
00250             log() << "Cleanup done."<<endlog();
00251         }
00252     }
00253 
00254     void TaskContextServer::CleanupServer(TaskContext* c) {
00255         if ( !CORBA::is_nil(orb) ) {
00256             ServerMap::iterator it = servers.find(c);
00257             if ( it != servers.end() ){
00258                 log(Info) << "Cleaning up TaskContextServer for "<< c->getName()<<endlog();
00259                 delete it->second; // destructor will do the rest.
00260                 // note: destructor will self-erase from map !
00261             }
00262         }
00263     }
00264 
00265     void TaskContextServer::ShutdownOrb(bool wait_for_completion)
00266     {
00267         Logger::In in("ShutdownOrb");
00268         DoShutdownOrb(wait_for_completion);
00269     }
00270 
00271     void TaskContextServer::DoShutdownOrb(bool wait_for_completion)
00272     {
00273         if (is_shutdown) {
00274             log(Info) << "Orb already down..."<<endlog();
00275             return;
00276         }
00277         if ( CORBA::is_nil(orb) ) {
00278             log(Error) << "Orb Shutdown...failed! Orb is nil." << endlog();
00279             return;
00280         }
00281 
00282         try {
00283             CleanupServers(); // can't do this after an orb->shutdown().
00284             log(Info) << "Orb Shutdown...";
00285             is_shutdown = true;
00286             if (wait_for_completion)
00287                 log(Info)<<"waiting..."<<endlog();
00288             orb->shutdown( wait_for_completion );
00289             log(Info) << "done." << endlog();
00290         }
00291         catch (CORBA::Exception &e) {
00292             log(Error) << "Orb Shutdown...failed! CORBA exception raised." << endlog();
00293             log() << CORBA_EXCEPTION_INFO(e) << endlog();
00294             return;
00295         }
00296     }
00297 
00298 
00299     void TaskContextServer::RunOrb()
00300     {
00301         if ( CORBA::is_nil(orb) ) {
00302             log(Error) << "RunOrb...failed! Orb is nil." << endlog();
00303             return;
00304         }
00305         try {
00306             log(Info) <<"Entering orb->run()."<<endlog();
00307             orb->run();
00308             log(Info) <<"Breaking out of orb->run()."<<endlog();
00309         }
00310         catch (CORBA::Exception &e) {
00311             log(Error) << "Orb Run : CORBA exception raised!" << endlog();
00312             log() << CORBA_EXCEPTION_INFO(e) << endlog();
00313         }
00314     }
00315 
00319     class OrbRunner
00320         : public Activity
00321     {
00322     public:
00323         OrbRunner()
00324             : Activity(RTT::os::LowestPriority)
00325         {}
00326         void loop()
00327         {
00328             Logger::In in("OrbRunner");
00329             TaskContextServer::RunOrb();
00330         }
00331 
00332         bool breakLoop()
00333         {
00334             return true;
00335         }
00336 
00337         void finalize()
00338         {
00339             Logger::In in("OrbRunner");
00340             log(Info) <<"Safely stopped."<<endlog();
00341         }
00342     };
00343 
00344     void TaskContextServer::ThreadOrb()
00345     {
00346         Logger::In in("ThreadOrb");
00347         if ( CORBA::is_nil(orb) ) {
00348             log(Error) << "ThreadOrb...failed! Orb is nil." << endlog();
00349             return;
00350         }
00351         if (orbrunner != 0) {
00352             log(Error) <<"Orb already running in a thread."<<endlog();
00353         } else {
00354             log(Info) <<"Starting Orb in a thread."<<endlog();
00355             orbrunner = new OrbRunner();
00356 
00357             orbrunner->start();
00358         }
00359     }
00360 
00361     void TaskContextServer::DestroyOrb()
00362     {
00363         Logger::In in("DestroyOrb");
00364         if ( CORBA::is_nil(orb) ) {
00365             log(Error) << "DestroyOrb...failed! Orb is nil." << endlog();
00366             return;
00367         }
00368 
00369         if (orbrunner) {
00370             orbrunner->stop();
00371             delete orbrunner;
00372             orbrunner = 0;
00373         }
00374 
00375         try {
00376             // Destroy the POA, waiting until the destruction terminates
00377             //poa->destroy (1, 1);
00378             CleanupServers();
00379             orb->destroy();
00380             rootPOA = 0;
00381             orb = 0;
00382             log(Info) <<"Orb destroyed."<<endlog();
00383         }
00384         catch (CORBA::Exception &e) {
00385             log(Error) << "Orb Destroy : CORBA exception raised!" << endlog();
00386             log() << CORBA_EXCEPTION_INFO(e) << endlog();
00387         }
00388 
00389     }
00390 
00391     TaskContextServer* TaskContextServer::Create(TaskContext* tc, bool use_naming, bool require_name_service) {
00392         if ( CORBA::is_nil(orb) )
00393             return 0;
00394 
00395         if ( servers.count(tc) ) {
00396             log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog();
00397             return servers.find(tc)->second;
00398         }
00399 
00400         // create new:
00401         log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog();
00402         try {
00403             TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service);
00404             return cts;
00405         }
00406         catch( IllegalServer& is ) {
00407             cerr << is.what() << endl;
00408         }
00409         return 0;
00410     }
00411 
00412     CTaskContext_ptr TaskContextServer::CreateServer(TaskContext* tc, bool use_naming, bool require_name_service) {
00413         if ( CORBA::is_nil(orb) )
00414             return CTaskContext::_nil();
00415 
00416         if ( servers.count(tc) ) {
00417             log(Debug) << "Returning existing TaskContextServer for "<<tc->getName()<<endlog();
00418             return CTaskContext::_duplicate( servers.find(tc)->second->server() );
00419         }
00420 
00421         for (TaskContextProxy::PMap::iterator it = TaskContextProxy::proxies.begin(); it != TaskContextProxy::proxies.end(); ++it)
00422             if ( (it->first) == tc ) {
00423                 log(Debug) << "Returning server of Proxy for "<<tc->getName()<<endlog();
00424                 return CTaskContext::_duplicate(it->second);
00425             }
00426 
00427         // create new:
00428         log(Info) << "Creating new TaskContextServer for "<<tc->getName()<<endlog();
00429         try {
00430             TaskContextServer* cts = new TaskContextServer(tc, use_naming, require_name_service);
00431             return CTaskContext::_duplicate( cts->server() );
00432         }
00433         catch( IllegalServer& is ) {
00434             cerr << is.what() << endl;
00435         }
00436         return CTaskContext::_nil();
00437     }
00438 
00439 
00440     corba::CTaskContext_ptr TaskContextServer::server() const
00441     {
00442         // we're not a factory function, so we don't _duplicate.
00443         return mtask.in();
00444     }
00445 
00446     std::string TaskContextServer::getIOR(TaskContext* tc)
00447     {
00448     IorMap::const_iterator it = iors.find(tc);
00449     if (it != iors.end())
00450         return it->second;
00451 
00452     return std::string("");
00453     }
00454 
00455 }}