Simple TCP client using non-periodic component

Rationale

Problem: You want a component that connects to a remote TCP server, and reads data from it (this example could easily write, instead of reading). This component will block for varying amounts of time when reading.

Solution: Use a non-periodic component. This example outlines one method to structure the component, to deal with the non-blocking reads while still being responsive to other components, being able to run a state machine, etc.

<!-- break -->

Assumptions

  • Uses Qt sockets to avoid operating-system intracacies and differences when using actual sockets. The code can easily be modified to use bind(), accept(), listen(), etc. instead. It is the structure of the solution that we are interested in.
  • The build directory is within the source directory. This helps with dynamic library loading.
  • Does not attempt reconnection if unable connect on first attempt.
  • Non-robust error handling.
  • Does not validate property values (a robust component would validate that the timeouts were valid, eg. not negative, within a configureHook()).

Files

SimpleNonPeriodicClient.cpp

SimpleNonPeriodicClient.hpp

SimpleNonPeriodicClient.xml

SimpleNonPeriodicClient.cpf

Buildable tarball

The .cpf file has a .txt extension simply to keep the wiki happy. To use the file, rename it to SimpleNonPeriodicClient.cpf.

Component definition

This is the class definition

class SimpleNonPeriodicClient : public RTT::TaskContext
{
protected:
    // DATA INTERFACE
 
    // *** OUTPUTS ***
 
    /// the last read data
    RTT::WriteDataPort<std::string>            lastRead_port;
 
    /// the number of items sucessfully read
    RTT::Attribute<int>                        countRead_attr;
 
    // *** CONFIGURATION ***
 
    // name to listen for incoming connections on, either FQDN or IPv4 addres
    RTT::Property<std::string>                hostName_prop;
    // port to listen on
    RTT::Property<int>                        hostPort_prop;
    // timeout in seconds, when waiting for connection
    RTT::Property<int>                        connectionTimeout_prop;
    // timeout in seconds, when waiting to read
    RTT::Property<int>                        readTimeout_prop;
 
public:
    SimpleNonPeriodicClient(std::string name);
    virtual ~SimpleNonPeriodicClient();
 
protected:
    /// reset count and lastRead, attempt to connect to remote
    virtual bool startHook();
    /// attempt to read and process one packet
    virtual void updateHook();
    /// close the socket and cleanup
    virtual void stopHook();
    /// cause updateHook() to return
    virtual bool breakUpdateHook();
 
    /// Socket used to connect to remote host
    QTcpSocket*    socket;
    /// Flag indicating to updateHook() that we want to quit
    bool        quit;
};

The component has a series of properties specifying the remote host and port to connect to, as well as timeout parameters. It also uses an RTT Attribute to count the number of successful reads that have occurred, and stores the last read data as a string in a RTT data port.

Component implementation

#include "SimpleNonPeriodicClient.hpp"
#include <rtt/Logger.hpp>
#include <ocl/ComponentLoader.hpp>
 
#include <QTcpSocket>

The class definition is included as well as the RTT logger, and importantly, the OCL component loader that turns this class into a deployable componet in a shared library.

Most importantly, all Qt related headers come after all Orocos headers. This is required as Qt redefines certain words (eg "slot", "emit") which when used in our or Orocos code cause compilation errors.

SimpleNonPeriodicClient::SimpleNonPeriodicClient(std::string name) :
        RTT::TaskContext(name),
        lastRead_port("lastRead", ""),
        countRead_attr("countRead", 0),
        hostName_prop("HostName", 
                      "Name to listen for incoming connections on (FQDN or IPv4)", ""),
        hostPort_prop("HostPort", 
                      "Port to listen on (1024-65535 inclusive)", 0),
        connectionTimeout_prop("ConnectionTimeout", 
                               "Timeout in seconds, when waiting for connection", 0),
        readTimeout_prop("ReadTimeout", 
                         "Timeout in seconds, when waiting for read", 0),
        socket(new QTcpSocket), 
        quit(false)
{
    ports()->addPort(&lastRead_port);
 
    attributes()->addAttribute(&countRead_attr);
 
    properties()->addProperty(&hostName_prop);
    properties()->addProperty(&hostPort_prop);
    properties()->addProperty(&connectionTimeout_prop);
    properties()->addProperty(&readTimeout_prop);
}

The constuctor simply sets up the data interface elements (ie the port, attribute and properties), and gives them appropriate initial values. Note that some of these initial values are illegal, which would aid in any validation code in a configureHook() (which has not been done in this example).

SimpleNonPeriodicClient::~SimpleNonPeriodicClient()
{
    delete socket;
}

The destructor cleans up by deleting the socket we allocated in the constructor.

Now to the meat of it

bool SimpleNonPeriodicClient::startHook()
{
    bool        rc                    = false;        // prove otherwise
    std::string    hostName            = hostName_prop.rvalue();
    int            hostPort            = hostPort_prop.rvalue();
    int         connectionTimeout    = connectionTimeout_prop.rvalue();
 
    quit = false;
 
    // attempt to connect to remote host/port
    log(Info) << "Connecting to " << hostName << ":" << hostPort << endlog();
    socket->connectToHost(hostName.c_str(), hostPort);
    if (socket->waitForConnected(1000 * connectionTimeout))    // to millseconds
    {
        log(Info) << "Connected" << endlog();
        rc = true;
    }
    else
    {    
        log(Error) << "Error connecting: " << socket->error() << ", " 
                   << socket->errorString().toStdString() << endlog();
        // as we now return false, this component will fail to start.
    }
 
    return rc;
}
The startHook() uses the properites loaded from the SimpleNonPeriodicClient.cpf file, to attempt to connect to the remote host. If the remote port is not ready, the attempted connection will timeout.

If the connection does not occur successfully, then startHook() will return false which prevents the component from actually being started. No reconnection is attempted (see Assumptions above)

void SimpleNonPeriodicClient::updateHook()
{
    // wait for some data to arrive, timing out if necessary
    int     readTimeout        = readTimeout_prop.rvalue();
    log(Debug) << "Waiting for data with timeout=" << readTimeout << " seconds" << endlog();
    if (!socket->waitForReadyRead(1000 * readTimeout))
    {
        log(Error) << "Error waiting for data: " << socket->error() << ", " 
                   << socket->errorString().toStdString() 
                   << ". Num bytes = " 
                   << socket->bytesAvailable() << endlog();
        log(Error) << "Disconnecting" << endlog();
        // disconnect socket, and do NOT call this function again
        // ie no engine()->getActivity()->trigger()
        socket->disconnectFromHost();
        return;        
    }
 
    // read and print whatever data is available, but stop if instructed
    // to quit
    while (!quit && (0 < socket->bytesAvailable()))
    {
#define    BUFSIZE        10
        char            str[BUFSIZE + 1];    // +1 for terminator
        qint64            numRead;
 
        numRead = socket->read((char*)&str[0], 
                               min(BUFSIZE, socket->bytesAvailable()));
        str[BUFSIZE] = '\0';        // forcibly terminate
        if (0 < numRead)
        {
            log(Info) << "Got " << numRead << " bytes : '" << &str[0] << "'" << endlog();
            countRead_attr.set(countRead_attr.get() + 1);
            lastRead_port.Set(&str[0]);
        }
    }
 
    // if not quitting then trigger another immediate call to this function, to
    // get the next batch of data
    if (!quit)
    {
        engine()->getActivity()->trigger();
    }
}

The updateHook() function attempts to wait until data is available, and then reads the data BUFSIZE characters at a time. If it times out waiting for data, then it errors out and disconnects the port. This is not a robust approach and a real algorithm would deal with this differently.

As data may be continually arriving and/or we get more than BUFSIZE characters at a time, the while loop may iterate several times. The quit flag will indicate if the user wants to stop the component, and that we should stop reading characters.

Of particular note is the last line

engine()->getActivity()->trigger();
This causes updateHook() to be called again immediately by the execution engine. Essentially, this makes the non-periodic component act as a periodic component with a varying period. Of course, this is not called if the component is being stopped (ie quit==true).

void SimpleNonPeriodicClient::stopHook()
{
    if (socket->isValid() &&
        (QAbstractSocket::ConnectedState == socket->state()))
    {
        log(Info) << "Disconnecting" << endlog();
        socket->disconnectFromHost();
    }
}
The stopHook() simply disconnects the socket if it is currently connected.

bool SimpleNonPeriodicClient::breakUpdateHook()
{
    quit = true;
    return true;
}
The breakUpdateHook() is very important, as it is the only way to inform a blocked updateHook() that it is time to return and quit. In this example we set the quit flag and return true. The quit flag will be picked up by updateHook() when it finishes waiting for data (in socket->waitForReadyRead()). Returning true from breakUpdateHook() tells the execution engine that we successfully told updateHook() to return and that it should wait (one second, hardcoded) for updateHook() to complete and return. If we returned false, then stop would also return false.

We could have also done something like socket->abort() to forcibly terminate any blocked socket->waitForReadyRead() calls.

When using system calls (e.g. read() ) instead of Qt classes you could attempt to send a signal to interrupt the system call, however, this might not have the desired effect when the component is deployed ... the reader is advised to be careful here.

ORO_CREATE_COMPONENT(SimpleNonPeriodicClient)
This line of code creates a deployable component for the SimpleNonPeriodicClient) class, that the deployer can load from a shared library.

To build

In a shell

cd /path/to/SimpleNonPeriodicClient
mkdir build
cd build
cmake .. -DOROCOS_TARGET=macosx
make

For other operating systems substitute the appopriate value for "macosx" when setting OROCOS_TARGET (e.g. "gnulinux").

Tested in Mac OS X Leopard 10.5.7, and Ubuntu Jaunty Linux.

To run

Start one shell and run netcat to act as the server (NB 50001 is the HostPort value from your SimpleNonPeriodicClient.cpf file)

nc -l 50001

Start a second shell and deploy the SimpleNonPeriodicClient component

cd /path/to/SimpleNonPeriodicClient/build
deployer-macosx -s ../SimpleNonPeriodicClient.xml

Now type in the first shell and when you hit enter, then netcat will send the data and it will be printed by the SimpleNonPeriodicClient component (where N is the size of the buffer in updateHook()).

Points to note:

  1. The SimpleNonPeriodicClient component will time out if you do not hit enter within ReadTimeout seconds (as specified in the SimpleNonPeriodicClient.cpf file).
  2. Setting the ORO_LOGLEVEL environment variable to 5 or 6, or running the deployer with -linfo or -ldebugoptions, will generate additional debugging statements.
  3. The component will take up to ReadTimeout seconds to respond to the user typing quit in the deployer, as breakUpdateHook() does not forcibly exit the socket->waitForReadyRead() call.

AttachmentSize
SimpleNonPeriodicClient.cpp7.42 KB
SimpleNonPeriodicClient.hpp3.11 KB
SimpleNonPeriodicClient.xml1 KB
SimpleNonPeriodicClient-cpf.txt748 bytes
SimpleNonPeriodicClient.tar_.bz27.72 KB

Sample output

Sample output

The netcat shell, with the text the user typed in.

nc -l 50001 
The quick brown fox jumps
over the lazy dog. 

The deployer shell, showing the text read in chunks, as well as the updated port and attribute within the component.

deployer-macosx -s ../SimpleNonPeriodicClient.xml -linfo
0.009 [ Info   ][deployer-macosx::main()] No plugins present in /usr/lib/rtt/macosx/plugins
0.009 [ Info   ][DeploymentComponent::loadComponents] Loading '../SimpleNonPeriodicClient.xml'.
0.010 [ Info   ][DeploymentComponent::loadComponents] Validating new configuration...
0.011 [ Info   ][DeploymentComponent::loadLibrary] Storing orocos-rtt
0.011 [ Info   ][DeploymentComponent::loadLibrary] Loaded shared library 'liborocos-rtt-macosx.dylib'
0.054 [ Info   ][DeploymentComponent::loadLibrary] Loaded multi component library 'libSimpleNonPeriodicClient.dylib'
0.054 [ Warning][DeploymentComponent::loadLibrary] Component type name SimpleNonPeriodicClient already used: overriding.
0.054 [ Info   ][DeploymentComponent::loadLibrary] Loaded component type 'SimpleNonPeriodicClient'
0.055 [ Info   ][DeploymentComponent::loadLibrary] Storing SimpleNonPeriodicClient
0.058 [ Info   ][DeploymentComponent::loadComponent] Adding SimpleNonPeriodicClient as new peer:  OK.
0.058 [ Warning][SingleThread] Forcing priority (0) of thread to 0.
0.058 [ Info   ][NonPeriodicActivity] SingleThread created with priority 0 and period 0.
0.058 [ Info   ][NonPeriodicActivity] Scheduler type was set to `4'.
0.059 [ Info   ][PropertyLoader:configure] Configuring TaskContext 'SimpleNonPeriodicClient' with '../SimpleNonPeriodicClient.cpf'.
0.059 [ Info   ][DeploymentComponent::configureComponents] Configured Properties of SimpleNonPeriodicClient from ../SimpleNonPeriodicClient.cpf
0.059 [ Info   ][DeploymentComponent::configureComponents] Re-setting activity of SimpleNonPeriodicClient
0.059 [ Info   ][DeploymentComponent::configureComponents] Configuration successful.
0.060 [ Info   ][DeploymentComponent::startComponents] Connecting to 127.0.0.1:50001
0.064 [ Info   ][DeploymentComponent::startComponents] Connected
0.065 [ Info   ][DeploymentComponent::startComponents] Startup successful.
0.065 [ Info   ][deployer-macosx::main()] Successfully loaded, configured and started components from ../SimpleNonPeriodicClient.xml
   Switched to : Deployer
0.066 [ Info   ][SimpleNonPeriodicClient] Entering Task Deployer
 
  This console reader allows you to browse and manipulate TaskContexts.
  You can type in a command, event, method, expression or change variables.
  (type 'help' for instructions)
    TAB completion and HISTORY is available ('bash' like)
 
 In Task Deployer[S]. (Status of last Command : none )
 (type 'ls' for context info) :4.816 [ Info   ][SimpleNonPeriodicClient] Got 10 bytes : 'The quick '
4.816 [ Info   ][SimpleNonPeriodicClient] Got 10 bytes : 'brown fox '
7.448 [ Info   ][SimpleNonPeriodicClient] Got 10 bytes : 'jumps
over'
7.448 [ Info   ][SimpleNonPeriodicClient] Got 10 bytes : ' the lazy '
12.448 [ ERROR  ][SimpleNonPeriodicClient] Error waiting for data: 5, Network operation timed out. Num bytes = 5
12.448 [ ERROR  ][SimpleNonPeriodicClient] Disconnecting
 
 
 In Task Deployer[S]. (Status of last Command : none )
 (type 'ls' for context info) :ls SimpleNonPeriodicClient
 
 Listing TaskContext SimpleNonPeriodicClient :
 
 Configuration Properties: 
     string HostName       = 127.0.0.1            (Name to listen for incoming connections on (FQDN or IPv4))
        int HostPort       = 50001                (Port to listen on (1024-65535 inclusive))
        int ConnectionTimeout = 5                    (Timeout in seconds, when waiting for connection)
        int ReadTimeout    = 5                    (Timeout in seconds, when waiting for read)
 
 Execution Interface:
  Attributes   : 
        int countRead      = 4                   
 
  Methods      : activate cleanup configure error getErrorCount getPeriod getWarningCount inFatalError inRunTimeError inRunTimeWarning isActive isConfigured isRunning resetError start stop trigger update warning 
  Commands     : (none)
  Events       : (none)
 
 Data Flow Ports: 
  W(U)      string lastRead       =  the lazy 
 
 Task Objects: 
  this           ( The interface of this TaskContext. ) 
  scripting      ( Access to the Scripting interface. Use this object in order to load or query programs or state machines. ) 
  engine         ( Access to the Execution Engine. Use this object in order to address programs or state machines which may or may not be loaded. ) 
  marshalling    ( Read and write Properties to a file. ) 
  lastRead       ( (No description set for this Port) ) 
 
 Peers        : (none)
 
 In Task Deployer[S]. (Status of last Command : none )
 (type 'ls' for context info) :quit
 
18.089 [ Info   ][DeploymentComponent::stopComponents] Stopped SimpleNonPeriodicClient
18.089 [ Info   ][DeploymentComponent::cleanupComponents] Cleaned up SimpleNonPeriodicClient
18.090 [ Info   ][DeploymentComponent::startComponents] Disconnected and destroyed SimpleNonPeriodicClient
18.090 [ Info   ][DeploymentComponent::startComponents] Kick-out successful.
18.091 [ Info   ][Logger] Orocos Logging Deactivated.