Larry Osterman's WebLog

Confessions of an Old Fogey
Blog - Title

So you need a worker thread pool...

So you need a worker thread pool...

  • Comments 19

And, for whatever reason, the NT’s built-in thread pool API doesn’t work for you.

Most people would write something like the following (error checking removed to reduce typing (and increase clarity)):

class WorkItem
{
    LIST_ENTRY m_listEntry;
        :
        :
};

class WorkerThreadPool
{
    HANDLE m_heventThreadPool;
    CRITICAL_SECTION m_critsThreadPool;
    LIST_ENTRY m_workItemQueue;

    void QueueWorkItem(WorkItem *pWorkItem)
    {
        //
        //   Insert the work item onto the work item queue.
        //
        EnterCriticalSection(&m_critsWorkItemQueue);
        InsertTailList(&m_workItemQueue, pWorkItem->m_listEntry);
        LeaveCriticalSection(&m_critsWorkItemQueue);
        //
        //   Kick the worker thread pool
        //
        SetEvent(m_heventThreadPool);
    }
    void WorkItemThread()
    {
        while (1)
        {
            //
            // Wait until we’ve got work to do
            //
            WaitForSingleObject(&m_heventThreadPool, INFINITE);
            //
            //  Remove the first item from the queue.
            //
            EnterCriticalSection(&m_critsWorkItemQueue);
            workItem = RemoveHeadList(&m_workItemQueue);
            LeaveCriticalSection(&m_critsWorkItemQueue);
            //
            // Process the work item if there is one.
            //
            if (workItem != NULL)
            {
                <Process Work Item>
            }
        }
    }
}

I’m sure there are gobs of bugs here, but you get the idea.  Ok, what’s wrong with this code?  Well, it turns out that there’s a MASSIVE scalability problem in this logic.  The problem is the m_critsWorkItemQueue critical section.  It turns out that this code is vulnerable to condition called “lock convoys” (also known as the “boxcar” problem).  Basically the problem occurs when there are more than one threads waiting on the m_heventThreadPool event.  What happens when QueueWorkItem calls SetEvent on the thread pool event?  All the threads in the thread pool immediately wake up and block on the work queue critical section.  One of the threads will “win” and will acquire the critical section, pull the work item off the queue and release the critical section.  All the other threads will then wake up, one will successfully acquire the critical section, and all the others will go back to sleep.  The one that woke up will see there’s no work to do and will block on the thread pool.  This will continue until all the work threads have made it past the critical section.

Essentially this is the same situation that you get when you have a bunch of boxcars in a trainyard.  The engine at the front of the cars starts to pull.  The first car moves a little bit, then it stops because the slack between its rear hitch and the front hitch of the second car is removed.  And then the second car moves a bit, then IT stops because the slack between its rear hitch and the front hitch of the 3rd card is removed.  And so forth – each boxcar moves a little bit and then stops.  And that’s just what happens to your threads.  You spend all your valuable CPU time executing context switches between the various threads and none of the CPU time is spent actually processing work items.

Now there are lots of band-aids that can be applied to this mechanism to make it smoother.  For example, the m_heventThreadPool event could be an auto-reset event, which means that only one thread would wake up for each work item.  But that’s only a temporary solution - if you get a flurry of requests queued to the work pool, you can still get multiple worker threads waking up simultaneously.

But the good news is that there’s an easier way altogether.  You can use NT’s built-in completion port logic to manage your work queues.  It turns out that NT exposes a really nifty API called PostQueuedCompletionStatus that essentially lets NT manage your worker thread queue for you!

To use NT’s completion ports, you create the port with CreateIoCompletionPort, remove items from the completion port with GetQueuedCompletionStatus and add items (as mentioned above) with PostQueuedCompletionStatus.

PostQueuedCompletionStatus takes 3 user specified variables, one of which which can be used to hold a 32 bit integer (dwNumberOfBytesTransferred), and two of which can be used to hold pointers (dwCompletionKey and lpOverlapped).  The contents of these parameters can be ANY value; the API blindly passes them through to GetQueuedCompletionStatus.

So, using NT’s completion ports, the worker thread class above becomes:

class WorkItem
{
        :
        :
};

class WorkerThreadPool
{
    HANDLE m_hcompletionPort;

    void QueueWorkItem(WorkItem *pWorkItem)
    {
        PostQueuedCompletionStatus(m_hcompletionPort, 0, (DWORD_PTR)pWorkItem, NULL);
    }

    void WorkItemThread()
    {
        while (1)
        {
            GetQueuedCompletionStatus(m_hCompletionPort, &numberOfBytes, &pWorkItem, &lpOverlapped, INFINITE);
            //
            // Process the work item if there is one.
            //
            if (pWorkItem != NULL)
            {
                <Process Work Item>
            }
        }
    }
}

Much simpler.  And as an added bonus, since NT’s managing the actual work queue in the kernel, it allows NT to eliminate the lock convoy in the first example.

 

[Insert std disclaimer: This posting is provided "AS IS" with no warranties, and confers no rights]

  • Larry, what do you think about using the APC mechanism to implement this? Each thread has an APC queue, you could queue APCs to each thread in your pool in a round robbin fashion, or any other load balancing mechanism you wish, with the QueueUserAPC function. That should eliminate the lock convoys problem. Then have each thread use SleepEx to wait in an alertable wait state so it could execute the APCs when it gets some, and once it finished executing all the APCs go back to sleep. The OS manages the FIFO APC queue for you, so you don't have to worry about it either.
  • Grrgh. I had written an answer to you but somehow it got lost.

    Lets try again.

    That's a fascinating idea that I hadn't even considered before.

    In my model, you have a pool of idempotent worker threads, which pick work up as needed and execute on it.

    In your model, you have effectively two pools of threads - those that are idle and those that are doing work.

    The only negative to your idea is that you need to know which thread is receiving the APC - you can't ever queue an APC to a thread that's actually doing work because (a) the thread might not wait interruptably for a long time, and (b) because the thread MIGHT wait interruptably and accidentally pick up your second APC and start executing it before it had finished its work - this could cause deadlocks or memory corruption.

    To track this, you need to know which threads are processing work requests and which threads are idle, and only queue the requests to the idle threads.

    It's more overhead than my solution, and the code to manage the thread state is also potentially subject to lock convoys, but...

    I'm not saying it's a bad idea, it's not, but there is some subtlty in the solution that might bite you on the foot in the future if you're not careful.
  • You don't mention that the completion port API also tries to limit the number of threads running simultaneously; if there's work to be done, but more than a given number of worker threads (associated with this port) are already running, NT won't release another thread. IIRC, this limit is 2x number of (virtual?) CPUs in the system.

    Conversely, when a thread associated with a completion port blocks (e.g. to perform synchronous I/O), NT looks to see if it can release another thread to keep the CPU busy. If there are fewer than the above limit threads running, there are threads blocked on the completion port, and there's work queued, NT will release a thread to replace the one that blocked.

    The net effect is to prevent context thrashing - where the scheduler spends more time trying to divide CPU time fairly among all the threads than the threads actually manage to get work done.

    At least, that's my recollection from Richter (Programming Applications for Windows, 4th ed, or it might be in Programming Server-Side Applications).
  • You're totally right Mike, I didn't include that (mostly for lack of time).

    See, there are LOTS of reasons to use completion ports for your work queues.
  • So... is there a way to query the number of items currently waiting in the queue?
  • Nope.

    The thing to keep in mind is that the number of items in the queue is meaningless. The microsecond after the value's loaded into the variable that holds the count, it's no longer valid.

    You can't make decisions based on the value because by the time you get the value it's not valid. So the NT guys don't give you the ability to screw it up.
  • I'm actually trying to use the ATL:CThreadPool class (which uses I/O completion ports) to queue up a long list of tasks (~6000) that take quite some time to finish - the whole process takes around 24 hours. I am looking for a way to tell when the queue has emptied in order to move to the next phase of my application. Am I barking up the wrong tree?
  • Hmm... That's a good question...

    Are you doing this on an MP machine? Are the tasks disk intensive? Since the CPU can only do one thing at a time, I'm wondering if queuing to the ATL::CThreadPool is the best idea.

    But anyway, since I don't know what you're doing, I can't speak to that.

    In your case, I'd just keep a counter - when you queue an item, increment it (with InterlockedIncrement), when you've processed a work item, you decrement it (InterlockedDecrement). When the count goes to 0, you're done.

    The general case can't work because NT can't know why you want the count of items, but in your specific case, since you know you won't be queuing new items, you can make it work.

    One gotcha - the count might go to 0 before you've finished queueing the items, especially on an MP machine, so you shouldn't check the count until you've queued all the tasks.
  • Another way would be to post a specific item that marks the end of the adding of items to the queue. Just create this special item before going MT, then in each worker thread, test the item you just retrieved from the queue against the special item, and if they're equal, just exit.
  • It seems everybody loves the thread pool. The default scheduler in Windows Workflow uses the thread pool,...
Page 1 of 2 (19 items) 12