In my last post, I was looking for someone to tell me about a race condition in the cancel() code path, but to my embarrassment, Krishnan Varadarajan, one of the many talented developers on the ConcRT team, pointed out a bad race on the run/wait code path!

In the original code, it was possible for the task counter to be decremented to zero as the last task finishes, just before a new run() call increments the count, resets the event and the calling code follows it with a call to wait(), which returns when the working thread calls event.set(). Ouch!

My issue was that I was trying too hard to avoid using locks in my code, something that should always raise a red flag. Lock-free code is great if you can pull it off, but almost always introduces complexity beyond its worth, and it is almost always incorrect.

The thing we are typically worried about when we dabble in lock-free code is scalability, and lock-based code has fine scalability as long as you don’t hold them for very long (which creates contention).

The serial task group exists in order to avoid taking locks for the duration of each scheduled task, not for a few individual instructions, as I was trying to do.

To address this, I decided to write a simple counting event class, which sets the event whenever the count it holds reaches zero, and resets it whenever its no longer zero. We can use this in the revised version of the serial_task_group by changing run(), wait(), cancel() and _Task_proc():

template <class Func>        
void run(const Func& fn)        
{            
    __int64 tasks = taskCount++;
    _Schedule_task(fn, tasks == 1);        
}    
void wait()        
{            
    taskCount.wait();        
}         
void cancel()        
{            
    InterlockedIncrement(&this->canceling);            
    taskCount.wait();            
    InterlockedDecrement(&this->canceling);        
}  
template<class Func>       
static void __cdecl _Task_proc(void* data)        
{            
    [ No changes before the loop ]  

    // Loop until there is no more work to perform.            
    do            
    {               
        [ No changes until the bottom of the loop ]
tasks = pTaskGroup->taskCount--; } while (tasks > 0); }

I also removed the use of QueueUserWorkItem and replaced it with calls to the ConcRT scheduler, which allows me to use the serial task group with any scheduler, so that tasks are serialized by the serial task group, but still sharing resources with tasks of another scheduler, such as the one used to run the task that calls run() in the first place.

Now, a variant of the serial task group is one that executes its tasks on the UI thread of some window in your application. In fact, it’s a trivial modification to the serial_task_group, and the source is available here.

It will get a little hacky, since we need to define a new window message and handle it in our window procedures:

#define WM_TASK WM_USER+1

The actual message value would have to be modified to not collide with any other user-defined messages that may appear in your specific application, of course. Alternatively, you could use RegisterMessage() to get a unique message identity at runtime, then pass that in to the task group constructor.

First, we need a window handle in order to schedule tasks, so the task group constructor takes both a handle and a message identifier:

ui_task_group(HWND hWnd, int msg) : canceling(0), hWnd(hWnd), msg(msg)      
{            
}

Second, the code to schedule a task on a scheduler is replaced by a call to ‘PostMessage:’

// Request a new worker thread if necessary.            
if (createWorker)                
    PostMessage(hWnd, msg, (WPARAM)(LPTHREAD_START_ROUTINE)_Task_proc<Func>, (LPARAM)&queue);

Then, as a last step, add this case to the big switch-statement in all message-handling procedures you have in your code (so that the task group can be used with any window you need it to):

case WM_TASK:
    {
        LPTHREAD_START_ROUTINE task = (LPTHREAD_START_ROUTINE)wParam;
        task((LPVOID)lParam);
    }
    break;

With this, you have a task group that will move all its tasks onto a UI thread, with support for wait and cancellation using APIs that look just like the task groups defined in PPL. Here’s an example:

    ui_task_group tg(hWnd,WM_TASK);

    for (int i = 0; i < 5; i++)
    {
        tg.run( [=] { ... do something on the UI thread ...} );
    }
    tg.wait();

wait() is just as dangerous here as it is with the serial task group – if you call wait() from the thread that owns the window handle, which will include all tasks scheduled by the task group, the program will promptly lock up. So don’t.