Orocos Real-Time Toolkit  2.5.0
ConnFactory.hpp
00001 /***************************************************************************
00002   tag: Peter Soetens  Thu Oct 22 11:59:08 CEST 2009  ConnFactory.hpp
00003 
00004                         ConnFactory.hpp -  description
00005                            -------------------
00006     begin                : Thu October 22 2009
00007     copyright            : (C) 2009 Peter Soetens
00008     email                : peter@thesourcworks.com
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #ifndef ORO_CONN_FACTORY_HPP
00040 #define ORO_CONN_FACTORY_HPP
00041 
00042 #include <string>
00043 #include "Channels.hpp"
00044 #include "ConnInputEndPoint.hpp"
00045 #include "ConnOutputEndPoint.hpp"
00046 #include "../base/PortInterface.hpp"
00047 #include "../base/InputPortInterface.hpp"
00048 #include "../base/OutputPortInterface.hpp"
00049 #include "../DataFlowInterface.hpp"
00050 
00051 #include "../base/DataObject.hpp"
00052 #include "../base/DataObjectUnSync.hpp"
00053 #include "../base/Buffer.hpp"
00054 #include "../base/BufferUnSync.hpp"
00055 #include "../Logger.hpp"
00056 
00057 namespace RTT
00058 { namespace internal {
00059 
00063     struct LocalConnID : public ConnID
00064     {
00065         base::PortInterface const* ptr;
00066         LocalConnID(base::PortInterface const* obj)
00067             : ptr(obj) {}
00068         virtual ConnID* clone() const;
00069         virtual bool isSameID(ConnID const& id) const;
00070     };
00071 
00075     struct RTT_API StreamConnID : public ConnID
00076     {
00077         std::string name_id;
00078         StreamConnID(const std::string& name)
00079             : name_id(name) {}
00080         virtual ConnID* clone() const;
00081         virtual bool isSameID(ConnID const& id) const;
00082     };
00083 
00084 
00091     class RTT_API ConnFactory
00092     {
00093     public:
00094 
00100         virtual base::ChannelElementBase::shared_ptr buildRemoteChannelOutput(
00101                 base::OutputPortInterface& output_port,
00102                 types::TypeInfo const* type_info,
00103                 base::InputPortInterface& input, const ConnPolicy& policy) = 0;
00104 
00112         template<typename T>
00113         static base::ChannelElementBase* buildDataStorage(ConnPolicy const& policy, const T& initial_value = T())
00114         {
00115             if (policy.type == ConnPolicy::DATA)
00116             {
00117                 typename base::DataObjectInterface<T>::shared_ptr data_object;
00118                 switch (policy.lock_policy)
00119                 {
00120 #ifndef OROBLD_OS_NO_ASM
00121                 case ConnPolicy::LOCK_FREE:
00122                     data_object.reset( new base::DataObjectLockFree<T>(initial_value) );
00123                     break;
00124 #else
00125         case ConnPolicy::LOCK_FREE:
00126             RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
00127 #endif
00128                 case ConnPolicy::LOCKED:
00129                     data_object.reset( new base::DataObjectLocked<T>(initial_value) );
00130                     break;
00131                 case ConnPolicy::UNSYNC:
00132                     data_object.reset( new base::DataObjectUnSync<T>(initial_value) );
00133                     break;
00134                 }
00135 
00136                 ChannelDataElement<T>* result = new ChannelDataElement<T>(data_object);
00137                 return result;
00138             }
00139             else if (policy.type == ConnPolicy::BUFFER)
00140             {
00141                 base::BufferInterface<T>* buffer_object = 0;
00142                 switch (policy.lock_policy)
00143                 {
00144 #ifndef OROBLD_OS_NO_ASM
00145                 case ConnPolicy::LOCK_FREE:
00146                     buffer_object = new base::BufferLockFree<T>(policy.size, initial_value);
00147                     break;
00148 #else
00149         case ConnPolicy::LOCK_FREE:
00150             RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
00151 #endif
00152                 case ConnPolicy::LOCKED:
00153                     buffer_object = new base::BufferLocked<T>(policy.size, initial_value);
00154                     break;
00155                 case ConnPolicy::UNSYNC:
00156                     buffer_object = new base::BufferUnSync<T>(policy.size, initial_value);
00157                     break;
00158                 }
00159                 return new ChannelBufferElement<T>(typename base::BufferInterface<T>::shared_ptr(buffer_object));
00160             }
00161             return NULL;
00162         }
00163 
00172         template<typename T>
00173         static base::ChannelElementBase::shared_ptr buildChannelInput(OutputPort<T>& port, ConnID* conn_id, base::ChannelElementBase::shared_ptr output_channel)
00174         {
00175             assert(conn_id);
00176             base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id);
00177             if (output_channel)
00178                 endpoint->setOutput(output_channel);
00179             return endpoint;
00180         }
00181 
00192         template<typename T>
00193         static base::ChannelElementBase::shared_ptr buildBufferedChannelInput(OutputPort<T>& port, ConnID* conn_id, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr output_channel)
00194         {
00195             assert(conn_id);
00196             base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id);
00197             base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, port.getLastWrittenValue() );
00198             endpoint->setOutput(data_object);
00199             if (output_channel)
00200                 data_object->setOutput(output_channel);
00201             return endpoint;
00202         }
00203 
00211         template<typename T>
00212         static base::ChannelElementBase::shared_ptr buildChannelOutput(InputPort<T>& port, ConnID* conn_id)
00213         {
00214             assert(conn_id);
00215             base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id);
00216             return endpoint;
00217         }
00218 
00228         template<typename T>
00229         static base::ChannelElementBase::shared_ptr buildBufferedChannelOutput(InputPort<T>& port, ConnID* conn_id, ConnPolicy const& policy, T const& initial_value = T() )
00230         {
00231             assert(conn_id);
00232             base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id);
00233             base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, initial_value);
00234             data_object->setOutput(endpoint);
00235             return data_object;
00236         }
00237 
00247         template<typename T>
00248         static bool createConnection(OutputPort<T>& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy)
00249         {
00250             if ( !output_port.isLocal() ) {
00251                 log(Error) << "Need a local OutputPort to create connections." <<endlog();
00252                 return false;
00253             }
00254 
00255             InputPort<T>* input_p = dynamic_cast<InputPort<T>*>(&input_port);
00256 
00257             // This is the input channel element of the output half
00258             base::ChannelElementBase::shared_ptr output_half = 0;
00259             if (input_port.isLocal() && policy.transport == 0)
00260             {
00261                 // Local connection
00262                 if (!input_p)
00263                 {
00264                     log(Error) << "Port " << input_port.getName() << " is not compatible with " << output_port.getName() << endlog();
00265                     return false;
00266                 }
00267                 // local ports, create buffer here.
00268                 output_half = buildBufferedChannelOutput<T>(*input_p, output_port.getPortID(), policy, output_port.getLastWrittenValue());
00269             }
00270             else
00271             {
00272                 // if the input is not local, this is a pure remote connection,
00273                 // if the input *is* local, the user requested to use a different transport
00274                 // than plain memory, rare case, but we accept it. The unit tests use this for example
00275                 // to test the OOB transports.
00276                 if ( !input_port.isLocal() ) {
00277                     output_half = createRemoteConnection( output_port, input_port, policy);
00278                 } else
00279                     output_half = createOutOfBandConnection<T>( output_port, *input_p, policy);
00280             }
00281 
00282             if (!output_half)
00283                 return false;
00284 
00285             // Since output is local, buildChannelInput is local as well.
00286             // This this the input channel element of the whole connection
00287             base::ChannelElementBase::shared_ptr channel_input =
00288                 buildChannelInput<T>(output_port, input_port.getPortID(), output_half);
00289 
00290             return createAndCheckConnection(output_port, input_port, channel_input, policy );
00291         }
00292 
00300         template<class T>
00301         static bool createStream(OutputPort<T>& output_port, ConnPolicy const& policy)
00302         {
00303             StreamConnID *sid = new StreamConnID(policy.name_id);
00304             RTT::base::ChannelElementBase::shared_ptr chan = buildChannelInput( output_port, sid, base::ChannelElementBase::shared_ptr() );
00305             return createAndCheckStream(output_port, policy, chan, sid);
00306         }
00307 
00309         static bool createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr chan, StreamConnID* conn_id);
00310 
00318         template<class T>
00319         static bool createStream(InputPort<T>& input_port, ConnPolicy const& policy)
00320         {
00321             StreamConnID *sid = new StreamConnID(policy.name_id);
00322             RTT::base::ChannelElementBase::shared_ptr outhalf = buildChannelOutput( input_port, sid );
00323             if ( createAndCheckStream(input_port, policy, outhalf, sid) )
00324                 return true;
00325             input_port.removeConnection(sid);
00326             return false;
00327         }
00328 
00329     protected:
00330         static bool createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy policy);
00331 
00332         static bool createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id);
00333 
00334         static base::ChannelElementBase::shared_ptr createRemoteConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy);
00335 
00343         template<class T>
00344         static base::ChannelElementBase::shared_ptr createOutOfBandConnection(OutputPort<T>& output_port, InputPort<T>& input_port, ConnPolicy const& policy) {
00345             StreamConnID* conn_id = new StreamConnID(policy.name_id);
00346             RTT::base::ChannelElementBase::shared_ptr output_half = ConnFactory::buildChannelOutput<T>(input_port, conn_id);
00347             return createAndCheckOutOfBandConnection( output_port, input_port, policy, output_half, conn_id);
00348         }
00349 
00350         static base::ChannelElementBase::shared_ptr createAndCheckOutOfBandConnection( base::OutputPortInterface& output_port,
00351                                                                                        base::InputPortInterface& input_port,
00352                                                                                        ConnPolicy const& policy,
00353                                                                                        base::ChannelElementBase::shared_ptr output_half,
00354                                                                                        StreamConnID* conn_id);
00355     };
00356 
00357 }}
00358 
00359 #endif
00360