Orocos Real-Time Toolkit  2.5.0
ConnFactory.cpp
00001 /***************************************************************************
00002   tag: Peter Soetens  Thu Oct 22 11:59:08 CEST 2009  ConnFactory.cpp
00003 
00004                         ConnFactory.cpp -  description
00005                            -------------------
00006     begin                : Thu October 22 2009
00007     copyright            : (C) 2009 Peter Soetens
00008     email                : peter@thesourcworks.com
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #include "../Port.hpp"
00040 #include "ConnFactory.hpp"
00041 #include "../base/InputPortInterface.hpp"
00042 #include "../DataFlowInterface.hpp"
00043 #include "../types/TypeMarshaller.hpp"
00044 
00045 using namespace std;
00046 using namespace RTT;
00047 using namespace RTT::internal;
00048 
00049 bool LocalConnID::isSameID(ConnID const& id) const
00050 {
00051     LocalConnID const* real_id = dynamic_cast<LocalConnID const*>(&id);
00052     if (!real_id)
00053         return false;
00054     else return real_id->ptr == this->ptr;
00055 }
00056 
00057 ConnID* LocalConnID::clone() const {
00058     return new LocalConnID(this->ptr);
00059 }
00060 
00061 bool StreamConnID::isSameID(ConnID const& id) const
00062 {
00063     StreamConnID const* real_id = dynamic_cast<StreamConnID const*>(&id);
00064     if (!real_id)
00065         return false;
00066     else return real_id->name_id == this->name_id;
00067 }
00068 
00069 ConnID* StreamConnID::clone() const {
00070     return new StreamConnID(this->name_id);
00071 }
00072 
00073 base::ChannelElementBase::shared_ptr RTT::internal::ConnFactory::createRemoteConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, const ConnPolicy& policy)
00074 {
00075     // Remote connection
00076     // if the policy's transport is set to zero, use the input ports server protocol,
00077     // otherwise, use the policy's protocol
00078     int transport = policy.transport == 0 ? input_port.serverProtocol() : policy.transport;
00079     types::TypeInfo const* type_info = output_port.getTypeInfo();
00080     if (!type_info || input_port.getTypeInfo() != type_info)
00081     {
00082         log(Error) << "Type of port " << output_port.getName() << " is not registered into the type system, cannot marshal it into the right transporter" << endlog();
00083         // There is no type info registered for this type
00084         return base::ChannelElementBase::shared_ptr();
00085     }
00086     else if ( !type_info->getProtocol( transport ) )
00087     {
00088         log(Error) << "Type " << type_info->getTypeName() << " cannot be marshalled into the requested transporter (id:"<< transport<<")." << endlog();
00089         // This type cannot be marshalled into the right transporter
00090         return base::ChannelElementBase::shared_ptr();
00091     }
00092     else
00093     {
00094         assert( input_port.getConnFactory() );
00095         return input_port.
00096                 getConnFactory()->buildRemoteChannelOutput(output_port, type_info, input_port, policy);
00097     }
00098     return base::ChannelElementBase::shared_ptr();
00099 }
00100 
00101 bool ConnFactory::createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy policy) {
00102     // Register the channel's input to the output port.
00103     if ( output_port.addConnection( input_port.getPortID(), channel_input, policy ) ) {
00104         // notify input that the connection is now complete.
00105         if ( input_port.channelReady( channel_input->getOutputEndPoint() ) == false ) {
00106             output_port.disconnect( &input_port );
00107             log(Error) << "The input port "<< input_port.getName()
00108                        << " could not successfully read from the connection from output port " << output_port.getName() <<endlog();
00109 
00110             return false;
00111         }
00112         log(Debug) << "Connected output port "<< output_port.getName()
00113                   << " successfully to " << input_port.getName() <<endlog();
00114         return true;
00115     }
00116     // setup failed.
00117     channel_input->disconnect(true);
00118     log(Error) << "The output port "<< output_port.getName()
00119                << " could not successfully use the connection to input port " << input_port.getName() <<endlog();
00120     return false;
00121 }
00122 
00123 bool ConnFactory::createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr chan, StreamConnID* conn_id) {
00124     if (policy.transport == 0 ) {
00125         log(Error) << "Need a transport for creating streams." <<endlog();
00126         return false;
00127     }
00128     const types::TypeInfo* type = output_port.getTypeInfo();
00129     if ( type->getProtocol(policy.transport) == 0 ) {
00130         log(Error) << "Could not create transport stream for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog();
00131         log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
00132         return false;
00133     }
00134     types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*> ( type->getProtocol(policy.transport) );
00135     if (ttt) {
00136         int size_hint = ttt->getSampleSize( output_port.getDataSource() );
00137         policy.data_size = size_hint;
00138     } else {
00139         log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog();
00140     }
00141     RTT::base::ChannelElementBase::shared_ptr chan_stream = type->getProtocol(policy.transport)->createStream(&output_port, policy, true);
00142             
00143     if ( !chan_stream ) {
00144         log(Error) << "Transport failed to create remote channel for output stream of port "<<output_port.getName() << endlog();
00145         return false;
00146     }
00147     chan->setOutput( chan_stream );
00148 
00149     if ( output_port.addConnection( new StreamConnID(policy.name_id), chan, policy) ) {
00150         log(Info) << "Created output stream for output port "<< output_port.getName() <<endlog();
00151         return true;
00152     }
00153     // setup failed.
00154     log(Error) << "Failed to create output stream for output port "<< output_port.getName() <<endlog();
00155     return false;
00156 }
00157 
00158 bool ConnFactory::createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id) {
00159     if (policy.transport == 0 ) {
00160         log(Error) << "Need a transport for creating streams." <<endlog();
00161         return false;
00162     }
00163     const types::TypeInfo* type = input_port.getTypeInfo();
00164     if ( type->getProtocol(policy.transport) == 0 ) {
00165         log(Error) << "Could not create transport stream for port "<< input_port.getName() << " with transport id " << policy.transport <<endlog();
00166         log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
00167         return false;
00168     }
00169 
00170     // note: don't refcount this final input chan, because no one will
00171     // take a reference to it. It would be destroyed upon return of this function.
00172     RTT::base::ChannelElementBase::shared_ptr chan = type->getProtocol(policy.transport)->createStream(&input_port,policy, false);
00173 
00174     if ( !chan ) {
00175         log(Error) << "Transport failed to create remote channel for input stream of port "<<input_port.getName() << endlog();
00176         return false;
00177     }
00178 
00179     // In stream mode, a buffer is always installed at input side.
00180     //
00181     ConnPolicy policy2 = policy;
00182     policy2.pull = false;
00183     // pass new name upwards.
00184     policy.name_id = policy2.name_id;
00185     conn_id->name_id = policy2.name_id;
00186 
00187     chan->getOutputEndPoint()->setOutput( outhalf );
00188     if ( input_port.channelReady( chan->getOutputEndPoint() ) == true ) {
00189         log(Info) << "Created input stream for input port "<< input_port.getName() <<endlog();
00190         return true;
00191     }
00192     // setup failed: manual cleanup.
00193     chan = 0; // deleted by channelReady() above !
00194     log(Error) << "Failed to create input stream for input port "<< input_port.getName() <<endlog();
00195     return false;
00196 }
00197 
00198 base::ChannelElementBase::shared_ptr ConnFactory::createAndCheckOutOfBandConnection( base::OutputPortInterface& output_port, 
00199                                                                                      base::InputPortInterface& input_port, 
00200                                                                                      ConnPolicy const& policy, 
00201                                                                                      base::ChannelElementBase::shared_ptr output_half, 
00202                                                                                      StreamConnID* conn_id) 
00203 {
00204     // create input half using a transport.
00205     const types::TypeInfo* type = output_port.getTypeInfo();
00206     if ( type->getProtocol(policy.transport) == 0 ) {
00207         log(Error) << "Could not create out-of-band transport for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog();
00208         log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
00209         return 0;
00210     }
00211 
00212     // we force the creation of a buffer on input side
00213     ConnPolicy policy2 = policy;
00214     policy2.pull = false;
00215     conn_id->name_id = policy2.name_id;
00216 
00217     // check if marshaller supports size hints:
00218     types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*>( type->getProtocol(policy.transport) );
00219     if (ttt) {
00220         policy2.data_size = ttt->getSampleSize(  output_port.getDataSource() );
00221     } else {
00222         log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog();
00223     }
00224     // XXX: this seems to be always true
00225     if ( input_port.isLocal() ) {
00226         RTT::base::ChannelElementBase::shared_ptr ceb_input = type->getProtocol(policy.transport)->createStream(&input_port, policy2, false);
00227         if (ceb_input) {
00228             log(Info) <<"Receiving data for port "<<input_port.getName() << " from out-of-band protocol "<< policy.transport << " with id "<< policy2.name_id<<endlog();
00229         } else {
00230             log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a remote channel for port " << input_port.getName()<<endlog();
00231             return 0;
00232         }
00233         ceb_input->getOutputEndPoint()->setOutput(output_half);
00234         output_half = ceb_input;
00235     }
00236 
00237     // XXX: this seems to be always true
00238     if ( output_port.isLocal() ) {
00239 
00240         RTT::base::ChannelElementBase::shared_ptr ceb_output = type->getProtocol(policy.transport)->createStream(&output_port, policy2, true);
00241         if (ceb_output) {
00242             log(Info) <<"Redirecting data for port "<< output_port.getName() << " to out-of-band protocol "<< policy.transport << " with id "<< policy2.name_id <<endlog();
00243         } else {
00244             log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a remote channel for port " << output_port.getName()<<endlog();
00245             return 0;
00246         }
00247         // this mediates the 'channel ready leads to initial data sample'.
00248         // it is probably not necessary, since streams don't assume this relation.
00249         ceb_output->getOutputEndPoint()->setOutput(output_half);
00250         output_half = ceb_output;
00251     }
00252     // Important ! since we made a copy above, we need to set the original to the changed name_id.
00253     policy.name_id = policy2.name_id;
00254     conn_id->name_id = policy2.name_id;
00255 
00256     return output_half;
00257 
00258 }
00259