Data aggregation / time average

The "Software Development Kit", the C++ API, code skeletons and macros

Data aggregation / time average

Postby shtrom » Wed Feb 24, 2010 5:48 am

Hello,

I needed to write a component which would output the average of its intput with a fixed sampling period. As the computation of the average is independent of the rate of incoming data, I used a thread to do the averaging and output.

Following is a simplified example snippet working with my user defined type PCapStream.

Code: Select all
void MAPSPCapStats::Birth()
{
        packetSum = 0;
        pSamplingRate = GetFloatProperty("SamplingRate");
        if (pSamplingRate <= 0) {
                ReportError("SamplingRate must be positive");
                CommitSuicide();
        }
        CreateThread((MAPSThreadFunction) &MAPSPCapStats::AggregateLoop);
}

void MAPSPCapStats::Core()
{
        MAPSIOElt *iElt;

        iElt = StartReading(Input("iPCapStream"));
        if (iElt==NULL)
                return;
        PCapStream &iPkt=*static_cast<PCapStream *>(iElt->Data());

        mutex.Lock();
        packetSum++;
        mutex.Release();
}

void MAPSPCapStats::AggregateLoop()
{
        MAPSIOElt *oElt;
        float period = 1000000. / pSamplingRate;
        MAPSFloat avgPPS = 0;

        while(!IsDying()) {
                mutex.Lock();
                avgPPS = pSamplingRate * packetSum;
                packetSum = 0;
                mutex.Release();

                oElt = StartWriting(Output("oPackets"));
                if (oElt == NULL) {
                        ReportError("Can't allocate I/O element for writing");
                } else {
                        oElt->Float() = avgPPS;
                        StopWriting(oElt);
                }
                Rest(period);
        }
}   


Is this a correct approach? Is there a better one?
shtrom
 
Posts: 2
Joined: Wed Feb 24, 2010 4:17 am
Location: Sydney

Re: Data aggregation / time average

Postby support@intempora.com » Wed Mar 03, 2010 2:04 pm

Hello,

It seems nice indeed.

A few "details" for information:

In Birth:
Instead of
Code: Select all
ReportError("SamplingRate must be positive");
CommitSuicide();

you can use the
Code: Select all
Error("SamplingRate must be positive");
function.
Error instead of ReportError will just 1 - report the error, 2 - throw an exception which will be caught by the RTMaps engine. It will shutdown the component and call Death() automatically. The result is the same as what you did but with 1 line only.


In the AggregateLoop() thread:
The Rest function is ok, but if you want to be very accurate on your sampling period, it is better to use the Wait function on a date (an appointment which would be incremented at each loop):
Declare a member :
Code: Select all
MAPSTimetamp m_appointment;

Initialize it in Birth:
Code: Select all
m_appointment = MAPS::CurrentTime();


then in AggregateLoop():
Code: Select all
...
while (!IsDying()) {
    ...
    m_appointment += period;
    Wait(m_appointment);
}

This way you are pausing until some date instead of pausing for some duration. The loop period is then the specified "period" and not the "period" + the time for computing the code in the while loop.
Just a detail...

Best regards,
support@intempora.com
Site Admin
 
Posts: 108
Joined: Wed Jan 17, 2007 2:07 pm


Return to RTMaps SDK

cron