AtomicQueue

Hi,

I remember a problem with the AtomicQueue implementation of pre rtt-1.18.
Does anyone remember what that was all about? We are still on pre rtt-1.18
and I fear we've run in to this problem.

Kind regards,
Butch.

AtomicQueue

On Mon, Dec 14, 2009 at 15:54, Butch Slayer <butch [dot] slayers [..] ...> wrote:
> Hi,
>
> I remember a problem with the AtomicQueue implementation of pre rtt-1.18.
> Does anyone remember what that was all about? We are still on pre rtt-1.18
> and I fear we've run in to this problem.

Bug #307: https://www.fmtc.be/bugzilla/orocos/show_bug.cgi?id=307

The initial implementation of AQ was not safe against 'overruns'. If a
low prio thread was inserting/removing an item, and two other high
prio threads were massively inserting/removing pointers, the low-prio
thread could, if woken up at the right moment, assume its
insertion/removal was fine, but it wasn't, because the buffer
round-tripped upto the same point.

If you only have two threads, the old impl was fine. We had a
discussion about this earlier in 2007, see the bug report.

Peter

AtomicQueue

On Tue, Dec 15, 2009 at 10:19, Butch Slayer <butch [dot] slayers [..] ...> wrote:
>
>
> 2009/12/14 Butch Slayer <butch [dot] slayers [..] ...>
>>
>>
>> 2009/12/14 Peter Soetens <peter [..] ...>
>>>
>>> On Mon, Dec 14, 2009 at 15:54, Butch Slayer <butch [dot] slayers [..] ...>
>>> wrote:
>>> > Hi,
>>> >
>>> > I remember a problem with the AtomicQueue implementation of pre
>>> > rtt-1.18.
>>> > Does anyone remember what that was all about? We are still on pre
>>> > rtt-1.18
>>> > and I fear we've run in to this problem.
>>>
>>> Bug #307: https://www.fmtc.be/bugzilla/orocos/show_bug.cgi?id=307
>>>
>>> The initial implementation of AQ was not safe against 'overruns'. If a
>>> low prio thread was inserting/removing an item, and two other high
>>> prio threads were massively inserting/removing pointers, the low-prio
>>> thread could, if woken up at the right moment, assume its
>>> insertion/removal was fine, but it wasn't, because the buffer
>>> round-tripped upto the same point.
>>>
>>> If you only have two threads, the old impl was fine. We had a
>>> discussion about this earlier in 2007, see the bug report.
>>>
>>> Peter
>>
>> Oh crap, I've got two threads (even three is possible) writing data in a
>> buffer.
>> I better use a BufferLocked here...
>>
>>
> An other problem arises with the BufferLocked: It uses mutexes, of which we
> don't have to much to spare.
> And since an AtomicQueue is used quite a lot through out Orocos I have to
> deal with it.
> The idea is that I create a locked queue of which a locking mechanism can be
> given upon creation. In my case the locking will be no more than disabling
> an enabling of interrupts. In other cases a mutex can be used. The defaults
> will be configured through a header file.
>
> What is your stance on this?

Why don't you use the new AtomicQueue implementation ? It requires
more memory, but you can specify the max number of threads in the
constructor.

Providing a separate locking mechanism can only be done in a new
class, similar to BufferLocked, but certainly different named (like
BufferSharedLocked). I would only do this as the very last resort.

Peter

AtomicQueue

On Tue, 15 Dec 2009, Peter Soetens wrote:

> On Tue, Dec 15, 2009 at 10:19, Butch Slayer <butch [dot] slayers [..] ...> wrote:
>>
>>
>> 2009/12/14 Butch Slayer <butch [dot] slayers [..] ...>
>>>
>>>
>>> 2009/12/14 Peter Soetens <peter [..] ...>
>>>>
>>>> On Mon, Dec 14, 2009 at 15:54, Butch Slayer <butch [dot] slayers [..] ...>
>>>> wrote:
>>>>> Hi,
>>>>>
>>>>> I remember a problem with the AtomicQueue implementation of pre
>>>>> rtt-1.18.
>>>>> Does anyone remember what that was all about? We are still on pre
>>>>> rtt-1.18
>>>>> and I fear we've run in to this problem.
>>>>
>>>> Bug #307: https://www.fmtc.be/bugzilla/orocos/show_bug.cgi?id=307
>>>>
>>>> The initial implementation of AQ was not safe against 'overruns'. If a
>>>> low prio thread was inserting/removing an item, and two other high
>>>> prio threads were massively inserting/removing pointers, the low-prio
>>>> thread could, if woken up at the right moment, assume its
>>>> insertion/removal was fine, but it wasn't, because the buffer
>>>> round-tripped upto the same point.
>>>>
>>>> If you only have two threads, the old impl was fine. We had a
>>>> discussion about this earlier in 2007, see the bug report.
>>>>
>>>> Peter
>>>
>>> Oh crap, I've got two threads (even three is possible) writing data in a
>>> buffer.
>>> I better use a BufferLocked here...
>>>
>>>
>> An other problem arises with the BufferLocked: It uses mutexes, of which we
>> don't have to much to spare.
>> And since an AtomicQueue is used quite a lot through out Orocos I have to
>> deal with it.
>> The idea is that I create a locked queue of which a locking mechanism can be
>> given upon creation. In my case the locking will be no more than disabling
>> an enabling of interrupts. In other cases a mutex can be used. The defaults
>> will be configured through a header file.
>>
>> What is your stance on this?
>
> Why don't you use the new AtomicQueue implementation ? It requires
> more memory, but you can specify the max number of threads in the
> constructor.
>
> Providing a separate locking mechanism can only be done in a new
> class, similar to BufferLocked, but certainly different named (like
> BufferSharedLocked). I would only do this as the very last resort.
>
> Peter

A more fundamental (hence 2.0...?) answer: buffers are a shared resource,
_hence_ they should have a mechanism that sharing components can use to
_coordinate_ access to the resource. In buffer terms, this is very similar
to the well-known "low water/high water" coordination pattern. Applications
that neglect the finiteness of resources will pay, sooner or later, for
neglecting to obey these coordination events... Such events _could_ be part
of the RTT, or, at least, your application should provide some similar
coordination mechanism. (Unless your system is so centralized and simple
that maximum loads can be predicted deterministically.)

Herman

AtomicQueue

2009/12/15 Butch Slayer <butch [dot] slayers [..] ...>

>
>
> 2009/12/15 Peter Soetens <peter [..] ...>
>
> On Tue, Dec 15, 2009 at 10:19, Butch Slayer <butch [dot] slayers [..] ...>
>> wrote:
>> >
>> >
>> > 2009/12/14 Butch Slayer <butch [dot] slayers [..] ...>
>> >>
>> >>
>> >> 2009/12/14 Peter Soetens <peter [..] ...>
>> >>>
>> >>> On Mon, Dec 14, 2009 at 15:54, Butch Slayer <butch [dot] slayers [..] ...>
>> >>> wrote:
>> >>> > Hi,
>> >>> >
>> >>> > I remember a problem with the AtomicQueue implementation of pre
>> >>> > rtt-1.18.
>> >>> > Does anyone remember what that was all about? We are still on pre
>> >>> > rtt-1.18
>> >>> > and I fear we've run in to this problem.
>> >>>
>> >>> Bug #307: https://www.fmtc.be/bugzilla/orocos/show_bug.cgi?id=307
>> >>>
>> >>> The initial implementation of AQ was not safe against 'overruns'. If a
>> >>> low prio thread was inserting/removing an item, and two other high
>> >>> prio threads were massively inserting/removing pointers, the low-prio
>> >>> thread could, if woken up at the right moment, assume its
>> >>> insertion/removal was fine, but it wasn't, because the buffer
>> >>> round-tripped upto the same point.
>> >>>
>> >>> If you only have two threads, the old impl was fine. We had a
>> >>> discussion about this earlier in 2007, see the bug report.
>> >>>
>> >>> Peter
>> >>
>> >> Oh crap, I've got two threads (even three is possible) writing data in
>> a
>> >> buffer.
>> >> I better use a BufferLocked here...
>> >>
>> >>
>> > An other problem arises with the BufferLocked: It uses mutexes, of which
>> we
>> > don't have to much to spare.
>> > And since an AtomicQueue is used quite a lot through out Orocos I have
>> to
>> > deal with it.
>> > The idea is that I create a locked queue of which a locking mechanism
>> can be
>> > given upon creation. In my case the locking will be no more than
>> disabling
>> > an enabling of interrupts. In other cases a mutex can be used. The
>> defaults
>> > will be configured through a header file.
>> >
>> > What is your stance on this?
>>
>> Why don't you use the new AtomicQueue implementation ? It requires
>> more memory, but you can specify the max number of threads in the
>> constructor.
>>
>> Providing a separate locking mechanism can only be done in a new
>> class, similar to BufferLocked, but certainly different named (like
>> BufferSharedLocked). I would only do this as the very last resort.
>>
>> Peter
>>
> We are tight on memory too...
>
> What about a BufferLocked with locking policies as template arguments?
> For example:
> template<class T, class Locker=OS::MutexLock, Lock=OS::Mutex>
> class BufferLocked
> {
> ....
> }
>
> Every thing is left as is except for the definition of the Mutex and the
> call to the MutexLock
>
> And it is a minor fix.
>
> Butch.
>
So that the rest of the mailing list can enjoy too. I messed up with the
'reply all' button. Sorry

Butch.

AtomicQueue

Hi,

I've patched the pre 1.18 AtomicQueue implementation. It now works fine for
my case:

Index: orocos/corelib/buffers/current/include/AtomicQueue.hpp
===================================================================
--- orocos/corelib/buffers/current/include/AtomicQueue.hpp (revision
2900)
+++ orocos/corelib/buffers/current/include/AtomicQueue.hpp (working copy)
@@ -58,13 +58,19 @@
//typedef _T* T;
const int _size;
typedef std::pair<T, int> C;
- typedef volatile C* volatile WriteType;
- typedef volatile C* volatile const ReadType;
+// typedef volatile C* volatile WriteType;
+// typedef volatile C* volatile const ReadType;
typedef volatile C* CachePtrType;
typedef C* volatile CacheObjType;
typedef C ValueType;
typedef C* PtrType;

+ union SIndexes
+ {
+ unsigned long _value;
+ unsigned short _index[2];
+ };
+
/**
* The pointer to the buffer can be cached,
* the contents are volatile.
@@ -72,16 +78,11 @@
CachePtrType _buf;

/**
- * A volatile pointer to a volatile cell.
+ * The indexes are packed into one double word.
+ * Therefore the read and write index can be read and written
atomically.
*/
- WriteType _wptr;
+ volatile SIndexes _indxes;

- /**
- * This pointer is also writable because we
- * set it to zero to indicate it has been read.
- */
- WriteType _rptr;
-
WritePolicy write_policy;
ReadPolicy read_policy;

@@ -91,24 +92,29 @@
*/
CachePtrType advance_w()
{
- CachePtrType oldval, newval;
+ SIndexes oldval, newval;
+ bool full=false;
do {
- oldval = _wptr;
- newval = oldval+1;
- if ( newval >= _buf+_size )
- newval -= _size;
+ oldval._value = _indxes._value; /*Points to a free writable
pointer.*/
+ newval._value = oldval._value; /*Points to the next
writable pointer.*/
// check for full :
- if ( newval->first != 0 || newval == _rptr || newval ==
_rptr + _size )
+ if ( (newval._index[0] == newval._index[1] - 1) ||
(newval._index[0] == newval._index[1] + _size - 1) )
+ {
return 0;
+ }
+ newval._index[0]++;
+ if ( newval._index[0] >= _size )
+ newval._index[0] = 0;
// if ptr is unchanged, replace it with newval.
- } while ( !OS::CAS( &_wptr, oldval, newval) );
+ } while ( !OS::CAS( &_indxes._value, oldval._value,
newval._value) );
// frome here on :
// oldval is 'unique', other preempting threads
// will have a different value for oldval, as
// _wptr advances.
-
+ if(full==true)
+ return 0;
// return the old position to write to :
- return oldval;
+ return &_buf[oldval._index[0] ];
}
/**
* Atomic advance and wrap of the Read pointer.
@@ -116,24 +122,29 @@
*/
CachePtrType advance_r()
{
- CachePtrType oldval, newval;
+ SIndexes oldval, newval;
do {
- oldval = _rptr;
- newval = oldval+1;
- if ( newval >= _buf+_size )
- newval -= _size;
+ oldval._value = _indxes._value;
+ newval._value = oldval._value;
// check for empty :
- if ( oldval->first == 0 )
+ if ( newval._index[0] == newval._index[1] )
+ {
+ //EnableIrq(ic);
return 0;
- // if ptr is unchanged, replace it with newval.
- } while ( !OS::CAS( &_rptr, oldval, newval) );
+ //empty=true;
+ }
+ newval._index[1]++;
+ if ( newval._index[1] >= _size )
+ newval._index[1] = 0;
+ // if indexes are unchanged, replace them with newval.
+ } while ( !OS::CAS( &_indxes._value, oldval._value,
newval._value) );
// frome here on :
// oldval is 'unique', other preempting threads
// will have a different value for oldval, as
// _rptr advances.

// return the old position to read from :
- return oldval;
+ return &_buf[oldval._index[1] ];
}

// non-copyable !
@@ -166,7 +177,7 @@
// two cases where the queue is full :
// if wptr is one behind rptr or if wptr is at end
// and rptr at beginning.
- return _wptr->first != 0 || _wptr == _rptr - 1 || _wptr ==
_rptr+_size - 1;
+ return _indxes._index[0] == _indxes._index[1] - 1 ||
_indxes._index[0] == _indxes._index[1] + _size - 1;
}

/**
@@ -176,7 +187,7 @@
bool isEmpty() const
{
// empty if nothing to read.
- return _rptr->first == 0;
+ return _indxes._index[0] == _indxes._index[1];
}

/**
@@ -192,7 +203,7 @@
*/
size_type size() const
{
- int c = (_wptr - _rptr);
+ int c = (_indxes._index[0] - _indxes._index[1]);
return c >= 0 ? c : c + _size;
}

@@ -255,7 +266,7 @@
*/
const T front() const
{
- return _rptr;
+ return _buf[_indxes._index[1] ];
}

/**
@@ -272,12 +283,12 @@
do {
if (was_locked)
mp.unlock(loc->first);
- loc = _rptr;
+ loc = &_buf[_indxes._index[1] ];
if (loc->first == 0)
return 0;
was_locked = mp.lock(loc->first);
// retry if lock failed or read moved.
- } while( !was_locked || loc != _rptr ); // obstruction
detection.
+ } while( !was_locked || loc != &_buf[_indxes._index[1] ] ); //
obstruction detection.
return loc->first;
}

@@ -311,8 +322,7 @@
}
_buf[i].second = i+1; // init the counters
}
- _rptr = _buf;
- _wptr = _buf;
+ _indxes._value = 0;
write_policy.reset( _size - 1 );
read_policy.reset(0);
}

Butch.

AtomicQueue

On Wednesday 23 December 2009 14:25:25 Butch Slayer wrote:
> Hi,
>
> I've patched the pre 1.18 AtomicQueue implementation. It now works fine for
> my case:

I have difficulties applying this patch. Could you send the complete file ?

Thanks,
Peter

>
> Index: orocos/corelib/buffers/current/include/AtomicQueue.hpp
> ===================================================================
> --- orocos/corelib/buffers/current/include/AtomicQueue.hpp (revision
> 2900)
> +++ orocos/corelib/buffers/current/include/AtomicQueue.hpp (working
> copy) @@ -58,13 +58,19 @@
> //typedef _T* T;
> const int _size;
> typedef std::pair<T, int> C;
> - typedef volatile C* volatile WriteType;
> - typedef volatile C* volatile const ReadType;
> +// typedef volatile C* volatile WriteType;
> +// typedef volatile C* volatile const ReadType;
> typedef volatile C* CachePtrType;
> typedef C* volatile CacheObjType;
> typedef C ValueType;
> typedef C* PtrType;
>
> + union SIndexes
> + {
> + unsigned long _value;
> + unsigned short _index[2];
> + };
> +
> /**
> * The pointer to the buffer can be cached,
> * the contents are volatile.
> @@ -72,16 +78,11 @@
> CachePtrType _buf;
>
> /**
> - * A volatile pointer to a volatile cell.
> + * The indexes are packed into one double word.
> + * Therefore the read and write index can be read and written
> atomically.
> */
> - WriteType _wptr;
> + volatile SIndexes _indxes;
>
> - /**
> - * This pointer is also writable because we
> - * set it to zero to indicate it has been read.
> - */
> - WriteType _rptr;
> -
> WritePolicy write_policy;
> ReadPolicy read_policy;
>
> @@ -91,24 +92,29 @@
> */
> CachePtrType advance_w()
> {
> - CachePtrType oldval, newval;
> + SIndexes oldval, newval;
> + bool full=false;
> do {
> - oldval = _wptr;
> - newval = oldval+1;
> - if ( newval >= _buf+_size )
> - newval -= _size;
> + oldval._value = _indxes._value; /*Points to a free
> writable pointer.*/
> + newval._value = oldval._value; /*Points to the next
> writable pointer.*/
> // check for full :
> - if ( newval->first != 0 || newval == _rptr || newval ==
> _rptr + _size )
> + if ( (newval._index[0] == newval._index[1] - 1) ||
> (newval._index[0] == newval._index[1] + _size - 1) )
> + {
> return 0;
> + }
> + newval._index[0]++;
> + if ( newval._index[0] >= _size )
> + newval._index[0] = 0;
> // if ptr is unchanged, replace it with newval.
> - } while ( !OS::CAS( &_wptr, oldval, newval) );
> + } while ( !OS::CAS( &_indxes._value, oldval._value,
> newval._value) );
> // frome here on :
> // oldval is 'unique', other preempting threads
> // will have a different value for oldval, as
> // _wptr advances.
> -
> + if(full==true)
> + return 0;
> // return the old position to write to :
> - return oldval;
> + return &_buf[oldval._index[0] ];
> }
> /**
> * Atomic advance and wrap of the Read pointer.
> @@ -116,24 +122,29 @@
> */
> CachePtrType advance_r()
> {
> - CachePtrType oldval, newval;
> + SIndexes oldval, newval;
> do {
> - oldval = _rptr;
> - newval = oldval+1;
> - if ( newval >= _buf+_size )
> - newval -= _size;
> + oldval._value = _indxes._value;
> + newval._value = oldval._value;
> // check for empty :
> - if ( oldval->first == 0 )
> + if ( newval._index[0] == newval._index[1] )
> + {
> + //EnableIrq(ic);
> return 0;
> - // if ptr is unchanged, replace it with newval.
> - } while ( !OS::CAS( &_rptr, oldval, newval) );
> + //empty=true;
> + }
> + newval._index[1]++;
> + if ( newval._index[1] >= _size )
> + newval._index[1] = 0;
> + // if indexes are unchanged, replace them with newval.
> + } while ( !OS::CAS( &_indxes._value, oldval._value,
> newval._value) );
> // frome here on :
> // oldval is 'unique', other preempting threads
> // will have a different value for oldval, as
> // _rptr advances.
>
> // return the old position to read from :
> - return oldval;
> + return &_buf[oldval._index[1] ];
> }
>
> // non-copyable !
> @@ -166,7 +177,7 @@
> // two cases where the queue is full :
> // if wptr is one behind rptr or if wptr is at end
> // and rptr at beginning.
> - return _wptr->first != 0 || _wptr == _rptr - 1 || _wptr ==
> _rptr+_size - 1;
> + return _indxes._index[0] == _indxes._index[1] - 1 ||
> _indxes._index[0] == _indxes._index[1] + _size - 1;
> }
>
> /**
> @@ -176,7 +187,7 @@
> bool isEmpty() const
> {
> // empty if nothing to read.
> - return _rptr->first == 0;
> + return _indxes._index[0] == _indxes._index[1];
> }
>
> /**
> @@ -192,7 +203,7 @@
> */
> size_type size() const
> {
> - int c = (_wptr - _rptr);
> + int c = (_indxes._index[0] - _indxes._index[1]);
> return c >= 0 ? c : c + _size;
> }
>
> @@ -255,7 +266,7 @@
> */
> const T front() const
> {
> - return _rptr;
> + return _buf[_indxes._index[1] ];
> }
>
> /**
> @@ -272,12 +283,12 @@
> do {
> if (was_locked)
> mp.unlock(loc->first);
> - loc = _rptr;
> + loc = &_buf[_indxes._index[1] ];
> if (loc->first == 0)
> return 0;
> was_locked = mp.lock(loc->first);
> // retry if lock failed or read moved.
> - } while( !was_locked || loc != _rptr ); // obstruction
> detection.
> + } while( !was_locked || loc != &_buf[_indxes._index[1] ] ); //
> obstruction detection.
> return loc->first;
> }
>
> @@ -311,8 +322,7 @@
> }
> _buf[i].second = i+1; // init the counters
> }
> - _rptr = _buf;
> - _wptr = _buf;
> + _indxes._value = 0;
> write_policy.reset( _size - 1 );
> read_policy.reset(0);
> }
>
> Butch.
>

AtomicQueue

I'm sorry for the delay, here is the file.

/***************************************************************************
tag: Peter Soetens Tue Dec 21 22:43:07 CET 2004 AtomicQueue.hpp

AtomicQueue.hpp - description
-------------------
begin : Tue December 21 2004
copyright : (C) 2004 Peter Soetens
email : peter [dot] soetens [..] ...

***************************************************************************
* This library is free software; you can redistribute it and/or *
* modify it under the terms of the GNU Lesser General Public *
* License as published by the Free Software Foundation; either *
* version 2.1 of the License, or (at your option) any later version. *
* *
* This library is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
* Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public *
* License along with this library; if not, write to the Free Software *
* Foundation, Inc., 59 Temple Place, *
* Suite 330, Boston, MA 02111-1307 USA *
* *
***************************************************************************/

#ifndef ORO_CORELIB_ATOMIC_QUEUE_HPP
#define ORO_CORELIB_ATOMIC_QUEUE_HPP

#include "os/CAS.hpp"
#include "BufferPolicy.hpp"
#include <utility>

namespace RTT
{
/**
* Create an atomic, non-blocking single ended queue (FIFO) for storing
* a pointer \a T by value. It is a
* Many Readers, Many Writers implementation
* based on the atomic Compare And Swap instruction. Any number of
threads
* may access the queue concurrently.
* @warning You can not store null pointers.
* @param T The pointer type to be stored in the Queue.
* Example : AtomicQueue< A* > is a queue of pointers to A.
* @param ReadPolicy The Policy to block (wait) on \a empty (during
dequeue)
* using \a BlockingPolicy, or to return \a false, using \a
NonBlockingPolicy (Default).
* This does not influence partial filled queue behaviour.
* @param WritePolicy The Policy to block (wait) on \a full (during
enqueue),
* using \a BlockingPolicy, or to return \a false, using \a
NonBlockingPolicy (Default).
* This does not influence partial filled buffer behaviour.
* @ingroup CoreLibBuffers
*/
template<class T, class ReadPolicy = NonBlockingPolicy, class
WritePolicy = NonBlockingPolicy>
class AtomicQueue
{
//typedef _T* T;
const int _size;
typedef std::pair<T, int> C;
// typedef volatile C* volatile WriteType;
// typedef volatile C* volatile const ReadType;
typedef volatile C* CachePtrType;
typedef C* volatile CacheObjType;
typedef C ValueType;
typedef C* PtrType;

union SIndexes
{
unsigned long _value;
unsigned short _index[2];
};

/**
* The pointer to the buffer can be cached,
* the contents are volatile.
*/
CachePtrType _buf;

/**
* The indexes are packed into one double word.
* Therefore the read and write index can be read and written
atomically.
*/
volatile SIndexes _indxes;

WritePolicy write_policy;
ReadPolicy read_policy;

/**
* Atomic advance and wrap of the Write pointer.
* Return the old position or zero if queue is full.
*/
CachePtrType advance_w()
{
SIndexes oldval, newval;
bool full=false;
do {
oldval._value = _indxes._value; /*Points to a free writable
pointer.*/
newval._value = oldval._value; /*Points to the next writable
pointer.*/
// check for full :
if ( (newval._index[0] == newval._index[1] - 1) ||
(newval._index[0] == newval._index[1] + _size - 1) )
{
return 0;
}
newval._index[0]++;
if ( newval._index[0] >= _size )
newval._index[0] = 0;
// if ptr is unchanged, replace it with newval.
} while ( !OS::CAS( &_indxes._value, oldval._value,
newval._value) );
// frome here on :
// oldval is 'unique', other preempting threads
// will have a different value for oldval, as
// _wptr advances.
if(full==true)
return 0;
// return the old position to write to :
return &_buf[oldval._index[0] ];
}
/**
* Atomic advance and wrap of the Read pointer.
* Return the data position or zero if queue is empty.
*/
CachePtrType advance_r()
{
SIndexes oldval, newval;
do {
oldval._value = _indxes._value;
newval._value = oldval._value;
// check for empty :
if ( newval._index[0] == newval._index[1] )
{
//EnableIrq(ic);
return 0;
//empty=true;
}
newval._index[1]++;
if ( newval._index[1] >= _size )
newval._index[1] = 0;
// if indexes are unchanged, replace them with newval.
} while ( !OS::CAS( &_indxes._value, oldval._value,
newval._value) );
// frome here on :
// oldval is 'unique', other preempting threads
// will have a different value for oldval, as
// _rptr advances.

// return the old position to read from :
return &_buf[oldval._index[1] ];
}

// non-copyable !
AtomicQueue( const AtomicQueue<T>& );
public:
typedef unsigned int size_type;

/**
* Create an AtomicQueue with queue size \a size.
* @param size The size of the queue, should be 1 or greater.
*/
AtomicQueue( unsigned int size )
: _size(size+1), write_policy(size), read_policy(0)
{
_buf= new C[_size];
this->clear();
}

~AtomicQueue()
{
delete[] _buf;
}

/**
* Inspect if the Queue is full.
* @return true if full, false otherwise.
*/
bool isFull() const
{
// two cases where the queue is full :
// if wptr is one behind rptr or if wptr is at end
// and rptr at beginning.
return _indxes._index[0] == _indxes._index[1] - 1 ||
_indxes._index[0] == _indxes._index[1] + _size - 1;
}

/**
* Inspect if the Queue is empty.
* @return true if empty, false otherwise.
*/
bool isEmpty() const
{
// empty if nothing to read.
return _indxes._index[0] == _indxes._index[1];
}

/**
* Return the maximum number of items this queue can contain.
*/
size_type capacity() const
{
return _size -1;
}

/**
* Return the number of elements in the queue.
*/
size_type size() const
{
int c = (_indxes._index[0] - _indxes._index[1]);
return c >= 0 ? c : c + _size;
}

/**
* Enqueue an item.
* @param value The value to enqueue.
* @return false if queue is full, true if queued.
*/
bool enqueue(const T& value)
{
if ( value == 0 )
return false;
write_policy.pop();
CachePtrType loc = advance_w();
if ( loc == 0 )
return false;
loc->first = value;
read_policy.push();
return true;
}

/**
* Enqueue an item and return its 'ticket' number.
* @param value The value to enqueue.
* @return zero if the queue is full, the 'ticket' number otherwise.
*/
int enqueueCounted(const T& value)
{
if ( value == 0 )
return 0;
write_policy.pop();
CachePtrType loc = advance_w();
if ( loc == 0 )
return 0;
loc->first = value;
read_policy.push();
return loc->second;
}

/**
* Dequeue an item.
* @param value The value dequeued.
* @return false if queue is empty, true if dequeued.
*/
bool dequeue( T& result )
{
read_policy.pop();
CachePtrType loc = advance_r();
if ( loc == 0 )
return false;
result = loc->first;
loc->second += _size; // give the cell a new number.
loc->first = 0; // this releases the cell to write to.
write_policy.push();
return true;
}

/**
* Return the next to be read value.
*/
const T front() const
{
return _buf[_indxes._index[1] ];
}

/**
* Return the next to be read value and lock
* it in a MemoryPool, such that it is not freed.
* The returned pointer must be unlock()'ed by the
* user's code.
*/
template<class MPoolType>
T lockfront(MPoolType& mp) const
{
CachePtrType loc=0;
bool was_locked = false;
do {
if (was_locked)
mp.unlock(loc->first);
loc = &_buf[_indxes._index[1] ];
if (loc->first == 0)
return 0;
was_locked = mp.lock(loc->first);
// retry if lock failed or read moved.
} while( !was_locked || loc != &_buf[_indxes._index[1] ] ); //
obstruction detection.
return loc->first;
}

/**
* Dequeue an item and return the same 'ticket' number when it was
queued.
* @param value The value dequeued.
* @return zero if the queue is empty, the 'ticket' number
otherwise.
*/
int dequeueCounted( T& result )
{
read_policy.pop();
CachePtrType loc = advance_r();
if ( loc == 0 )
return 0;
result = loc->first;
int nr = loc->second;
loc->second += _size; // give the cell a new number.
loc->first = 0; // this releases the cell to write to.
write_policy.push();
return nr;
}

/**
* Clear all contents of the Queue and thus make it empty.
*/
void clear()
{
for(int i = 0 ; i != _size; ++i) {
if ( _buf[i].first != 0 ) {
_buf[i].first = 0;
}
_buf[i].second = i+1; // init the counters
}
_indxes._value = 0;
write_policy.reset( _size - 1 );
read_policy.reset(0);
}

};

}

#endif

AtomicQueue

On Wed, Dec 23, 2009 at 14:25, Butch Slayer <butch [dot] slayers [..] ...> wrote:
> Hi,
>
> I've patched the pre 1.18 AtomicQueue implementation. It now works fine for
> my case:

I'll test it in my setup too. It's a clever/better way of solving the
issue in comparison to my solution. We should also check that size of
the AQ does not exceed 2^(8*sizeof(unsigned short)) - 1 (about 65535).

Peter

AtomicQueue

Hi,

I was just wondering... Now an AtomicQueue has a multi-writer/multi-reader
implementation.
Does anyone use it that way? Because it doesn't make much sense to me.

Kind regards,
Butch.

AtomicQueue

On Monday 22 March 2010 08:38:13 Butch Slayer wrote:
> Hi,
>
> I was just wondering... Now an AtomicQueue has a multi-writer/multi-reader
> implementation.
> Does anyone use it that way? Because it doesn't make much sense to me.

For RTT 2.x only, This is where AtomicQueue is used and in which form:
* Operations: The asynchronous message dispatching in the Execution Engine
uses a multi-writer-single reader
* Signals (former Events): The ListLockFree implementation uses an MPool
(which uses AtomicQueue internally) for storing blobs of data in a memory pool
in a multi-writer/multi-reader setting of the pool and the list.
* Data Flow: The BufferLockFree implementation uses an AtomicQueue for storing
blobs of data in a memory pool in a single-writer/single-reader setting
* Data Flow: The Port's ConnectionManager implementation uses a ListLockFree
to store connections to other ports. Normally, single reader, single writer,
but it actually does not need to be thread safe at all (the API should specify
this, @nts, see bug #722), since no addX() method of the TaskContext is
thread-safe.

Summarizing, only the Signals implementation really needs mwmr atomic queues
since we allow connection/disconnection of signal handlers in real-time
(should we or is this an 1.x artifact ?). The next case is multi-writer single
reader which we can not discard, data flow is always single/single.

In RTT 1.x, the data flow is mw/mr, the Events are mw/mr, command/event
processor are mw/sr, scripting uses ListLockFree to store pointers to programs
and state machines (not so in 2.x).

So in 1.x, it is even harder to give up mw/mr since data flow depends on it
through BufferLockFree.

Peter

AtomicQueue

2010/3/22 Peter Soetens <peter [dot] soetens [..] ...>

> On Monday 22 March 2010 08:38:13 Butch Slayer wrote:
> > Hi,
> >
> > I was just wondering... Now an AtomicQueue has a
> multi-writer/multi-reader
> > implementation.
> > Does anyone use it that way? Because it doesn't make much sense to me.
>
> For RTT 2.x only, This is where AtomicQueue is used and in which form:
> * Operations: The asynchronous message dispatching in the Execution Engine
> uses a multi-writer-single reader
> * Signals (former Events): The ListLockFree implementation uses an MPool
> (which uses AtomicQueue internally) for storing blobs of data in a memory
> pool
> in a multi-writer/multi-reader setting of the pool and the list.
> * Data Flow: The BufferLockFree implementation uses an AtomicQueue for
> storing
> blobs of data in a memory pool in a single-writer/single-reader setting
> * Data Flow: The Port's ConnectionManager implementation uses a
> ListLockFree
> to store connections to other ports. Normally, single reader, single
> writer,
> but it actually does not need to be thread safe at all (the API should
> specify
> this, @nts, see bug #722), since no addX() method of the TaskContext is
> thread-safe.
>
> Summarizing, only the Signals implementation really needs mwmr atomic
> queues
> since we allow connection/disconnection of signal handlers in real-time
> (should we or is this an 1.x artifact ?). The next case is multi-writer
> single
> reader which we can not discard, data flow is always single/single.
>
> In RTT 1.x, the data flow is mw/mr, the Events are mw/mr, command/event
> processor are mw/sr, scripting uses ListLockFree to store pointers to
> programs
> and state machines (not so in 2.x).
>
> So in 1.x, it is even harder to give up mw/mr since data flow depends on it
> through BufferLockFree.
>
> Hallo,

If we take the MemoryPool out of the equation, we don't need mw/mr, even in
1.x?

Butch.

AtomicQueue

On Tuesday 30 March 2010 09:46:35 Butch Slayer wrote:
> 2010/3/22 Peter Soetens <peter [dot] soetens [..] ...>
>
> > On Monday 22 March 2010 08:38:13 Butch Slayer wrote:
> > > Hi,
> > >
> > > I was just wondering... Now an AtomicQueue has a
> >
> > multi-writer/multi-reader
> >
> > > implementation.
> > > Does anyone use it that way? Because it doesn't make much sense to me.
> >
> > For RTT 2.x only, This is where AtomicQueue is used and in which form:
> > * Operations: The asynchronous message dispatching in the Execution
> > Engine uses a multi-writer-single reader
> > * Signals (former Events): The ListLockFree implementation uses an MPool
> > (which uses AtomicQueue internally) for storing blobs of data in a memory
> > pool
> > in a multi-writer/multi-reader setting of the pool and the list.
> > * Data Flow: The BufferLockFree implementation uses an AtomicQueue for
> > storing
> > blobs of data in a memory pool in a single-writer/single-reader setting
> > * Data Flow: The Port's ConnectionManager implementation uses a
> > ListLockFree
> > to store connections to other ports. Normally, single reader, single
> > writer,
> > but it actually does not need to be thread safe at all (the API should
> > specify
> > this, @nts, see bug #722), since no addX() method of the TaskContext is
> > thread-safe.
> >
> > Summarizing, only the Signals implementation really needs mwmr atomic
> > queues
> > since we allow connection/disconnection of signal handlers in real-time
> > (should we or is this an 1.x artifact ?). The next case is multi-writer
> > single
> > reader which we can not discard, data flow is always single/single.
> >
> > In RTT 1.x, the data flow is mw/mr, the Events are mw/mr, command/event
> > processor are mw/sr, scripting uses ListLockFree to store pointers to
> > programs
> > and state machines (not so in 2.x).
> >
> > So in 1.x, it is even harder to give up mw/mr since data flow depends on
> > it through BufferLockFree.
> >
> > Hallo,
>
> If we take the MemoryPool out of the equation, we don't need mw/mr, even in
> 1.x?

You mean for AtomicQueue ? It seems so but we never know what the user will do
or what we will invent in the future. The only solution that will always work
is to provide the different implementations, and use the one that is suitable
for the task. For example, using a mw/sr AQ in the EE for incomming messages
makes perfectly sense. I think MemoryPool will always be MW/MR, looking at the
evidence of current use of it.

So trying to give MemoryPool an implementation that is MW/MR but does not rely
on AQ; and writing a MW/SR AQ for the EE starts making sense...

Peter

AtomicQueue

2010/3/31 Peter Soetens <peter [..] ...>

> On Tuesday 30 March 2010 09:46:35 Butch Slayer wrote:
> > 2010/3/22 Peter Soetens <peter [dot] soetens [..] ...>
> >
> > > On Monday 22 March 2010 08:38:13 Butch Slayer wrote:
> > > > Hi,
> > > >
> > > > I was just wondering... Now an AtomicQueue has a
> > >
> > > multi-writer/multi-reader
> > >
> > > > implementation.
> > > > Does anyone use it that way? Because it doesn't make much sense to
> me.
> > >
> > > For RTT 2.x only, This is where AtomicQueue is used and in which form:
> > > * Operations: The asynchronous message dispatching in the Execution
> > > Engine uses a multi-writer-single reader
> > > * Signals (former Events): The ListLockFree implementation uses an
> MPool
> > > (which uses AtomicQueue internally) for storing blobs of data in a
> memory
> > > pool
> > > in a multi-writer/multi-reader setting of the pool and the list.
> > > * Data Flow: The BufferLockFree implementation uses an AtomicQueue for
> > > storing
> > > blobs of data in a memory pool in a single-writer/single-reader setting
> > > * Data Flow: The Port's ConnectionManager implementation uses a
> > > ListLockFree
> > > to store connections to other ports. Normally, single reader, single
> > > writer,
> > > but it actually does not need to be thread safe at all (the API should
> > > specify
> > > this, @nts, see bug #722), since no addX() method of the TaskContext is
> > > thread-safe.
> > >
> > > Summarizing, only the Signals implementation really needs mwmr atomic
> > > queues
> > > since we allow connection/disconnection of signal handlers in real-time
> > > (should we or is this an 1.x artifact ?). The next case is multi-writer
> > > single
> > > reader which we can not discard, data flow is always single/single.
> > >
> > > In RTT 1.x, the data flow is mw/mr, the Events are mw/mr, command/event
> > > processor are mw/sr, scripting uses ListLockFree to store pointers to
> > > programs
> > > and state machines (not so in 2.x).
> > >
> > > So in 1.x, it is even harder to give up mw/mr since data flow depends
> on
> > > it through BufferLockFree.
> > >
> > > Hallo,
> >
> > If we take the MemoryPool out of the equation, we don't need mw/mr, even
> in
> > 1.x?
>
> You mean for AtomicQueue ? It seems so but we never know what the user will
> do
> or what we will invent in the future. The only solution that will always
> work
> is to provide the different implementations, and use the one that is
> suitable
> for the task. For example, using a mw/sr AQ in the EE for incomming
> messages
> makes perfectly sense. I think MemoryPool will always be MW/MR, looking at
> the
> evidence of current use of it.
>
> So trying to give MemoryPool an implementation that is MW/MR but does not
> rely
> on AQ; and writing a MW/SR AQ for the EE starts making sense...
>
> Peter
>
Peter,

That is what I am thinking about. A mw/mr AQ did never make sense to me.
Dropping the mr constraint will enable a more efficient implementation in
terms of both speed and memory footprint. Which is my main concern.

As you pointed out before, we don't know if anyone will ever need a mw/mr
AQ. So why not choose the most appropriate queue implementation through
configuration?

Butch.

AtomicQueue

On Wed, Mar 31, 2010 at 13:22, Butch Slayer <butch [dot] slayers [..] ...> wrote:
>
>
> 2010/3/31 Peter Soetens <peter [..] ...>
>>
>> On Tuesday 30 March 2010 09:46:35 Butch Slayer wrote:
>> > 2010/3/22 Peter Soetens <peter [dot] soetens [..] ...>
>> >
>> > > On Monday 22 March 2010 08:38:13 Butch Slayer wrote:
>> > > > Hi,
>> > > >
>> > > > I was just wondering... Now an AtomicQueue has a
>> > >
>> > > multi-writer/multi-reader
>> > >
>> > > > implementation.
>> > > > Does anyone use it that way? Because it doesn't make much sense to
>> > > > me.
>> > >
>> > > For RTT 2.x only, This is where AtomicQueue is used and in which form:
>> > > * Operations: The asynchronous message dispatching in the Execution
>> > > Engine uses a multi-writer-single reader
>> > > * Signals (former Events): The ListLockFree implementation uses an
>> > > MPool
>> > > (which uses AtomicQueue internally) for storing blobs of data in a
>> > > memory
>> > > pool
>> > > in a multi-writer/multi-reader setting of the pool and the list.
>> > > * Data Flow: The BufferLockFree implementation uses an AtomicQueue for
>> > > storing
>> > > blobs of data in a memory pool in a single-writer/single-reader
>> > > setting
>> > > * Data Flow: The Port's ConnectionManager implementation uses a
>> > > ListLockFree
>> > > to store connections to other ports. Normally, single reader, single
>> > > writer,
>> > > but it actually does not need to be thread safe at all (the API should
>> > > specify
>> > > this, @nts, see bug #722), since no addX() method of the TaskContext
>> > > is
>> > > thread-safe.
>> > >
>> > > Summarizing, only the Signals implementation really needs mwmr atomic
>> > > queues
>> > > since we allow connection/disconnection of signal handlers in
>> > > real-time
>> > > (should we or is this an 1.x artifact ?). The next case is
>> > > multi-writer
>> > > single
>> > > reader which we can not discard, data flow is always single/single.
>> > >
>> > > In RTT 1.x, the data flow is mw/mr, the Events are mw/mr,
>> > > command/event
>> > > processor are mw/sr, scripting uses ListLockFree to store pointers to
>> > > programs
>> > > and state machines (not so in 2.x).
>> > >
>> > > So in 1.x, it is even harder to give up mw/mr since data flow depends
>> > > on
>> > > it through BufferLockFree.
>> > >
>> > > Hallo,
>> >
>> > If we take the MemoryPool out of the equation, we don't need mw/mr, even
>> > in
>> > 1.x?
>>
>> You mean for AtomicQueue ? It seems so but we never know what the user
>> will do
>> or what we will invent in the future. The only solution that will always
>> work
>> is to provide the different implementations, and use the one that is
>> suitable
>> for the task. For example, using a mw/sr AQ in the EE for incomming
>> messages
>> makes perfectly sense. I think MemoryPool will always be MW/MR, looking at
>> the
>> evidence of current use of it.
>>
>> So trying to give MemoryPool an implementation that is MW/MR but does not
>> rely
>> on AQ; and writing a MW/SR AQ for the EE starts making sense...
>>
>> Peter
>
> Peter,
>
> That is what I am thinking about. A mw/mr AQ did never make sense to me.
> Dropping the mr constraint will enable a more efficient implementation in
> terms of both speed and memory footprint. Which is my main concern.
>
> As you pointed out before, we don't know if anyone will ever need a mw/mr
> AQ. So why not choose the most appropriate queue implementation through
> configuration?

I think our situation (being: mw/mr == costly, but only necessary
seldomly) justifies a case specific implementation, depending on what
we are doing in the code.

To summarize: MPool always needs to be mw/mr, but we'll implement it
without depending on AQueue. We will provide two AQueue
implementations: an efficient mw/sr and keep the existing fat one for
whereever we need mw/mr.

I would name the classes differently, ie explicitly for their case and
let their type be decided at compile time. Also, the 'counted' feature
of AQueue is dropped. Counting needs to be done in user code if
necessary.

Peter

AtomicQueue

>
> If we take the MemoryPool out of the equation, we don't need mw/mr,
> even in 1.x?
connection management needs to be mw/mr.

AtomicQueue

Peter Soetens wrote:
> * Data Flow: The Port's ConnectionManager implementation uses a ListLockFree
> to store connections to other ports. Normally, single reader, single writer,
> but it actually does not need to be thread safe at all (the API should specify
> this, @nts, see bug #722), since no addX() method of the TaskContext is
> thread-safe.
>
Does it not ? What about connection/disconnections at runtime ?

AtomicQueue

On Monday 22 March 2010 12:51:40 Sylvain Joyeux wrote:
> Peter Soetens wrote:
> > * Data Flow: The Port's ConnectionManager implementation uses a
> > ListLockFree to store connections to other ports. Normally, single
> > reader, single writer, but it actually does not need to be thread safe at
> > all (the API should specify this, @nts, see bug #722), since no addX()
> > method of the TaskContext is thread-safe.
>
> Does it not ? What about connection/disconnections at runtime ?

Oops, was confusing two things. addX() is not thread-safe, connectTo *is*, so
my above statement is false. CM needs to be thread safe in order to add
connections at run-time.

Peter

AtomicQueue

2009/12/23 Peter Soetens <peter [..] ...>

> On Wed, Dec 23, 2009 at 14:25, Butch Slayer <butch [dot] slayers [..] ...>
> wrote:
> > Hi,
> >
> > I've patched the pre 1.18 AtomicQueue implementation. It now works fine
> for
> > my case:
>
> I'll test it in my setup too. It's a clever/better way of solving the
> issue in comparison to my solution. We should also check that size of
> the AQ does not exceed 2^(8*sizeof(unsigned short)) - 1 (about 65535).
>
> Peter
>

Yes that is a constraint. For my case it is sufficient. Most buffers are in
the number of hundreds.

How should we handle such a case? Create a queue of maximum size if the size
put in is greater than 65535?

Butch.

AtomicQueue

On Wednesday 23 December 2009 15:51:25 Butch Slayer wrote:
> 2009/12/23 Peter Soetens <peter [..] ...>
>
> > On Wed, Dec 23, 2009 at 14:25, Butch Slayer <butch [dot] slayers [..] ...>
> >
> > wrote:
> > > Hi,
> > >
> > > I've patched the pre 1.18 AtomicQueue implementation. It now works fine
> >
> > for
> >
> > > my case:
> >
> > I'll test it in my setup too. It's a clever/better way of solving the
> > issue in comparison to my solution. We should also check that size of
> > the AQ does not exceed 2^(8*sizeof(unsigned short)) - 1 (about 65535).
> >
> > Peter
>
> Yes that is a constraint. For my case it is sufficient. Most buffers are in
> the number of hundreds.

65535 items is also close to the practical limit of a lock-free algorithm. If
you have larger queues, a lock will probably be more efficient. I do not have
experimental proof for this though.

>
> How should we handle such a case? Create a queue of maximum size if the
> size put in is greater than 65535?

I'd recommend to throw in the constructor if exceptions are enabled. If they
are not enabled, I would indeed cap to 65535, with an error logged.

Peter