[PATCH] operations: initial implementation of a non-cloning operation.

OperationCaller gets a 'setCloning(bool)' option to indicate if clones may
be created using rt_malloc. Defaults to true (=2.0.0 impl.). If you turn
this off, no clones are created from the rt-memory pool, but the
OperationCaller can only send/call one operation at a time.

Users must take care that collectIfDone() may return with results,
before the next call/send can be done. This is because the results
may be ready sooner than the internal logic for preparing the next
call/send. It's a multi-threaded system, remember. The only
absolute guarantee is given by collect(), which blocks until
the OperationCaller is available again for a next send/call.

+ Unit test

Signed-off-by: Peter Soetens <peter [..] ...>
---
rtt/OperationCaller.hpp | 25 +++++++++++-
rtt/base/OperationCallerBase.hpp | 8 ++++
rtt/internal/InvokerSignature.hpp | 2 +-
rtt/internal/LocalOperationCaller.hpp | 43 ++++++++++++++-------
rtt/internal/RemoteOperationCaller.hpp | 3 +-
rtt/transports/corba/TaskContextProxy.cpp | 4 +-
tests/operation_test.cpp | 58 +++++++++++++++++++++++++++++
7 files changed, 123 insertions(+), 20 deletions(-)

diff --git a/rtt/OperationCaller.hpp b/rtt/OperationCaller.hpp
index db24f14..136affe 100644
--- a/rtt/OperationCaller.hpp
+++ b/rtt/OperationCaller.hpp
@@ -140,8 +140,10 @@ namespace RTT
mname = m.mname;
mcaller = m.mcaller;
this->impl = m.impl;
- if (this->impl)
+ if (this->impl) {
this->impl.reset( this->impl->cloneI(mcaller) );
+ this->impl->setSelf( this->impl );
+ }
return *this;
}

@@ -163,6 +165,7 @@ namespace RTT
} else {
if (this->impl) {
this->impl.reset( this->impl->cloneI(mcaller) );
+ this->impl->setSelf( this->impl );
}
}
}
@@ -304,7 +307,8 @@ namespace RTT
OperationCaller(std::string name, OperationInterfacePart* orp, ExecutionEngine* caller = 0)
: Base( OperationCallerBasePtr(new internal::RemoteOperationCaller<Signature>(orp,caller) ) ),
mname(name)
- {}
+ {
+ }
#endif

/**
@@ -334,6 +338,7 @@ namespace RTT
if ( this->impl ) {
this->mcaller = caller;
this->impl->setCaller(caller);
+ this->impl->setSelf( this->impl );
}
return ready();
}
@@ -357,6 +362,8 @@ namespace RTT
*/
void setOperationCallerImpl( OperationCallerBasePtr new_impl) const {
this->impl = new_impl;
+ if (this->impl)
+ this->impl->setSelf( this->impl );
}

void setCaller(ExecutionEngine* caller) {
@@ -364,6 +371,18 @@ namespace RTT
if (this->impl)
this->impl->setCaller(caller);
}
+
+ /**
+ * Turn on or of the rt-cloning of invocation objects.
+ * Do this if you don't want to rely on rt_malloc for
+ * this operation. If true, only one operation at a time can be
+ * invoked using this object. If false, any number of calls/sends
+ * can be done, as long as the rt_malloc pool is not exhausted.
+ */
+ void setCloning(bool on_off) {
+ if (on_off && this->impl)
+ this->impl->setSelf( this->impl );
+ }
protected:
/**
* If no local implementation of an operation could be found,
@@ -384,6 +403,7 @@ namespace RTT
if (this->impl->ready()) {
log(Debug) << "Constructed OperationCaller from remote implementation '"<< mname<<"'."<< endlog();
this->impl->setCaller(mcaller);
+ this->impl->setSelf( this->impl );
} else {
this->impl.reset(); // clean up.
log(Error) << "Tried to construct OperationCaller from incompatible operation '"<< mname<<"'."<< endlog();
@@ -394,6 +414,7 @@ namespace RTT
} else {
// finally clone and set caller on clone.
this->impl.reset( this->impl->cloneI(mcaller) );
+ this->impl->setSelf( this->impl );
}
}

diff --git a/rtt/base/OperationCallerBase.hpp b/rtt/base/OperationCallerBase.hpp
index 12cd3c8..d2588c9 100644
--- a/rtt/base/OperationCallerBase.hpp
+++ b/rtt/base/OperationCallerBase.hpp
@@ -81,6 +81,14 @@ namespace RTT
virtual void setExecutor(ExecutionEngine* ee) = 0;

virtual void setCaller(ExecutionEngine* ee) = 0;
+
+ /**
+ * Since all Operation implementations are ref-counted, this method is
+ * called to inform it of the shared pointer that holds a reference
+ * to this object. This may be ignored by the implementation if it
+ * does not require this information.
+ */
+ virtual void setSelf(boost::shared_ptr<OperationCallerBase> self) = 0;
};
}
}
diff --git a/rtt/internal/InvokerSignature.hpp b/rtt/internal/InvokerSignature.hpp
index e156abf..d6e6e35 100644
--- a/rtt/internal/InvokerSignature.hpp
+++ b/rtt/internal/InvokerSignature.hpp
@@ -63,7 +63,7 @@ namespace RTT
typedef typename boost::function_traits<F>::result_type result_type;

InvokerSignature() : impl() {}
- InvokerSignature(ToInvoke implementation) : impl(implementation) {}
+ InvokerSignature(ToInvoke implementation) : impl(implementation) { if (impl) impl->setSelf(impl); }
~InvokerSignature() {}

/**
diff --git a/rtt/internal/LocalOperationCaller.hpp b/rtt/internal/LocalOperationCaller.hpp
index 72c7418..ace83ba 100644
--- a/rtt/internal/LocalOperationCaller.hpp
+++ b/rtt/internal/LocalOperationCaller.hpp
@@ -42,6 +42,8 @@
#include <boost/function.hp

#include <boost/shared_ptr.hp

#include <boost/make_shared.hp

+#include <boost/lambda/lambda.hp

+#include <boost/lambda/bind.hp

#include <string>
#include "Invoker.hpp"
#include "../base/OperationCallerBase.hpp"
@@ -89,6 +91,7 @@ namespace RTT
typedef boost::function_traits<Signature> traits;

typedef boost::shared_ptr<LocalOperationCallerImpl> shared_ptr;
+ typedef boost::weak_ptr<LocalOperationCallerImpl> weak_ptr;

virtual void setExecutor(ExecutionEngine* ee) {
if (met == OwnThread)
@@ -131,9 +134,7 @@ namespace RTT
* not called, this object will not be destroyed.
*/
void dispose() {
- //this->~LocalOperationCallerImpl();
- //oro_rt_free(this);
- self.reset();
+ self.reset();
}

ExecutionEngine* getMessageProcessor() const { return myengine; }
@@ -141,13 +142,12 @@ namespace RTT
SendHandle<Signature> do_send(shared_ptr cl) {
assert(myengine); // myengine must be either the caller's engine or GlobalEngine::Instance().
//std::cout << "Sending clone..."<<std::endl;
+ this->retv.executed = false; // necessary for no-cloning case.
if ( myengine->process( cl.get() ) ) {
cl->self = cl;
return SendHandle<Signature>( cl );
} else {
- // cleanup. Done by shared_ptr.
- //cl->~OperationCallerBase();
- //oro_rt_free(cl);
+ // cleanup. Done by shared_ptr. 'cl'.
return SendHandle<Signature>();
}
}
@@ -285,25 +285,29 @@ namespace RTT
return SendNotReady;
}

+ void block_impl() {
+ caller->waitForMessages( !boost::lambda::var( this->self ) && boost::lambda::bind(&Store::RStoreType::isExecuted,boost::lambda::var(this->retv)) );
+ }
+
SendStatus collect_impl() {
- caller->waitForMessages( boost::bind(&Store::RStoreType::isExecuted,boost::ref(this->retv)) );
+ block_impl();
return this->collectIfDone_impl();
}
template<class T1>
SendStatus collect_impl( T1& a1 ) {
- caller->waitForMessages( boost::bind(&Store::RStoreType::isExecuted,boost::ref(this->retv)) );
+ block_impl();
return this->collectIfDone_impl(a1);
}

template<class T1, class T2>
SendStatus collect_impl( T1& a1, T2& a2 ) {
- caller->waitForMessages( boost::bind(&Store::RStoreType::isExecuted,boost::ref(this->retv)) );
+ block_impl();
return this->collectIfDone_impl(a1,a2);
}

template<class T1, class T2, class T3>
SendStatus collect_impl( T1& a1, T2& a2, T3& a3 ) {
- caller->waitForMessages( boost::bind(&Store::RStoreType::isExecuted,boost::ref(this->retv)) );
+ block_impl();
return this->collectIfDone_impl(a1,a2,a3);
}

@@ -556,6 +560,10 @@ namespace RTT

virtual shared_ptr cloneRT() const = 0;

+ void setSelf(boost::shared_ptr<base::OperationCallerBase + this->weak_self = boost::dynamic_pointer_cast<LocalOperationCallerImpl>( sharedself );
+ }
+
protected:
ExecutionEngine* myengine;
ExecutionEngine* caller;
@@ -566,7 +574,8 @@ namespace RTT
* This refcount is real-time since both shared_ptr and object
* were allocated with the rt_allocator class.
*/
- typename base::OperationCallerBase<FunctionT>::shared_ptr self;
+ shared_ptr self;
+ weak_ptr weak_self;
};

/**
@@ -651,9 +660,15 @@ namespace RTT

typename LocalOperationCallerImpl<Signature>::shared_ptr cloneRT() const
{
- //void* obj = oro_rt_malloc(sizeof(LocalOperationCallerImpl<Signature>));
- //return new(obj) LocalOperationCaller<Signature>(*this);
- return boost::allocate_shared<LocalOperationCaller + if ( this->weak_self.expired() )
+ // rt_malloc cloning case:
+ return boost::allocate_shared<LocalOperationCaller + else {
+ // no cloning case:
+ if (this->self)
+ throw SendFailure; // we are in the process of being sent
+ return this->weak_self.lock();
+ }
}
};
}
diff --git a/rtt/internal/RemoteOperationCaller.hpp b/rtt/internal/RemoteOperationCaller.hpp
index 24f8f51..eeb27b2 100644
--- a/rtt/internal/RemoteOperationCaller.hpp
+++ b/rtt/internal/RemoteOperationCaller.hpp
@@ -103,7 +103,8 @@ namespace RTT
virtual void setCaller(ExecutionEngine* ee) {
}

-
+ void setSelf(boost::shared_ptr<base::OperationCallerBase + }
/**
* Call this operator if the RemoteOperationCaller takes no arguments.
*
diff --git a/rtt/transports/corba/TaskContextProxy.cpp b/rtt/transports/corba/TaskContextProxy.cpp
index cd916f1..87ed064 100644
--- a/rtt/transports/corba/TaskContextProxy.cpp
+++ b/rtt/transports/corba/TaskContextProxy.cpp
@@ -648,10 +648,10 @@ namespace RTT
try {
if (CORBA::is_nil(mtask))
return 0;
- corba::CTaskContext_ptr ct = mtask->getPeer( peer_name.c_str() );
+ corba::CTaskContext_var ct = mtask->getPeer( peer_name.c_str() );
if ( CORBA::is_nil(ct) )
return 0;
- return TaskContextProxy::Create( ct );
+ return TaskContextProxy::Create( ct.in() );
} catch(...) {
mtask = CTaskContext::_nil();
}
diff --git a/tests/operation_test.cpp b/tests/operation_test.cpp
index 79b16f2..761438f 100644
--- a/tests/operation_test.cpp
+++ b/tests/operation_test.cpp
@@ -24,6 +24,7 @@
#include <rtt/Service.hp

#include <rtt/OperationCaller.hp

#include <rtt/TaskContext.hp

+#include <rtt/extras/SlaveActivity.hp

using namespace std;
using namespace RTT::detail;
@@ -39,6 +40,7 @@ public:

tc.provides()->addOperation("op0", &OperationTest::func0, this);
tc.provides()->addOperation("op1", &OperationTest::func1, this);
+ tc.provides()->addOperation("Ownop1", &OperationTest::func1, this, OwnThread);
tc.provides()->addOperation("op2", &OperationTest::func2, this);
tc.provides()->addOperation("op3", &OperationTest::func3, this);
tc.provides()->addOperation("op4", &OperationTest::func4, this);
@@ -232,5 +234,61 @@ BOOST_AUTO_TEST_CASE( testOperationCallAndSignal )
BOOST_CHECK_EQUAL( -1.0, sig );
}

+BOOST_AUTO_TEST_CASE( testOperationNoCloning)
+{
+ OperationCaller<double(int)> caller( tc.provides()->getOperation("Ownop1"));
+ BOOST_REQUIRE(caller.ready());
+ caller.setCloning(false);
+
+ // First check a normal call:
+ double result = caller.call(3);
+ BOOST_CHECK_EQUAL(result, 2);
+ result = 0.0;
+ // Now check a normal send:
+ SendHandle<double(int)> sh = caller.send(4);
+ BOOST_CHECK( sh.ready() );
+ BOOST_CHECK_EQUAL( sh.collect(result), SendSuccess);
+ BOOST_CHECK_EQUAL( result, 2);
+
+ // Prepare for a send + send
+ tc.stop();
+ BOOST_REQUIRE( tc.setActivity(new SlaveActivity() ) );
+
+ // Do send #1 (succeeds)
+ result = 0.0;
+ sh = caller.send(5);
+ BOOST_CHECK( sh.ready() );
+ BOOST_CHECK_EQUAL( sh.collectIfDone(result), SendNotReady);
+ // Do send #2 (fails, no cloning)
+ SendHandle<double(int)> sh2;
+ try {
+ BOOST_CHECK_THROW( sh2 = caller.send(6) , SendStatus); // fails since not done.
+ } catch(...) {}
+ BOOST_CHECK( !sh2.ready() );
+ BOOST_CHECK_EQUAL( sh.collectIfDone(result), SendNotReady);
+ // process send #1
+ BOOST_CHECK( tc.update() );
+ BOOST_REQUIRE_EQUAL( sh.collect(result), SendSuccess);
+ BOOST_CHECK_EQUAL(sh.ret(), 2);
+ BOOST_CHECK_EQUAL(result, 2);
+
+ // Prepare for a send + call
+ result = 0.0;
+ sh = caller.send(7);
+ BOOST_CHECK( sh.ready() );
+ BOOST_CHECK_EQUAL( sh.collectIfDone(result), SendNotReady);
+ // Do call (fails, no cloning)
+ try {
+ BOOST_CHECK_THROW( caller.call(8) , SendStatus); // fails since not done.
+ } catch(...) {}
+ BOOST_CHECK_EQUAL( sh.collectIfDone(result), SendNotReady);
+ // process send #1
+ BOOST_CHECK( tc.update() );
+ BOOST_REQUIRE_EQUAL( sh.collect(result), SendSuccess);
+ // install default activity again:
+ tc.setActivity( new Activity() );
+ BOOST_CHECK_EQUAL( caller.call(9), 2.0);
+}
+
BOOST_AUTO_TEST_SUITE_END()