Orocos Real-Time Toolkit  2.6.0
AtomicMWSRQueue.hpp
00001 /***************************************************************************
00002   tag: The SourceWorks  Tue Sep 7 00:55:18 CEST 2010  AtomicMWSRQueue.hpp
00003 
00004                         AtomicMWSRQueue.hpp -  description
00005                            -------------------
00006     begin                : Tue September 07 2010
00007     copyright            : (C) 2010 The SourceWorks
00008     email                : peter@thesourceworks.com
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #ifndef ORO_CORELIB_ATOMIC_MWSR_QUEUE_HPP
00040 #define ORO_CORELIB_ATOMIC_MWSR_QUEUE_HPP
00041 
00042 #include "../os/CAS.hpp"
00043 #include <utility>
00044 
00045 namespace RTT
00046 {
00047     namespace internal
00048     {
00058         template<class T>
00059         class AtomicMWSRQueue
00060         {
00061             //typedef _T* T;
00062             const int _size;
00063             typedef T C;
00064             typedef volatile C* CachePtrType;
00065             typedef C* volatile CacheObjType;
00066             typedef C ValueType;
00067             typedef C* PtrType;
00068 
00076             union SIndexes
00077             {
00078                 unsigned long _value;
00079                 unsigned short _index[2];
00080             };
00081 
00086             CachePtrType _buf;
00087 
00092             volatile SIndexes _indxes;
00093 
00098             CachePtrType advance_w()
00099             {
00100                 SIndexes oldval, newval;
00101                 do
00102                 {
00103                     oldval._value = _indxes._value; /*Points to a free writable pointer.*/
00104                     newval._value = oldval._value; /*Points to the next writable pointer.*/
00105                     // check for full :
00106                     if ((newval._index[0] == newval._index[1] - 1) || (newval._index[0] == newval._index[1] + _size - 1))
00107                     {
00108                         return 0;
00109                     }
00110                     newval._index[0]++;
00111                     if (newval._index[0] >= _size)
00112                         newval._index[0] = 0;
00113                     // if ptr is unchanged, replace it with newval.
00114                 } while (!os::CAS(&_indxes._value, oldval._value, newval._value));
00115                 // frome here on :
00116                 // oldval is 'unique', other preempting threads
00117                 // will have a different value for oldval, as
00118                 // _wptr advances. As long as oldval has not been written,
00119                 // rptr will not advance and wptr will remain stuck behind it.
00120                 // return the old position to write to :
00121                 return &_buf[oldval._index[0]];
00122             }
00123 
00128             bool advance_r(T& result)
00129             {
00130                 SIndexes oldval, newval;
00131                 // read it:
00132                 oldval._value = _indxes._value;
00133                 result = _buf[oldval._index[1]];
00134                 // return it if not yet written:
00135                 if ( !result )
00136                     return false;
00137                 // got it, clear field.
00138                 _buf[oldval._index[1]] = 0;
00139 
00140                 // move pointer:
00141                 do
00142                 {
00143                     // re-read indxes, since we are the only reader,
00144                     // _index[1] will not have changed since entry of this function
00145                     oldval._value = _indxes._value;
00146                     newval._value = oldval._value;
00147                     ++newval._index[1];
00148                     if (newval._index[1] >= _size)
00149                         newval._index[1] = 0;
00150 
00151                     // we need to CAS since the write pointer may have moved.
00152                     // this moves read pointer only:
00153                 } while (!os::CAS(&_indxes._value, oldval._value, newval._value));
00154 
00155                 return true;
00156             }
00157 
00158             // non-copyable !
00159             AtomicMWSRQueue(const AtomicMWSRQueue<T>&);
00160         public:
00161             typedef unsigned int size_type;
00162 
00167             AtomicMWSRQueue(unsigned int size) :
00168                 _size(size + 1)
00169             {
00170                 _buf = new C[_size];
00171                 this->clear();
00172             }
00173 
00174             ~AtomicMWSRQueue()
00175             {
00176                 delete[] _buf;
00177             }
00178 
00183             bool isFull() const
00184             {
00185                 // two cases where the queue is full :
00186                 // if wptr is one behind rptr or if wptr is at end
00187                 // and rptr at beginning.
00188                 SIndexes val;
00189                 val._value = _indxes._value;
00190                 return val._index[0] == val._index[1] - 1 || val._index[0] == val._index[1] + _size - 1;
00191             }
00192 
00197             bool isEmpty() const
00198             {
00199                 // empty if nothing to read.
00200                 SIndexes val;
00201                 val._value = _indxes._value;
00202                 return val._index[0] == val._index[1];
00203             }
00204 
00208             size_type capacity() const
00209             {
00210                 return _size - 1;
00211             }
00212 
00216             size_type size() const
00217             {
00218                 SIndexes val;
00219                 val._value = _indxes._value;
00220                 int c = (val._index[0] - val._index[1]);
00221                 return c >= 0 ? c : c + _size;
00222             }
00223 
00229             bool enqueue(const T& value)
00230             {
00231                 if (value == 0)
00232                     return false;
00233                 CachePtrType loc = advance_w();
00234                 if (loc == 0)
00235                     return false;
00236                 *loc = value;
00237                 return true;
00238             }
00239 
00247             bool dequeue(T& result)
00248             {
00249                 T tmpresult;
00250                 if (advance_r(tmpresult) ) {
00251                     result = tmpresult;
00252                     return true;
00253                 }
00254                 return false;
00255             }
00256 
00260             const T front() const
00261             {
00262                 return _buf[_indxes._index[1]];
00263             }
00264 
00268             void clear()
00269             {
00270                 for (int i = 0; i != _size; ++i)
00271                 {
00272                     _buf[i] = 0;
00273                 }
00274                 _indxes._value = 0;
00275             }
00276 
00277         };
00278 
00279     }
00280 }
00281 #endif