Orocos Real-Time Toolkit
2.5.0
|
00001 /*************************************************************************** 00002 tag: Peter Soetens Thu Oct 22 11:59:07 CEST 2009 RemoteChannelElement.hpp 00003 00004 RemoteChannelElement.hpp - description 00005 ------------------- 00006 begin : Thu October 22 2009 00007 copyright : (C) 2009 Peter Soetens 00008 email : peter@thesourcworks.com 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU General Public * 00013 * License as published by the Free Software Foundation; * 00014 * version 2 of the License. * 00015 * * 00016 * As a special exception, you may use this file as part of a free * 00017 * software library without restriction. Specifically, if other files * 00018 * instantiate templates or use macros or inline functions from this * 00019 * file, or you compile this file and link it with other files to * 00020 * produce an executable, this file does not by itself cause the * 00021 * resulting executable to be covered by the GNU General Public * 00022 * License. This exception does not however invalidate any other * 00023 * reasons why the executable file might be covered by the GNU General * 00024 * Public License. * 00025 * * 00026 * This library is distributed in the hope that it will be useful, * 00027 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00028 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00029 * Lesser General Public License for more details. * 00030 * * 00031 * You should have received a copy of the GNU General Public * 00032 * License along with this library; if not, write to the Free Software * 00033 * Foundation, Inc., 59 Temple Place, * 00034 * Suite 330, Boston, MA 02111-1307 USA * 00035 * * 00036 ***************************************************************************/ 00037 00038 00039 #ifndef CORBA_REMOTE_CHANNEL_ELEMENT_H 00040 #define CORBA_REMOTE_CHANNEL_ELEMENT_H 00041 00042 #include "DataFlowI.h" 00043 #include "CorbaTypeTransporter.hpp" 00044 #include "CorbaDispatcher.hpp" 00045 00046 namespace RTT { 00047 00048 namespace corba { 00049 00057 template<typename T> 00058 class RemoteChannelElement 00059 : public CRemoteChannelElement_i 00060 , public base::ChannelElement<T> 00061 { 00062 typename internal::ValueDataSource<T>::shared_ptr data_source; 00063 00067 bool valid; 00071 bool pull; 00072 00074 typename base::ChannelElement<T>::value_t sample; 00075 00076 DataFlowInterface* msender; 00077 00081 CORBA::Any* write_any; 00082 00083 PortableServer::ObjectId_var oid; 00084 00085 public: 00091 RemoteChannelElement(CorbaTypeTransporter const& transport, DataFlowInterface* sender, PortableServer::POA_ptr poa, bool is_pull) 00092 : CRemoteChannelElement_i(transport, poa), 00093 data_source(new internal::ValueDataSource<T>), valid(true), pull(is_pull), 00094 msender(sender), 00095 write_any(new CORBA::Any) 00096 { 00097 // Big note about cleanup: The RTT will dispose this object through 00098 // the ChannelElement<T> refcounting. So we only need to inform the 00099 // POA that our object is dead in disconnect(). 00100 // CORBA refcount-managed servants must start with a refcount of 00101 // 1 00102 this->ref(); 00103 oid = mpoa->activate_object(this); 00104 // Force creation of dispatcher. 00105 CorbaDispatcher::Instance(msender); 00106 } 00107 00108 ~RemoteChannelElement() 00109 { 00110 delete write_any; 00111 } 00112 00114 void _add_ref() 00115 { this->ref(); } 00117 void _remove_ref() 00118 { this->deref(); } 00119 00120 00124 CORBA::Boolean remoteSignal() ACE_THROW_SPEC (( 00125 CORBA::SystemException 00126 )) 00127 { return base::ChannelElement<T>::signal(); } 00128 00129 bool signal() 00130 { 00131 // forward too. 00132 base::ChannelElementBase::signal(); 00133 // intercept signal if no remote side set. 00134 if ( CORBA::is_nil(remote_side.in()) ) 00135 return true; 00136 // Remember that signal() is called in the context of the one 00137 // that wrote the data, so we must decouple here to keep hard-RT happy. 00138 // the dispatch thread must read the data and send it over by calling transferSample(). 00139 CorbaDispatcher::Instance(msender)->dispatchChannel( this ); 00140 00141 return valid; 00142 } 00143 00144 virtual void transferSamples() { 00145 if (!valid) 00146 return; 00147 //log(Debug) <<"transfering..." <<endlog(); 00148 // in push mode, transfer all data, in pull mode, only signal once for each sample. 00149 if ( pull ) { 00150 try 00151 { valid = remote_side->remoteSignal(); } 00152 #ifdef CORBA_IS_OMNIORB 00153 catch(CORBA::SystemException& e) 00154 { 00155 log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << " " << e.NP_minorString() << endlog(); 00156 valid = false; 00157 } 00158 #endif 00159 catch(CORBA::Exception& e) 00160 { 00161 log(Error) << "caught CORBA exception while signalling our remote endpoint: " << e._name() << endlog(); 00162 valid = false; 00163 } 00164 } else { 00165 //log(Debug) <<"...read..."<<endlog(); 00166 while ( this->read(sample, false) == NewData && valid) { 00167 //log(Debug) <<"...write..."<<endlog(); 00168 if ( this->write(sample) == false ) 00169 valid = false; 00170 //log(Debug) <<"...next read?..."<<endlog(); 00171 } 00172 } 00173 //log(Debug) <<"... done." <<endlog(); 00174 00175 } 00176 00180 void disconnect() ACE_THROW_SPEC (( 00181 CORBA::SystemException 00182 )) { 00183 // disconnect both local and remote side. 00184 // !!!THIS RELIES ON BEHAVIOR OF REMOTEDISCONNECT BELOW doing both writer_to_reader and !writer_to_reader !!! 00185 try { 00186 if ( ! CORBA::is_nil(remote_side.in()) ) 00187 remote_side->remoteDisconnect(true); 00188 } 00189 catch(CORBA::Exception&) {} 00190 00191 try { this->remoteDisconnect(true); } 00192 catch(CORBA::Exception&) {} 00193 } 00194 00195 void remoteDisconnect(bool writer_to_reader) ACE_THROW_SPEC (( 00196 CORBA::SystemException 00197 )) 00198 { 00199 base::ChannelElement<T>::disconnect(writer_to_reader); 00200 00201 // Because we support out-of-band transports, we must cleanup more thoroughly. 00202 // an oob channel may be sitting at our other end. If not, this is a nop. 00203 base::ChannelElement<T>::disconnect(!writer_to_reader); 00204 00205 // Will fail at shutdown if all objects are already deactivated 00206 try { 00207 if (mdataflow) 00208 mdataflow->deregisterChannel(_this()); 00209 mpoa->deactivate_object(oid); 00210 } 00211 catch(CORBA::Exception&) {} 00212 } 00213 00217 void disconnect(bool writer_to_reader) ACE_THROW_SPEC (( 00218 CORBA::SystemException 00219 )) 00220 { 00221 try { 00222 if ( ! CORBA::is_nil(remote_side.in()) ) 00223 remote_side->remoteDisconnect(writer_to_reader); 00224 } 00225 catch(CORBA::Exception&) {} 00226 00227 base::ChannelElement<T>::disconnect(writer_to_reader); 00228 00229 // Will fail at shutdown if all objects are already deactivated 00230 try { 00231 if (mdataflow) 00232 mdataflow->deregisterChannel(_this()); 00233 mpoa->deactivate_object(oid); 00234 } 00235 catch(CORBA::Exception&) {} 00236 } 00237 00238 FlowStatus read(typename base::ChannelElement<T>::reference_t sample, bool copy_old_data) 00239 { 00240 // try to read locally first 00241 FlowStatus fs; 00242 CFlowStatus cfs; 00243 if ( (fs = base::ChannelElement<T>::read(sample, copy_old_data)) ) 00244 return fs; 00245 // go through corba 00246 CORBA::Any_var remote_value; 00247 try 00248 { 00249 if ( remote_side && (cfs = remote_side->read(remote_value, copy_old_data) ) ) 00250 { 00251 RTT::internal::ReferenceDataSource<T> data_source(sample); 00252 00253 // This is a workaround since updateFromAny requires a 00254 // shared_ptr, but we want to allocate the 00255 // ReferenceDataSource on the stack 00256 data_source.ref(); 00257 RTT::base::DataSourceBase::shared_ptr ptr(&data_source); 00258 00259 transport.updateFromAny(&remote_value.in(), ptr); 00260 return (FlowStatus)cfs; 00261 } 00262 else 00263 return NoData; 00264 } 00265 #ifdef CORBA_IS_OMNIORB 00266 catch(CORBA::SystemException& e) 00267 { 00268 log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << " " << e.NP_minorString() << endlog(); 00269 return NoData; 00270 } 00271 #endif 00272 catch(CORBA::Exception& e) 00273 { 00274 log(Error) << "caught CORBA exception while reading a remote channel: " << e._name() << endlog(); 00275 return NoData; 00276 } 00277 } 00278 00282 CFlowStatus read(::CORBA::Any_out sample, bool copy_old_data) ACE_THROW_SPEC (( 00283 CORBA::SystemException 00284 )) 00285 { 00286 00287 FlowStatus fs; 00288 if ( (fs = base::ChannelElement<T>::read(data_source->set(), copy_old_data)) ) 00289 { 00290 sample = transport.createAny(data_source); 00291 if ( sample != 0) { 00292 return (CFlowStatus)fs; 00293 } 00294 // this is a programmatic error and should never happen during run-time. 00295 log(Error) << "CORBA Transport failed to create Any for " << data_source->getTypeName() << " while it should have!" <<endlog(); 00296 } 00297 // we *must* return something in sample. 00298 sample = new CORBA::Any(); 00299 return CNoData; 00300 } 00301 00302 bool write(typename base::ChannelElement<T>::param_t sample) 00303 { 00304 data_source->set(sample); 00305 // try to write locally first 00306 if (base::ChannelElement<T>::write(sample)) 00307 return true; 00308 // go through corba 00309 assert( remote_side.in() != 0 && "Got write() without remote side. Need buffer OR remote side but neither was present."); 00310 try 00311 { 00312 transport.updateAny(data_source, *write_any); 00313 remote_side->write(*write_any); 00314 return true; 00315 } 00316 #ifdef CORBA_IS_OMNIORB 00317 catch(CORBA::SystemException& e) 00318 { 00319 log(Error) << "caught CORBA exception while marshalling: " << e._name() << " " << e.NP_minorString() << endlog(); 00320 return false; 00321 } 00322 #endif 00323 catch(CORBA::Exception& e) 00324 { 00325 log(Error) << "caught CORBA exception while marshalling: " << e._name() << endlog(); 00326 return false; 00327 } 00328 } 00329 00333 bool write(const ::CORBA::Any& sample) ACE_THROW_SPEC (( 00334 CORBA::SystemException 00335 )) 00336 { 00337 transport.updateFromAny(&sample, data_source); 00338 return base::ChannelElement<T>::write(data_source->rvalue()); 00339 } 00340 00341 virtual bool data_sample(typename base::ChannelElement<T>::param_t sample) 00342 { 00343 // we don't pass it on through CORBA (yet). 00344 // If an oob transport is used, that one will send it through. 00345 typename base::ChannelElement<T>::shared_ptr output = 00346 this->getOutput(); 00347 if (output) 00348 return base::ChannelElement<T>::data_sample(sample); 00349 return true; 00350 } 00351 00355 virtual bool inputReady() { 00356 // signal to oob transport if any. 00357 typename base::ChannelElement<T>::shared_ptr input = 00358 this->getInput(); 00359 if (input) 00360 return base::ChannelElement<T>::inputReady(); 00361 return true; 00362 } 00363 00364 }; 00365 } 00366 } 00367 00368 #endif 00369