Orocos Real-Time Toolkit  2.6.0
RemoteChannelElement.hpp
00001 /***************************************************************************
00002   tag: Peter Soetens  Thu Oct 22 11:59:07 CEST 2009  RemoteChannelElement.hpp
00003 
00004                         RemoteChannelElement.hpp -  description
00005                            -------------------
00006     begin                : Thu October 22 2009
00007     copyright            : (C) 2009 Peter Soetens
00008     email                : peter@thesourcworks.com
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #ifndef CORBA_REMOTE_CHANNEL_ELEMENT_H
00040 #define CORBA_REMOTE_CHANNEL_ELEMENT_H
00041 
00042 #include "DataFlowI.h"
00043 #include "CorbaTypeTransporter.hpp"
00044 #include "CorbaDispatcher.hpp"
00045 
00046 namespace RTT {
00047 
00048     namespace corba {
00049 
00057     template<typename T>
00058     class RemoteChannelElement 
00059         : public CRemoteChannelElement_i
00060         , public base::ChannelElement<T>
00061     {
00062         typename internal::ValueDataSource<T>::shared_ptr value_data_source;
00063         typename internal::LateReferenceDataSource<T>::shared_ptr ref_data_source;
00064         typename internal::LateConstReferenceDataSource<T>::shared_ptr const_ref_data_source;
00065 
00069         bool valid;
00073         bool pull;
00074 
00076             typename base::ChannelElement<T>::value_t sample;
00077 
00078         DataFlowInterface* msender;
00079 
00083             CORBA::Any* write_any;
00084 
00085             PortableServer::ObjectId_var oid;
00086 
00087     public:
00093         RemoteChannelElement(CorbaTypeTransporter const& transport, DataFlowInterface* sender, PortableServer::POA_ptr poa, bool is_pull)
00094         : CRemoteChannelElement_i(transport, poa),
00095           value_data_source(new internal::ValueDataSource<T>),
00096           ref_data_source(new internal::LateReferenceDataSource<T>),
00097           const_ref_data_source(new internal::LateConstReferenceDataSource<T>),
00098               valid(true), pull(is_pull),
00099           msender(sender),
00100               write_any(new CORBA::Any)
00101             {
00102                 // Big note about cleanup: The RTT will dispose this object through
00103                 // the ChannelElement<T> refcounting. So we only need to inform the
00104                 // POA that our object is dead in disconnect().
00105                 // CORBA refcount-managed servants must start with a refcount of
00106                 // 1
00107                 this->ref();
00108                 oid = mpoa->activate_object(this);
00109                 // Force creation of dispatcher.
00110                 CorbaDispatcher::Instance(msender);
00111             }
00112 
00113             ~RemoteChannelElement()
00114             {
00115                 delete write_any;
00116             }
00117 
00119             void _add_ref()
00120             { this->ref(); }
00122             void _remove_ref()
00123             { this->deref(); }
00124 
00125 
00129             CORBA::Boolean remoteSignal() ACE_THROW_SPEC ((
00130                   CORBA::SystemException
00131                 ))
00132             { return base::ChannelElement<T>::signal(); }
00133 
00134             bool signal()
00135             {
00136                 // forward too.
00137                 base::ChannelElementBase::signal();
00138                 // intercept signal if no remote side set.
00139                 if ( CORBA::is_nil(remote_side.in()) )
00140                     return true;
00141                 // Remember that signal() is called in the context of the one
00142                 // that wrote the data, so we must decouple here to keep hard-RT happy.
00143                 // the dispatch thread must read the data and send it over by calling transferSample().
00144                 CorbaDispatcher::Instance(msender)->dispatchChannel( this );
00145 
00146                 return valid;
00147             }
00148 
00149             virtual void transferSamples() {
00150                 if (!valid)
00151                     return;
00152                 //log(Debug) <<"transfering..." <<endlog();
00153                 // in push mode, transfer all data, in pull mode, only signal once for each sample.
00154                 if ( pull ) {
00155                     try
00156                     { valid = remote_side->remoteSignal(); }
00157 #ifdef CORBA_IS_OMNIORB
00158                     catch(CORBA::SystemException& e)
00159                     {
00160                         log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << " " << e.NP_minorString() << endlog();
00161                         valid = false;
00162                     }
00163 #endif
00164                     catch(CORBA::Exception& e)
00165                     {
00166                         log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << endlog();
00167                         valid = false;
00168                     }
00169                 } else {
00170                     //log(Debug) <<"...read..."<<endlog();
00171                     while ( this->read(sample, false) == NewData && valid) {
00172                         //log(Debug) <<"...write..."<<endlog();
00173                         if ( this->write(sample) == false )
00174                             valid = false;
00175                         //log(Debug) <<"...next read?..."<<endlog();
00176                     }
00177                 }
00178                 //log(Debug) <<"... done." <<endlog();
00179 
00180             }
00181 
00185             void disconnect() ACE_THROW_SPEC ((
00186                   CORBA::SystemException
00187                 )) {
00188                 // disconnect both local and remote side.
00189                 // !!!THIS RELIES ON BEHAVIOR OF REMOTEDISCONNECT BELOW doing both writer_to_reader and !writer_to_reader !!!
00190                 try {
00191                     if ( ! CORBA::is_nil(remote_side.in()) )
00192                         remote_side->remoteDisconnect(true);
00193                 }
00194                 catch(CORBA::Exception&) {}
00195 
00196                 try { this->remoteDisconnect(true); }
00197                 catch(CORBA::Exception&) {}
00198             }
00199 
00200             void remoteDisconnect(bool writer_to_reader) ACE_THROW_SPEC ((
00201                   CORBA::SystemException
00202                 ))
00203             {
00204                 base::ChannelElement<T>::disconnect(writer_to_reader);
00205 
00206                 // Because we support out-of-band transports, we must cleanup more thoroughly.
00207                 // an oob channel may be sitting at our other end. If not, this is a nop.
00208                 base::ChannelElement<T>::disconnect(!writer_to_reader);
00209 
00210                 // Will fail at shutdown if all objects are already deactivated
00211                 try {
00212                     if (mdataflow)
00213                         mdataflow->deregisterChannel(_this());
00214                     mpoa->deactivate_object(oid);
00215                 }
00216                 catch(CORBA::Exception&) {}
00217             }
00218 
00222             void disconnect(bool writer_to_reader) ACE_THROW_SPEC ((
00223                   CORBA::SystemException
00224                 ))
00225             {
00226                 try {
00227                     if ( ! CORBA::is_nil(remote_side.in()) )
00228                         remote_side->remoteDisconnect(writer_to_reader);
00229                 }
00230                 catch(CORBA::Exception&) {}
00231 
00232                 base::ChannelElement<T>::disconnect(writer_to_reader);
00233 
00234                 // Will fail at shutdown if all objects are already deactivated
00235                 try {
00236                     if (mdataflow)
00237                         mdataflow->deregisterChannel(_this());
00238                     mpoa->deactivate_object(oid);
00239                 }
00240                 catch(CORBA::Exception&) {}
00241             }
00242 
00243             FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data)
00244             {
00245                 if (!valid)
00246                     return NoData;
00247 
00248                 // try to read locally first
00249                 FlowStatus fs;
00250                 CFlowStatus cfs;
00251                 if ( (fs = base::ChannelElement<T>::read(sample, copy_old_data)) )
00252                     return fs;
00253 
00254                 // go through corba
00255                 CORBA::Any_var remote_value;
00256                 try
00257                 {
00258                     if ( remote_side && (cfs = remote_side->read(remote_value, copy_old_data) ) )
00259                     {
00260                         ref_data_source->setPointer(&sample);
00261                         transport.updateFromAny(&remote_value.in(), ref_data_source);
00262                         return (FlowStatus)cfs;
00263                     }
00264                     else
00265                         return NoData;
00266                 }
00267 #ifdef CORBA_IS_OMNIORB
00268                 catch(CORBA::SystemException& e)
00269                 {
00270                     log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << " " << e.NP_minorString() << endlog();
00271                     valid = false;
00272                     return NoData;
00273                 }
00274 #endif
00275                 catch(CORBA::Exception& e)
00276                 {
00277                     log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << endlog();
00278                     valid = false;
00279                     return NoData;
00280                 }
00281             }
00282 
00286             CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC ((
00287                   CORBA::SystemException
00288                 ))
00289             {
00290 
00291                 FlowStatus fs;
00292                 if ( (fs = base::ChannelElement<T>::read(value_data_source->set(), copy_old_data)) )
00293                 {
00294                     sample = transport.createAny(value_data_source);
00295                     if ( sample != 0) {
00296                         return (CFlowStatus)fs;
00297                     }
00298                     // this is a programmatic error and should never happen during run-time.
00299                     log(Error) << "CORBA Transport failed to create Any for " << value_data_source->getTypeName() << " while it should have!" <<endlog();
00300                 }
00301                 // we *must* return something in sample.
00302                 sample = new CORBA::Any();
00303                 return CNoData;
00304             }
00305 
00306             bool write(typename base::ChannelElement<T>::param_t sample)
00307             {
00308                 // try to write locally first
00309                 if (base::ChannelElement<T>::write(sample))
00310                     return true;
00311                 // go through corba
00312                 assert( remote_side.in() != 0 && "Got write() without remote side. Need buffer OR remote side but neither was present.");
00313                 try
00314                 {
00315                     // There is a trick. We allocate on the stack, but need to
00316                     // provide shared pointers. Manually increment refence count
00317                     // (the stack "owns" the object)
00318                     const_ref_data_source->setPointer(&sample);
00319                     transport.updateAny(const_ref_data_source, *write_any);
00320                     remote_side->write(*write_any); 
00321                     return true;
00322                 }
00323 #ifdef CORBA_IS_OMNIORB
00324                 catch(CORBA::SystemException& e)
00325                 {
00326                     log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog();
00327                     return false;
00328                 }
00329 #endif
00330                 catch(CORBA::Exception& e)
00331                 {
00332                     log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog();
00333                     return false;
00334                 }
00335             }
00336 
00340             bool write(const ::CORBA::Any& sample) ACE_THROW_SPEC ((
00341                   CORBA::SystemException
00342                 ))
00343             {
00344                 transport.updateFromAny(&sample, value_data_source);
00345                 return base::ChannelElement<T>::write(value_data_source->rvalue());
00346             }
00347 
00348             virtual bool data_sample(typename base::ChannelElement<T>::param_t sample)
00349             {
00350                 // we don't pass it on through CORBA (yet).
00351                 // If an oob transport is used, that one will send it through.
00352                 typename base::ChannelElement<T>::shared_ptr output =
00353                     this->getOutput();
00354                 if (output)
00355                     return base::ChannelElement<T>::data_sample(sample);
00356                 return true;
00357             }
00358 
00362             virtual bool inputReady() {
00363                 // signal to oob transport if any.
00364                 typename base::ChannelElement<T>::shared_ptr input =
00365                     this->getInput();
00366                 if (input)
00367                     return base::ChannelElement<T>::inputReady();
00368                 return true;
00369             }
00370 
00371         };
00372     }
00373 }
00374 
00375 #endif
00376