Orocos Real-Time Toolkit  2.6.0
DataFlowI.cpp
00001 /***************************************************************************
00002   tag: FMTC  Tue Mar 11 21:49:24 CET 2008  DataFlowI.cpp
00003 
00004                         DataFlowI.cpp -  description
00005                            -------------------
00006     begin                : Tue March 11 2008
00007     copyright            : (C) 2008 FMTC
00008     email                : peter.soetens@fmtc.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 #include "DataFlowI.h"
00039 #include "corba.h"
00040 #ifdef CORBA_IS_TAO
00041 #include "DataFlowS.h"
00042 #else
00043 #include "DataFlowC.h"
00044 #endif
00045 #include "../../base/PortInterface.hpp"
00046 #include "../../Logger.hpp"
00047 #include "TaskContextProxy.hpp"
00048 #include "CorbaTypeTransporter.hpp"
00049 #include "../../InputPort.hpp"
00050 #include "../../OutputPort.hpp"
00051 #include "CorbaConnPolicy.hpp"
00052 #include "CorbaLib.hpp"
00053 
00054 #include "RemotePorts.hpp"
00055 #include "RemoteConnID.hpp"
00056 #include <rtt/os/MutexLock.hpp>
00057 
00058 #include <iostream>
00059 
00060 using namespace std;
00061 using namespace RTT::corba;
00062 using namespace RTT::base;
00063 using namespace RTT::types;
00064 using namespace RTT::internal;
00065 
00066 CDataFlowInterface_i::ServantMap CDataFlowInterface_i::s_servant_map;
00067 
00068 CDataFlowInterface_i::CDataFlowInterface_i (RTT::DataFlowInterface* interface, PortableServer::POA_ptr poa)
00069     : mdf(interface), mpoa(PortableServer::POA::_duplicate(poa))
00070 {
00071 }
00072 
00073 CDataFlowInterface_i::~CDataFlowInterface_i ()
00074 {
00075     channel_list.clear();
00076 }
00077 
00078 void CDataFlowInterface_i::registerServant(CDataFlowInterface_ptr objref, RTT::DataFlowInterface* obj)
00079 {
00080     s_servant_map.push_back(
00081             std::make_pair(
00082                 CDataFlowInterface_var(objref),
00083                 obj)
00084             );
00085 }
00086 void CDataFlowInterface_i::deregisterServant(RTT::DataFlowInterface* obj)
00087 {
00088     for (ServantMap::iterator it = s_servant_map.begin();
00089             it != s_servant_map.end(); ++it)
00090     {
00091         if (it->second == obj)
00092         {
00093             s_servant_map.erase(it);
00094             return;
00095         }
00096     }
00097 }
00098 
00099 void CDataFlowInterface_i::clearServants()
00100 {
00101         s_servant_map.clear();
00102 }
00103 
00104 RTT::DataFlowInterface* CDataFlowInterface_i::getLocalInterface(CDataFlowInterface_ptr objref)
00105 {
00106     for (ServantMap::const_iterator it = s_servant_map.begin();
00107             it != s_servant_map.end(); ++it)
00108     {
00109         if (it->first->_is_equivalent(objref))
00110             return it->second;
00111     }
00112     return NULL;
00113 }
00114 
00115 CDataFlowInterface_ptr CDataFlowInterface_i::getRemoteInterface(RTT::DataFlowInterface* dfi, PortableServer::POA_ptr poa)
00116 {
00117     for (ServantMap::const_iterator it = s_servant_map.begin();
00118             it != s_servant_map.end(); ++it)
00119     {
00120         if (it->second == dfi)
00121             return it->first;
00122     }
00123     CDataFlowInterface_i* servant = new CDataFlowInterface_i(dfi, poa );
00124     CDataFlowInterface_ptr server = servant->_this();
00125     servant->_remove_ref();
00126     registerServant( server, dfi);
00127     return server;
00128 }
00129 
00130 PortableServer::POA_ptr CDataFlowInterface_i::_default_POA()
00131 {
00132     return PortableServer::POA::_duplicate(mpoa);
00133 }
00134 
00135 CDataFlowInterface::CPortNames * CDataFlowInterface_i::getPorts() ACE_THROW_SPEC ((
00136           CORBA::SystemException
00137         ))
00138 {
00139     ::RTT::DataFlowInterface::PortNames ports = mdf->getPortNames();
00140 
00141     RTT::corba::CDataFlowInterface::CPortNames_var pn = new RTT::corba::CDataFlowInterface::CPortNames();
00142     pn->length( ports.size() );
00143 
00144     for (unsigned int i=0; i != ports.size(); ++i )
00145         pn[i] = CORBA::string_dup( ports[i].c_str() );
00146 
00147     return pn._retn();
00148 }
00149 
00150 CDataFlowInterface::CPortDescriptions* CDataFlowInterface_i::getPortDescriptions() ACE_THROW_SPEC ((
00151           CORBA::SystemException
00152         ))
00153 {
00154     RTT::DataFlowInterface::PortNames ports = mdf->getPortNames();
00155     RTT::corba::CDataFlowInterface::CPortDescriptions_var result = new RTT::corba::CDataFlowInterface::CPortDescriptions();
00156     result->length( ports.size() );
00157 
00158     unsigned int j = 0;
00159     for (unsigned int i = 0; i < ports.size(); ++i)
00160     {
00161         CPortDescription port_desc;
00162 
00163         PortInterface* port = mdf->getPort(ports[i]);
00164         port_desc.name = CORBA::string_dup(ports[i].c_str());
00165 
00166         TypeInfo const* type_info = port->getTypeInfo();
00167         if (!type_info || !type_info->getProtocol(ORO_CORBA_PROTOCOL_ID))
00168         {
00169             log(Warning) << "the type of port " << ports[i] << " is not registered into the Orocos type system. It is ignored by the CORBA layer." << endlog();
00170             continue;
00171         }
00172 
00173         port_desc.type_name = CORBA::string_dup(type_info->getTypeName().c_str());
00174         if (dynamic_cast<InputPortInterface*>(port))
00175             port_desc.type = corba::CInput;
00176         else
00177             port_desc.type = corba::COutput;
00178 
00179         result[j++] = port_desc;
00180     }
00181     result->length( j );
00182     return result._retn();
00183 }
00184 
00185 CPortType CDataFlowInterface_i::getPortType(const char * port_name) ACE_THROW_SPEC ((
00186           CORBA::SystemException
00187           ,::RTT::corba::CNoSuchPortException
00188         ))
00189 {
00190     PortInterface* p = mdf->getPort(port_name);
00191     if (p == 0)
00192         throw CNoSuchPortException();
00193 
00194     if (dynamic_cast<InputPortInterface*>(p))
00195         return RTT::corba::CInput;
00196     else return RTT::corba::COutput;
00197 }
00198 
00199 char* CDataFlowInterface_i::getDataType(const char * port_name) ACE_THROW_SPEC ((
00200           CORBA::SystemException
00201           ,::RTT::corba::CNoSuchPortException
00202         ))
00203 {
00204     PortInterface* p = mdf->getPort(port_name);
00205     if ( p == 0)
00206         throw CNoSuchPortException();
00207     return CORBA::string_dup( p->getTypeInfo()->getTypeName().c_str() );
00208 }
00209 
00210 CORBA::Boolean CDataFlowInterface_i::isConnected(const char * port_name) ACE_THROW_SPEC ((
00211           CORBA::SystemException
00212           ,::RTT::corba::CNoSuchPortException
00213         ))
00214 {
00215     PortInterface* p = mdf->getPort(port_name);
00216     if (p == 0)
00217         throw corba::CNoSuchPortException();
00218 
00219     return p->connected();
00220 }
00221 
00222 void CDataFlowInterface_i::deregisterChannel(CChannelElement_ptr channel)
00223 { RTT::os::MutexLock lock(channel_list_mtx);
00224     ChannelList::iterator it=channel_list.begin();
00225     for (; it != channel_list.end(); ++it) {
00226         if (it->first->_is_equivalent (channel) ) {
00227             channel_list.erase(it); // it->first is a _var so releases automatically.
00228             return;
00229         }
00230     }
00231 }
00232 
00233 CORBA::Boolean CDataFlowInterface_i::channelReady(const char * reader_port_name, CChannelElement_ptr channel) ACE_THROW_SPEC ((
00234           CORBA::SystemException
00235           ,::RTT::corba::CNoSuchPortException
00236         ))
00237 {
00238     PortInterface* p = mdf->getPort(reader_port_name);
00239     if (p == 0)
00240         throw corba::CNoSuchPortException();
00241 
00242     InputPortInterface* ip = dynamic_cast<InputPortInterface*>(p);
00243     if (ip == 0)
00244         throw corba::CNoSuchPortException();
00245 
00246     CORBA_CHECK_THREAD();
00247     // lookup the C++ channel that matches the corba channel and
00248     // inform our local port that that C++ channel is ready.
00249     { RTT::os::MutexLock lock(channel_list_mtx);
00250         ChannelList::iterator it=channel_list.begin();
00251         for (; it != channel_list.end(); ++it) {
00252             if (it->first->_is_equivalent (channel) ) {
00253                 try {
00254                     return ip->channelReady( it->second );
00255                 }
00256                 catch(std::exception const& e)
00257                 {
00258                     log(Error) << "call to channelReady threw " << e.what() << endlog();
00259                     throw;
00260                 }
00261             }
00262         }
00263     }
00264     log(Error) << "Invalid CORBA channel given for port " << reader_port_name << ": could not match it to a local C++ channel." <<endlog();
00265     return false;
00266 }
00267 
00268 void CDataFlowInterface_i::disconnectPort(const char * port_name) ACE_THROW_SPEC ((
00269           CORBA::SystemException
00270           ,::RTT::corba::CNoSuchPortException
00271         ))
00272 {
00273     PortInterface* p = mdf->getPort(port_name);
00274     if (p == 0) {
00275         log(Error) << "disconnectPort: No such port: "<< port_name <<endlog();
00276         throw corba::CNoSuchPortException();
00277     }
00278     CORBA_CHECK_THREAD();
00279     p->disconnect();
00280 }
00281 
00282 bool CDataFlowInterface_i::removeConnection(
00283         const char* local_port,
00284         CDataFlowInterface_ptr remote_interface, const char* remote_port) ACE_THROW_SPEC ((
00285                   CORBA::SystemException
00286                   ,::RTT::corba::CNoSuchPortException
00287                 ))
00288 {
00289     PortInterface* port = mdf->getPort(local_port);
00290     // CORBA does not support disconnecting from the input port
00291     if (port == 0) {
00292         log(Error) << "disconnectPort: No such port: "<< local_port <<endlog();
00293         throw corba::CNoSuchPortException();
00294     }
00295     if (dynamic_cast<OutputPortInterface*>(port) == 0) {
00296         log(Error) << "disconnectPort: "<< local_port << " is an input port" << endlog();
00297         throw corba::CNoSuchPortException();
00298     }
00299 
00300     RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(remote_interface);
00301     if (local_interface)
00302     {
00303         PortInterface* other_port = local_interface->getPort(remote_port);
00304         if (!other_port)
00305             return false;
00306 
00307         // Try to disconnect the local port. However, one might have forced
00308         // having a CORBA connection between local ports, so if it fails go on
00309         // with normal CORBA disconnection
00310         if (port->disconnect(other_port))
00311             return true;
00312 
00313     }
00314 
00315     CORBA_CHECK_THREAD();
00316     RemoteConnID rcid(remote_interface, remote_port);
00317     return port->removeConnection( &rcid );
00318 }
00319 
00320 ::CORBA::Boolean CDataFlowInterface_i::createStream( const char* port,
00321                                RTT::corba::CConnPolicy & policy) ACE_THROW_SPEC ((
00322                                       CORBA::SystemException
00323                                       ,::RTT::corba::CNoSuchPortException
00324                                     ))
00325 {
00326     PortInterface* p = mdf->getPort(port);
00327     if (p == 0) {
00328         log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
00329         throw corba::CNoSuchPortException();
00330     }
00331 
00332     CORBA_CHECK_THREAD();
00333     RTT::ConnPolicy p2 = toRTT(policy);
00334     if ( p->createStream( p2 ) ) {
00335         policy = toCORBA(p2);
00336         return true;
00337     }
00338     return false;
00339 }
00340 
00341 void CDataFlowInterface_i::removeStream( const char* port, const char* stream_name) ACE_THROW_SPEC ((
00342           CORBA::SystemException
00343           ,::RTT::corba::CNoSuchPortException
00344         ))
00345 {
00346     PortInterface* p = mdf->getPort(port);
00347     if (p == 0) {
00348         log(Error) << "createStream: No such port: "<< p->getName() <<endlog();
00349         throw corba::CNoSuchPortException();
00350     }
00351     CORBA_CHECK_THREAD();
00352     StreamConnID rcid(stream_name);
00353     p->removeConnection( &rcid );
00354 }
00355 
00356 
00357 CChannelElement_ptr CDataFlowInterface_i::buildChannelOutput(
00358         const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
00359                   CORBA::SystemException
00360                   ,::RTT::corba::CNoCorbaTransport
00361                   ,::RTT::corba::CNoSuchPortException
00362                 ))
00363 {
00364     Logger::In in("CDataFlowInterface_i::buildChannelOutput");
00365     InputPortInterface* port = dynamic_cast<InputPortInterface*>(mdf->getPort(port_name));
00366     if (port == 0)
00367         throw CNoSuchPortException();
00368 
00369     TypeInfo const* type_info = port->getTypeInfo();
00370     if (!type_info)
00371         throw CNoCorbaTransport();
00372 
00373     CorbaTypeTransporter* transporter =
00374         dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
00375     if (!transporter)
00376         throw CNoCorbaTransport();
00377 
00378     CORBA_CHECK_THREAD();
00379     // Convert to RTT policy.
00380     ConnPolicy policy2 = toRTT(corba_policy);
00381 
00382     ChannelElementBase::shared_ptr end = type_info->buildChannelOutput(*port);
00383     CRemoteChannelElement_i* this_element =
00384         transporter->createChannelElement_i(mdf, mpoa, corba_policy.pull);
00385     this_element->setCDataFlowInterface(this);
00386 
00387     /*
00388      * This part is for out-of band (needs to be factored out).
00389      */
00390     if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
00391         // prepare out-of-band transport for this port.
00392         // if user supplied name, use that one.
00393         if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
00394             log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
00395             log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
00396             return RTT::corba::CChannelElement::_nil();
00397         }
00398         RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, false);
00399         // if no user supplied name, pass on the new name.
00400         if ( strlen( corba_policy.name_id.in()) == 0 )
00401             corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
00402 
00403         if (ceb) {
00404             // override, insert oob element between corba and endpoint and add a buffer between oob and endpoint.
00405             dynamic_cast<ChannelElementBase*>(this_element)->setOutput(ceb);
00406             ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00407             ceb->setOutput( buf );
00408             buf->setOutput(end);
00409             log(Info) <<"Receiving data for port "<< policy2.name_id << " from out-of-band protocol "<< corba_policy.transport <<endlog();
00410         } else {
00411             log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
00412             return RTT::corba::CChannelElement::_nil();
00413         }
00414         //
00415     } else {
00416         // No OOB. omit buffer if in pull
00417         if ( !corba_policy.pull ) {
00418             ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00419             dynamic_cast<ChannelElementBase*>(this_element)->setOutput(buf);
00420             buf->setOutput(end);
00421         } else {
00422             dynamic_cast<ChannelElementBase*>(this_element)->setOutput(end);
00423         }
00424     }
00425 
00426     this_element->_remove_ref();
00427 
00428     // store our mapping of corba channel elements to C++ channel elements. We need this for channelReady() and removing a channel again.
00429     { RTT::os::MutexLock lock(channel_list_mtx);
00430         channel_list.push_back( ChannelList::value_type(this_element->_this(), end->getOutputEndPoint()));
00431     }
00432 
00433     CRemoteChannelElement_var proxy = this_element->_this();
00434     return proxy._retn();
00435 }
00436 
00440 CChannelElement_ptr CDataFlowInterface_i::buildChannelInput(
00441         const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC ((
00442                   CORBA::SystemException
00443                   ,::RTT::corba::CNoCorbaTransport
00444                   ,::RTT::corba::CNoSuchPortException
00445                 ))
00446 {
00447     Logger::In in("CDataFlowInterface_i::buildChannelInput");
00448     // First check validity of user input...
00449     OutputPortInterface* port = dynamic_cast<OutputPortInterface*>(mdf->getPort(port_name));
00450     if (port == 0)
00451         throw CNoSuchPortException();
00452 
00453     TypeInfo const* type_info = port->getTypeInfo();
00454     if (!type_info)
00455         throw CNoCorbaTransport();
00456 
00457     CorbaTypeTransporter* transporter =
00458         dynamic_cast<CorbaTypeTransporter*>(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID));
00459     if (!transporter)
00460         throw CNoCorbaTransport();
00461 
00462     CORBA_CHECK_THREAD();
00463     // Convert to RTT policy.
00464     ConnPolicy policy2 = toRTT(corba_policy);
00465 
00466     // Now create the output-side channel elements.
00467     ChannelElementBase::shared_ptr start = type_info->buildChannelInput(*port);
00468 
00469     // The channel element that exposes our channel in CORBA
00470     CRemoteChannelElement_i* this_element;
00471     PortableServer::ServantBase_var servant = this_element = transporter->createChannelElement_i(mdf, mpoa, corba_policy.pull);
00472     this_element->setCDataFlowInterface(this);
00473 
00474     // Attach the corba channel element first (so OOB is after corba).
00475     assert( dynamic_cast<ChannelElementBase*>(this_element) );
00476     start->getOutputEndPoint()->setOutput( dynamic_cast<ChannelElementBase*>(this_element));
00477 
00478     /*
00479      * This part if for out-of band. (needs to be factored out).
00480      */
00481     if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) {
00482         // prepare out-of-band transport for this port.
00483         // if user supplied name, use that one.
00484         if ( type_info->getProtocol(corba_policy.transport) == 0 ) {
00485             log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <<endlog();
00486             log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <<endlog();
00487             throw CNoCorbaTransport();
00488         }
00489         RTT::base::ChannelElementBase::shared_ptr ceb = type_info->getProtocol(corba_policy.transport)->createStream(port, policy2, true);
00490         // if no user supplied name, pass on the new name.
00491         if ( strlen( corba_policy.name_id.in()) == 0 )
00492             corba_policy.name_id = CORBA::string_dup( policy2.name_id.c_str() );
00493 
00494         if (ceb) {
00495             // OOB is added to end of chain.
00496             start->getOutputEndPoint()->setOutput( ceb );
00497             log(Info) <<"Sending data from port "<< policy2.name_id << " to out-of-band protocol "<< corba_policy.transport <<endlog();
00498         } else {
00499             log(Error) << "The type transporter for type "<<type_info->getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<<endlog();
00500             throw CNoCorbaTransport();
00501         }
00502     } else {
00503         // No OOB. Always add output buffer.
00504         ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy));
00505         start->setOutput(buf);
00506         buf->setOutput( dynamic_cast<ChannelElementBase*>(this_element) );
00507     }
00508 
00509 
00510     // Attach to our output port:
00511     port->addConnection( new SimpleConnID(), start->getInputEndPoint(), policy2);
00512 
00513     // DO NOT DO THIS: _remove_ref() is tied to the refcount of the ChannelElementBase !
00514     //this_element->_remove_ref();
00515 
00516     // Finally, store our mapping of corba channel elements to C++ channel elements. We need this for channelReady() and removing a channel again.
00517     { RTT::os::MutexLock lock(channel_list_mtx);
00518         channel_list.push_back( ChannelList::value_type(this_element->_this(), start->getInputEndPoint()));
00519     }
00520 
00521     return this_element->_this();
00522 }
00523 
00524 
00525 ::CORBA::Boolean CDataFlowInterface_i::createConnection(
00526         const char* writer_port, CDataFlowInterface_ptr reader_interface,
00527         const char* reader_port, CConnPolicy & policy) ACE_THROW_SPEC ((
00528                   CORBA::SystemException
00529                   ,::RTT::corba::CNoSuchPortException
00530                 ))
00531 {
00532     Logger::In in("CDataFlowInterface_i::createConnection");
00533     OutputPortInterface* writer = dynamic_cast<OutputPortInterface*>(mdf->getPort(writer_port));
00534     if (writer == 0)
00535         throw CNoSuchPortException();
00536 
00537     CORBA_CHECK_THREAD();
00538     // Check if +reader_interface+ is local. If it is, use the non-CORBA
00539     // connection.
00540     RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(reader_interface);
00541     if (local_interface && policy.transport == 0)
00542     {
00543         InputPortInterface* reader =
00544             dynamic_cast<InputPortInterface*>(local_interface->getPort(reader_port));
00545         if (!reader)
00546         {
00547             log(Warning) << "CORBA: createConnection() target is not an input port" << endlog();
00548             throw CNoSuchPortException();
00549             return false;
00550         }
00551 
00552         log(Debug) << "CORBA: createConnection() is creating a LOCAL connection between " <<
00553            writer_port << " and " << reader_port << endlog();
00554         return writer->createConnection(*reader, toRTT(policy));
00555     }
00556     else
00557         log(Debug) << "CORBA: createConnection() is creating a REMOTE connection between " <<
00558            writer_port << " and " << reader_port << endlog();
00559 
00560     try {
00561         if (reader_interface->getPortType(reader_port) != corba::CInput) {
00562             log(Error) << "Could not create connection: " << reader_port <<" is not an input port."<<endlog();
00563             throw CNoSuchPortException();
00564             return false;
00565         }
00566 
00567         // Creates a proxy to the remote input port
00568         RemoteInputPort port(writer->getTypeInfo(), reader_interface, reader_port, mpoa);
00569         port.setInterface( mdf ); // cheating !
00570         // Connect to proxy.
00571         return writer->createConnection(port, toRTT(policy));
00572     }
00573     catch(CORBA::COMM_FAILURE&) { throw; }
00574     catch(CORBA::TRANSIENT&) { throw; }
00575     catch(...) { throw; }
00576     return false;
00577 }
00578 
00579 // standard constructor
00580 CRemoteChannelElement_i::CRemoteChannelElement_i(RTT::corba::CorbaTypeTransporter const& transport,
00581       PortableServer::POA_ptr poa)
00582     : transport(transport)
00583     , mpoa(PortableServer::POA::_duplicate(poa))
00584     , mdataflow(0)
00585     { }
00586 CRemoteChannelElement_i::~CRemoteChannelElement_i() {}
00587 PortableServer::POA_ptr CRemoteChannelElement_i::_default_POA()
00588 { return PortableServer::POA::_duplicate(mpoa); }
00589 void CRemoteChannelElement_i::setRemoteSide(CRemoteChannelElement_ptr remote) ACE_THROW_SPEC ((
00590           CORBA::SystemException
00591         ))
00592 { this->remote_side = RTT::corba::CRemoteChannelElement::_duplicate(remote); }
00593