OrocosComponentLibrary  2.7.0
TcpReporting.cpp
00001 /***************************************************************************
00002 
00003                        TcpReporting.cpp -  TCP reporter
00004                            -------------------
00005     begin                : Fri Aug 4 2006
00006     copyright            : (C) 2006 Bas Kemper
00007                            2007-2008 Ruben Smits
00008     email                : kst@ <my name> .be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU Lesser General Public            *
00013  *   License as published by the Free Software Foundation; either          *
00014  *   version 2.1 of the License, or (at your option) any later version.    *
00015  *                                                                         *
00016  *   This library is distributed in the hope that it will be useful,       *
00017  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00018  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00019  *   Lesser General Public License for more details.                       *
00020  *                                                                         *
00021  *   You should have received a copy of the GNU Lesser General Public      *
00022  *   License along with this library; if not, write to the Free Software   *
00023  *   Foundation, Inc., 59 Temple Place,                                    *
00024  *   Suite 330, Boston, MA  02111-1307  USA                                *
00025  *                                                                         *
00026  ***************************************************************************/
00027 
00028 #include <sys/socket.h>
00029 #include <netinet/in.h>
00030 #include <sys/types.h>
00031 #include <errno.h>
00032 
00033 #include "TcpReporting.hpp"
00034 #include <rtt/Activity.hpp>
00035 #include <rtt/Logger.hpp>
00036 #include <rtt/os/Mutex.hpp>
00037 #include "socket.hpp"
00038 #include "socketmarshaller.hpp"
00039 
00040 using RTT::Logger;
00041 using RTT::os::Mutex;
00042 
00043 #include "ocl/Component.hpp"
00044 ORO_LIST_COMPONENT_TYPE(OCL::TcpReporting);
00045 
00046 namespace OCL
00047 {
00052     class ListenThread
00053         : public RTT::Activity
00054     {
00055         private:
00056             bool inBreak;
00057             static ListenThread* _instance;
00058             RTT::SocketMarshaller* _marshaller;
00059             unsigned short _port;
00060             bool _accepting;
00061             int _sock;
00062 
00063             bool listen()
00064             {
00065                 _sock = ::socket(PF_INET, SOCK_STREAM, 0);
00066                 if( _sock < 0 )
00067                 {
00068                     Logger::log() << Logger::Error << "Socket creation failed." << Logger::endl;
00069                     return false;
00070                 }
00071 
00072                 struct sockaddr_in localsocket;
00073                 struct sockaddr remote;
00074                 int adrlen = sizeof(remote);
00075 
00076                 localsocket.sin_family = AF_INET;
00077                 localsocket.sin_port = htons(_port);
00078                 localsocket.sin_addr.s_addr = INADDR_ANY;
00079                 if( ::bind(_sock, (struct sockaddr*)&localsocket, sizeof(localsocket) ) < 0 )
00080                 {
00081                     /* bind can fail when there is a legitimate server when a
00082                        previous run of orocos has crashed and the kernel does
00083                        not have freed the port yet. TRY_OTHER_PORTS can
00084                        select another port if the bind fails. */
00085                     #define TRY_OTHER_PORTS
00086                     // TODO: remove #define
00087                     #ifdef TRY_OTHER_PORTS
00088                     int i = 1;
00089                     int r = -1;
00090                     while( errno == EADDRINUSE && i < 5 && r < 0 )
00091                     {
00092                         localsocket.sin_port = htons(_port + i);
00093                         r = ::bind(_sock, (struct sockaddr*)&localsocket, sizeof(localsocket) );
00094                         i++;
00095                     }
00096                     if( r >= 0 )
00097                     {
00098                         Logger::log() << Logger::Info << "Port occupied, use port " << (_port+i-1) << " instead." << Logger::endl;
00099                     } else {
00100                     #endif
00101                     if( errno == EADDRINUSE )
00102                     {
00103                         Logger::log() << Logger::Error << "Binding of port failed: address already in use." << Logger::endl;
00104                     } else {
00105                         Logger::log() << Logger::Error << "Binding of port failed with errno " << errno << Logger::endl;
00106                     }
00107                     ::close(_sock);
00108                     return false;
00109                     #ifdef TRY_OTHER_PORTS
00110                     }
00111                     #endif
00112                 }
00113 
00114                 if( ::listen(_sock, 2) < 0 )
00115                 {
00116                     Logger::log() << Logger::Info << "Cannot listen on socket" << Logger::endl;
00117                     ::close(_sock);
00118                     return true;
00119                 }
00120                 while(_accepting)
00121                 {
00122                     int socket = ::accept( _sock, &remote,
00123                                            reinterpret_cast<socklen_t*>(&adrlen) );
00124                     if( socket == -1 )
00125                     {
00126                         return false;
00127                     }
00128                     if( _accepting )
00129                     {
00130                         Logger::log() << Logger::Info << "Incoming connection" << Logger::endl;
00131                         _marshaller->addConnection( new Orocos::TCP::Socket(socket) );
00132                     }
00133                 }
00134                 return true;
00135             }
00136 
00137             ListenThread( RTT::SocketMarshaller* marshaller, unsigned short port )
00138             : Activity(10), _marshaller(marshaller)
00139             {
00140                 inBreak = false;
00141                 removeInstance();
00142                 _accepting = true;
00143                 _port = port;
00144                 Logger::log() << Logger::Info << "Starting server on port " << port << Logger::endl;
00145                 this->Activity::start();
00146             }
00147 
00148         // This method should only be called when theadCreationLock is locked.
00149             void removeInstance()
00150             {
00151               if( _instance )
00152               {
00153                 delete _instance;
00154               }
00155             }
00156 
00157       public:
00158           ~ListenThread()
00159           {
00160               _accepting = false;
00161           }
00162 
00163           virtual void loop()
00164           {
00165               if( !inBreak )
00166               {
00167                   if( !listen() )
00168                   {
00169                       Logger::log() << Logger::Error << "Could not listen on port " << _port << Logger::endl;
00170                   } else {
00171                       Logger::log() << Logger::Info << "Shutting down server" << Logger::endl;
00172                   }
00173               }
00174           }
00175 
00176           virtual bool breakLoop()
00177           {
00178               inBreak = true;
00179               _accepting = false;
00180               ::close( _sock );
00181               // accept still hangs until a new connection has been established
00182               int sock = ::socket(PF_INET, SOCK_STREAM, 0);
00183               if( sock > 0 )
00184               {
00185                   struct sockaddr_in socket;
00186                   socket.sin_family = AF_INET;
00187                   socket.sin_port = htons(_port);
00188                   socket.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
00189                   ::connect( sock, (struct sockaddr*)&socket, sizeof(socket) );
00190                   ::close( sock );
00191               }
00192               return true;
00193           }
00194 
00195           static void createInstance( RTT::SocketMarshaller* marshaller, unsigned short port = 3142 )
00196           {
00197             // The lock is needed to avoid problems when createInstance is called by two
00198             // different threads (which in reality should not occur very often).
00199             //ListenThread* _oinst = ListenThread::_instance;
00200             ListenThread::_instance = new ListenThread( marshaller, port );
00201             //delete _oinst;
00202           }
00203 
00204           static void destroyInstance()
00205           {
00206               ListenThread::_instance->breakLoop();
00207           }
00208     };
00209     ListenThread* ListenThread::_instance = 0;
00210 }
00211 
00212 namespace OCL
00213 {
00214     TcpReporting::TcpReporting(std::string fr_name /*= "Reporting"*/)
00215         : ReportingComponent( fr_name ),
00216           port_prop("port","port to listen/send to",3142)
00217     {
00218         _finishing = false;
00219         this->properties()->addProperty( port_prop);
00220     }
00221 
00222     TcpReporting::~TcpReporting()
00223     {
00224     }
00225 
00226     const RTT::PropertyBag* TcpReporting::getReport()
00227     {
00228         makeReport2();
00229         return &report;
00230     }
00231 
00232     bool TcpReporting::configureHook(){
00233         port=port_prop.value();
00234         return true;
00235     }
00236 
00237     bool TcpReporting::startHook()
00238     {
00239         RTT::Logger::In in("TcpReporting::startup");
00240         fbody = new RTT::SocketMarshaller(this);
00241         this->addMarshaller( 0, fbody );
00242         ListenThread::createInstance( fbody, port );
00243         return ReportingComponent::startHook();
00244     }
00245 
00246     void TcpReporting::stopHook()
00247     {
00248         _finishing = true;
00249         ListenThread::destroyInstance();
00250         fbody->shutdown();
00251         ReportingComponent::stopHook();
00252         this->removeMarshallers();
00253     }
00254 }