ParallelExtensionsExtras Tour - #5 - StaTaskScheduler

ParallelExtensionsExtras Tour - #5 - StaTaskScheduler

Rate This
  • Comments 13

(The full set of ParallelExtensionsExtras Tour posts is available here.)  

The Task Parallel Library (TPL) supports a wide array of semantics for scheduling tasks, even though it only includes two in the box (one using the ThreadPool, and one using SynchronizationContext, which exists primarily to run tasks on UI threads).  This support comes from the extensibility of the abstract TaskScheduler class, which may be derived from to implement custom scheduling algorithms.  ParallelExtensionsExtras includes a multitude of such derived types, and in our tour through ParallelExtensionsExtras, we’ll walk through a few of them... today, we’ll start with StaTaskScheduler.

In an ideal world, there would be no legacy code; all code would be up-to-date on best practices and would be using the latest and greatest technologies.  In the real world, that’s almost never possible, and we need to be able to accommodate designs of the past.  Single-threaded apartments (STA) from COM represent one such technology we need to be able to integrate with (rather than describing STA here, for more information see http://msdn.microsoft.com/en-us/library/ms680112(VS.85).aspx).  If a components is STA, it must only be used from an STA thread.  However, all of the ThreadPool’s threads are multi-threaded apartment (MTA) threads, and by default Tasks from TPL run on these threads.  The good news is that TPL’s implementation is able to run on either MTA or STA threads, and takes into account relevant differences around underlying APIs like WaitHandle.WaitAll (which only supports MTA threads when the method is provided multiple wait handles).  Given that, we can take advantage of the TaskScheduler API to implement an StaTaskScheduler which runs tasks on STA threads.  This then allows you to use Tasks with STA components.  StaTaskScheduler is implemented in the StaTaskScheduler.cs file in ParallelExtensionsExtras.

Our scheduler will derive from TaskScheduler and will also implement IDisposable in order to enable shutdown of the STA threads being used by the scheduler.  It will contain two fields: a List<Thread> for storing the threads created and used by the scheduler, and a BlockingCollection<Task> for storing all tasks scheduled to the scheduler, enabling the scheduler’s threads to consume and process them.

public sealed class StaTaskScheduler : TaskScheduler, IDisposable

{

    private readonly List<Thread> _threads;

    private BlockingCollection<Task> _tasks;

    ...

}

 

When created, the scheduler will instantiate the BlockingCollection<Task> and spin up numberOfThreads threads, where numberOfThreads is provided to the constructor of the type.  Each of the threads is explicitly marked with ApartmentState.STA.  The thread procedure for each thread is a simple dispatch loop that continually takes from the BlockingCollection<Task> (blocking if the collection is empty) and executes the discovered Task.

public StaTaskScheduler(int numberOfThreads)

{

    if (numberOfThreads < 1)

        throw new ArgumentOutOfRangeException("concurrencyLevel");

 

    _tasks = new BlockingCollection<Task>();

 

    _threads = Enumerable.Range(0, numberOfThreads).Select(i =>

               {

                   var thread = new Thread(() =>

                   {

                       foreach (var t in

                           _tasks.GetConsumingEnumerable())

                       {

                           TryExecuteTask(t);

                       }

                   });

                   thread.IsBackground = true;

                   thread.SetApartmentState(ApartmentState.STA);

                   return thread;

               }).ToList();

 

    _threads.ForEach(t => t.Start());

}

 

That’s the most complicated part of the implementation.  The QueueTask method on StaTaskScheduler just takes the Task provided by the TPL infrastructure and stores it into the BlockingCollection<Task> (this will be called, for example, during Task.Factory.StartNew):

protected override void QueueTask(Task task)

{

    _tasks.Add(task);

}

 

The GetScheduledTasks method, which is used by the debugging infrastructure in Visual Studio to feed the Parallel Tasks and Parallel Stacks window, returns a copy of the contents of the BlockingCollection<Task>:

protected override IEnumerable<Task> GetScheduledTasks()

{

    return _tasks.ToArray();

}

 

TryExecuteTaskInline is implemented to allow task inlining (running the Task on the current thread, which might happen in a call to Task.Wait, Task.WaitAll, Task<TResult>.Result, or Task.RunSynchronously), but only if the current thread is an STA thread:

protected override bool TryExecuteTaskInline(

    Task task, bool taskWasPreviouslyQueued)

{

    return

        Thread.CurrentThread.GetApartmentState() ==

            ApartmentState.STA &&

        TryExecuteTask(task);

}

 

MaximumConcurrencyLevel returns the number of threads utilized by the scheduler:

public override int MaximumConcurrencyLevel

{

    get { return _threads.Count; }

}

 

And finally, our Dispose implementation marks the BlockingCollection<Task> as complete for adding (so that as soon as the collection is empty, the threads will exit), waits for all threads to exit, and then cleans up the BlockingCollection<Task>.

public void Dispose()

{

    if (_tasks != null)

    {

        _tasks.CompleteAdding();

 

        foreach (var thread in _threads) thread.Join();

 

        _tasks.Dispose();

        _tasks = null;

    }

}

 

With this in place, we can now use our StaTaskScheduler.  Commonly, an StaTaskScheduler will be instantiated with only one thread, so that the same thread is used to process all tasks queued to the scheduler:

var sta = new StaTaskScheduler(numberOfThreads:1);

...

for(int i=0; i<10; i++)

{

    // All 10 tasks will be executed sequentially

    // due to using only one thread.

    Task.Factory.StartNew(() =>

    {

        ... // run code here that requires an STA thread

    }, CancellationToken.None, TaskCreationOptions.None, sta);

}

 

Another approach, depending on the use case, is to use multiple threads, but to dedicate one STA instance to each thread.  For example, let’s say you have an STA object CoolObject that provides a method DoStuff.  You want to be able to have multiple tasks take advantage of DoStuff concurrently, but that can’t be done on the same object instance, and unfortunately CoolObject is relatively expensive to instantiate.  A solution then is to store one CoolObject instance in thread-local storage for each of the scheduler’s threads, and each task can then access that thread-local copy:

var sta = new StaTaskScheduler(numberOfThreads:4);

var coolObject = new ThreadLocal<CoolObject>(() => new CoolObject());

for(int i=0; i<N; i++)

{

    Task.Factory.StartNew(() =>

    {

        ...

        coolObject.Value.DoStuff();

        ...

    }, CancellationToken.None, TaskCreationOptions.None, sta);

}

 

In general, it’s desirable to avoid STA objects, but with legacy code that’s a luxury we often can’t afford, and it’s great to be able to still use TPL with such objects.
Leave a Comment
  • Please add 1 and 2 and type the answer here:
  • Post
  • Just as a small addition: STA (unfortunately) is not just for legacy code. If you ever try to set up a worker pool that creates XPS from FlowDocuments (or something else that makes sense in the background and relies on WPF), you'll find that WPF only does STA threads.

    Maybe in some future version of Windows that doesn't do clipboard and DDE we'll get rid of STA's :-)

    -- Henkk

  • Thanks for the addition, Henkk.

  • Hi Toub,

      Do you have any sample application that utilizes the above extension to PTL?  Can we have dedicated STA thread to WPF User control?

  • Hi Samir-

    For accessing WPF controls, you need to run the code that accesses them on the same thread that created the WPF control.  For such purposes you can use the built in TaskScheduler that targets a synchronization context, e.g. when running on your UI thread, call TaskScheduler.FromCurrentSynchronizationContext in order to get a TaskScheduler that targets that thread, and any future tasks scheduled to that schedule will be automatically marshaled back to run on the UI thread.

    Regarding sample apps that use this, no, it's a fairly niche thing to need to do.  Typically you'd only do it if you had some component that had to be accessed from an STA thread.

  • I can't thank you enough for your StaTaskScheduler. My C# project creates a handful of COM objects concurrently and when all done a remoting process consumes the objects and does some work. It ran seriously slow using .NET's default thread apartments (MTA).

    Unfortunately simply creating the COM objects using STA threads did not work because the STA thread exits on completion and the COM objects were detached from their RCW resulting in exceptions in the remoting process.

    Your StaTaskScheduler saved me from having to design some esoteric mechanism of keeping the STA threads alive until the COM object's been consumed...

    Many thanks.

  • Hi Wole-

    I'm glad you've found it so useful!  Thanks for letting me know.

  • Per the first comment about WPF using STA threads, can someone please elaborate? I am in the process of writing an in-house Windows Service that needs to print to various printers. I will either need to use the WinForm printing engine or the WPF printing engine to accomplish this (c# person here, not C++ fluent anymore).

    So, do I need to worry about STA vs. MTA for that? What about using Tasks in general in a WPF full program? Do I need to use the STA scheduler regardless? Or, if I am not using DDE or the Clipboard am I "safe".

    Thanks in advance,

    Jim

  • Jim, some components require being run from STA threads.  I believe this includes the XpsDocument class.  By default, Tasks run on the .NET ThreadPool, which are MTA threads, so you wouldn't be able to just fire up a task using the default scheduler in order to utilize STA-bound types.  However, as this blog post highlighted, you can use custom TaskSchedulers to run tasks wherever you want, including on STA threads.  This would then allow you to run code that must be run on STA threads while still using tasks to offload and coordinate your work.  I hope that helps.

  • Hi Toub,  

    Thanks for the reply. So where does one find if a particular class requires STA use over MTA? Or will it just fail spectacularly? You suggested that XpsDocument class requires STA, but I did not see anything on the doc page for that class that mentioned STA.  The only thing I saw was "Thread Safety: Any public static (Shared in Visual Basic) members of this type are thread safe. Any instance members are not guaranteed to be thread safe."  Is this some sort of "insider-speak" for use STA threads only?

    Thanks,

    Jim

  • Hi Jim-

    Unfortunately I've not see this extensively documented.  It's good feedback I'll pass along to the relevant documentation folks, though.  STA vs MTA needs is different than the thread-safety information, which speaks to whether and how static members or instance members on the same instance may be called concurrently or not.

  • Doesn't an STA thread need to have a message pump in order for COM events to be processed?   How would you do this?  Periodically call Application.DoEvents() in the task body?

               convertCtrl.Progress += (sender, e) =>

                  {

                     progress.Report(e.percent);

                  };

               convertCtrl.StartConvert();

               while (convertCtrl.State != ConvertState.Stopped)

               {

                  cancel.ThrowIfCancellationRequested();

                  Application.DoEvents();

               }

    convertCtrl is a COM object that is doing some processing and reporting progress which I then pass on to a Progress<T> object.   This seems to work but is this the correct way to handle this?

    Thanks,

    Bill

  • Thanks Steven, the StaTaskScheduler made it possible for me to serve dynamic XpsDocuments from an ASP.NET MVC page. If anyone else is interested in doing this, be sure to create a singleton instance of StaTaskScheduler. Xps does some weird caching of paginator objects, that require them to be accessed on the same thread that created them.

    /Palle

  • @Palle Due Larsen: Glad to hear it was useful to you :)

Page 1 of 1 (13 items)