If you follow the samples for Parallel Programming with the .Net Framework, you may have come across the ParallelExtensionsExtras and the Additional TaskSchedulers. Although these samples cover a broad set of requirements I recently came across another that could be satisfied with the creation of a new custom task scheduler.
In the current samples there is a custom task scheduler that supports a Maximum Degree of Parallelism (MDOP). However, what if one wanted to adjust the MDOP based on the task execution times.
As an example consider calling a web service, or performing any data processing, from a message queue. I recently came across this requirement, where if the response times drop then the MDOP also needed to drop. One could take the approach of a service that manages its threads but a custom task scheduler enables a far greater set of usage scenarios.
The full code can be located here: Execution Time Based Heuristic Custom Task Scheduler
As a starting point here is the code from the PfX Team covering a custom task scheduler that allows one to specify a fixed MDOP:
To use the task scheduler one just has to create a task scheduler instance, with the required MDOP, and then create a Factory with the newly created scheduler:
Alternatively to use the scheduler in Parallel loops, one will again create the task scheduler and use this when defining the ParallelOptions class for the parallel loop:
All easy stuff. So onto the new task scheduler.
In meeting the requirement to vary the MDOP based on execution times there are three main differences one needs to accommodate. Firstly, one needs a function that can be used for determining the MDOP based on average task execution times. Secondly, one needs to record the actual task execution times in such a manner that the average can be determined and passed to the MDOP calculator function. Lastly, the MDOP needs to be re-evaluated, and the number of executing threads adjusted accordingly.
To support the requirement for determining the MDOP the scheduler class constructor now supports the following prototype:
public ResponseLimitedConcurrencyTaskScheduler(Func<int, int> funcMdop, ResponseLimitedConcurrencyTaskConfiguration configuration)
This MDOP function is designed to accept an average execution time and return the desired MDOP. To calculate the average execution times a ConcurrentStack<int> is used to record the task execution times. On a timed schedule the top configured items from the stack are averaged, with this value being used in the MDOP calculation.
Once the MDOP has been calculated it is used in the NotifyThreadPoolOfPendingWork() call to ensure the correct MDOP is used when processing threads.
So without further adieu here is the full code listing for the scheduler:
To use the task scheduler one just has to create an instance, with the required MDOP function, and then as before create a Factory with the newly created scheduler:
The only difference in this code is the construction of the MDOP function. To assist in creating this function the code comes with a few starters for 10. Firstly a step up and step down function, and secondly a linearly increasing and decreasing function. The step up and down functions takes a series of steps definitions outlining the step changes in MDOP. The linear increasing and decreasing functions take an upper and lower limit with a rate of change.
There is one final item needed to meet the requirements that initially drove the custom task scheduler solution. How do you use the scheduler to process a queue of actions?
The downloadable code also contains a class that can be used to ensure the task scheduler instance always has an active processing queue of a specified Action:
If needed this class can be used to construct a Windows Service that processes items from a queue, and where the MDOP of the scheduler varies based on the workload. It is for this reason the code contains Start() and Stop() function calls. To use the Action Loader one just has to provide the necessary Action and Scheduler to the class instance:
So going back to my original requirements, it became easy to create a Windows Service where the Action is to read from a processing queue and call a web service; where the MDOP is reduced when the service response times increase.
There are a variety of possible approaches one could take for Action processing, where the number of processing threads is varied based on task execution times. I chose to explore the custom task Scheduler approach as it offered the greatest amount of flexibility and reuse for a variety of other possible scenarios.
As a final reminder the code download can be located here.
Written by Carl Nolan