AtomicQueue.hpp

00001 /***************************************************************************
00002   tag: Peter Soetens  Wed Jan 18 14:11:39 CET 2006  AtomicQueue.hpp
00003 
00004                         AtomicQueue.hpp -  description
00005                            -------------------
00006     begin                : Wed January 18 2006
00007     copyright            : (C) 2006 Peter Soetens
00008     email                : peter.soetens@mech.kuleuven.be
00009 
00010  ***************************************************************************
00011  *   This library is free software; you can redistribute it and/or         *
00012  *   modify it under the terms of the GNU General Public                   *
00013  *   License as published by the Free Software Foundation;                 *
00014  *   version 2 of the License.                                             *
00015  *                                                                         *
00016  *   As a special exception, you may use this file as part of a free       *
00017  *   software library without restriction.  Specifically, if other files   *
00018  *   instantiate templates or use macros or inline functions from this     *
00019  *   file, or you compile this file and link it with other files to        *
00020  *   produce an executable, this file does not by itself cause the         *
00021  *   resulting executable to be covered by the GNU General Public          *
00022  *   License.  This exception does not however invalidate any other        *
00023  *   reasons why the executable file might be covered by the GNU General   *
00024  *   Public License.                                                       *
00025  *                                                                         *
00026  *   This library is distributed in the hope that it will be useful,       *
00027  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00028  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU     *
00029  *   Lesser General Public License for more details.                       *
00030  *                                                                         *
00031  *   You should have received a copy of the GNU General Public             *
00032  *   License along with this library; if not, write to the Free Software   *
00033  *   Foundation, Inc., 59 Temple Place,                                    *
00034  *   Suite 330, Boston, MA  02111-1307  USA                                *
00035  *                                                                         *
00036  ***************************************************************************/
00037 
00038 
00039 #ifndef ORO_QUEUE_LOCK_FREE_HPP
00040 #define ORO_QUEUE_LOCK_FREE_HPP
00041 
00042 #include <vector>
00043 #include "os/oro_atomic.h"
00044 #include "os/CAS.hpp"
00045 #include "BufferPolicy.hpp"
00046 
00047 namespace RTT
00048 {
00065     template< class T, class ReadPolicy = NonBlockingPolicy, class WritePolicy = NonBlockingPolicy>
00066     class AtomicQueue
00067     {
00068     public:
00074         const unsigned int MAX_THREADS;
00075 
00076         typedef T value_t;
00077     private:
00078         typedef std::vector<value_t> BufferType;
00079         typedef typename BufferType::iterator Iterator;
00080         typedef typename BufferType::const_iterator CIterator;
00081         struct Item {
00082             Item()  {
00083                 //ORO_ATOMIC_INIT(count);
00084                 oro_atomic_set(&count,-1);
00085             }
00086             mutable oro_atomic_t count;  // refcount
00087             BufferType data;
00088         };
00089 
00090         struct StorageImpl
00091         {
00092             Item* items;
00093             StorageImpl(size_t alloc) : items( new Item[alloc] ) {
00094             }
00095             ~StorageImpl() {
00096                 delete[] items;
00097             }
00098             Item& operator[](int i) {
00099                 return items[i];
00100             }
00101         };
00102 
00106         typedef StorageImpl* Storage;
00107 
00108         Storage newStorage(size_t alloc, size_t items, bool init = true)
00109         {
00110             Storage st( new StorageImpl(alloc) );
00111             for (unsigned int i=0; i < alloc; ++i) {
00112                 (*st)[i].data.reserve( items ); // pre-allocate
00113             }
00114             // bootstrap the first queue :
00115             if (init) {
00116                 active = &(*st)[0];
00117                 oro_atomic_inc( &active->count );
00118             }
00119 
00120             return st;
00121         }
00122 
00123         Storage bufs;
00124         Item* volatile active;
00125 
00126         // each thread has one 'working' buffer, and one 'active' buffer
00127         // lock. Thus we require to allocate twice as much buffers as threads,
00128         // for all the locks to succeed in a worst case scenario.
00129         inline size_t BufNum() const {
00130             return MAX_THREADS * 2;
00131         }
00132 
00133         WritePolicy write_policy;
00134         ReadPolicy read_policy;
00135 
00136         oro_atomic_t counter;
00137         oro_atomic_t dcounter;
00138     public:
00139         typedef unsigned int size_type;
00140 
00149         AtomicQueue(unsigned int lsize, unsigned int threads = ORONUM_OS_MAX_THREADS )
00150             : MAX_THREADS( threads ), write_policy(lsize), read_policy(0)
00151         {
00152             const unsigned int BUF_NUM = BufNum();
00153             bufs = newStorage( BUF_NUM, lsize );
00154             oro_atomic_set(&counter,0);
00155             oro_atomic_set(&dcounter,0);
00156         }
00157 
00158         ~AtomicQueue() {
00159             delete bufs;
00160         }
00161 
00162         size_type capacity() const
00163         {
00164             size_type res;
00165             Item* orig = lockAndGetActive();
00166             res = orig->data.capacity();
00167             oro_atomic_dec( &orig->count ); // lockAndGetActive
00168             return res;
00169         }
00170 
00171         size_type size() const
00172         {
00173             size_type res;
00174             Item* orig = lockAndGetActive();
00175             res = orig->data.size();
00176             oro_atomic_dec( &orig->count ); // lockAndGetActive
00177             return res;
00178         }
00179 
00184         bool isEmpty() const
00185         {
00186             bool res;
00187             Item* orig = lockAndGetActive();
00188             res = orig->data.empty();
00189             oro_atomic_dec( &orig->count ); // lockAndGetActive
00190             return res;
00191         }
00192 
00197         bool isFull() const
00198         {
00199             bool res;
00200             Item* orig = lockAndGetActive();
00201             res = (orig->data.size() == orig->data.capacity());
00202             oro_atomic_dec( &orig->count ); // lockAndGetActive
00203             return res;
00204         }
00205 
00206         void clear()
00207         {
00208             Item* orig(0);
00209             Item* nextbuf(0);
00210             int items = 0;
00211             do {
00212                 if (orig) {
00213                     oro_atomic_dec(&orig->count);
00214                     oro_atomic_dec(&nextbuf->count);
00215                 }
00216                 orig = lockAndGetActive();
00217                 items = orig->data.size();
00218                 nextbuf = findEmptyBuf(); // find unused Item in bufs
00219             } while ( OS::CAS(&active, orig, nextbuf ) == false );
00220             oro_atomic_dec( &orig->count ); // lockAndGetActive
00221             oro_atomic_dec( &orig->count ); // ref count
00222             oro_atomic_set(&counter,0);
00223             oro_atomic_set(&dcounter,0);
00224         }
00225 
00231         bool enqueue(const T& value)
00232         {
00233             Item* orig=0;
00234             Item* usingbuf(0);
00235             write_policy.pop();
00236             do {
00237                 if (orig) {
00238                     oro_atomic_dec(&orig->count);
00239                     oro_atomic_dec(&usingbuf->count);
00240                 }
00241                 orig = lockAndGetActive();
00242                 if ( orig->data.size() == orig->data.capacity() ) { // check for full
00243                     oro_atomic_dec( &orig->count );
00244                     write_policy.push(); // return our token.
00245                     return false;
00246                 }
00247                 usingbuf = findEmptyBuf(); // find unused Item in bufs
00248                 usingbuf->data = orig->data;
00249                 usingbuf->data.push_back( value );
00250             } while ( OS::CAS(&active, orig, usingbuf ) ==false);
00251             oro_atomic_dec( &orig->count ); // lockAndGetActive()
00252             oro_atomic_dec( &orig->count ); // set queue free
00253             read_policy.push();
00254             return true;
00255         }
00256 
00263         int enqueueCounted(const T& value)
00264         {
00265             if ( enqueue( value ) ) {
00266                 oro_atomic_inc(&counter);
00267                 return oro_atomic_read(&counter);
00268             }
00269             return 0;
00270         }
00271 
00277         bool dequeue( T& result )
00278         {
00279             Item* orig=0;
00280             Item* usingbuf(0);
00281             read_policy.pop();
00282             do {
00283                 if (orig) {
00284                     oro_atomic_dec(&orig->count);
00285                     oro_atomic_dec(&usingbuf->count);
00286                 }
00287                 orig = lockAndGetActive();
00288                 if ( orig->data.empty() ) { // check for empty
00289                     oro_atomic_dec( &orig->count );
00290                     read_policy.push();
00291                     return false;
00292                 }
00293                 usingbuf = findEmptyBuf(); // find unused Item in bufs
00294                 result = orig->data.front();
00295                 CIterator it = ++(orig->data.begin());
00296                 for ( ;  it != orig->data.end(); ++it )
00297                     usingbuf->data.push_back(*it);
00298                 //usingbuf->data.insert( usingbuf->data.end(), it, orig->data.end() ); // ALTERNATIVE. (does it allocate??)
00299             } while ( OS::CAS(&active, orig, usingbuf ) ==false);
00300             oro_atomic_dec( &orig->count ); // lockAndGetActive()
00301             oro_atomic_dec( &orig->count ); // set queue free
00302             write_policy.push();
00303             return true;
00304         }
00305 
00312         int dequeueCounted( T& result )
00313         {
00314             if (dequeue(result) ) {
00315                 oro_atomic_inc(&dcounter);
00316                 return oro_atomic_read(&dcounter);
00317             }
00318             return 0;
00319         }
00320 
00327         template<class MPoolType>
00328         T lockfront(MPoolType& mp) const
00329         {
00330             bool was_locked = false;
00331             Item* orig=0;
00332             T result;
00333             do {
00334                 if (orig) {
00335                     mp.unlock( orig->data.front() );
00336                     oro_atomic_dec(&orig->count);
00337                 }
00338                 orig = lockAndGetActive();
00339                 if ( orig->data.empty() ) { // check for empty
00340                     oro_atomic_dec( &orig->count ); //lockAndGetActive
00341                     return 0;
00342                 }
00343 
00344                 was_locked = mp.lock( orig->data.front() );
00345                 // retry if lock failed or read moved.
00346             } while( !was_locked );
00347             result = orig->data.front();
00348             oro_atomic_dec( &orig->count ); // lockAndGetActive()
00349             return result;
00350         }
00351 
00355         value_t front() const
00356         {
00357             Item* orig = lockAndGetActive();
00358             value_t ret(orig->data.front());
00359             oro_atomic_dec( &orig->count ); //lockAndGetActive
00360             return ret;
00361         }
00362 
00366         value_t back() const
00367         {
00368             Item* orig = lockAndGetActive();
00369             value_t ret(orig->data.back());
00370             oro_atomic_dec( &orig->count ); //lockAndGetActive
00371             return ret;
00372         }
00373 
00374     private:
00378         Item* findEmptyBuf() {
00379             // These two functions are copy/pasted from BufferLockFree.
00380             // If MAX_THREADS is large enough, this will always succeed :
00381             Item* start = &(*bufs)[0];
00382             while( true ) {
00383                 if ( oro_atomic_inc_and_test( &start->count ) )
00384                     break;
00385                 oro_atomic_dec( &start->count );
00386                 ++start;
00387                 if (start == &(*bufs)[0] + BufNum() )
00388                     start = &(*bufs)[0]; // in case of races, rewind
00389             }
00390             start->data.clear();
00391             return start; // unique pointer across all threads
00392         }
00393 
00398         Item* lockAndGetActive() const {
00399             // only operates on active's refcount.
00400             Item* orig=0;
00401             do {
00402                 if (orig)
00403                     oro_atomic_dec( &orig->count );
00404                 orig = active;
00405                 oro_atomic_inc( &orig->count );
00406                 // this synchronisation point is 'aggressive' (a _sufficient_ condition)
00407                 // if active is still equal to orig, the increase of orig->count is
00408                 // surely valid, since no contention (change of active) occured.
00409             } while ( active != orig );
00410             return orig;
00411         }
00412     };
00413 
00414 }
00415 
00416 #endif
Generated on Thu Dec 23 13:22:36 2010 for Orocos Real-Time Toolkit by  doxygen 1.6.3