Orocos Real-Time Toolkit  2.6.0
ConnFactory.cpp
00001 /***************************************************************************
00002   tag: Peter Soetens  Thu Oct 22 11:59:08 CEST 2009  ConnFactory.cpp
00003 
00004                         ConnFactory.cpp -  description
00005                            -------------------
00006     begin                : Thu October 22 2009
00007     copyright            : (C) 2009 Peter Soetens
00008     email                : peter@thesourcworks.com
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #include "../Port.hpp"
00040 #include "ConnFactory.hpp"
00041 #include "../base/InputPortInterface.hpp"
00042 #include "../DataFlowInterface.hpp"
00043 #include "../types/TypeMarshaller.hpp"
00044 
00045 using namespace std;
00046 using namespace RTT;
00047 using namespace RTT::internal;
00048 
00049 bool LocalConnID::isSameID(ConnID const& id) const
00050 {
00051     LocalConnID const* real_id = dynamic_cast<LocalConnID const*>(&id);
00052     if (!real_id)
00053         return false;
00054     else return real_id->ptr == this->ptr;
00055 }
00056 
00057 ConnID* LocalConnID::clone() const {
00058     return new LocalConnID(this->ptr);
00059 }
00060 
00061 bool StreamConnID::isSameID(ConnID const& id) const
00062 {
00063     StreamConnID const* real_id = dynamic_cast<StreamConnID const*>(&id);
00064     if (!real_id)
00065         return false;
00066     else return real_id->name_id == this->name_id;
00067 }
00068 
00069 ConnID* StreamConnID::clone() const {
00070     return new StreamConnID(this->name_id);
00071 }
00072 
00073 base::ChannelElementBase::shared_ptr RTT::internal::ConnFactory::createRemoteConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, const ConnPolicy& policy)
00074 {
00075     // Remote connection
00076     // if the policy's transport is set to zero, use the input ports server protocol,
00077     // otherwise, use the policy's protocol
00078     int transport = policy.transport == 0 ? input_port.serverProtocol() : policy.transport;
00079     types::TypeInfo const* type_info = output_port.getTypeInfo();
00080     if (!type_info || input_port.getTypeInfo() != type_info)
00081     {
00082         log(Error) << "Type of port " << output_port.getName() << " is not registered into the type system, cannot marshal it into the right transporter" << endlog();
00083         // There is no type info registered for this type
00084         return base::ChannelElementBase::shared_ptr();
00085     }
00086     else if ( !type_info->getProtocol( transport ) )
00087     {
00088         log(Error) << "Type " << type_info->getTypeName() << " cannot be marshalled into the requested transporter (id:"<< transport<<")." << endlog();
00089         // This type cannot be marshalled into the right transporter
00090         return base::ChannelElementBase::shared_ptr();
00091     }
00092     else
00093     {
00094         return input_port.
00095                 buildRemoteChannelOutput(output_port, type_info, input_port, policy);
00096     }
00097     return base::ChannelElementBase::shared_ptr();
00098 }
00099 
00100 bool ConnFactory::createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy policy) {
00101     // Register the channel's input to the output port.
00102     if ( output_port.addConnection( input_port.getPortID(), channel_input, policy ) ) {
00103         // notify input that the connection is now complete.
00104         if ( input_port.channelReady( channel_input->getOutputEndPoint() ) == false ) {
00105             output_port.disconnect( &input_port );
00106             log(Error) << "The input port "<< input_port.getName()
00107                        << " could not successfully read from the connection from output port " << output_port.getName() <<endlog();
00108 
00109             return false;
00110         }
00111         log(Debug) << "Connected output port "<< output_port.getName()
00112                   << " successfully to " << input_port.getName() <<endlog();
00113         return true;
00114     }
00115     // setup failed.
00116     channel_input->disconnect(true);
00117     log(Error) << "The output port "<< output_port.getName()
00118                << " could not successfully use the connection to input port " << input_port.getName() <<endlog();
00119     return false;
00120 }
00121 
00122 bool ConnFactory::createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr chan, StreamConnID* conn_id) {
00123     if (policy.transport == 0 ) {
00124         log(Error) << "Need a transport for creating streams." <<endlog();
00125         return false;
00126     }
00127     const types::TypeInfo* type = output_port.getTypeInfo();
00128     if ( type->getProtocol(policy.transport) == 0 ) {
00129         log(Error) << "Could not create transport stream for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog();
00130         log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
00131         return false;
00132     }
00133     types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*> ( type->getProtocol(policy.transport) );
00134     if (ttt) {
00135         int size_hint = ttt->getSampleSize( output_port.getDataSource() );
00136         policy.data_size = size_hint;
00137     } else {
00138         log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog();
00139     }
00140     RTT::base::ChannelElementBase::shared_ptr chan_stream = type->getProtocol(policy.transport)->createStream(&output_port, policy, true);
00141             
00142     if ( !chan_stream ) {
00143         log(Error) << "Transport failed to create remote channel for output stream of port "<<output_port.getName() << endlog();
00144         return false;
00145     }
00146     chan->setOutput( chan_stream );
00147 
00148     if ( output_port.addConnection( new StreamConnID(policy.name_id), chan, policy) ) {
00149         log(Info) << "Created output stream for output port "<< output_port.getName() <<endlog();
00150         return true;
00151     }
00152     // setup failed.
00153     log(Error) << "Failed to create output stream for output port "<< output_port.getName() <<endlog();
00154     return false;
00155 }
00156 
00157 bool ConnFactory::createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id) {
00158     if (policy.transport == 0 ) {
00159         log(Error) << "Need a transport for creating streams." <<endlog();
00160         return false;
00161     }
00162     const types::TypeInfo* type = input_port.getTypeInfo();
00163     if ( type->getProtocol(policy.transport) == 0 ) {
00164         log(Error) << "Could not create transport stream for port "<< input_port.getName() << " with transport id " << policy.transport <<endlog();
00165         log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
00166         return false;
00167     }
00168 
00169     // note: don't refcount this final input chan, because no one will
00170     // take a reference to it. It would be destroyed upon return of this function.
00171     RTT::base::ChannelElementBase::shared_ptr chan = type->getProtocol(policy.transport)->createStream(&input_port,policy, false);
00172 
00173     if ( !chan ) {
00174         log(Error) << "Transport failed to create remote channel for input stream of port "<<input_port.getName() << endlog();
00175         return false;
00176     }
00177 
00178     // In stream mode, a buffer is always installed at input side.
00179     //
00180     ConnPolicy policy2 = policy;
00181     policy2.pull = false;
00182     // pass new name upwards.
00183     policy.name_id = policy2.name_id;
00184     conn_id->name_id = policy2.name_id;
00185 
00186     chan->getOutputEndPoint()->setOutput( outhalf );
00187     if ( input_port.channelReady( chan->getOutputEndPoint() ) == true ) {
00188         log(Info) << "Created input stream for input port "<< input_port.getName() <<endlog();
00189         return true;
00190     }
00191     // setup failed: manual cleanup.
00192     chan = 0; // deleted by channelReady() above !
00193     log(Error) << "Failed to create input stream for input port "<< input_port.getName() <<endlog();
00194     return false;
00195 }
00196 
00197 base::ChannelElementBase::shared_ptr ConnFactory::createAndCheckOutOfBandConnection( base::OutputPortInterface& output_port, 
00198                                                                                      base::InputPortInterface& input_port, 
00199                                                                                      ConnPolicy const& policy, 
00200                                                                                      base::ChannelElementBase::shared_ptr output_half, 
00201                                                                                      StreamConnID* conn_id) 
00202 {
00203     // create input half using a transport.
00204     const types::TypeInfo* type = output_port.getTypeInfo();
00205     if ( type->getProtocol(policy.transport) == 0 ) {
00206         log(Error) << "Could not create out-of-band transport for port "<< output_port.getName() << " with transport id " << policy.transport <<endlog();
00207         log(Error) << "No such transport registered. Check your policy.transport settings or add the transport for type "<< type->getTypeName() <<endlog();
00208         return 0;
00209     }
00210 
00211     // we force the creation of a buffer on input side
00212     ConnPolicy policy2 = policy;
00213     policy2.pull = false;
00214     conn_id->name_id = policy2.name_id;
00215 
00216     // check if marshaller supports size hints:
00217     types::TypeMarshaller* ttt = dynamic_cast<types::TypeMarshaller*>( type->getProtocol(policy.transport) );
00218     if (ttt) {
00219         policy2.data_size = ttt->getSampleSize(  output_port.getDataSource() );
00220     } else {
00221         log(Debug) <<"Could not determine sample size for type " << type->getTypeName() << endlog();
00222     }
00223     // XXX: this seems to be always true
00224     if ( input_port.isLocal() ) {
00225         RTT::base::ChannelElementBase::shared_ptr ceb_input = type->getProtocol(policy.transport)->createStream(&input_port, policy2, false);
00226         if (ceb_input) {
00227             log(Info) <<"Receiving data for port "<<input_port.getName() << " from out-of-band protocol "<< policy.transport << " with id "<< policy2.name_id<<endlog();
00228         } else {
00229             log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a remote channel for port " << input_port.getName()<<endlog();
00230             return 0;
00231         }
00232         ceb_input->getOutputEndPoint()->setOutput(output_half);
00233         output_half = ceb_input;
00234     }
00235 
00236     // XXX: this seems to be always true
00237     if ( output_port.isLocal() ) {
00238 
00239         RTT::base::ChannelElementBase::shared_ptr ceb_output = type->getProtocol(policy.transport)->createStream(&output_port, policy2, true);
00240         if (ceb_output) {
00241             log(Info) <<"Redirecting data for port "<< output_port.getName() << " to out-of-band protocol "<< policy.transport << " with id "<< policy2.name_id <<endlog();
00242         } else {
00243             log(Error) << "The type transporter for type "<<type->getTypeName()<< " failed to create a remote channel for port " << output_port.getName()<<endlog();
00244             return 0;
00245         }
00246         // this mediates the 'channel ready leads to initial data sample'.
00247         // it is probably not necessary, since streams don't assume this relation.
00248         ceb_output->getOutputEndPoint()->setOutput(output_half);
00249         output_half = ceb_output;
00250     }
00251     // Important ! since we made a copy above, we need to set the original to the changed name_id.
00252     policy.name_id = policy2.name_id;
00253     conn_id->name_id = policy2.name_id;
00254 
00255     return output_half;
00256 
00257 }
00258