Orocos Real-Time Toolkit  2.5.0
AtomicQueue.hpp
00001 /***************************************************************************
00002   tag: The SourceWorks  Tue Sep 7 00:55:18 CEST 2010  AtomicQueue.hpp
00003 
00004                         AtomicQueue.hpp -  description
00005                            -------------------
00006     begin                : Tue September 07 2010
00007     copyright            : (C) 2010 The SourceWorks
00008     email                : peter@thesourceworks.com
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #ifndef ORO_CORELIB_ATOMIC_QUEUE_HPP
00040 #define ORO_CORELIB_ATOMIC_QUEUE_HPP
00041 
00042 #include "../os/CAS.hpp"
00043 #include <utility>
00044 
00045 namespace RTT
00046 {
00047     namespace internal {
00069     template<class T>
00070     class AtomicQueue
00071     {
00072         const int _size;
00073         typedef T C;
00074         typedef volatile C* CachePtrType;
00075         typedef C* volatile CacheObjType;
00076         typedef C  ValueType;
00077         typedef C* PtrType;
00078 
00079         union SIndexes
00080         {
00081             unsigned long _value;
00082             unsigned short _index[2];
00083         };
00084 
00089         CachePtrType  _buf;
00090 
00095         volatile SIndexes _indxes;
00096 
00104         CachePtrType recover_r() const
00105         {
00106             // The implementation starts from the read pointer,
00107             // and wraps around until all fields were scanned.
00108             // As such, the out-of-order elements will at least
00109             // be returned in their relative order.
00110             SIndexes start;
00111             start._value = _indxes._value;
00112             unsigned short r = start._index[1];
00113             while( r != _size) {
00114                 if (_buf[r])
00115                     return &_buf[r];
00116                 ++r;
00117             }
00118             r = 0;
00119             while( r != start._index[1]) {
00120                 if (_buf[r])
00121                     return &_buf[r];
00122                 ++r;
00123             }
00124             return 0;
00125         }
00126 
00131         CachePtrType propose_w()
00132         {
00133             SIndexes oldval, newval;
00134             do {
00135                 oldval._value = _indxes._value; /*Points to a free writable pointer.*/
00136                 newval._value = oldval._value; /*Points to the next writable pointer.*/
00137                 // check for full on a *Copy* of oldval:
00138                 if ( (newval._index[0] == newval._index[1] - 1) || (newval._index[0] == newval._index[1] + _size - 1) )
00139                 {
00140                     // note: in case of high contention, there might be existing empty fields
00141                     // in _buf that aren't used.
00142                     return 0;
00143                 }
00144                 ++newval._index[0];
00145                 if ( newval._index[0] == _size )
00146                     newval._index[0] = 0;
00147                 // if ptr is unchanged, replace it with newval.
00148             } while ( !os::CAS( &_indxes._value, oldval._value, newval._value) );
00149 
00150             // the returned field may contain data, in that case, the caller needs to retry.
00151             return &_buf[ oldval._index[0] ];
00152         }
00157         CachePtrType propose_r()
00158         {
00159             SIndexes oldval, newval;
00160             do {
00161                 oldval._value = _indxes._value;
00162                 newval._value = oldval._value;
00163                 // check for empty on a *Copy* of oldval:
00164                 if ( newval._index[0] == newval._index[1] )
00165                 {
00166                     // seldom: R and W are indicating empty, but 'lost' fields
00167                     // are to be picked up. Return these
00168                     // that would have been read eventually after some writes.
00169                     return recover_r();
00170                 }
00171                 ++newval._index[1];
00172                 if ( newval._index[1] == _size )
00173                     newval._index[1] = 0;
00174 
00175             } while ( !os::CAS( &_indxes._value, oldval._value, newval._value) );
00176             // the returned field may contain *no* data, in that case, the caller needs to retry.
00177             // as such r will advance until it hits a data sample or write pointer.
00178             return &_buf[oldval._index[1] ];
00179         }
00180 
00181         // non-copyable !
00182         AtomicQueue( const AtomicQueue<T>& );
00183     public:
00184         typedef unsigned int size_type;
00185 
00190         AtomicQueue( unsigned int size )
00191             : _size(size+1)
00192         {
00193             _buf= new C[_size];
00194             this->clear();
00195         }
00196 
00197         ~AtomicQueue()
00198         {
00199             delete[] _buf;
00200         }
00201 
00206         bool isFull() const
00207         {
00208             // two cases where the queue is full :
00209             // if wptr is one behind rptr or if wptr is at end
00210             // and rptr at beginning.
00211             SIndexes val;
00212             val._value = _indxes._value;
00213             return val._index[0] == val._index[1] - 1 || val._index[0] == val._index[1] + _size - 1;
00214         }
00215 
00220         bool isEmpty() const
00221         {
00222             // empty if nothing to read.
00223             SIndexes val;
00224             val._value = _indxes._value;
00225             return val._index[0] == val._index[1] && recover_r() == 0;
00226         }
00227 
00231         size_type capacity() const
00232         {
00233             return _size -1;
00234         }
00235 
00241         size_type size() const
00242         {
00243             int c = 0, ret = 0;
00244             while (c != _size ) {
00245                 if (_buf[c++] )
00246                     ++ret;
00247             }
00248             return ret;
00249             //int c = (_indxes._index[0] - _indxes._index[1]);
00250             //return c >= 0 ? c : c + _size;
00251         }
00252 
00258         bool enqueue(const T& value)
00259         {
00260             if ( value == 0 )
00261                 return false;
00262             CachePtrType loc;
00263             C null = 0;
00264             do {
00265                 loc = propose_w();
00266                 if ( loc == 0 )
00267                     return false; //full
00268                 // if loc contains a zero, write it, otherwise, re-try.
00269             } while( !os::CAS(loc, null, value));
00270             return true;
00271         }
00272 
00278         bool dequeue( T& result )
00279         {
00280             CachePtrType loc;
00281             C null = 0;
00282             do {
00283                 loc = propose_r();
00284                 if ( loc == 0 )
00285                     return false; // empty
00286                 result = *loc;
00287                 // if loc still contains result, clear it, otherwise, re-try.
00288             } while( result == 0 || !os::CAS(loc, result, null) );
00289             assert(result);
00290             return true;
00291         }
00292 
00296         const T front() const
00297         {
00298             return _buf[_indxes._index[1] ];
00299         }
00300 
00304         void clear()
00305         {
00306             for(int i = 0 ; i != _size; ++i) {
00307                 _buf[i] = 0;
00308             }
00309             _indxes._value = 0;
00310         }
00311 
00312     };
00313 
00314 }}
00315 
00316 #endif