Orocos Real-Time Toolkit  2.5.0
RemoteChannelElement.hpp
00001 /***************************************************************************
00002   tag: Peter Soetens  Thu Oct 22 11:59:07 CEST 2009  RemoteChannelElement.hpp
00003 
00004                         RemoteChannelElement.hpp -  description
00005                            -------------------
00006     begin                : Thu October 22 2009
00007     copyright            : (C) 2009 Peter Soetens
00008     email                : peter@thesourcworks.com
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #ifndef CORBA_REMOTE_CHANNEL_ELEMENT_H
00040 #define CORBA_REMOTE_CHANNEL_ELEMENT_H
00041 
00042 #include "DataFlowI.h"
00043 #include "CorbaTypeTransporter.hpp"
00044 #include "CorbaDispatcher.hpp"
00045 
00046 namespace RTT {
00047 
00048     namespace corba {
00049 
00057     template<typename T>
00058     class RemoteChannelElement 
00059         : public CRemoteChannelElement_i
00060         , public base::ChannelElement<T>
00061     {
00062         typename internal::ValueDataSource<T>::shared_ptr data_source;
00063 
00067         bool valid;
00071         bool pull;
00072 
00074             typename base::ChannelElement<T>::value_t sample;
00075 
00076         DataFlowInterface* msender;
00077 
00081             CORBA::Any* write_any;
00082 
00083             PortableServer::ObjectId_var oid;
00084 
00085     public:
00091         RemoteChannelElement(CorbaTypeTransporter const& transport, DataFlowInterface* sender, PortableServer::POA_ptr poa, bool is_pull)
00092         : CRemoteChannelElement_i(transport, poa),
00093           data_source(new internal::ValueDataSource<T>), valid(true), pull(is_pull),
00094           msender(sender),
00095               write_any(new CORBA::Any)
00096             {
00097                 // Big note about cleanup: The RTT will dispose this object through
00098                 // the ChannelElement<T> refcounting. So we only need to inform the
00099                 // POA that our object is dead in disconnect().
00100                 // CORBA refcount-managed servants must start with a refcount of
00101                 // 1
00102                 this->ref();
00103                 oid = mpoa->activate_object(this);
00104                 // Force creation of dispatcher.
00105                 CorbaDispatcher::Instance(msender);
00106             }
00107 
00108             ~RemoteChannelElement()
00109             {
00110                 delete write_any;
00111             }
00112 
00114             void _add_ref()
00115             { this->ref(); }
00117             void _remove_ref()
00118             { this->deref(); }
00119 
00120 
00124             CORBA::Boolean remoteSignal() ACE_THROW_SPEC ((
00125                   CORBA::SystemException
00126                 ))
00127             { return base::ChannelElement<T>::signal(); }
00128 
00129             bool signal()
00130             {
00131                 // forward too.
00132                 base::ChannelElementBase::signal();
00133                 // intercept signal if no remote side set.
00134                 if ( CORBA::is_nil(remote_side.in()) )
00135                     return true;
00136                 // Remember that signal() is called in the context of the one
00137                 // that wrote the data, so we must decouple here to keep hard-RT happy.
00138                 // the dispatch thread must read the data and send it over by calling transferSample().
00139                 CorbaDispatcher::Instance(msender)->dispatchChannel( this );
00140 
00141                 return valid;
00142             }
00143 
00144             virtual void transferSamples() {
00145                 if (!valid)
00146                     return;
00147                 //log(Debug) <<"transfering..." <<endlog();
00148                 // in push mode, transfer all data, in pull mode, only signal once for each sample.
00149                 if ( pull ) {
00150                     try
00151                     { valid = remote_side->remoteSignal(); }
00152 #ifdef CORBA_IS_OMNIORB
00153                     catch(CORBA::SystemException& e)
00154                     {
00155                         log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << " " << e.NP_minorString() << endlog();
00156                         valid = false;
00157                     }
00158 #endif
00159                     catch(CORBA::Exception& e)
00160                     {
00161                         log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << endlog();
00162                         valid = false;
00163                     }
00164                 } else {
00165                     //log(Debug) <<"...read..."<<endlog();
00166                     while ( this->read(sample, false) == NewData && valid) {
00167                         //log(Debug) <<"...write..."<<endlog();
00168                         if ( this->write(sample) == false )
00169                             valid = false;
00170                         //log(Debug) <<"...next read?..."<<endlog();
00171                     }
00172                 }
00173                 //log(Debug) <<"... done." <<endlog();
00174 
00175             }
00176 
00180             void disconnect() ACE_THROW_SPEC ((
00181                   CORBA::SystemException
00182                 )) {
00183                 // disconnect both local and remote side.
00184                 // !!!THIS RELIES ON BEHAVIOR OF REMOTEDISCONNECT BELOW doing both writer_to_reader and !writer_to_reader !!!
00185                 try {
00186                     if ( ! CORBA::is_nil(remote_side.in()) )
00187                         remote_side->remoteDisconnect(true);
00188                 }
00189                 catch(CORBA::Exception&) {}
00190 
00191                 try { this->remoteDisconnect(true); }
00192                 catch(CORBA::Exception&) {}
00193             }
00194 
00195             void remoteDisconnect(bool writer_to_reader) ACE_THROW_SPEC ((
00196                   CORBA::SystemException
00197                 ))
00198             {
00199                 base::ChannelElement<T>::disconnect(writer_to_reader);
00200 
00201                 // Because we support out-of-band transports, we must cleanup more thoroughly.
00202                 // an oob channel may be sitting at our other end. If not, this is a nop.
00203                 base::ChannelElement<T>::disconnect(!writer_to_reader);
00204 
00205                 // Will fail at shutdown if all objects are already deactivated
00206                 try {
00207                     if (mdataflow)
00208                         mdataflow->deregisterChannel(_this());
00209                     mpoa->deactivate_object(oid);
00210                 }
00211                 catch(CORBA::Exception&) {}
00212             }
00213 
00217             void disconnect(bool writer_to_reader) ACE_THROW_SPEC ((
00218                   CORBA::SystemException
00219                 ))
00220             {
00221                 try {
00222                     if ( ! CORBA::is_nil(remote_side.in()) )
00223                         remote_side->remoteDisconnect(writer_to_reader);
00224                 }
00225                 catch(CORBA::Exception&) {}
00226 
00227                 base::ChannelElement<T>::disconnect(writer_to_reader);
00228 
00229                 // Will fail at shutdown if all objects are already deactivated
00230                 try {
00231                     if (mdataflow)
00232                         mdataflow->deregisterChannel(_this());
00233                     mpoa->deactivate_object(oid);
00234                 }
00235                 catch(CORBA::Exception&) {}
00236             }
00237 
00238             FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
00239             {
00240                 // try to read locally first
00241                 FlowStatus fs;
00242                 CFlowStatus cfs;
00243                 if ( (fs = base::ChannelElement<T>::read(sample, copy_old_data)) )
00244                     return fs;
00245                 // go through corba
00246                 CORBA::Any_var remote_value;
00247                 try
00248                 {
00249                     if ( remote_side && (cfs = remote_side->read(remote_value, copy_old_data) ) )
00250                     {
00251                         RTT::internal::ReferenceDataSource<T> data_source(sample);
00252 
00253                         // This is a workaround since updateFromAny requires a
00254                         // shared_ptr, but we want to allocate the
00255                         // ReferenceDataSource on the stack
00256                         data_source.ref();
00257                         RTT::base::DataSourceBase::shared_ptr ptr(&data_source);
00258 
00259                         transport.updateFromAny(&remote_value.in(), ptr);
00260                         return (FlowStatus)cfs;
00261                     }
00262                     else
00263                         return NoData;
00264                 }
00265 #ifdef CORBA_IS_OMNIORB
00266                 catch(CORBA::SystemException& e)
00267                 {
00268                     log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << " " << e.NP_minorString() << endlog();
00269                     return NoData;
00270                 }
00271 #endif
00272                 catch(CORBA::Exception& e)
00273                 {
00274                     log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << endlog();
00275                     return NoData;
00276                 }
00277             }
00278 
00282             CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC ((
00283                   CORBA::SystemException
00284                 ))
00285             {
00286 
00287                 FlowStatus fs;
00288                 if ( (fs = base::ChannelElement<T>::read(data_source->set(), copy_old_data)) )
00289                 {
00290                     sample = transport.createAny(data_source);
00291                     if ( sample != 0) {
00292                         return (CFlowStatus)fs;
00293                     }
00294                     // this is a programmatic error and should never happen during run-time.
00295                     log(Error) << "CORBA Transport failed to create Any for " << data_source->getTypeName() << " while it should have!" <<endlog();
00296                 }
00297                 // we *must* return something in sample.
00298                 sample = new CORBA::Any();
00299                 return CNoData;
00300             }
00301 
00302             bool write(typename base::ChannelElement<T>::param_t sample)
00303             {
00304                 data_source->set(sample);
00305                 // try to write locally first
00306                 if (base::ChannelElement<T>::write(sample))
00307                     return true;
00308                 // go through corba
00309                 assert( remote_side.in() != 0 && "Got write() without remote side. Need buffer OR remote side but neither was present.");
00310                 try
00311                 {
00312                     transport.updateAny(data_source, *write_any);
00313                     remote_side->write(*write_any); 
00314                     return true;
00315                 }
00316 #ifdef CORBA_IS_OMNIORB
00317                 catch(CORBA::SystemException& e)
00318                 {
00319                     log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog();
00320                     return false;
00321                 }
00322 #endif
00323                 catch(CORBA::Exception& e)
00324                 {
00325                     log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog();
00326                     return false;
00327                 }
00328             }
00329 
00333             bool write(const ::CORBA::Any& sample) ACE_THROW_SPEC ((
00334                   CORBA::SystemException
00335                 ))
00336             {
00337                 transport.updateFromAny(&sample, data_source);
00338                 return base::ChannelElement<T>::write(data_source->rvalue());
00339             }
00340 
00341             virtual bool data_sample(typename base::ChannelElement<T>::param_t sample)
00342             {
00343                 // we don't pass it on through CORBA (yet).
00344                 // If an oob transport is used, that one will send it through.
00345                 typename base::ChannelElement<T>::shared_ptr output =
00346                     this->getOutput();
00347                 if (output)
00348                     return base::ChannelElement<T>::data_sample(sample);
00349                 return true;
00350             }
00351 
00355             virtual bool inputReady() {
00356                 // signal to oob transport if any.
00357                 typename base::ChannelElement<T>::shared_ptr input =
00358                     this->getInput();
00359                 if (input)
00360                     return base::ChannelElement<T>::inputReady();
00361                 return true;
00362             }
00363 
00364         };
00365     }
00366 }
00367 
00368 #endif
00369