OrocosComponentLibrary
2.7.0
|
00001 /*************************************************************************** 00002 tag: Peter Soetens Mon May 10 19:10:38 CEST 2004 ReportingComponent.cxx 00003 00004 ReportingComponent.cxx - description 00005 ------------------- 00006 begin : Mon May 10 2004 00007 copyright : (C) 2004 Peter Soetens 00008 email : peter.soetens@mech.kuleuven.ac.be 00009 00010 *************************************************************************** 00011 * This library is free software; you can redistribute it and/or * 00012 * modify it under the terms of the GNU Lesser General Public * 00013 * License as published by the Free Software Foundation; either * 00014 * version 2.1 of the License, or (at your option) any later version. * 00015 * * 00016 * This library is distributed in the hope that it will be useful, * 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of * 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * 00019 * Lesser General Public License for more details. * 00020 * * 00021 * You should have received a copy of the GNU Lesser General Public * 00022 * License along with this library; if not, write to the Free Software * 00023 * Foundation, Inc., 59 Temple Place, * 00024 * Suite 330, Boston, MA 02111-1307 USA * 00025 * * 00026 ***************************************************************************/ 00027 00028 #include "ReportingComponent.hpp" 00029 #include <rtt/Logger.hpp> 00030 00031 // Impl. 00032 #include "EmptyMarshaller.hpp" 00033 #include <rtt/marsh/PropertyDemarshaller.hpp> 00034 #include <rtt/marsh/PropertyMarshaller.hpp> 00035 #include <iostream> 00036 #include <fstream> 00037 #include <exception> 00038 #include <boost/algorithm/string.hpp> 00039 00040 #include "ocl/Component.hpp" 00041 #include <rtt/types/PropertyDecomposition.hpp> 00042 #include <boost/lexical_cast.hpp> 00043 00044 ORO_CREATE_COMPONENT_TYPE() 00045 00046 00047 namespace OCL 00048 { 00049 using namespace std; 00050 using namespace RTT; 00051 using namespace RTT::detail; 00052 00055 class CheckSizeDataSource : public ValueDataSource<bool> 00056 { 00057 mutable int msize; 00058 DataSource<int>::shared_ptr mds; 00059 DataSource<bool>::shared_ptr mupstream; 00060 public: 00061 CheckSizeDataSource(int size, DataSource<int>::shared_ptr ds, DataSource<bool>::shared_ptr upstream) 00062 : msize(size), mds(ds), mupstream(upstream) 00063 {} 00067 bool get() const{ 00068 // it's very important to first check upstream, because 00069 // if upstream changed size, downstream might already be corrupt ! 00070 // (downstream will be corrupt upon capacity changes upstream) 00071 bool result = true; 00072 if (mupstream) 00073 result = (mupstream->get() && msize == mds->get()); 00074 else 00075 result = (msize == mds->get()); 00076 msize = mds->get(); 00077 return result; 00078 } 00079 }; 00080 00087 bool memberDecomposition( base::DataSourceBase::shared_ptr dsb, PropertyBag& targetbag, DataSource<bool>::shared_ptr& resized) 00088 { 00089 assert(dsb); 00090 00091 vector<string> parts = dsb->getMemberNames(); 00092 if ( parts.empty() ) { 00093 return false; 00094 } 00095 00096 targetbag.setType( dsb->getTypeName() ); 00097 00098 // needed for recursion. 00099 auto_ptr< Property<PropertyBag> > recurse_bag( new Property<PropertyBag>("recurse_bag","") ); 00100 // First at the explicitly listed parts: 00101 for(vector<string>::iterator it = parts.begin(); it != parts.end(); ++it ) { 00102 // we first force getMember to get to the type, then we do it again but with a reference set. 00103 DataSourceBase::shared_ptr part = dsb->getMember( *it ); 00104 if (!part) { 00105 log(Error) <<"memberDecomposition: Inconsistent type info for "<< dsb->getTypeName() << ": reported to have part '"<<*it<<"' but failed to return it."<<endlog(); 00106 continue; 00107 } 00108 if ( !part->isAssignable() ) { 00109 // For example: the case for size() and capacity() in SequenceTypeInfo 00110 log(Debug)<<"memberDecomposition: Part "<< *it << ":"<< part->getTypeName() << " is not changeable."<<endlog(); 00111 continue; 00112 } 00113 // now the reference magic: 00114 DataSourceBase::shared_ptr ref = part->getTypeInfo()->buildReference( 0 ); 00115 dsb->getTypeInfo()->getMember( dynamic_cast<Reference*>(ref.get() ), dsb, *it); // fills in ref 00116 // newpb will contain a reference to the port's datasource data ! 00117 PropertyBase* newpb = part->getTypeInfo()->buildProperty(*it,"Part", ref); 00118 if ( !newpb ) { 00119 log(Error)<< "Decomposition failed because Part '"<<*it<<"' is not known to type system."<<endlog(); 00120 continue; 00121 } 00122 // finally recurse or add it to the target bag: 00123 if ( !memberDecomposition( ref, recurse_bag->value(), resized) ) { 00124 assert( recurse_bag->value().empty() ); 00125 // finally: check for conversions (enums use this): 00126 base::DataSourceBase::shared_ptr converted = newpb->getTypeInfo()->convertType( dsb ); 00127 if ( converted && converted != dsb ) { 00128 // converted contains another type. 00129 targetbag.add( converted->getTypeInfo()->buildProperty(*it, "", converted) ); 00130 delete newpb; 00131 } else 00132 targetbag.ownProperty( newpb ); // leaf 00133 } else { 00134 recurse_bag->setName(*it); 00135 // setType() is done by recursive of self. 00136 targetbag.ownProperty( recurse_bag.release() ); //recursed. 00137 recurse_bag.reset( new Property<PropertyBag>("recurse_bag","") ); 00138 delete newpb; // since we recursed, the recurse_bag now 'embodies' newpb. 00139 } 00140 } 00141 00142 // Next get the numbered parts. This is much more involved since sequences may be resizable. 00143 // We keep track of the size, and if that changes, we will have to force a re-decomposition 00144 // of the sequence's internals. 00145 DataSource<int>::shared_ptr size = DataSource<int>::narrow( dsb->getMember("size").get() ); 00146 if (size) { 00147 int msize = size->get(); 00148 for (int i=0; i < msize; ++i) { 00149 string indx = boost::lexical_cast<string>( i ); 00150 DataSourceBase::shared_ptr item = dsb->getMember(indx); 00151 resized = new CheckSizeDataSource( msize, size, resized ); 00152 if (item) { 00153 if ( !item->isAssignable() ) { 00154 // For example: the case for size() and capacity() in SequenceTypeInfo 00155 log(Warning)<<"memberDecomposition: Item '"<< indx << "' of type "<< dsb->getTypeName() << " is not changeable."<<endlog(); 00156 continue; 00157 } 00158 // finally recurse or add it to the target bag: 00159 PropertyBase* newpb = item->getTypeInfo()->buildProperty( indx,"",item); 00160 if ( !memberDecomposition( item, recurse_bag->value(), resized) ) { 00161 targetbag.ownProperty( newpb ); // leaf 00162 } else { 00163 delete newpb; 00164 recurse_bag->setName( indx ); 00165 // setType() is done by recursive of self. 00166 targetbag.ownProperty( recurse_bag.release() ); //recursed. 00167 recurse_bag.reset( new Property<PropertyBag>("recurse_bag","") ); 00168 } 00169 } 00170 } 00171 } 00172 if (targetbag.empty() ) 00173 log(Debug) << "memberDecomposition: "<< dsb->getTypeName() << " returns an empty property bag." << endlog(); 00174 return true; 00175 } 00176 00177 ReportingComponent::ReportingComponent( std::string name /*= "Reporting" */ ) 00178 : TaskContext( name ), 00179 report("Report"), snapshotted(false), 00180 writeHeader("WriteHeader","Set to true to start each report with a header.", true), 00181 decompose("Decompose","Set to false in order to not decompose the port data. The marshaller must be able to handle this itself for this to work.", true), 00182 insnapshot("Snapshot","Set to true to enable snapshot mode. This will cause a non-periodic reporter to only report data upon the snapshot() operation.",false), 00183 synchronize_with_logging("Synchronize","Set to true if the timestamp should be synchronized with the logging",false), 00184 report_data("ReportData","A PropertyBag which defines which ports or components to report."), 00185 report_policy( ConnPolicy::data(ConnPolicy::LOCK_FREE,true,false) ), 00186 onlyNewData(false), 00187 starttime(0), 00188 timestamp("TimeStamp","The time at which the data was read.",0.0) 00189 { 00190 this->provides()->doc("Captures data on data ports. A periodic reporter will sample each added port according to its period, a non-periodic reporter will write out data as it comes in, or only during a snapshot() if the Snapshot property is true."); 00191 00192 this->properties()->addProperty( writeHeader ); 00193 this->properties()->addProperty( decompose ); 00194 this->properties()->addProperty( insnapshot ); 00195 this->properties()->addProperty( synchronize_with_logging); 00196 this->properties()->addProperty( report_data); 00197 this->properties()->addProperty( "ReportPolicy", report_policy).doc("The ConnPolicy for the reporter's port connections."); 00198 this->properties()->addProperty( "ReportOnlyNewData", onlyNewData).doc("Turn on in order to only write out NewData on ports and omit unchanged ports. Turn off in order to sample and write out all ports (even old data)."); 00199 // Add the methods, methods make sure that they are 00200 // executed in the context of the (non realtime) caller. 00201 00202 this->addOperation("snapshot", &ReportingComponent::snapshot , this, RTT::OwnThread).doc("Take a new shapshot of all data and cause them to be written out."); 00203 this->addOperation("screenComponent", &ReportingComponent::screenComponent , this, RTT::ClientThread).doc("Display the variables and ports of a Component.").arg("Component", "Name of the Component"); 00204 this->addOperation("reportComponent", &ReportingComponent::reportComponent , this, RTT::ClientThread).doc("Add a peer Component and report all its data ports").arg("Component", "Name of the Component"); 00205 this->addOperation("unreportComponent", &ReportingComponent::unreportComponent , this, RTT::ClientThread).doc("Remove all Component's data ports from reporting.").arg("Component", "Name of the Component"); 00206 this->addOperation("reportData", &ReportingComponent::reportData , this, RTT::ClientThread).doc("Add a Component's Property or attribute for reporting.").arg("Component", "Name of the Component").arg("Data", "Name of the Data to report. A property's or attribute's name."); 00207 this->addOperation("unreportData", &ReportingComponent::unreportData , this, RTT::ClientThread).doc("Remove a Data object from reporting.").arg("Component", "Name of the Component").arg("Data", "Name of the property or attribute."); 00208 this->addOperation("reportPort", &ReportingComponent::reportPort , this, RTT::ClientThread).doc("Add a Component's OutputPort for reporting.").arg("Component", "Name of the Component").arg("Port", "Name of the Port."); 00209 this->addOperation("unreportPort", &ReportingComponent::unreportPort , this, RTT::ClientThread).doc("Remove a Port from reporting.").arg("Component", "Name of the Component").arg("Port", "Name of the Port."); 00210 00211 } 00212 00213 ReportingComponent::~ReportingComponent() {} 00214 00215 00216 bool ReportingComponent::addMarshaller( marsh::MarshallInterface* headerM, marsh::MarshallInterface* bodyM) 00217 { 00218 boost::shared_ptr<marsh::MarshallInterface> header(headerM); 00219 boost::shared_ptr<marsh::MarshallInterface> body(bodyM); 00220 if ( !header && !body) 00221 return false; 00222 if ( !header ) 00223 header.reset( new EmptyMarshaller() ); 00224 if ( !body) 00225 body.reset( new EmptyMarshaller()); 00226 00227 marshallers.push_back( std::make_pair( header, body ) ); 00228 return true; 00229 } 00230 00231 bool ReportingComponent::removeMarshallers() 00232 { 00233 marshallers.clear(); 00234 return true; 00235 } 00236 00237 void ReportingComponent::cleanupHook() 00238 { 00239 root.clear(); // uses shared_ptr. 00240 deletePropertyBag( report ); 00241 } 00242 00243 bool ReportingComponent::configureHook() 00244 { 00245 Logger::In in("ReportingComponent"); 00246 00247 // we make a copy to be allowed to iterate over and exted report_data: 00248 PropertyBag bag = report_data.value(); 00249 00250 if ( bag.empty() ) { 00251 log(Error) <<"No port or component configuration loaded."<<endlog(); 00252 log(Error) <<"Please use marshalling.loadProperties(), reportComponent() (scripting) or LoadProperties (XML) in order to fill in ReportData." <<endlog(); 00253 return false; 00254 } 00255 00256 bool ok = true; 00257 PropertyBag::const_iterator it = bag.getProperties().begin(); 00258 while ( it != bag.getProperties().end() ) 00259 { 00260 Property<std::string>* compName = dynamic_cast<Property<std::string>* >( *it ); 00261 if ( !compName ) 00262 log(Error) << "Expected Property \"" 00263 << (*it)->getName() <<"\" to be of type string."<< endlog(); 00264 else if ( compName->getName() == "Component" ) { 00265 std::string name = compName->value(); // we will delete this property ! 00266 this->unreportComponent( name ); 00267 ok &= this->reportComponent( name ); 00268 } 00269 else if ( compName->getName() == "Port" ) { 00270 string cname = compName->value().substr(0, compName->value().find(".")); 00271 string pname = compName->value().substr( compName->value().find(".")+1, string::npos); 00272 if (cname.empty() || pname.empty() ) { 00273 log(Error) << "The Port value '"<<compName->getName()<< "' must at least consist of a component name followed by a dot and the port name." <<endlog(); 00274 ok = false; 00275 continue; 00276 } 00277 this->unreportPort(cname,pname); 00278 ok &= this->reportPort(cname, pname); 00279 } 00280 else if ( compName->getName() == "Data" ) { 00281 string cname = compName->value().substr(0, compName->value().find(".")); 00282 string pname = compName->value().substr( compName->value().find(".")+1, string::npos); 00283 if (cname.empty() || pname.empty() ) { 00284 log(Error) << "The Data value '"<<compName->getName()<< "' must at least consist of a component name followed by a dot and the property/attribute name." <<endlog(); 00285 ok = false; 00286 continue; 00287 } 00288 this->unreportData(cname,pname); 00289 ok &= this->reportData(cname, pname); 00290 } 00291 else { 00292 log(Error) << "Expected \"Component\", \"Port\" or \"Data\", got " 00293 << compName->getName() << endlog(); 00294 ok = false; 00295 } 00296 ++it; 00297 } 00298 return ok; 00299 } 00300 00301 bool ReportingComponent::screenComponent( const std::string& comp ) 00302 { 00303 Logger::In in("ReportingComponent::screenComponent"); 00304 log(Error) << "not implemented." <<comp<<endlog(); 00305 return false; 00306 } 00307 00308 bool ReportingComponent::screenImpl( const std::string& comp, std::ostream& output) 00309 { 00310 Logger::In in("ReportingComponent"); 00311 TaskContext* c = this->getPeer(comp); 00312 if ( c == 0) { 00313 log(Error) << "Unknown Component: " <<comp<<endlog(); 00314 return false; 00315 } 00316 output << "Screening Component '"<< comp << "' : "<< endl << endl; 00317 PropertyBag* bag = c->properties(); 00318 if (bag) { 00319 output << "Properties :" << endl; 00320 for (PropertyBag::iterator it= bag->begin(); it != bag->end(); ++it) 00321 output << " " << (*it)->getName() << " : " << (*it)->getDataSource() << endl; 00322 } 00323 ConfigurationInterface::AttributeNames atts = c->provides()->getAttributeNames(); 00324 if ( !atts.empty() ) { 00325 output << "Attributes :" << endl; 00326 for (ConfigurationInterface::AttributeNames::iterator it= atts.begin(); it != atts.end(); ++it) 00327 output << " " << *it << " : " << c->provides()->getValue(*it)->getDataSource() << endl; 00328 } 00329 00330 vector<string> ports = c->ports()->getPortNames(); 00331 if ( !ports.empty() ) { 00332 output << "Ports :" << endl; 00333 for (vector<string>::iterator it= ports.begin(); it != ports.end(); ++it) { 00334 output << " " << *it << " : "; 00335 if (c->ports()->getPort(*it)->connected() ) 00336 output << "(connected)" << endl; 00337 else 00338 output << "(not connected)" << endl; 00339 } 00340 } 00341 return true; 00342 } 00343 00344 bool ReportingComponent::reportComponent( const std::string& component ) { 00345 Logger::In in("ReportingComponent"); 00346 // Users may add own data sources, so avoid duplicates 00347 //std::vector<std::string> sources = comp->data()->getNames(); 00348 TaskContext* comp = this->getPeer(component); 00349 if ( !comp ) { 00350 log(Error) << "Could not report Component " << component <<" : no such peer."<<endlog(); 00351 return false; 00352 } 00353 if ( !report_data.value().findValue<string>(component) ) 00354 report_data.value().ownProperty( new Property<string>("Component","",component) ); 00355 Ports ports = comp->ports()->getPorts(); 00356 for (Ports::iterator it = ports.begin(); it != ports.end() ; ++it) { 00357 log(Debug) << "Checking port " << (*it)->getName()<<"."<<endlog(); 00358 this->reportPort( component, (*it)->getName() ); 00359 } 00360 return true; 00361 } 00362 00363 00364 bool ReportingComponent::unreportComponent( const std::string& component ) { 00365 TaskContext* comp = this->getPeer(component); 00366 if ( !comp ) { 00367 log(Error) << "Could not unreport Component " << component <<" : no such peer."<<endlog(); 00368 return false; 00369 } 00370 Ports ports = comp->ports()->getPorts(); 00371 for (Ports::iterator it = ports.begin(); it != ports.end() ; ++it) { 00372 this->unreportDataSource( component + "." + (*it)->getName() ); 00373 unreportPort(component, (*it)->getName() ); 00374 } 00375 base::PropertyBase* pb = report_data.value().findValue<string>(component); 00376 if (pb) 00377 report_data.value().removeProperty( pb );// pb is deleted by bag 00378 return true; 00379 } 00380 00381 // report a specific connection. 00382 bool ReportingComponent::reportPort(const std::string& component, const std::string& port ) { 00383 Logger::In in("ReportingComponent"); 00384 TaskContext* comp = this->getPeer(component); 00385 if ( this->ports()->getPort(component +"_"+port) ) { 00386 log(Warning) <<"Already reporting "<<component<<"."<<port<<": removing old port first."<<endlog(); 00387 this->unreportPort(component,port); 00388 } 00389 if ( !comp ) { 00390 log(Error) << "Could not report Component " << component <<" : no such peer."<<endlog(); 00391 return false; 00392 } 00393 std::vector<std::string> strs; 00394 boost::split(strs, port, boost::is_any_of(".")); 00395 00396 // strs could be empty because of a bug in Boost 1.44 (see https://svn.boost.org/trac/boost/ticket/4751) 00397 if (strs.empty()) return false; 00398 00399 Service::shared_ptr service=comp->provides(); 00400 while ( strs.size() != 1 && service) { 00401 service = service->getService( strs.front() ); 00402 if (service) 00403 strs.erase( strs.begin() ); 00404 } 00405 if (!service) { 00406 log(Error) <<"No such service: '"<< strs.front() <<"' while looking for port '"<< port<<"'"<<endlog(); 00407 return 0; 00408 } 00409 base::PortInterface* porti = 0; 00410 porti = service->getPort(strs.front()); 00411 if ( !porti ) { 00412 log(Error) << "Could not report Port " << port 00413 <<" : no such port on Component "<<component<<"."<<endlog(); 00414 return false; 00415 } 00416 00417 base::InputPortInterface* ipi = dynamic_cast<base::InputPortInterface*>(porti); 00418 if (ipi) { 00419 log(Error) << "Can not report InputPort "<< porti->getName() <<" of Component " << component <<endlog(); 00420 return false; 00421 } 00422 // create new port temporarily 00423 // this port is only created with the purpose of 00424 // creating a connection object. 00425 base::PortInterface* ourport = porti->antiClone(); 00426 assert(ourport); 00427 ourport->setName(component + "_" + port); 00428 ipi = dynamic_cast<base::InputPortInterface*> (ourport); 00429 assert(ipi); 00430 00431 if (report_policy.type == ConnPolicy::DATA ) { 00432 log(Info) << "Not buffering of data flow connections. You may miss samples." <<endlog(); 00433 } else { 00434 log(Info) << "Buffering ports with size "<< report_policy.size << ", as set in ReportPolicy property." <<endlog(); 00435 } 00436 00437 this->ports()->addEventPort( *ipi ); 00438 if (porti->connectTo(ourport, report_policy ) == false) 00439 { 00440 log(Error) << "Could not connect to OutputPort " << porti->getName() << endlog(); 00441 this->ports()->removePort(ourport->getName()); 00442 delete ourport; // XXX/TODO We're leaking ourport ! 00443 return false; 00444 } 00445 00446 if (this->reportDataSource(component + "." + port, "Port", 00447 ipi->getDataSource(),ipi, true) == false) 00448 { 00449 log(Error) << "Failed reporting port " << port << endlog(); 00450 this->ports()->removePort(ourport->getName()); 00451 delete ourport; 00452 return false; 00453 } 00454 00455 log(Info) << "Monitoring OutputPort " << port << " : ok." << endlog(); 00456 // Add port to ReportData properties if component nor port are listed yet. 00457 if ( !report_data.value().findValue<string>(component) && !report_data.value().findValue<string>( component+"."+port) ) 00458 report_data.value().ownProperty(new Property<string>("Port","",component+"."+port)); 00459 return true; 00460 } 00461 00462 bool ReportingComponent::unreportPort(const std::string& component, const std::string& port ) { 00463 base::PortInterface* ourport = this->ports()->getPort(component + "_" + port); 00464 if ( this->unreportDataSource( component + "." + port ) && report_data.value().removeProperty( report_data.value().findValue<string>(component+"."+port))) { 00465 this->ports()->removePort(ourport->getName()); 00466 delete ourport; // also deletes datasource. 00467 return true; 00468 } 00469 return false; 00470 } 00471 00472 // report a specific datasource, property,... 00473 bool ReportingComponent::reportData(const std::string& component,const std::string& dataname) 00474 { 00475 Logger::In in("ReportingComponent"); 00476 TaskContext* comp = this->getPeer(component); 00477 if ( !comp ) { 00478 log(Error) << "Could not report Component " << component <<" : no such peer."<<endlog(); 00479 return false; 00480 } 00481 // Is it an attribute ? 00482 if ( comp->provides()->getValue( dataname ) ) { 00483 if (this->reportDataSource( component + "." + dataname, "Data", 00484 comp->provides()->getValue( dataname )->getDataSource(), 0, false ) == false) { 00485 log(Error) << "Failed reporting data " << dataname <<endlog(); 00486 return false; 00487 } 00488 } 00489 00490 // Is it a property ? 00491 if ( comp->properties() && comp->properties()->find( dataname ) ) { 00492 if (this->reportDataSource( component + "." + dataname, "Data", 00493 comp->properties()->find( dataname )->getDataSource(), 0, false ) == false) { 00494 log(Error) << "Failed reporting data " << dataname <<endlog(); 00495 return false; 00496 } 00497 } 00498 // Ok. we passed. 00499 // Add port to ReportData properties if data not listed yet. 00500 if ( !report_data.value().findValue<string>( component+"."+dataname) ) 00501 report_data.value().ownProperty(new Property<string>("Data","",component+"."+dataname)); 00502 return true; 00503 } 00504 00505 bool ReportingComponent::unreportData(const std::string& component,const std::string& datasource) { 00506 return this->unreportDataSource( component +"." + datasource) && report_data.value().removeProperty( report_data.value().findValue<string>(component+"."+datasource)); 00507 } 00508 00509 bool ReportingComponent::reportDataSource(std::string tag, std::string type, base::DataSourceBase::shared_ptr orig, base::InputPortInterface* ipi, bool track) 00510 { 00511 // check for duplicates: 00512 for (Reports::iterator it = root.begin(); 00513 it != root.end(); ++it) 00514 if ( it->get<T_QualName>() == tag ) { 00515 return true; 00516 } 00517 00518 // creates a copy of the data and an update command to 00519 // update the copy from the original. 00520 base::DataSourceBase::shared_ptr clone = orig->getTypeInfo()->buildValue(); 00521 if ( !clone ) { 00522 log(Error) << "Could not report '"<< tag <<"' : unknown type." << endlog(); 00523 return false; 00524 } 00525 PropertyBase* prop = 0; 00526 root.push_back( boost::make_tuple( tag, orig, type, prop, ipi, false, track ) ); 00527 return true; 00528 } 00529 00530 bool ReportingComponent::unreportDataSource(std::string tag) 00531 { 00532 for (Reports::iterator it = root.begin(); 00533 it != root.end(); ++it) 00534 if ( it->get<T_QualName>() == tag ) { 00535 root.erase(it); 00536 return true; 00537 } 00538 return false; 00539 } 00540 00541 bool ReportingComponent::startHook() { 00542 Logger::In in("ReportingComponent"); 00543 if (marshallers.begin() == marshallers.end()) { 00544 log(Error) << "Need at least one marshaller to write reports." <<endlog(); 00545 return false; 00546 } 00547 00548 if(synchronize_with_logging.get()) 00549 starttime = Logger::Instance()->getReferenceTime(); 00550 else 00551 starttime = os::TimeService::Instance()->getTicks(); 00552 00553 // Get initial data samples 00554 this->copydata(); 00555 this->makeReport2(); 00556 00557 // write headers 00558 if (writeHeader.get()) { 00559 // call all header marshallers. 00560 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) { 00561 it->first->serialize( report ); 00562 it->first->flush(); 00563 } 00564 } 00565 00566 // write initial values with all value marshallers (uses the forcing above) 00567 if ( getActivity()->isPeriodic() ) { 00568 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) { 00569 it->second->serialize( report ); 00570 it->second->flush(); 00571 } 00572 } 00573 00574 // Turn off port triggering in snapshot mode, and vice versa. 00575 // Also clears any old data in the buffers 00576 for(Reports::iterator it = root.begin(); it != root.end(); ++it ) 00577 if ( it->get<T_Port>() ) { 00578 #ifndef ORO_SIGNALLING_PORTS 00579 it->get<T_Port>()->signalInterface( !insnapshot.get() ); 00580 #endif 00581 it->get<T_Port>()->clear(); 00582 } 00583 00584 00585 snapshotted = false; 00586 return true; 00587 } 00588 00589 void ReportingComponent::snapshot() { 00590 // this function always copies and reports all data It's run in ownthread, so updateHook will be run later. 00591 if ( getActivity()->isPeriodic() ) 00592 return; 00593 snapshotted = true; 00594 updateHook(); 00595 } 00596 00597 bool ReportingComponent::copydata() { 00598 timestamp = os::TimeService::Instance()->secondsSince( starttime ); 00599 00600 // result will become true if more data is to be read. 00601 bool result = false; 00602 // This evaluates the InputPortDataSource evaluate() returns true upon new data. 00603 for(Reports::iterator it = root.begin(); it != root.end(); ++it ) { 00604 it->get<T_NewData>() = (it->get<T_PortDS>())->evaluate(); // stores 'NewData' flag. 00605 // if its a property/attr, get<T_NewData> will always be true, so we override (clear) with get<T_Tracked>. 00606 result = result || ( it->get<T_NewData>() && it->get<T_Tracked>() ); 00607 } 00608 return result; 00609 } 00610 00611 void ReportingComponent::makeReport2() 00612 { 00613 // Uses the port DS itself to make the report. 00614 assert( report.empty() ); 00615 // For the timestamp, we need to add a new property object: 00616 report.add( timestamp.getTypeInfo()->buildProperty( timestamp.getName(), "", timestamp.getDataSource() ) ); 00617 DataSource<bool>::shared_ptr checker; 00618 for(Reports::iterator it = root.begin(); it != root.end(); ++it ) { 00619 Property<PropertyBag>* subbag = new Property<PropertyBag>( it->get<T_QualName>(), ""); 00620 if ( decompose.get() && memberDecomposition( it->get<T_PortDS>(), subbag->value(), checker ) ) { 00621 report.add( subbag ); 00622 it->get<T_Property>() = subbag; 00623 } else { 00624 // property or simple value port... 00625 base::DataSourceBase::shared_ptr converted = it->get<T_PortDS>()->getTypeInfo()->convertType( it->get<T_PortDS>() ); 00626 if ( converted && converted != it->get<T_PortDS>() ) { 00627 // converted contains another type. 00628 PropertyBase* convProp = converted->getTypeInfo()->buildProperty(it->get<T_QualName>(), "", converted); 00629 it->get<T_Property>() = convProp; 00630 report.add(convProp); 00631 } else { 00632 PropertyBase* origProp = it->get<T_PortDS>()->getTypeInfo()->buildProperty(it->get<T_QualName>(), "", it->get<T_PortDS>()); 00633 it->get<T_Property>() = origProp; 00634 report.add(origProp); 00635 } 00636 delete subbag; 00637 } 00638 00639 } 00640 mchecker = checker; 00641 } 00642 00643 void ReportingComponent::cleanReport() 00644 { 00645 // Only clones were added to result, so delete them. 00646 deletePropertyBag( report ); 00647 } 00648 00649 void ReportingComponent::updateHook() { 00650 //If not periodic and insnapshot is true, only continue if snapshot is called. 00651 if( !getActivity()->isPeriodic() && insnapshot.get() && !snapshotted) 00652 return; 00653 else 00654 snapshotted = false; 00655 00656 // if any data sequence got resized, we rebuild the whole bunch. 00657 // otherwise, we need to track every individual array (not impossible though, but still needs an upstream concept). 00658 if ( mchecker && mchecker->get() == false ) { 00659 cleanReport(); 00660 makeReport2(); 00661 } else 00662 copydata(); 00663 00664 do { 00665 // Step 3: print out the result 00666 // write out to all marshallers 00667 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) { 00668 if ( onlyNewData ) { 00669 // Serialize only changed ports: 00670 it->second->serialize( *report.begin() ); // TimeStamp. 00671 for (Reports::const_iterator i = root.begin(); 00672 i != root.end(); 00673 i++ ) 00674 { 00675 if ( i->get<T_NewData>() ) 00676 it->second->serialize( i->get<T_Property>() ); 00677 } 00678 } else { 00679 // pass on all ports to the marshaller 00680 it->second->serialize( report ); 00681 } 00682 it->second->flush(); 00683 } 00684 } while( !getActivity()->isPeriodic() && !insnapshot.get() && copydata() ); // repeat if necessary. In periodic mode we always only sample once. 00685 } 00686 00687 void ReportingComponent::stopHook() { 00688 // tell body marshallers that serialization is done. 00689 for(Marshallers::iterator it=marshallers.begin(); it != marshallers.end(); ++it) { 00690 it->second->flush(); 00691 } 00692 cleanReport(); 00693 } 00694 00695 }