1

hope you had all had nice holidays.

This questions is related to my earlier question: std::condition_variable - Wait for several threads to notify observer

I'm trying to implement a threadpool based on my own mutable thread implementation below:

class MutableThread
{
private:
    std::thread m_Thread;
    std::function<void()> m_Function;
    bool m_bRun;
    std::mutex m_LockMutex;
    std::mutex m_WaitMutex;
    std::condition_variable m_CV;
    IAsyncTemplateObserver<MutableThread>* m_Observer = nullptr;

private:
    void Execute()
    {
        while (m_bRun)
        {
            {
                std::unique_lock<std::mutex> wait(m_WaitMutex);
                m_CV.wait(wait);
            }               

            std::lock_guard<std::mutex> lock(m_LockMutex);
            if (m_bRun && m_Function)
            {
                m_Function();
                m_Function = std::function<void()>();

                if (m_Observer != nullptr)
                {
                    m_Observer->Signal(this);
                }
            }
        }
    }

public:
    HDEBUGNAME(TEXT("MutableThread"));

    MutableThread(const MutableThread& thread) = delete;

    MutableThread(IAsyncTemplateObserver<MutableThread>* _Observer)
    {
        m_Observer = _Observer;
        m_bRun = true;
        m_Thread = std::thread(&MutableThread::Execute, this);
    }

    MutableThread()
    {
        m_Observer = nullptr;
        m_bRun = true;
        m_Thread = std::thread(&MutableThread::Execute, this);
    }       

    ~MutableThread()
    {
        m_bRun = false;

        m_CV.notify_one();

        try
        {
            if (m_Thread.joinable())
                m_Thread.join();
        }
        catch (std::system_error& ex)
        {
            HWARNINGD(TEXT("%s"), ex.what());
        }                           
    }

    inline bool Start(const std::function<void()>& f)
    {
        std::lock_guard<std::mutex> lock(m_LockMutex);

        if (m_Function != nullptr)
            return false;

        m_Function = f;

        m_CV.notify_one();

        return true;
    }

The IAsyncTemplateObserver simply derives from my IAsyncObserver class posted in the earlier question and adds a virtual function:

template <typename T>
class IAsyncTemplateObserver : public IAsyncObserver
{
public:
    virtual void Signal(T* _Obj) = 0;
};

What I want to do is, signal the ThreadPool that the function has finished execution and a new task is assigned to the mutable thread:

class MutableThread;

struct Task
{
    std::function<void()> m_Function;
    uint32_t m_uPriority;

    Task(const std::function<void()>& _Function, uint32_t _uPriority)
    {
        m_Function = _Function;
        m_uPriority = _uPriority;
    }
};

inline bool operator<(const Task& lhs, const Task& rhs)
{
    return lhs.m_uPriority < rhs.m_uPriority;
}

class ThreadPool : public IAsyncTemplateObserver<MutableThread>
{
private:
    std::list<MutableThread* > m_FreeThreads;
    std::list<MutableThread* > m_UsedThreads;

    std::set<Task> m_Tasks;

    std::mutex m_LockMutex;     
public:

    ThreadPool()
    {
        //Grow(std::thread::hardware_concurrency() - 1);
    }

    ThreadPool(size_t n)
    {
        Grow(n);
    }

    ~ThreadPool()
    {
        //std::lock_guard<std::mutex> lock(m_Mutex);
        for (MutableThread* pUsed : m_UsedThreads)
        {
            HSAFE_DELETE(pUsed);
        }

        for (MutableThread* pFree : m_FreeThreads)
        {
            HSAFE_DELETE(pFree);
        }
    }

    inline void Grow(size_t n)
    {
        std::lock_guard<std::mutex> lock(m_LockMutex);

        for (size_t i = 0; i < n; i++)
        {
            m_FreeThreads.push_back(new MutableThread(this));
        }
    }

    inline void AddTask(const Task& _Task)
    {
        {
            std::lock_guard<std::mutex> lock(m_LockMutex);
            m_Tasks.insert(_Task);
        }

        AssignThreads();
    }

    virtual void Signal(MutableThread* _pThread)
    {
        {
            std::lock_guard<std::mutex> lock(m_LockMutex);
            m_UsedThreads.remove(_pThread);
            m_FreeThreads.push_back(_pThread);
        }

        AssignThreads();

        NotifyOne();
    }

    inline void WaitForAllThreads()
    {
        bool bWait = true;
        do
        {
            {
                //check if we have to wait
                std::lock_guard<std::mutex> lock(m_LockMutex);
                bWait = !m_UsedThreads.empty() || !m_Tasks.empty();
            }

            if (bWait)
            {                   
                std::unique_lock<std::mutex> wait(m_ObserverMutex);
                m_ObserverCV.wait(wait);
            }

        } while (bWait);
    }

private:

    inline void AssignThreads()
    {
        std::lock_guard<std::mutex> lock(m_LockMutex);

        if (m_FreeThreads.empty() || m_Tasks.empty())
            return;

        //Get free thread
        MutableThread* pThread = m_FreeThreads.back();
        m_FreeThreads.pop_back();

        //park thread in used list
        m_UsedThreads.push_back(pThread);

        //get task with highest priority
        std::set<Task>::iterator it = m_Tasks.end();
        --it; //last entry has highest priority

        //start the task
        pThread->Start(it->m_Function);

        //remove the task from the list
        m_Tasks.erase(it);          
    }

The AddTask function is called several times by the same thread, but when a mutable thread signals the threadpool (via m_Observer->Signal(this) ) the application freezes at the lock_guard of the AssignThreads() function. Now the strange thing is unlike a normal deadlock, all callstack-views in Visual Studio are empty as soon is I try to step over the line with the lock_guard.

Can anyone explain this behaviour? Is there any major design flaw or just a simple mix up?

Thanks for your help!

Greetings, Fabian

Edit: I've added a minimal visual studio solution that reproduces the problem: ThreadPoolTest.zip

Community
  • 1
  • 1
Fabian
  • 152
  • 2
  • 14
  • I don't have the answer to your question, but if you are working on a windows system you should try profiling calls to std::async with no thread pool. In my experience it's a lot faster as windows seems to be really good at optimizing the threads. – Nicolas Holthaus Dec 29 '14 at 15:24
  • That seems like a good idea, I'll check it out. – Fabian Dec 29 '14 at 16:56

1 Answers1

0

Thanks to a friend, I was able to fix the problem by moving the call m_Observer->Signal(this) outside of the lock_guard scope in the MutableThread::Execute() function. Secondly I removed the lock_guard in the AssignThreads() function and moved its call into the scope of the lock_guard in the Signal()/AddTask function. Not really related but still a flaw: all condition_variables.wait() calls are now in a while(m_bNotified == false) loop.

Fabian
  • 152
  • 2
  • 14