Orocos Real-Time Toolkit  2.6.0
ConnFactory.hpp
00001 /***************************************************************************
00002   tag: Peter Soetens  Thu Oct 22 11:59:08 CEST 2009  ConnFactory.hpp
00003 
00004                         ConnFactory.hpp -  description
00005                            -------------------
00006     begin                : Thu October 22 2009
00007     copyright            : (C) 2009 Peter Soetens
00008     email                : peter@thesourcworks.com
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #ifndef ORO_CONN_FACTORY_HPP
00040 #define ORO_CONN_FACTORY_HPP
00041 
00042 #include <string>
00043 #include "Channels.hpp"
00044 #include "ConnInputEndPoint.hpp"
00045 #include "ConnOutputEndPoint.hpp"
00046 #include "../base/PortInterface.hpp"
00047 #include "../base/InputPortInterface.hpp"
00048 #include "../base/OutputPortInterface.hpp"
00049 #include "../DataFlowInterface.hpp"
00050 
00051 #include "../base/DataObject.hpp"
00052 #include "../base/DataObjectUnSync.hpp"
00053 #include "../base/Buffer.hpp"
00054 #include "../base/BufferUnSync.hpp"
00055 #include "../Logger.hpp"
00056 
00057 namespace RTT
00058 { namespace internal {
00059 
00063     struct LocalConnID : public ConnID
00064     {
00065         base::PortInterface const* ptr;
00066         LocalConnID(base::PortInterface const* obj)
00067             : ptr(obj) {}
00068         virtual ConnID* clone() const;
00069         virtual bool isSameID(ConnID const& id) const;
00070     };
00071 
00075     struct RTT_API StreamConnID : public ConnID
00076     {
00077         std::string name_id;
00078         StreamConnID(const std::string& name)
00079             : name_id(name) {}
00080         virtual ConnID* clone() const;
00081         virtual bool isSameID(ConnID const& id) const;
00082     };
00083 
00084 
00091     class RTT_API ConnFactory
00092     {
00093     public:
00094         virtual ~ConnFactory() {}
00095 
00100         virtual base::InputPortInterface* inputPort(std::string const& name) const = 0;
00101 
00106         virtual base::OutputPortInterface* outputPort(std::string const& name) const = 0;
00107 
00114         virtual base::ChannelElementBase::shared_ptr buildDataStorage(ConnPolicy const& policy) const = 0;
00115 
00122         virtual base::ChannelElementBase::shared_ptr buildChannelOutput(base::InputPortInterface& port) const = 0;
00129         virtual base::ChannelElementBase::shared_ptr buildChannelInput(base::OutputPortInterface& port) const = 0;
00130 
00138         template<typename T>
00139         static base::ChannelElementBase* buildDataStorage(ConnPolicy const& policy, const T& initial_value = T())
00140         {
00141             if (policy.type == ConnPolicy::DATA)
00142             {
00143                 typename base::DataObjectInterface<T>::shared_ptr data_object;
00144                 switch (policy.lock_policy)
00145                 {
00146 #ifndef OROBLD_OS_NO_ASM
00147                 case ConnPolicy::LOCK_FREE:
00148                     data_object.reset( new base::DataObjectLockFree<T>(initial_value) );
00149                     break;
00150 #else
00151         case ConnPolicy::LOCK_FREE:
00152             RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
00153 #endif
00154                 case ConnPolicy::LOCKED:
00155                     data_object.reset( new base::DataObjectLocked<T>(initial_value) );
00156                     break;
00157                 case ConnPolicy::UNSYNC:
00158                     data_object.reset( new base::DataObjectUnSync<T>(initial_value) );
00159                     break;
00160                 }
00161 
00162                 ChannelDataElement<T>* result = new ChannelDataElement<T>(data_object);
00163                 return result;
00164             }
00165             else if (policy.type == ConnPolicy::BUFFER || policy.type == ConnPolicy::CIRCULAR_BUFFER)
00166             {
00167                 base::BufferInterface<T>* buffer_object = 0;
00168                 switch (policy.lock_policy)
00169                 {
00170 #ifndef OROBLD_OS_NO_ASM
00171                 case ConnPolicy::LOCK_FREE:
00172                     buffer_object = new base::BufferLockFree<T>(policy.size, initial_value, policy.type == ConnPolicy::CIRCULAR_BUFFER);
00173                     break;
00174 #else
00175         case ConnPolicy::LOCK_FREE:
00176             RTT::log(Warning) << "lock free connection policy is unavailable on this system, defaulting to LOCKED" << RTT::endlog();
00177 #endif
00178                 case ConnPolicy::LOCKED:
00179                     buffer_object = new base::BufferLocked<T>(policy.size, initial_value, policy.type == ConnPolicy::CIRCULAR_BUFFER);
00180                     break;
00181                 case ConnPolicy::UNSYNC:
00182                     buffer_object = new base::BufferUnSync<T>(policy.size, initial_value, policy.type == ConnPolicy::CIRCULAR_BUFFER);
00183                     break;
00184                 }
00185                 return new ChannelBufferElement<T>(typename base::BufferInterface<T>::shared_ptr(buffer_object));
00186             }
00187             return NULL;
00188         }
00189 
00198         template<typename T>
00199         static base::ChannelElementBase::shared_ptr buildChannelInput(OutputPort<T>& port, ConnID* conn_id, base::ChannelElementBase::shared_ptr output_channel)
00200         {
00201             assert(conn_id);
00202             base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id);
00203             if (output_channel)
00204                 endpoint->setOutput(output_channel);
00205             return endpoint;
00206         }
00207 
00218         template<typename T>
00219         static base::ChannelElementBase::shared_ptr buildBufferedChannelInput(OutputPort<T>& port, ConnID* conn_id, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr output_channel)
00220         {
00221             assert(conn_id);
00222             base::ChannelElementBase::shared_ptr endpoint = new ConnInputEndpoint<T>(&port, conn_id);
00223             base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, port.getLastWrittenValue() );
00224             endpoint->setOutput(data_object);
00225             if (output_channel)
00226                 data_object->setOutput(output_channel);
00227             return endpoint;
00228         }
00229 
00237         template<typename T>
00238         static base::ChannelElementBase::shared_ptr buildChannelOutput(InputPort<T>& port, ConnID* conn_id)
00239         {
00240             assert(conn_id);
00241             base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id);
00242             return endpoint;
00243         }
00244 
00254         template<typename T>
00255         static base::ChannelElementBase::shared_ptr buildBufferedChannelOutput(InputPort<T>& port, ConnID* conn_id, ConnPolicy const& policy, T const& initial_value = T() )
00256         {
00257             assert(conn_id);
00258             base::ChannelElementBase::shared_ptr endpoint = new ConnOutputEndpoint<T>(&port, conn_id);
00259             base::ChannelElementBase::shared_ptr data_object = buildDataStorage<T>(policy, initial_value);
00260             data_object->setOutput(endpoint);
00261             return data_object;
00262         }
00263 
00273         template<typename T>
00274         static bool createConnection(OutputPort<T>& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy)
00275         {
00276             if ( !output_port.isLocal() ) {
00277                 log(Error) << "Need a local OutputPort to create connections." <<endlog();
00278                 return false;
00279             }
00280 
00281             InputPort<T>* input_p = dynamic_cast<InputPort<T>*>(&input_port);
00282 
00283             // This is the input channel element of the output half
00284             base::ChannelElementBase::shared_ptr output_half = 0;
00285             if (input_port.isLocal() && policy.transport == 0)
00286             {
00287                 // Local connection
00288                 if (!input_p)
00289                 {
00290                     log(Error) << "Port " << input_port.getName() << " is not compatible with " << output_port.getName() << endlog();
00291                     return false;
00292                 }
00293                 // local ports, create buffer here.
00294                 output_half = buildBufferedChannelOutput<T>(*input_p, output_port.getPortID(), policy, output_port.getLastWrittenValue());
00295             }
00296             else
00297             {
00298                 // if the input is not local, this is a pure remote connection,
00299                 // if the input *is* local, the user requested to use a different transport
00300                 // than plain memory, rare case, but we accept it. The unit tests use this for example
00301                 // to test the OOB transports.
00302                 if ( !input_port.isLocal() ) {
00303                     output_half = createRemoteConnection( output_port, input_port, policy);
00304                 } else
00305                     output_half = createOutOfBandConnection<T>( output_port, *input_p, policy);
00306             }
00307 
00308             if (!output_half)
00309                 return false;
00310 
00311             // Since output is local, buildChannelInput is local as well.
00312             // This this the input channel element of the whole connection
00313             base::ChannelElementBase::shared_ptr channel_input =
00314                 buildChannelInput<T>(output_port, input_port.getPortID(), output_half);
00315 
00316             return createAndCheckConnection(output_port, input_port, channel_input, policy );
00317         }
00318 
00326         template<class T>
00327         static bool createStream(OutputPort<T>& output_port, ConnPolicy const& policy)
00328         {
00329             StreamConnID *sid = new StreamConnID(policy.name_id);
00330             RTT::base::ChannelElementBase::shared_ptr chan = buildChannelInput( output_port, sid, base::ChannelElementBase::shared_ptr() );
00331             return createAndCheckStream(output_port, policy, chan, sid);
00332         }
00333 
00335         static bool createAndCheckStream(base::OutputPortInterface& output_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr chan, StreamConnID* conn_id);
00336 
00344         template<class T>
00345         static bool createStream(InputPort<T>& input_port, ConnPolicy const& policy)
00346         {
00347             StreamConnID *sid = new StreamConnID(policy.name_id);
00348             RTT::base::ChannelElementBase::shared_ptr outhalf = buildChannelOutput( input_port, sid );
00349             if ( createAndCheckStream(input_port, policy, outhalf, sid) )
00350                 return true;
00351             input_port.removeConnection(sid);
00352             return false;
00353         }
00354 
00355     protected:
00356         static bool createAndCheckConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, base::ChannelElementBase::shared_ptr channel_input, ConnPolicy policy);
00357 
00358         static bool createAndCheckStream(base::InputPortInterface& input_port, ConnPolicy const& policy, base::ChannelElementBase::shared_ptr outhalf, StreamConnID* conn_id);
00359 
00360         static base::ChannelElementBase::shared_ptr createRemoteConnection(base::OutputPortInterface& output_port, base::InputPortInterface& input_port, ConnPolicy const& policy);
00361 
00369         template<class T>
00370         static base::ChannelElementBase::shared_ptr createOutOfBandConnection(OutputPort<T>& output_port, InputPort<T>& input_port, ConnPolicy const& policy) {
00371             StreamConnID* conn_id = new StreamConnID(policy.name_id);
00372             RTT::base::ChannelElementBase::shared_ptr output_half = ConnFactory::buildChannelOutput<T>(input_port, conn_id);
00373             return createAndCheckOutOfBandConnection( output_port, input_port, policy, output_half, conn_id);
00374         }
00375 
00376         static base::ChannelElementBase::shared_ptr createAndCheckOutOfBandConnection( base::OutputPortInterface& output_port,
00377                                                                                        base::InputPortInterface& input_port,
00378                                                                                        ConnPolicy const& policy,
00379                                                                                        base::ChannelElementBase::shared_ptr output_half,
00380                                                                                        StreamConnID* conn_id);
00381     };
00382 
00383         typedef boost::shared_ptr<ConnFactory> ConnFactoryPtr;
00384 
00385 
00386     }
00387 }
00388 
00389 #endif
00390