ThreadPool throttle
As promised, here are the details on how we limit the number of threads being used to perform a background task. The code below was written by Brian Harry. With this class, we can queue up as many requests as we want, but only maxThreads number of them will be active at any given time. So long as maxThreads is smaller than the size of the system thread pool, threads remain available for high-priority tasks such as serving ASP.NET pages or web requests. Given that there is only one thread pool per application in .NET, this class provides a good substitute for isolated thread pools.
using System;
using System.Collections.Generic;
using System.Threading;
namespace ThreadPoolThrottle
{
class ThreadPoolThrottle
{
// Initialize the throttler to the specified # of max threads.
public ThreadPoolThrottle(int maxThreads)
{
m_maxThreads = maxThreads;
m_callback = new WaitCallback(ProcessWorkItem);
}
/// <summary>
/// The thread priority at which thread pool threads dispatched to handle work items
/// for this throttle class will run.
/// </summary>
public ThreadPriority Priority
{
get { return m_priority; }
set { m_priority = value; }
}
/// <summary>
/// Queue a work item to the threadpool while obeying the thread limit passed in the
/// constructor. If this instance of the ThreadPoolThrottle already has submitted
/// its limit of simultaneous tasks to the threadpool, wait for an existing work
/// item to complete and submit this request when there is room.
/// </summary>
/// <param name="callBack">The callback to be called on the threadpool thread.</param>
/// <param name="state">A state object to be supplied on the callback.</param>
/// <returns>True if the request was successfully queued and false if not.</returns>
public bool QueueUserWorkItemWithWait(WaitCallback callBack, Object state)
{
// Loop in the case that a race condition happens that causes us to not see space in
// the thread pool without the lock and then find space with the lock.
for (;;)
{
// Lightweight check to see if there is room in our limit to start this work item.
if (m_currentThreads < m_maxThreads)
{
// Allocate a slot.
if (Interlocked.Increment(ref m_currentThreads) <= m_maxThreads)
{
// If we succeeded in getting a slot under the limit, queue the item to
// the thread pool.
return ThreadPool.UnsafeQueueUserWorkItem(m_callback, new ThrottleCallbackState(callBack, state));
}
// We failed to get a slot so return it and try with a lock.
Interlocked.Decrement(ref m_currentThreads);
}
lock (m_queue)
{
// Check under the lock if the thread limit is still full.
if (m_currentThreads >= m_maxThreads)
{
// If it is, mark us as waiting and go to sleep until we are awoken.
++m_waiting;
Monitor.Wait(m_queue);
--m_waiting;
// Once we have been awoken, run our work item - the thread that woke us
// did not decrement m_currentThreads so we don't need to increment it.
return ThreadPool.UnsafeQueueUserWorkItem(m_callback, new ThrottleCallbackState(callBack, state));
}
}
}
}
/// <summary>
/// Queue a work item to the threadpool while obeying the thread limit passed in the
/// constructor. If this instance of the ThreadPoolThrottle already has submitted
/// its limit of simultaneous tasks to the threadpool, queue this work item and return.
/// </summary>
/// <param name="callBack">The callback to be called on the threadpool thread.</param>
/// <param name="state">A state object to be supplied on the callback.</param>
/// <returns>True if the request was successfully queued and false if not.</returns>
public bool QueueUserWorkItem(WaitCallback callBack, Object state)
{
for (;;)
{
// Lightweight check to see if there is room in our limit to start this work item.
if (m_currentThreads < m_maxThreads)
{
// Allocate a slot.
if (Interlocked.Increment(ref m_currentThreads) <= m_maxThreads)
{
// If we succeeded in getting a slot under the limit, queue the item to
// the thread pool.
return ThreadPool.UnsafeQueueUserWorkItem(m_callback, new ThrottleCallbackState(callBack, state));
}
// We failed to get a slot so return it and try with a lock.
Interlocked.Decrement(ref m_currentThreads);
}
lock (m_queue)
{
// Check under the lock if the thread limit is still full.
if (m_currentThreads >= m_maxThreads)
{
try
{
// Put the work item on the queue.
m_queue.Enqueue(new ThrottleCallbackState(callBack, state));
return true;
}
catch
{
return false;
}
}
}
}
}
/// <summary>
/// An approximate count of the number of queued work items that are being processed or
/// waiting to be processed.
/// </summary>
public int UncompletedWorkItems
{
get { return m_currentThreads + m_waiting + m_queue.Count; }
}
// Callback funtion from the thread pool to process a work item. We need this so that
// we can keep track of the number that are currently being processed and limit it to
// the requested number.
private void ProcessWorkItem(Object state)
{
ThrottleCallbackState callbackState = (ThrottleCallbackState)state;
// Call the queued callback and pass its state.
CouldntQueue:
try
{
if (m_priority != ThreadPriority.Normal)
{
Thread.CurrentThread.Priority = m_priority;
}
callbackState.m_callback(callbackState.m_state);
}
finally
{
if (Thread.CurrentThread.Priority != ThreadPriority.Normal)
{
Thread.CurrentThread.Priority = ThreadPriority.Normal;
}
}
lock (m_queue)
{
if (m_waiting > 0)
{
// This will wake up a thread that will take our spot in the "current"
// threads list so we don't need to decrement m_currentThreads.
Monitor.Pulse(m_queue);
}
else if (m_queue.Count > 0)
{
// Dequeue a waiting item and queue it back to the thread pool. Since
// we are queuing it, we don't need to decrement m_currentThreads.
callbackState = m_queue.Dequeue();
if (!ThreadPool.UnsafeQueueUserWorkItem(m_callback, callbackState))
{
// If we weren't able to queue the next item, rather than drop it on
// the floor or let threads drain out when they shouldn't, run it
// manually.
goto CouldntQueue;
}
}
else
{
// There are no work items waiting to process, so reduce the number of
// working threads.
Interlocked.Decrement(ref m_currentThreads);
}
}
}
int m_maxThreads;
int m_currentThreads;
ThreadPriority m_priority = ThreadPriority.Normal;
WaitCallback m_callback;
int m_waiting;
Queue<ThrottleCallbackState> m_queue = new Queue<ThrottleCallbackState>(32);
// In internal class to track the user's callback and state.
class ThrottleCallbackState
{
public ThrottleCallbackState(WaitCallback callback, Object state)
{
m_callback = callback;
m_state = state;
}
internal WaitCallback m_callback;
internal Object m_state;
}
}
}