Orocos Real-Time Toolkit  2.6.0
Dispatcher.hpp
00001 /***************************************************************************
00002   tag: Peter Soetens  Thu Oct 22 11:59:07 CEST 2009  Dispatcher.hpp
00003 
00004                         Dispatcher.hpp -  description
00005                            -------------------
00006     begin                : Thu October 22 2009
00007     copyright            : (C) 2009 Peter Soetens
00008     email                : peter@thesourcworks.com
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 
00040 #include "../../os/MutexLock.hpp"
00041 #include "../../Activity.hpp"
00042 #include "../../base/ChannelElementBase.hpp"
00043 #include "../../Logger.hpp"
00044 #include <map>
00045 #include <sys/select.h>
00046 #include <mqueue.h>
00047 
00048 namespace RTT { namespace mqueue { class Dispatcher; } }
00049 
00050 namespace RTT {
00051     namespace mqueue {
00052         RTT_API void intrusive_ptr_add_ref(const RTT::mqueue::Dispatcher* p );
00053         RTT_API void intrusive_ptr_release(const RTT::mqueue::Dispatcher* p );
00054 
00062         class Dispatcher : public Activity
00063         {
00064             friend void intrusive_ptr_add_ref(const RTT::mqueue::Dispatcher* p );
00065             friend void intrusive_ptr_release(const RTT::mqueue::Dispatcher* p );
00066             mutable os::AtomicInt refcount;
00067             static Dispatcher* DispatchI;
00068 
00069             typedef std::map<mqd_t,base::ChannelElementBase*> MQMap;
00070             MQMap mqmap;
00071 
00072             fd_set socks;        /* Socket file descriptors we want to wake up for, using select() */
00073 
00074             int highsock;        /* Highest #'d file descriptor, needed for select() */
00075 
00076             bool do_exit;
00077 
00078             os::Mutex maplock;
00079 
00080             Dispatcher( const std::string& name)
00081             : Activity(ORO_SCHED_RT, os::HighestPriority, 0.0, 0, name),
00082               highsock(0), do_exit(false)
00083               {}
00084 
00085             ~Dispatcher() {
00086                 Logger::In in("Dispatcher");
00087                 log(Info) << "Dispacher cleans up: no more work."<<endlog();
00088                 stop();
00089                 DispatchI = 0;
00090             }
00091 
00092             void build_select_list() {
00093 
00094                 /* First put together fd_set for select(), which will
00095                    consist of the sock veriable in case a new connection
00096                    is coming in, plus all the sockets we have already
00097                    accepted. */
00098 
00099 
00100                 /* FD_ZERO() clears out the fd_set called socks, so that
00101                     it doesn't contain any file descriptors. */
00102 
00103                 FD_ZERO(&socks);
00104                 highsock = 0;
00105 
00106                 /* Loops through all the possible connections and adds
00107                     those sockets to the fd_set */
00108                 os::MutexLock lock(maplock);
00109                 for (MQMap::const_iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
00110                     FD_SET( it->first, &socks);
00111                     if ( int(it->first) > highsock)
00112                         highsock = int(it->first);
00113                 }
00114             }
00115 
00116             void read_socks() {
00117                 /* OK, now socks will be set with whatever socket(s)
00118                    are ready for reading.*/
00119 
00120                 /* Run through our sockets and check to see if anything
00121                     happened with them, if so 'service' them. */
00122                 os::MutexLock lock(maplock);
00123                 for (MQMap::iterator it = mqmap.begin(); it != mqmap.end(); ++it) {
00124                     if ( FD_ISSET( it->first, &socks) ) {
00125                         //log(Debug) << "New data on " << it->first <<endlog();
00126                         it->second->signal();
00127                     }
00128                 }
00129             }
00130 
00131         public:
00132             typedef boost::intrusive_ptr<Dispatcher> shared_ptr;
00133 
00134             static Dispatcher::shared_ptr Instance() {
00135                 if ( DispatchI == 0) {
00136                     DispatchI = new Dispatcher("MQueueDispatch");
00137                     DispatchI->start();
00138                 }
00139                 return DispatchI;
00140             }
00141 
00142             void addQueue( mqd_t mqdes, base::ChannelElementBase* chan ) {
00143                 Logger::In in("Dispatcher");
00144                 if (mqdes < 0) {
00145                     log(Error) <<"Invalid mqd_t given to MQueue Dispatcher." <<endlog();
00146                     return;
00147                 }
00148                 log(Debug) <<"Dispatcher is monitoring mqdes "<< mqdes <<endlog();
00149                 os::MutexLock lock(maplock);
00150                 // we add a refcount per channel we monitor.
00151                 if (mqmap.count(mqdes) == 0)
00152                     refcount.inc();
00153                 mqmap[mqdes] = chan;
00154             }
00155 
00156             void removeQueue(mqd_t mqdes) {
00157                 Logger::In in("Dispatcher");
00158                 log(Debug) <<"Dispatcher drops mqdes "<< mqdes <<endlog();
00159                 os::MutexLock lock(maplock);
00160                 if (mqmap.count(mqdes)) {
00161                     mqmap.erase( mqmap.find(mqdes) );
00162                     refcount.dec();
00163                 }
00164             }
00165 
00166             bool initialize() {
00167                 do_exit = false;
00168                 return true;
00169             }
00170 
00171             void loop() {
00172                 struct timeval timeout;  /* Timeout for select */
00173                 int readsocks;       /* Number of sockets ready for reading */
00174                 while (1) { /* select loop */
00175                     build_select_list();
00176                     timeout.tv_sec = 0;
00177                     timeout.tv_usec = 50000;
00178 
00179                     /* The first argument to select is the highest file
00180                         descriptor value plus 1.*/
00181 
00182                     readsocks = select(highsock+1, &socks, (fd_set *) 0,
00183                       (fd_set *) 0, &timeout);
00184 
00185                     /* select() returns the number of sockets that had
00186                         things going on with them -- i.e. they're readable. */
00187 
00188                     /* Once select() returns, the original fd_set has been
00189                         modified so it now reflects the state of why select()
00190                         woke up. i.e. If file descriptor 4 was originally in
00191                         the fd_set, and then it became readable, the fd_set
00192                         contains file descriptor 4 in it. */
00193 
00194                     if (readsocks < 0) {
00195                         log(Error) <<"Dispatcher failed to select on message queues. Stopped thread."<<endlog();
00196                         return;
00197                     }
00198                     if (readsocks == 0) {
00199                         // nop
00200                     } else // readsocks > 0
00201                         read_socks();
00202 
00203                     if ( do_exit )
00204                         return;
00205                 } /* while(1) */
00206             }
00207 
00208             bool breakLoop() {
00209                 do_exit = true;
00210                 return true;
00211             }
00212         };
00213     }
00214 }
00215