If you follow the samples for Parallel Programming with the .Net Framework, you may have come across the ParallelExtensionsExtras and the Additional TaskSchedulers. Although these samples cover a broad set of requirements I recently came across another that could be satisfied with the creation of a new custom task scheduler.

In the current samples there is a custom task scheduler that supports a Maximum Degree of Parallelism (MDOP). However, what if one wanted to adjust the MDOP based on the task execution times.

As an example consider calling a web service, or performing any data processing, from a message queue. I recently came across this requirement, where if the response times drop then the MDOP also needed to drop. One could take the approach of a service that manages its threads but a custom task scheduler enables a far greater set of usage scenarios.

The full code can be located here: Execution Time Based Heuristic Custom Task Scheduler

LimitedConcurrencyLevelTaskScheduler Class

As a starting point here is the code from the PfX Team covering a custom task scheduler that allows one to specify a fixed MDOP:

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6.  
  7. namespace MSDN.Schedulers
  8. {
  9.     /// <summary>Provides a task scheduler that ensures a maximum concurrency level while running on top of the ThreadPool.</summary>
  10.     public sealed class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
  11.     {
  12.         /// <summary>Values defining the current state of the scheduler.</summary>
  13.         public struct ExecutionState
  14.         {
  15.             /// <summary>Current level of concurrency.</summary>
  16.             public int CurrentConcurrency { get; internal set; }
  17.  
  18.             /// <summary>Current count of awaiting task.</summary>
  19.             public int AwaitingTasks { get; internal set; }
  20.         }
  21.  
  22.         /// <summary>Whether the current thread is processing work items.</summary>
  23.         [ThreadStatic]
  24.         private static bool CurrentThreadIsProcessingItems;
  25.  
  26.         /// <summary>The list of tasks to be executed.</summary>
  27.         private readonly LinkedList<Task> tasks = new LinkedList<Task>(); // protected by lock(tasks)
  28.  
  29.         /// <summary>The maximum concurrency level allowed by this scheduler.</summary>
  30.         private readonly int maxDegreeOfParallelism;
  31.  
  32.         /// <summary>Whether the scheduler is currently processing work items.</summary>
  33.         private int delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
  34.  
  35.         /// <summary>Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the specified degree of parallelism.</summary>
  36.         /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
  37.         public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
  38.         {
  39.             if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
  40.             this.maxDegreeOfParallelism = maxDegreeOfParallelism;
  41.         }
  42.  
  43.         /// <summary>Queues a task to the scheduler.</summary>
  44.         /// <param name="task">The task to be queued.</param>
  45.         protected override void QueueTask(Task task)
  46.         {
  47.             // Add the task to the list of tasks to be processed.  If there aren't enough
  48.             // delegates currently queued or running to process tasks, schedule another.
  49.             lock (this.tasks)
  50.             {
  51.                 this.tasks.AddLast(task);
  52.                 if (this.delegatesQueuedOrRunning < this.maxDegreeOfParallelism)
  53.                 {
  54.                     ++this.delegatesQueuedOrRunning;
  55.                     NotifyThreadPoolOfPendingWork();
  56.                 }
  57.             }
  58.         }
  59.  
  60.         /// <summary>Informs the ThreadPool that there's work to be executed for this scheduler.</summary>
  61.         private void NotifyThreadPoolOfPendingWork()
  62.         {
  63.             ThreadPool.UnsafeQueueUserWorkItem(_ =>
  64.             {
  65.                 // Note that the current thread is now processing work items.
  66.                 // This is necessary to enable inlining of tasks into this thread.
  67.                 CurrentThreadIsProcessingItems = true;
  68.                 try
  69.                 {
  70.                     // Process all available items in the queue.
  71.                     while (true)
  72.                     {
  73.                         Task item;
  74.                         lock (this.tasks)
  75.                         {
  76.                             // When there are no more items to be processed,
  77.                             // note that we're done processing, and get out.
  78.                             if (this.tasks.Count == 0)
  79.                             {
  80.                                 --this.delegatesQueuedOrRunning;
  81.                                 break;
  82.                             }
  83.  
  84.                             // Get the next item from the queue
  85.                             item = this.tasks.First.Value;
  86.                             this.tasks.RemoveFirst();
  87.                         }
  88.  
  89.                         // Execute the task we pulled out of the queue
  90.                         base.TryExecuteTask(item);
  91.                     }
  92.                 }
  93.                 // We're done processing items on the current thread
  94.                 finally
  95.                 {
  96.                     CurrentThreadIsProcessingItems = false;
  97.                 }
  98.             }, null);
  99.         }
  100.  
  101.         /// <summary>Attempts to execute the specified task on the current thread.</summary>
  102.         /// <param name="task">The task to be executed.</param>
  103.         /// <param name="taskWasPreviouslyQueued">Indicator for whether the task was previously dequeued.</param>
  104.         /// <returns>Whether the task could be executed on the current thread.</returns>
  105.         protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  106.         {
  107.             // If this thread isn't already processing a task, we don't support inlining
  108.             if (!CurrentThreadIsProcessingItems) return false;
  109.  
  110.             // If the task was previously queued, remove it from the queue
  111.             if (taskWasPreviouslyQueued) TryDequeue(task);
  112.  
  113.             // Try to run the task.
  114.             return base.TryExecuteTask(task);
  115.         }
  116.  
  117.         /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
  118.         /// <param name="task">The task to be removed.</param>
  119.         /// <returns>Whether the task could be found and removed.</returns>
  120.         protected override bool TryDequeue(Task task)
  121.         {
  122.             lock (this.tasks)
  123.             {
  124.                 return this.tasks.Remove(task);
  125.             }
  126.         }
  127.  
  128.         /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
  129.         /// <returns>An enumerable of the tasks currently scheduled.</returns>
  130.         protected override IEnumerable<Task> GetScheduledTasks()
  131.         {
  132.             bool lockTaken = false;
  133.  
  134.             try
  135.             {
  136.                 Monitor.TryEnter(this.tasks, ref lockTaken);
  137.                 if (lockTaken) return this.tasks.ToArray();
  138.                 else throw new NotSupportedException();
  139.             }
  140.             finally
  141.             {
  142.                 if (lockTaken) Monitor.Exit(this.tasks);
  143.             }
  144.         }
  145.  
  146.         /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
  147.         public override int MaximumConcurrencyLevel
  148.         {
  149.             get
  150.             {
  151.                 return this.maxDegreeOfParallelism;
  152.             }
  153.         }
  154.  
  155.         /// <summary>Gets the current Execution State of the scheduler.</summary>
  156.         public ExecutionState SchedulerExecutionState
  157.         {
  158.             get
  159.             {
  160.                 ExecutionState state = new ExecutionState();
  161.                 state.CurrentConcurrency = this.delegatesQueuedOrRunning;
  162.                 state.AwaitingTasks = this.tasks.Count;
  163.  
  164.                 return state;
  165.             }
  166.         }
  167.  
  168.         /// <summary>Gets the current Processing state of the scheduler.</summary>
  169.         public bool IsProcessing
  170.         {
  171.             get
  172.             {
  173.                 return (this.tasks.Count > 0 || this.delegatesQueuedOrRunning > 0);
  174.             }
  175.         }
  176.     }
  177. }

To use the task scheduler one just has to create a task scheduler instance, with the required MDOP, and then create a Factory with the newly created scheduler:

LimitedConcurrencyLevelTaskScheduler lclts = new LimitedConcurrencyLevelTaskScheduler(5);
TaskFactory factory = new TaskFactory(lclts);

factory.StartNew(() =>
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine("{0} on thread {1}", i, Thread.CurrentThread.ManagedThreadId);
        }
    }
);

Alternatively to use the scheduler in Parallel loops, one will again create the task scheduler and use this when defining the ParallelOptions class for the parallel loop:

LimitedConcurrencyLevelTaskScheduler lclts = new LimitedConcurrencyLevelTaskScheduler(5);
ParallelOptions options = new ParallelOptions();
options.TaskScheduler = lclts;

Parallel.For(
        0,
        50,
        options,
        (i) =>
        {                        
            Thread.Sleep(100);
            Console.WriteLine("Finish Thread={0}, i={1}", Thread.CurrentThread.ManagedThreadId, i);
        }
    );

All easy stuff. So onto the new task scheduler.

ResponseLimitedConcurrencyTaskScheduler Class

In meeting the requirement to vary the MDOP based on execution times there are three main differences one needs to accommodate. Firstly, one needs a function that can be used for determining the MDOP based on average task execution times. Secondly, one needs to record the actual task execution times in such a manner that the average can be determined and passed to the MDOP calculator function. Lastly, the MDOP needs to be re-evaluated, and the number of executing threads adjusted accordingly.

To support the requirement for determining the MDOP the scheduler class constructor now supports the following prototype:

public ResponseLimitedConcurrencyTaskScheduler(Func<int, int> funcMdop,
ResponseLimitedConcurrencyTaskConfiguration configuration)

This MDOP function is designed to accept an average execution time and return the desired MDOP. To calculate the average execution times a ConcurrentStack<int> is used to record the task execution times. On a timed schedule the top configured items from the stack are averaged, with this value being used in the MDOP calculation.

Once the MDOP has been calculated it is used in the NotifyThreadPoolOfPendingWork() call to ensure the correct MDOP is used when processing threads.

So without further adieu here is the full code listing for the scheduler:

  1. namespace MSDN.Schedulers
  2. {
  3.     using System;
  4.     using System.Collections.Concurrent;
  5.     using System.Collections.Generic;
  6.     using System.Diagnostics;
  7.     using System.Linq;
  8.     using System.Threading;
  9.     using System.Threading.Tasks;
  10.  
  11.     /// <summary>Provides a task scheduler that ensures a maximum concurrency level while running on top of the ThreadPool.</summary>
  12.     public sealed class ResponseLimitedConcurrencyTaskScheduler : TaskScheduler, IDisposable
  13.     {
  14.         /// <summary>Values defining the current state of the scheduler.</summary>
  15.         public struct ExecutionState
  16.         {
  17.             /// <summary>Gets the current Max Degree of Parallelism.</summary>
  18.             public int MaxDegreeOfParallelism { get; internal set; }
  19.  
  20.             /// <summary>Gets the current level of concurrency.</summary>
  21.             public int CurrentConcurrency { get; internal set; }
  22.  
  23.             /// <summary>Gets the current count of awaiting task.</summary>
  24.             public int AwaitingTasks { get; internal set; }
  25.  
  26.             /// <summary>Gets the last sampled average of the execution times.</summary>
  27.             public int LastSampledAverage { get; internal set; }
  28.         }
  29.  
  30.         /// <summary>Whether the current thread is processing work items.</summary>
  31.         [ThreadStatic]
  32.         private static bool CurrentThreadIsProcessingItems;
  33.  
  34.         /// <summary>Scheduler running configuration.</summary>
  35.         private readonly ResponseLimitedConcurrencyTaskConfiguration configuration;
  36.  
  37.         /// <summary>The list of tasks to be executed.</summary>
  38.         private readonly LinkedList<Task> tasks = new LinkedList<Task>(); // protected by lock(_tasks)
  39.  
  40.         /// <summary>The maximum concurrency level allowed by this scheduler for the current response times.</summary>
  41.         private volatile int maxDegreeOfParallelism;
  42.  
  43.         /// <summary>Last sampled average of the execution times.</summary>
  44.         private volatile int lastSampledAverage;
  45.  
  46.         /// <summary>Whether the scheduler is currently processing work items.</summary>
  47.         private int delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
  48.  
  49.         /// <summary>Stack for holding execution times for calculating MDOP </summary>
  50.         private ConcurrentStack<int> timings = new ConcurrentStack<int>();
  51.  
  52.         /// <summary>Object to ensure MDOP is only set on a single thread.</summary>
  53.         private object timingsSync = new object();
  54.  
  55.         /// <summary>Function determining the MDOP, where a zero average gives the starting MDOP.</summary>
  56.         private Func<int, int> funcMdop;
  57.  
  58.         /// <summary>The timer for refreshing the scheduler MDOP.</summary>
  59.         private Timer refreshTimer;
  60.  
  61.         /// <summary>Indicator for whether Dispose has been called.</summary>
  62.         private bool disposed = false;
  63.  
  64.         /// <summary> Initializes a new instance of the ResponseLimitedConcurrencyTaskScheduler class with the specified MDOP function. </summary>
  65.         /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism function provider for this scheduler.</param>
  66.         /// <param name="configuration">Scheduler configuration.</param>
  67.         public ResponseLimitedConcurrencyTaskScheduler(Func<int, int> funcMdop, ResponseLimitedConcurrencyTaskConfiguration configuration)
  68.         {
  69.             // Finally ensure zero average gives a starting MDOP
  70.             if (funcMdop(0) < 0) { throw new ArgumentOutOfRangeException("funcMdop", "Calculated Zero Max Degree of Parallelism cannot be negative."); }
  71.  
  72.             this.funcMdop = funcMdop;
  73.             this.configuration = configuration;
  74.  
  75.             // Get the default MDOP
  76.             int mdop = this.funcMdop(0);
  77.             this.maxDegreeOfParallelism = mdop == 0 ? this.configuration.DefaultDegreeOfParallelism : mdop;
  78.             this.lastSampledAverage = 0;
  79.  
  80.             // Setup the refresh of the MDOP
  81.             int rate = this.configuration.SamplingRateMilliseconds;
  82.             this.refreshTimer = new Timer(state => this.SetMdop(), null, rate + 1000, rate);
  83.         }
  84.  
  85.         /// <summary>Initializes a new instance of the ResponseLimitedConcurrencyTaskScheduler class with the specified MDOP function.</summary>
  86.         /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism function provider for this scheduler.</param>
  87.         public ResponseLimitedConcurrencyTaskScheduler(Func<int, int> funcMdop)
  88.             : this(funcMdop, new ResponseLimitedConcurrencyTaskConfiguration())
  89.         {
  90.         }
  91.  
  92.         /// <summary> Define the MDOP based on the current state of the execution times. </summary>
  93.         private void SetMdop()
  94.         {
  95.             if (this.timings != null && this.timings.Count > this.configuration.MinimumSampleCount)
  96.             {
  97.                 lock (this.timingsSync)
  98.                 {
  99.                     if (this.timings != null && this.timings.Count > this.configuration.MinimumSampleCount)
  100.                     {
  101.                         int[] latestTimes = new int[this.configuration.AverageSampleCount];
  102.                         int count = this.timings.TryPopRange(latestTimes);
  103.  
  104.                         if (count > this.configuration.MinimumSampleCount)
  105.                         {
  106.                             this.timings.Clear();
  107.                             int average = (int)latestTimes.Take(count).Average();
  108.                             int mdop = this.funcMdop(average);
  109.  
  110.                             this.lastSampledAverage = average;
  111.                             if (mdop > 0) { this.maxDegreeOfParallelism = mdop <= 0 ? this.configuration.DefaultDegreeOfParallelism : mdop; }
  112.                         }
  113.                     }
  114.                 }
  115.             }
  116.         }
  117.  
  118.         /// <summary>Attempts to execute the specified task recording execution time.</summary>
  119.         /// <param name="task">The task to be executed.</param>
  120.         /// <returns>Whether the task could be executed on the current thread.</returns>
  121.         private bool TryExecuteTaskInternal(Task task)
  122.         {
  123.             Stopwatch sw = Stopwatch.StartNew();
  124.  
  125.             bool result = base.TryExecuteTask(task);
  126.  
  127.             sw.Stop();
  128.             TimeSpan timing = sw.Elapsed;
  129.  
  130.             // Ensure queue is not growing too much before adding timing
  131.             if (this.timings.Count < this.configuration.MaximumSampleCount) { this.timings.Push((int)timing.TotalMilliseconds); }
  132.  
  133.             return result;
  134.         }
  135.  
  136.         /// <summary>Queues a task to the scheduler.</summary>
  137.         /// <param name="task">The task to be queued.</param>
  138.         protected override void QueueTask(Task task)
  139.         {
  140.             // Add the task to the list of tasks to be processed.  If there aren't enough
  141.             // delegates currently queued or running to process tasks, schedule another.
  142.             lock (this.tasks)
  143.             {
  144.                 this.tasks.AddLast(task);
  145.                 if (this.delegatesQueuedOrRunning < this.maxDegreeOfParallelism)
  146.                 {
  147.                     ++this.delegatesQueuedOrRunning;
  148.                     this.NotifyThreadPoolOfPendingWork();
  149.                 }
  150.             }
  151.         }
  152.  
  153.         /// <summary> Informs the ThreadPool that there's work to be executed for this scheduler. </summary>
  154.         private void NotifyThreadPoolOfPendingWork()
  155.         {
  156.             ThreadPool.UnsafeQueueUserWorkItem(_ =>
  157.             {
  158.                 // Note that the current thread is now processing work items.
  159.                 // This is necessary to enable inlining of tasks into this thread.
  160.                 CurrentThreadIsProcessingItems = true;
  161.                 try
  162.                 {
  163.                     // Process all available items in the queue.
  164.                     while (true)
  165.                     {
  166.                         Task item;
  167.                         lock (this.tasks)
  168.                         {
  169.                             // When there are no more items to be processed,
  170.                             // or if running with too much concurrency,
  171.                             // note that we're done processing, and get out.
  172.                             if (this.tasks.Count == 0 || this.delegatesQueuedOrRunning > this.maxDegreeOfParallelism)
  173.                             {
  174.                                 --this.delegatesQueuedOrRunning;
  175.                                 break;
  176.                             }
  177.  
  178.                             // Get the next item from the queue
  179.                             item = this.tasks.First.Value;
  180.                             this.tasks.RemoveFirst();
  181.                         }
  182.  
  183.                         // Execute the task we pulled out of the queue
  184.                         this.TryExecuteTaskInternal(item);
  185.                     }
  186.                 }
  187.  
  188.                 // We're done processing items on the current thread
  189.                 finally
  190.                 {
  191.                     CurrentThreadIsProcessingItems = false;
  192.                 }
  193.             }, null);
  194.         }
  195.  
  196.         /// <summary>Attempts to execute the specified task on the current thread.</summary>
  197.         /// <param name="task">The task to be executed.</param>
  198.         /// <param name="taskWasPreviouslyQueued">Indicator for whether the task was previously dequeued.</param>
  199.         /// <returns>Whether the task could be executed on the current thread.</returns>
  200.         protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  201.         {
  202.             // If this thread isn't already processing a task, we don't support inlining
  203.             if (!CurrentThreadIsProcessingItems) { return false; }
  204.  
  205.             // If the task was previously queued, remove it from the queue
  206.             if (taskWasPreviouslyQueued) this.TryDequeue(task);
  207.  
  208.             // Try to run the task.
  209.             return this.TryExecuteTaskInternal(task);
  210.         }
  211.  
  212.         /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
  213.         /// <param name="task">The task to be removed.</param>
  214.         /// <returns>Whether the task could be found and removed.</returns>
  215.         protected override bool TryDequeue(Task task)
  216.         {
  217.             lock (this.tasks)
  218.             {
  219.                 return this.tasks.Remove(task);
  220.             }
  221.         }
  222.  
  223.         /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
  224.         public override int MaximumConcurrencyLevel
  225.         {
  226.             get
  227.             {
  228.                 return this.maxDegreeOfParallelism;
  229.             }
  230.         }
  231.  
  232.         /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
  233.         /// <returns>An enumerable of the tasks currently scheduled.</returns>
  234.         protected override IEnumerable<Task> GetScheduledTasks()
  235.         {
  236.             bool lockTaken = false;
  237.             try
  238.             {
  239.                 Monitor.TryEnter(this.tasks, ref lockTaken);
  240.  
  241.                 if (lockTaken)
  242.                 {
  243.                     return this.tasks.ToArray();
  244.                 }
  245.                 else
  246.                 {
  247.                     throw new NotSupportedException();
  248.                 }
  249.             }
  250.             finally
  251.             {
  252.                 if (lockTaken) { Monitor.Exit(this.tasks); }
  253.             }
  254.         }
  255.  
  256.         /// <summary>Gets the current Execution State of the scheduler.</summary>
  257.         public ExecutionState SchedulerExecutionState
  258.         {
  259.             get
  260.             {
  261.                 ExecutionState state = new ExecutionState();
  262.                 state.LastSampledAverage = this.lastSampledAverage;
  263.                 state.MaxDegreeOfParallelism = this.maxDegreeOfParallelism;
  264.                 state.CurrentConcurrency = this.delegatesQueuedOrRunning;
  265.                 state.AwaitingTasks = this.tasks.Count;
  266.  
  267.                 return state;
  268.             }
  269.         }
  270.  
  271.         /// <summary>Gets the current Configuration of the scheduler.</summary>
  272.         public ResponseLimitedConcurrencyTaskConfiguration SchedulerTaskConfiguration
  273.         {
  274.             get
  275.             {
  276.                 return this.configuration;
  277.             }
  278.         }
  279.  
  280.         /// <summary>Gets a value indicating whether the scheduler is processing.</summary>
  281.         public bool IsProcessing
  282.         {
  283.             get
  284.             {
  285.                 return this.tasks.Count > 0 || this.delegatesQueuedOrRunning > 0;
  286.             }
  287.         }
  288.  
  289.         /// <summary>Method to control the disposing of resources. </summary>
  290.         /// <param name="disposing">Indicator for whether Dispose has been called directly.</param>
  291.         private void Dispose(bool disposing)
  292.         {
  293.             if (!this.disposed)
  294.             {
  295.                 if (disposing)
  296.                 {
  297.                     if (this.refreshTimer != null)
  298.                     {
  299.                         this.refreshTimer.Dispose();
  300.                     }
  301.                 }
  302.  
  303.                 // Indicate that the instance has been disposed.
  304.                 this.refreshTimer = null;
  305.                 this.disposed = true;
  306.             }
  307.         }
  308.  
  309.         /// <summary>IDisposable Dispose method.</summary>
  310.         public void Dispose()
  311.         {
  312.             this.Dispose(true);
  313.             GC.SuppressFinalize(this);
  314.         }
  315.     }
  316. }

To use the task scheduler one just has to create an instance, with the required MDOP function, and then as before create a Factory with the newly created scheduler:

var config = new List<ResponseLimitedConcurrencyStepDefinition>();
config.Add(new ResponseLimitedConcurrencyStepDefinition(5, 500));
config.Add(new ResponseLimitedConcurrencyStepDefinition(3, 5000));
config.Add(new ResponseLimitedConcurrencyStepDefinition(2, Int32.MaxValue));
Func<int, int> mdopFunc = ResponseLimitedConcurrencyFunctionFactory.StepDecrement(config);

ResponseLimitedConcurrencyTaskScheduler rlcts = new ResponseLimitedConcurrencyTaskScheduler(mdopFunc);
TaskFactory factory = new TaskFactory(rlcts);

factory.StartNew(() =>
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine("{0} on thread {1}", i, Thread.CurrentThread.ManagedThreadId);
        }
    }
);

The only difference in this code is the construction of the MDOP function. To assist in creating this function the code comes with a few starters for 10. Firstly a step up and step down function, and secondly a linearly increasing and decreasing function. The step up and down functions takes a series of steps definitions outlining the step changes in MDOP. The linear increasing and decreasing functions take an upper and lower limit with a rate of change.

There is one final item needed to meet the requirements that initially drove the custom task scheduler solution. How do you use the scheduler to process a queue of actions?

ResponseLimitedConcurrencyActionLoader Class

The downloadable code also contains a class that can be used to ensure the task scheduler instance always has an active processing queue of a specified Action:

  1. namespace MSDN.Schedulers
  2. {
  3.     using System;
  4.     using System.Collections.Concurrent;
  5.     using System.Collections.Generic;
  6.     using System.Diagnostics;
  7.     using System.Linq;
  8.     using System.Threading;
  9.     using System.Threading.Tasks;
  10.  
  11.     /// <summary>A class to load a ResponseLimitedConcurrencyTaskScheduler with a single Action.</summary>
  12.     public class ResponseLimitedConcurrencyActionLoader
  13.     {
  14.         /// <summary>Indicator for whether to continue loading.</summary>
  15.         private volatile bool continueProcessing = false;
  16.  
  17.         /// <summary>The ResponseLimitedConcurrencyTaskScheduler reference.</summary>
  18.         private ResponseLimitedConcurrencyTaskScheduler scheduler;
  19.  
  20.         /// <summary>The factory to be used for loading the action.</summary>
  21.         private TaskFactory factory;
  22.  
  23.         /// <summary>The Task used to load the data on a background thread.</summary>
  24.         private Task loaderTask;
  25.  
  26.         /// <summary>The interval (in milliseconds) for which to review the outstanding tasks.</summary>
  27.         private int baseInterval;
  28.  
  29.         /// <summary>Initializes a new instance of the ResponseLimitedConcurrencyActionLoader class.</summary>
  30.         /// <param name="scheduler">The scheduler to load.</param>
  31.         /// <param name="interval">The interval (in milliseconds) for which to perform a load check.</param>
  32.         public ResponseLimitedConcurrencyActionLoader(ResponseLimitedConcurrencyTaskScheduler scheduler, int interval)
  33.         {
  34.             this.scheduler = scheduler;
  35.             this.factory = new TaskFactory(scheduler);
  36.             this.baseInterval = interval;
  37.         }
  38.  
  39.         /// <summary>Initializes a new instance of the ResponseLimitedConcurrencyActionLoader class.</summary>
  40.         /// <param name="scheduler">The scheduler to load.</param>
  41.         /// <param name="interval">The interval (in milliseconds) for which to perform a load check.</param>
  42.         public ResponseLimitedConcurrencyActionLoader(ResponseLimitedConcurrencyTaskScheduler scheduler)
  43.             : this(scheduler, 1000)
  44.         {
  45.         }
  46.  
  47.         /// <summary>Indicators whether the scheduler or loader is currently busy.</summary>
  48.         public bool IsProcessing()
  49.         {
  50.             return this.continueProcessing || this.scheduler.IsProcessing;
  51.         }
  52.  
  53.         /// <summary>Performs a load of the specified action until stop is called.</summary>
  54.         /// <param name="action">The action to execute.</param>
  55.         public void Start(Action action)
  56.         {
  57.             // Ensure loading is currently not active
  58.             if (this.IsProcessing()) { throw new InvalidOperationException("Loader is currently active."); }
  59.  
  60.             // Note now running
  61.             this.continueProcessing = true;
  62.  
  63.             // Define base configuration values
  64.             var configuration = this.scheduler.SchedulerTaskConfiguration;
  65.             int minSampleRate = Math.Min(configuration.MinimumSampleCount * 3, configuration.AverageSampleCount * 2);
  66.             int interval = Math.Min(configuration.SamplingRateMilliseconds, this.baseInterval);
  67.  
  68.             // Load actions into scheduler until processing needs to stop
  69.             this.loaderTask = Task.Factory.StartNew(() =>
  70.                 {
  71.                     while (this.continueProcessing)
  72.                     {
  73.                         // Calculate tasks that should be pending
  74.                         int actualNeeded;
  75.                         if (this.scheduler.SchedulerExecutionState.LastSampledAverage == 0)
  76.                         {
  77.                             double rate = ((double)this.scheduler.SchedulerExecutionState.MaxDegreeOfParallelism * 1000.0) / (double)this.scheduler.SchedulerExecutionState.LastSampledAverage;
  78.                             int minNeeded = Math.Max((int)((rate * (double)interval * 2.0) / 1000.0), minSampleRate);
  79.                             actualNeeded = minNeeded - this.scheduler.SchedulerExecutionState.AwaitingTasks;
  80.                         }
  81.                         else
  82.                         {
  83.                             actualNeeded = minSampleRate;
  84.                         }
  85.  
  86.                         // Load any needed tasks
  87.                         if (actualNeeded > 0)
  88.                         {
  89.                             for (int idx = 0; idx < actualNeeded; idx++)
  90.                             {
  91.                                 factory.StartNew(action);
  92.                             }
  93.                         }
  94.  
  95.                         // Wait for inteval before looping back around
  96.                         Thread.Sleep(interval);
  97.                     }
  98.                 });
  99.         }
  100.  
  101.         /// <summary>Tell the loader to stop processing.</summary>
  102.         public void Stop()
  103.         {
  104.             this.continueProcessing = false;
  105.             if (this.loaderTask != null) { this.loaderTask.Wait(); }
  106.         }
  107.  
  108.         /// <summary>Tell the loader to stop processing and block until all actions complete.</summary>
  109.         public void StopAndWait()
  110.         {
  111.             this.Stop();
  112.             while (this.scheduler.IsProcessing) { Thread.Sleep(250); }
  113.         }
  114.     }
  115. }

If needed this class can be used to construct a Windows Service that processes items from a queue, and where the MDOP of the scheduler varies based on the workload. It is for this reason the code contains Start() and Stop() function calls. To use the Action Loader one just has to provide the necessary Action and Scheduler to the class instance:

Func<int, int> funcMdop = ResponseLimitedConcurrencyFunctionFactory.LinearDecreasing(5, 1, 2);

ResponseLimitedConcurrencyTaskScheduler rlcts = new ResponseLimitedConcurrencyTaskScheduler(funcMdop);
ResponseLimitedConcurrencyActionLoader loader = new ResponseLimitedConcurrencyActionLoader(rlcts);

Action action = () =>
        {
            Thread.Sleep(10);
            Console.WriteLine("Finish Thread={0}", Thread.CurrentThread.ManagedThreadId);
        };

loader.Start(action);
Thread.Sleep(5000);
loader.StopAndWait();

So going back to my original requirements, it became easy to create a Windows Service where the Action is to read from a processing queue and call a web service; where the MDOP is reduced when the service response times increase.

Summary

There are a variety of possible approaches one could take for Action processing, where the number of processing threads is varied based on task execution times. I chose to explore the custom task Scheduler approach as it offered the greatest amount of flexibility and reuse for a variety of other possible scenarios.

As a final reminder the code download can be located here.