ParallelExtensionsExtras Tour - #7 - Additional TaskSchedulers

ParallelExtensionsExtras Tour - #7 - Additional TaskSchedulers

Rate This
  • Comments 9

(The full set of ParallelExtensionsExtras Tour posts is available here.)

In our last two ParallelExtensionsExtras blog tour posts, we’ve discussed two TaskScheduler implementations in ParallelExtensionsExtras: StaTaskScheduler and ConcurrentExclusiveInterleave.  These are just 2 of more than 10 schedulers in ParallelExtensionsExtras, and rather than explore the rest in-depth, in this post we’ll provide a short description of other key schedulers included.

QueuedTaskScheduler

Found in the QueuedTaskScheduler.cs file, QueuedTaskScheduler provides a wealth of functionality all wrapped up into a single scheduler type.  It supports:

  • Priorities.  You create a queue of off the scheduler with a particular priority, and that queue is itself a TaskScheduler.  Any tasks you schedule to that scheduler then are tagged with that priority, and the scheduler will service tasks in priority order.  e.g.

QueuedTaskScheduler qts = new QueuedTaskScheduler();
TaskScheduler pri0 = qts.CreateQueue(priority:0);
TaskScheduler pri1 = qts.CreateQueue(priority:1);

Any tasks scheduled to pri0 will get priority over tasks scheduled to pri1, even if scheduled after.

  • Fairness.  It’s often the case that you have batches of work, and you want each of these batches to be treated fairly with each other, such that if a large batch arrives and then a small batch arrives, the small batch doesn’t have to wait until the entire large batch is completed; instead, tasks from both the large and small batches will be scheduled fairly and round-robin’d between.  The queues created on a QueuedTaskScheduler are scheduled in just such a manner, also taking into account priorities.  e.g.

QueuedTaskScheduler qts = new QueuedTaskScheduler();
TaskScheduler pri0a = qts.CreateQueue(priority:0);
TaskScheduler pri0b = qts.CreateQueue(priority:0);
TaskScheduler pri1 = qts.CreateQueue(priority:1);

Tasks scheduled to both pri0a and pri0b will take priority over tasks scheduled to pri1.  However, tasks scheduled to pri0a and pri0b will be round-robin’d between, as they exist at the same priority level.

  • Concurrency Levels.  In a large system, you may want to control how much parallelism is afforded to different parts of the system.  With parallel loops and PLINQ queries, you can control this on a per-loop or per-query basis, but out-of-the-box there’s no way to control it across loops, and there’s no built-in way to control it for tasks.  By scheduling all related work to a TaskScheduler that enforces a maximum concurrency level, that functionality is gained.

var qts = new QueuedTaskScheduler(
    TaskScheduler.Default, maxConcurrencyLevel:4);
var options = new ParallelOptions { TaskScheduler = qts };

Task.Factory.StartNew(() =>
{
    Parallel.For(0, 100, options, i=> { ... });
}, CancellationToken.None, TaskCreationOptions.None, qts);

Task.Factory.StartNew(() =>
{
    Parallel.For(0, 100, options, i=> { ... });
}, CancellationToken.None, TaskCreationOptions.None, qts); 

Both tasks and the parallel loops they contain will be limited to a maximum concurrency level of four.

  • Thread control.  The priorities, fairness, and concurrency level control all apply when QueuedTaskScheduler is used on top of another TaskScheduler as well as when used with dedicated threads for the scheduler.  However, QueuedTaskScheduler also provides very low-level control over the threads utilized by the scheduler when dedicated threads are requested.  Here’s the relevant constructor, which provides insight into the various knobs provided:

public QueuedTaskScheduler(
    int threadCount,
    string threadName = "",
    bool useForegroundThreads = false,
    ThreadPriority threadPriority = ThreadPriority.Normal,
    ApartmentState threadApartmentState = ApartmentState.MTA,
    int threadMaxStackSize = 0,
    Action threadInit = null,
    Action threadFinally = null);

IOTaskScheduler

While we often refer to “the” .NET thread pool, the ThreadPool abstraction in .NET is actually built on top of two pools, one referred to as the worker pool and one referred to as the I/O pool.  The former is what’s targeted by the QueueUserWorkItem method as well as by the default TaskScheduler, while the latter is targeted by the UnsafeQueueNativeOverlapped method, and is frequently used for work in Windows Communication Foundation and Windows Workflow Foundation.  The IOTaskScheduler scheduler in the IOTaskScheduler.cs file runs tasks on this I/O thread pool via the UnsafeQueueNativeOverlapped method.

WorkStealingTaskScheduler

In .NET 4, the ThreadPool was augmented with “work-stealing” algorithms.  However, the ThreadPool does not allow you to create instances of pools, only to share the global pool.  If you desire individual pool instances with a dedicated set of threads, and if you also want work-stealing, the WorkStealingTaskScheduler class in the WorkStealingTaskScheduler.cs file is your friend.

SynchronizationContextTaskScheduler

The TaskScheduler.FromCurrentSynchronizationContext static method returns a TaskScheduler instance that wraps the SynchronizationContext instance returned from SynchronizationContext.Current.  In rare circumstances, however, you might want to wrap an arbitrary SynchronizationContext that is not set as the current context.  SynchronizationContextTaskScheduler.cs contains a scheduler that does exactly that.  When a Task is queued to this scheduler, it’s added to an internal concurrent queue, and a delegate is then posted to the target SynchronizationContext, which was passed into the scheduler’s constructor:

protected override void QueueTask(Task task)
{
    _tasks.Enqueue(task);
    _context.Post(delegate
    {
        Task nextTask;
        if (_tasks.TryDequeue(out nextTask)) TryExecuteTask(nextTask);
    }, null);
}

We could have skipped storing the task, and instead passed it directly to the delegate via a closure.  We opted for the shown approach, however, in order to enable debugger integration, so that the GetScheduledTasks method can return the contents of the queue:

protected override IEnumerable<Task> GetScheduledTasks()
{
    return _tasks.ToArray();
}

ThreadPerTaskScheduler

In rare cases, it’s desirable to dedicate a thread to each individual task, rather than having tasks share a pool of threads.  For this, the ThreadPerTaskScheduler.cs file contains a very simple ThreadPerTaskScheduler class that does exactly what its name implies, spinning up a dedicated thread for each task that gets queued to the scheduler.  Here is its QueueTask method:

protected override void QueueTask(Task task)
{
    new Thread(() => TryExecuteTask(task)) { IsBackground = true }.Start();
}

CurrentThreadTaskScheduler

Largely for testing purposes, a synchronous scheduler is provided in CurrentThreadTaskScheduler.cs that runs all tasks on the current thread when scheduling is requested.  The implementation is trivial. Here is its QueueTask method:

protected override void QueueTask(Task task)
{
    TryExecuteTask(task);
}

LimitedConcurrencyLevelTaskScheduler, OrderedTaskScheduler, and ReprioritizableTaskScheduler

The LimitedConcurrencyLevelTaskScheduler.cs, OrderedTaskScheduler.cs, and ReprioritizableTaskScheduler.cs files contain schedulers that are largely limited variants of the QueuedTaskScheduler class.  LimitedConcurrencyLevelTaskScheduler sits on top of the ThreadPool and supports a user-provided maximum degree of concurrency.  OrderedTaskScheduler derives from LimitedConcurrencyLevelTaskScheduler and simply sets that maximum degree of concurrency to one, which guarantees that tasks are processed in the order that they were scheduled.  ReprioritizableTaskScheduler is a simple first-in-first-out scheduler on top of the ThreadPool, except that it supports moving tasks around in the queue after they’ve been queued; this effectively enables prioritizing and deprioritizing previously queued tasks.

Leave a Comment
  • Please add 3 and 8 and type the answer here:
  • Post
  • In LimitedConcurrencyLevelTaskScheduler or the QueuedTaskScheduler with maxConcurrency specified, is their a way to block the Add.

    I have a producer/consumer scenario, where the reader is producing much faster and the writer writes to database and is slower.

    Using either of above schedulers allow more than allowed to be Queued and later it waits long for all the writes to finish.

    I had a simple implementation that was blocking on a semaphore both inside the WriterWorker and the wrapper that submits to the WriterWorker to the TaskScheduler.

    That was working fine, except for a lot of code.

    What we need is a maxConcurrency with limited Queue depth. This will then block the "Work Submitting" thread from proceeding further.

    Also what would be nice would be to give the Queue depth or "inflight" tasks count on this.

    The Semaphore solution is surprisingly faster than the custom task schedulers i used.

  • Hi Rajesh-

    Sure.  If you take a look at a bunch of the schedulers in the Parallel Extensions Extras, you'll see that they use a BlockingCollection to store the queued tasks, where the QueueTask method takes the provided task and adds it to the BlockingCollection using Add, and then where the consumer threads pull from the BlockingCollection using Take.  BlockingCollection is, by default, unbounded, but when you instantiate the BlockingCollection you can configure it to have an upper bound; if a producer tries to add to the BlockingCollection when the collection is full, the producer will block until room is available.  If you were using Add inside of your QueueTask method, this means that the user's code calling Task.Factory.StartNew will block until there's room left in the scheduler.

    There are of course other ways to implement this, such as by using a semaphore (SemaphoreSlim or Semaphore) to throttle.  Internally, this is what BlockingCollection does, using a SemaphoreSlim.

  • Thanks Toub.

    In the LimitedConcurrencyLevelTaskScheduler i donot see the BlockingCollection being used nor the ctor allows any other params. It is using LinkedList<Task>.

    Also the ctor of QueuedTaskScheduler doesnot support any params to configure the BlockingCollection.

    I could replicate the QueuedTaskScheduler to create a BlockingQueuedTaskScheduler, but that defeats the purpose, if there are any enhancements/bugfixes...

    I did try my custom solution with SemaphoreSlim that blocks the producer while adding to Q.

    I would appreciate if you create an appropriate ctor and open up this functionality on QueuedTaskScheduler.

    Also is the LimitedConcurrencyLevelTaskScheduler a "light-weight" alternative to QueuedTaskScheduler?

    Thanks.

  • Hi Rajesh-

    Right, you would need to add this support manually by augmenting the samples with additional configuration parameters.  Both QueuedTaskScheduler and StaTaskScheduler in the samples currently use a BlockingCollection. For the most part, these are just samples showing how you can achieve various things, and they're meant to be augmented as you see fit.  The LimitedConcurrencyLevelTaskScheduler is just an example of a scheduler with a small amount of functionality, specifically geared towards limiting the concurrency level.  You could think of it as a subset of the QueueTaskScheduler.

  • LimitedConcurrencyLevelTaskScheduler: Dynamically modifying the max degree?

    I would like to add a setter for MaximumConcurrencyLevel.

    I can see that increasing the value would work as-is, in a way.  It would not immediately go wider on existing queued tasks, but adding new tasks would add threads.  Could the setter simply call NotifyThreadPoolOfPendingWork() once for each increase in level?  Would the setter then need to be wrapped in lock(_tasks)?

    Reducing is a different story.  Could TryDequeue() be modified to just return false if _delegatesQueuedOrRunning >= _maxDegreeOfParallelism?

    Thanks!

  • @Beau Skinner:

    Untested, but you could try something like this:

    public sealed override int MaximumConcurrencyLevel

    {

       get { lock(_tasks) return _maxDegreeOfParallelism; }

       set

       {

           if (value < 1) throw new ArgumentOutOfRangeException("MaximumConcurrencyLevel");

           lock(_tasks)

           {

               if (value == m_maxDegreeOfParallelism) return;

               int oldDegree = m_maxDegreeOfParallelism;

               m_maxDegreeOfParallelism = value;

               if (value > oldDegree && _tasks.Count > 0)

               {

                   ++_delegatesQueuedOrRunning;

                   NotifyThreadPoolOfPendingWork();

               }

           }

       }

    }

    That should address an increase.  To more aggressively handle a decrease, you could also modify the NotifyThreadPoolOfPendingWork method: the line that checks "if (_tasks.Count == 0)" to see whether it should exit could be augmented to be something like "if (_tasks.Count == 0 || _delegatesQueuedOrRunning > m_maxDegreeOfParallelism)".

  • Thanks for the fast and detailed response Stephen!

    I'm wondering why the last bit shouldn't be more like:

                   int numNewDelegatesToQueue = Math.Max(0, Math.Min(_maxDegreeOfParallelism - _delegatesQueuedOrRunning, _tasks.Count));

                   for (int iNewDelegate = 0; iNewDelegate < numNewDelegatesToQueue; ++iNewDelegate)

                   {

                       ++_delegatesQueuedOrRunning;

                       NotifyThreadPoolOfPendingWork();

                   }

    Is there something I'm not understanding here?

  • It should be ;-) i.e. queuing N rather than 1... like I said, untested ;-)

  • Details:

    For each instance of QueuedTaskScheduler, the "Current Queue Length" under the Performance Monitor rises by the requested/initialized thread count and stays at least at that level.

    Question:

    Does anyone know if this is an issue to to worry about, or if there is a fix?

Page 1 of 1 (9 items)