Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

Better and general answer is out of OpenCV scope and would requires many pages to investigate different scenario and intra thread events and communication.

Short answer Use a thread safe queue for your logical data.

If you want to use simple implementation:

  • Think in term of application then find the best compromise between safety, performance and code clarity.
  • Which is your logical data ? Create a class for your logical data. If the class has cv::Mats or object pointer member it should have a copy constructor (and =operator) overload.
  • Use a thread safe queue for this class. You can find a lot of example of general purpose thread safe circular buffer.

below is mine:

class MyData implements your logical data. class VerySimpleThreadSafeFIFOBuffer as name says is a generic thread safe FIFO. Follow test functions for how to use.

Compare GetDataMemoryCount() and GetMatMemoryCount() with GetItemCount() to see that memory recycling is effective. One test:

Queued Item:496
Queue Max Size: 15
Unique Data Allocation: 32
Unique Mat Allocation: 28

This is an example. It should work but things could be done better and with many improvements. BTW it could be useful in many case

#include <queue>
#include <thread>
#include <mutex>

class MyData
{
public:
    //we use a struct to have simple copy without forget some members
    typedef struct MyDataFlatMembers {
        int64 timestamp = 0;
        int64 frameNum = 0;
        //...
    };

    MyDataFlatMembers vars;
    Mat ir, depth, bgr;

    MyData() { }

    /** @brief class destructor.(USED BY queue::Pop)
     delete object pointer here if you have
     */
    ~MyData() { }

    /** @brief overload for the copy constructor (USED BY buffer::Push)*/
    MyData(const MyData& src) {
        Clone(src);//call clone Mat
    }

    /** @brief Assignment (=) Operator overloading (USED BY from buffer::Pop)
    Because buffer::Pop calls queue:pop who calls MyData destructor we will lost all members
    pointers.
    We are safe with cv::Mat, clone is not needed because of internal cv::Mat memory counter.
    @warning  This operator is not needed ONLY IF object members pointers are only cv::Mat
    in case we have some member pointer object we need to clone them
    */
    MyData& operator=(const MyData&src)
    {
        if (this == &src) return *this;
        Copy(src);
        return *this;
    }

    // this is just to collect stats
    unsigned char* GetMatPointer(){ return ir.data; }

private:
    /** @brief Copy all members (for Mat member =operator will be used)
    USED BY =operator
    */
    void Copy(const MyData& src)
    {
        vars = src.vars; //Copy all flat members using struct copy
        ir = src.ir;
        depth = src.depth;
        bgr = src.bgr;
    }

    /** @brief Copy all members (for Mat clone will be used)
    USED BY copy constructor
    */
    void Clone(const MyData& src)
    {
        vars = src.vars;//Copy all flat members using struct copy
        src.ir.copyTo(ir);
        src.depth.copyTo(depth);
        src.bgr.copyTo(bgr);
    }
};

/** @brief a Very Simple ThreadSafe FIFO Buffer
std::queue with cv::Mat is like a circular buffer.
Even if you store N frames and create N x Mat in the queue
only few blocks will be really allocated thanks to std::queue and Mat memory recycling
compare GetDataMemoryCount() and GetMatMemoryCount() with GetItemCount()
@warning THIS IS AN EXAMPLE, MANY IMPROVEMENT CAN BE DONE
*/
template <class T>
class VerySimpleThreadSafeFIFOBuffer
{
public:
    /* _maxSize=0 :size is unlimited (up to available memory) */
    VerySimpleThreadSafeFIFOBuffer(size_t _maxSize = 20) :
        maxBufSize(0), itemCount(0), maxSize(_maxSize)
    {}
    ~VerySimpleThreadSafeFIFOBuffer()
    {
        //calls destructor for all elements if any
        //if the elements are pointers, the pointed to objects are not destroyed.
        while (!m_buffer.empty())
            m_buffer.pop(); //you never should be here
    }

    /** @brief  Add an item to the queue
     class T should have a copy constructor overload
     */
    bool Push(const T &item)
    {
        //mutex is automatically released when
        //lock goes out of scope
        lock_guard<mutex> lock(m_queueMtx);

        size_t size = m_buffer.size();
        if (maxSize > 0 && size > maxSize)
            return false;

        m_buffer.push(item);//calls T::copy constructor
#ifdef _DEBUG
        //collect some stats
        itemCount++;
        maxBufSize = max(size, maxBufSize);
        MyData *dataPtr = &m_buffer.back();
        unsigned char *matPtr = m_buffer.back().GetMatPointer();
        dataMemoryCounter[dataPtr]++;
        matMemoryCounter[matPtr]++;
#endif
        return true;
    }

    /** @brief Get oldest queued item
     class T should have a =operator overload
     */
    bool Pop(T &item)
    {
        lock_guard<mutex> lock(m_queueMtx);
        if (m_buffer.empty())
            return false;
        item = m_buffer.front(); //calls T::=operator 
        m_buffer.pop();
        return true;
    }

    size_t Size() {
        lock_guard<mutex> lock(m_queueMtx);
        return m_buffer.size();
    }
#ifdef _DEBUG
    size_t GetItemCount(){
        lock_guard<mutex> lock(m_queueMtx);
        return itemCount;
    }
    size_t GetBufSizeMax(){
        lock_guard<mutex> lock(m_queueMtx);
        return maxBufSize;
    }
    size_t GetDataMemoryCount(){
        lock_guard<mutex> lock(m_queueMtx);
        return dataMemoryCounter.size();
    }
    size_t GetMatMemoryCount(){
        lock_guard<mutex> lock(m_queueMtx);
        return matMemoryCounter.size();
    }
#endif
private:
    queue<T> m_buffer;
    mutex m_queueMtx;
    size_t maxBufSize, maxSize;
    size_t itemCount;
    map<void*, int> dataMemoryCounter;
    map<void*, int> matMemoryCounter;
};

the test application

#include <atomic>
// GLOBAL VARS
VerySimpleThreadSafeFIFOBuffer<MyData> theBuffer(40);
atomic<bool> grabbing, processing;
void GrabberThread_Test();
void ProcessorThread_Test();
void ProcessDataInThread_Test(MyData &data);

// THE GRABBER THREAD
void GrabberThread_Test()
{
    processing.store(true);    //ensure to start the processing

    VideoCapture capIr, capDepth;
    MyData grabData;
    capIr.open(0);
    //capDepth.open(1);
    //cap...
    int frameCount = 0;
    int defaultDelay = 100;
    int delay = defaultDelay;
    while (grabbing.load() == true) //this is lock free
    {
        //WORK WITH grabData OBJECT
        capIr >> grabData.ir;
        //capDepth >> grabData.depth
        //...

        if (grabData.ir.empty()) continue;
        frameCount++;
        grabData.vars.timestamp = getTickCount();
        grabData.vars.frameNum = frameCount;
        //PUSH THE grabData OBJECT TO THE END OF THE QUEUE
        if (!theBuffer.Push(grabData))
        {
            cout << "GrabberThread_Test: " << endl
                << "The queue is full! Grabber is going to sleep a bit"<<endl
                << "HINT: increase queue size or improve processor performance or reduce FPS" 
                << endl;
            this_thread::sleep_for(chrono::milliseconds(delay));
            delay += 80;
        }
        else
        {
            if (delay>defaultDelay)
                cout << "GrabberThread_Test: " << "Wake up !" << endl;
            delay = 100;
        }
    }
    processing.store(false);    //!!!!!!stop processing here
}

//THE PROCESSOR THREAD
void ProcessorThread_Test()
{
    MyData procData;
    Mat ir, depth;
    while (processing.load() == true) //this is lock free
    {
        //this is same of procData="front of the queue" and "remove front"
        //cv::Mats in procData are still valid because of cv::Mat internal counter
        if (theBuffer.Pop(procData) == false) continue;
        ProcessDataInThread_Test(procData);
    }
    size_t todo = theBuffer.Size();
    cout << endl << "ProcessorThread: Flushing buffer: "
        << todo << " frames..." << endl;
    while (theBuffer.Pop(procData))
    {
        ProcessDataInThread_Test(procData);
        cout << "todo: " << todo-- << endl;
    }
    cout << "ProcessorThread: done!" << endl;
}

//THE PROCESSING FUNCTION CALLED BY PROCESSING THREAD
void ProcessDataInThread_Test(MyData &data)
{
    int size = 1;
    Point anchor(size, size);
    Size sz(2 * size + 1, 2 * size + 1);
    Mat element = getStructuringElement(MORPH_ELLIPSE, sz, anchor);
    morphologyEx(data.ir, data.ir, MORPH_GRADIENT, element, anchor);
    putText(data.ir, to_string(data.vars.frameNum),
        Point(10, 10), 1, 1, Scalar(0, 255, 0));
    imshow("IR", data.ir);
    //imshow("DEPTH", data.depth);
    //..
    waitKey(1);
}

//MAIN THREAD
int MainQueueGrabbingThread() {

    grabbing.store(true);               // set the grabbing control variable
    processing.store(true);             // ensure the processing will start
    thread grab(GrabberThread_Test);    // start the grabbing task
    thread proc(ProcessorThread_Test);  // start the the processing task

    //your own GUI
    cout << endl << "Press Enter to stop grabbing...";
    cin.get();

    // terminate all
    grabbing.store(false);    // stop the grab loop 
    processing.store(false);  // ensure to stop the processing
    grab.join();              // wait for the grab thread
    proc.join();              // wait for the process thread
#ifdef _DEBUG
    //get some stats on the buffer
    cout << "Number Queued items: " << theBuffer.GetItemCount() << endl;
    cout << "Queue Max Size: " << theBuffer.GetBufSizeMax() << endl;
    cout << "Number of Data Object in memory: " << theBuffer.GetDataMemoryCount() << endl;
    cout << "Number of Mat Object in memory: " << theBuffer.GetMatMemoryCount() << endl;
    cout << endl << "Press Enter to close !";
    cin.get();
#endif
    return 0;
}

Better and general answer is out of OpenCV scope and would requires many pages to investigate different scenario and intra thread events and communication.

Short answer Use a thread safe queue for your logical data.

If you want to use simple implementation:

  • Think in term of application then find the best compromise between safety, performance and code clarity.
  • Which is your logical data ? Create a class for your logical data. If the class has cv::Mats or object pointer member it should have a copy constructor (and =operator) overload.
  • Use a thread safe queue for this class. You can find a lot of example of general purpose thread safe circular buffer.

below is mine:mine.

class MyData implements your logical data. class VerySimpleThreadSafeFIFOBuffer as name says is a generic thread safe FIFO. Follow test functions for how to use.

Compare GetDataMemoryCount() and GetMatMemoryCount() with GetItemCount() to see that memory recycling is effective. One test:

Queued Item:496
Queue Max Size: 15
Unique Data Allocation: 32
Unique Mat Allocation: 28

EDIT: Answer to some comments:

The implementation of VerySimpleThreadSafeFIFOBuffer does assume nothing about consumers and producer, it just needs that the item class must have a deep copy constructor and =operator overloading. It means that you can use VerySimpleThreadSafeFIFOBuffer in multiple producer/consumers. It means also that you can't use VerySimpleThreadSafeFIFOBuffer<cv::Mat> because cv::Mat copy constructor copies the header but not the image (isn't a deep copy).

Under some circumstance, like single producer single consumer, you can write your own queue without using locks. This will reduce lock overhead and deadlock risk.

About multiple producer/consumer you have to take care to synchronization and consider that, if you have multiple consumers each consumer pops an item from the queue. You can't use simple queue with a thread that writes and an thread that processes. This is because they will pop different frames. In this case you need a queue with a release mechanism and need to know when all consumers have completed their task. As alternative you can have a single consumer that pops the item from the queue than starts N threads that use the same item in read only mode.

Threads are useful to do things during idle time, like processing while waiting next frame.

If you grab at 25fps (40ms) only few ms are used to get the frame from the cam, let say 2ms. In this case you have 38ms of idle time that could be used to do some processing. Also your processing must be shorter than 38ms. You can use a queue to get safe from odd long time processing (you could also calculate needed queue length).

If your application doesn't have idle time, using threads you will introduce additional complexity and overtime without real performance gain.

Really, with multicore/hyperthreading CPUs, single thread application has bad performance because it uses just 1 core. Multithreading allocates threads over your multiple core processor providing really better performance. But if you use OpenCV+TBB, a lot of functions have internal threading optimization (eg. parallel_for) than you get this kind of boosting even if you use single thread application. Check here for some comparison.

My advice is always the same: use multi threading if it's really needed and ONLY if you have a strong background on the subject.

Here is the code for the queue. It's an example. It example and it should work but things work: Things could be done better and with many improvements. BTW improvements but it could be useful in many casecase.

#include <queue>
#include <thread>
#include <mutex>

class MyData
{
public:
    //we use a struct to have simple copy without forget some members
    typedef struct MyDataFlatMembers {
        int64 timestamp = 0;
        int64 frameNum = 0;
        //...
    };

    MyDataFlatMembers vars;
    Mat ir, depth, bgr;

    MyData() { }

    /** @brief class destructor.(USED BY queue::Pop)
     delete object pointer here if you have
     */
    ~MyData() { }

    /** @brief overload for the copy constructor (USED BY buffer::Push)*/
    MyData(const MyData& src) {
        Clone(src);//call clone Mat
    }

    /** @brief Assignment (=) Operator overloading (USED BY from buffer::Pop)
    Because buffer::Pop calls queue:pop who calls MyData destructor we will lost all members
    pointers.
    We are safe with cv::Mat, clone is not needed because of internal cv::Mat memory counter.
    @warning  This operator is not needed ONLY IF object members pointers are only cv::Mat
    in case we have some member pointer object we need to clone them
    */
    MyData& operator=(const MyData&src)
    {
        if (this == &src) return *this;
        Copy(src);
        return *this;
    }

    // this is just to collect stats
    unsigned char* GetMatPointer(){ return ir.data; }

private:
    /** @brief Copy all members (for Mat member =operator will be used)
    USED BY =operator
    */
    void Copy(const MyData& src)
    {
        vars = src.vars; //Copy all flat members using struct copy
        ir = src.ir;
        depth = src.depth;
        bgr = src.bgr;
    }

    /** @brief Copy all members (for Mat clone will be used)
    USED BY copy constructor
    */
    void Clone(const MyData& src)
    {
        vars = src.vars;//Copy all flat members using struct copy
        src.ir.copyTo(ir);
        src.depth.copyTo(depth);
        src.bgr.copyTo(bgr);
    }
};

/** @brief a Very Simple ThreadSafe FIFO Buffer
std::queue with cv::Mat is like a circular buffer.
Even if you store N frames and create N x Mat in the queue
only few blocks will be really allocated thanks to std::queue and Mat memory recycling
compare GetDataMemoryCount() and GetMatMemoryCount() with GetItemCount()
@warning THIS IS AN EXAMPLE, MANY IMPROVEMENT CAN BE DONE
*/
template <class T>
class VerySimpleThreadSafeFIFOBuffer
{
public:
    /* _maxSize=0 :size is unlimited (up to available memory) */
    VerySimpleThreadSafeFIFOBuffer(size_t _maxSize = 20) :
        maxBufSize(0), itemCount(0), maxSize(_maxSize)
    {}
    ~VerySimpleThreadSafeFIFOBuffer()
    {
        //calls destructor for all elements if any
        //if the elements are pointers, the pointed to objects are not destroyed.
        while (!m_buffer.empty())
            m_buffer.pop(); //you never should be here
    }

    /** @brief  Add an item to the queue
     class T should have a copy constructor overload
     */
    bool Push(const T &item)
    {
        //mutex is automatically released when
        //lock goes out of scope
        lock_guard<mutex> lock(m_queueMtx);

        size_t size = m_buffer.size();
        if (maxSize > 0 && size > maxSize)
            return false;

        m_buffer.push(item);//calls T::copy constructor
#ifdef _DEBUG
        //collect some stats
        itemCount++;
        maxBufSize = max(size, maxBufSize);
        MyData *dataPtr = &m_buffer.back();
        unsigned char *matPtr = m_buffer.back().GetMatPointer();
        dataMemoryCounter[dataPtr]++;
        matMemoryCounter[matPtr]++;
#endif
        return true;
    }

    /** @brief Get oldest queued item
     class T should have a =operator overload
     */
    bool Pop(T &item)
    {
        lock_guard<mutex> lock(m_queueMtx);
        if (m_buffer.empty())
            return false;
        item = m_buffer.front(); //calls T::=operator 
        m_buffer.pop();
        return true;
    }

    size_t Size() {
        lock_guard<mutex> lock(m_queueMtx);
        return m_buffer.size();
    }
#ifdef _DEBUG
    size_t GetItemCount(){
        lock_guard<mutex> lock(m_queueMtx);
        return itemCount;
    }
    size_t GetBufSizeMax(){
        lock_guard<mutex> lock(m_queueMtx);
        return maxBufSize;
    }
    size_t GetDataMemoryCount(){
        lock_guard<mutex> lock(m_queueMtx);
        return dataMemoryCounter.size();
    }
    size_t GetMatMemoryCount(){
        lock_guard<mutex> lock(m_queueMtx);
        return matMemoryCounter.size();
    }
#endif
private:
    queue<T> m_buffer;
    mutex m_queueMtx;
    size_t maxBufSize, maxSize;
    size_t itemCount;
    map<void*, int> dataMemoryCounter;
    map<void*, int> matMemoryCounter;
};

the test application

#include <atomic>
// GLOBAL VARS
VerySimpleThreadSafeFIFOBuffer<MyData> theBuffer(40);
atomic<bool> grabbing, processing;
void GrabberThread_Test();
void ProcessorThread_Test();
void ProcessDataInThread_Test(MyData &data);

// THE GRABBER THREAD
void GrabberThread_Test()
{
    processing.store(true);    //ensure to start the processing

    VideoCapture capIr, capDepth;
    MyData grabData;
    capIr.open(0);
    //capDepth.open(1);
    //cap...
    int frameCount = 0;
    int defaultDelay = 100;
    int delay = defaultDelay;
    while (grabbing.load() == true) //this is lock free
    {
        //WORK WITH grabData OBJECT
        capIr >> grabData.ir;
        //capDepth >> grabData.depth
        //...

        if (grabData.ir.empty()) continue;
        frameCount++;
        grabData.vars.timestamp = getTickCount();
        grabData.vars.frameNum = frameCount;
        //PUSH THE grabData OBJECT TO THE END OF THE QUEUE
        if (!theBuffer.Push(grabData))
        {
            cout << "GrabberThread_Test: " << endl
                << "The queue is full! Grabber is going to sleep a bit"<<endl
                << "HINT: increase queue size or improve processor performance or reduce FPS" 
                << endl;
            this_thread::sleep_for(chrono::milliseconds(delay));
            delay += 80;
        }
        else
        {
            if (delay>defaultDelay)
                cout << "GrabberThread_Test: " << "Wake up !" << endl;
            delay = 100;
        }
    }
    processing.store(false);    //!!!!!!stop processing here
}

//THE PROCESSOR THREAD
void ProcessorThread_Test()
{
    MyData procData;
    Mat ir, depth;
    while (processing.load() == true) //this is lock free
    {
        //this is same of procData="front of the queue" and "remove front"
        //cv::Mats in procData are still valid because of cv::Mat internal counter
        if (theBuffer.Pop(procData) == false) continue;
        ProcessDataInThread_Test(procData);
    }
    size_t todo = theBuffer.Size();
    cout << endl << "ProcessorThread: Flushing buffer: "
        << todo << " frames..." << endl;
    while (theBuffer.Pop(procData))
    {
        ProcessDataInThread_Test(procData);
        cout << "todo: " << todo-- << endl;
    }
    cout << "ProcessorThread: done!" << endl;
}

//THE PROCESSING FUNCTION CALLED BY PROCESSING THREAD
void ProcessDataInThread_Test(MyData &data)
{
    int size = 1;
    Point anchor(size, size);
    Size sz(2 * size + 1, 2 * size + 1);
    Mat element = getStructuringElement(MORPH_ELLIPSE, sz, anchor);
    morphologyEx(data.ir, data.ir, MORPH_GRADIENT, element, anchor);
    putText(data.ir, to_string(data.vars.frameNum),
        Point(10, 10), 1, 1, Scalar(0, 255, 0));
    imshow("IR", data.ir);
    //imshow("DEPTH", data.depth);
    //..
    waitKey(1);
}

//MAIN THREAD
int MainQueueGrabbingThread() {

    grabbing.store(true);               // set the grabbing control variable
    processing.store(true);             // ensure the processing will start
    thread grab(GrabberThread_Test);    // start the grabbing task
    thread proc(ProcessorThread_Test);  // start the the processing task

    //your own GUI
    cout << endl << "Press Enter to stop grabbing...";
    cin.get();

    // terminate all
    grabbing.store(false);    // stop the grab loop 
    processing.store(false);  // ensure to stop the processing
    grab.join();              // wait for the grab thread
    proc.join();              // wait for the process thread
#ifdef _DEBUG
    //get some stats on the buffer
    cout << "Number Queued items: " << theBuffer.GetItemCount() << endl;
    cout << "Queue Max Size: " << theBuffer.GetBufSizeMax() << endl;
    cout << "Number of Data Object in memory: " << theBuffer.GetDataMemoryCount() << endl;
    cout << "Number of Mat Object in memory: " << theBuffer.GetMatMemoryCount() << endl;
    cout << endl << "Press Enter to close !";
#endif
    cin.get();
#endif
    return 0;
}