In my last couple of posts I have lead you through some basic introduction to workflow as well as a real world example of how to implement a cancelable CopyFile activity. The next thing I would like to do is dive into a much more involved example utilizing the previous activity. The purpose of this post is to introduce the class NativeActivity, since we will need to implement a custom activity for driving the consumers. We will also be working toward using our producer-consumer activity to eventually develop a parallelizable CopyDirectory activity, which I plan to cover in a subsequent posts.

Producer-Consumer

The first thing I want to do is step back and determine how we might go about modeling this as a reusable model that may be extended through composition. Our basic structure will be something along the lines of the following:

Parallel

  • Producer
    • While continue-producing
      • Invoke producer to generate items
      • For each of the items produced, notify the consumer that a new item is available
    • Notify the consumer to shut down
  • Consumer
    • Wait for next item
    • If consumer is available
      • Then
        • Invoke consumer to handle item
      • Else
        • Queue the item and wait for a consumer to complete

Typically the coordination between the producer and the consumer would be handled through mutexes, monitors, or some other cross-thread synchronization mechanism. The nice thing about workflow, however, is that we do not need to worry about this synchronization between threads as long as we make sure that all access to the common objects are handled on the workflow thread. If you were worried by my earlier posts that a single thread is dedicated to running a workflow, perhaps now it will make more sense. What this allows us to do is model complex, multi-threaded operations without having to deal with thread synchronization issues! However, there is one critical piece of the puzzle that is not already done for us – modeling the consumer scheduling behavior across threads. In order to run the consumer scheduling in parallel to the producer we will need to dive into the wonderful world of NativeActivity and Bookmark.

Activity classes which derive from NativeActivity are typically attempting to provide some sort of custom scheduling logic that is not available through composition of existing framework activities. In our scenario, for instance, we need an activity that can:

  • Wait for items to be pushed into a queue (this is what we need the bookmark for)
  • Schedule some maximum number of handlers in parallel and ensure that maximal use of these handlers is obtained

The remainder of the post will be dedicated to walking through how to design this activity, using NativeActivity.

What is a Bookmark?

A bookmark, for lack of a better definition, is exactly what it sounds like. For those of you unfamiliar with the previous workflow framework, a bookmark is essentially like a piece of paper you put inside of a book to mark your place. When an activity creates a bookmark, it is creating a named “resumption point” so it may go dormant and other activities may “wake it up” by resuming the bookmark. If you are familiar with Workflow from the .NET 3.5 Framework, a bookmark provides similar functionality to a queue. A bookmark is how we will handle making the consumer scheduling thread go idle while waiting for items in the queue.

Implementing the ParallelItemScheduler

The parallel item scheduler is a generic and reusable class we will be developing to add to our slowly growing library of custom activities. This will be the class (well, the main class) that I will be discussing which derives from NativeActivity. As we already mentioned, the purpose of this activity is to invoke a user-defined callback for each item in the queue, with up to N running simultaneously. We will need some mechanism for pushing items from the producer into the consumer, which means we will need some sort of a queue (represented by a bookmark). We should also allow users of our activity to control how many consumers may run in parallel. Using this information we can come up with a stub for our activity.

public sealed class ParallelItemScheduler<T> : NativeActivity
{
    public ParallelItemScheduler()
    {
        m_maxConcurrency = new InArgument<Int32>(new VisualBasicValue<Int32>("1"));
    }

    public ActivityAction<T> Body
    {
        get;
        set;
    }

    public InArgument<Int32> MaxConcurrency
    {
        get
        {
            return m_maxConcurrency;
        }
        set
        {
            m_maxConcurrency = value;
            m_maxConcurrencySet = true;
        }
    }

    [RequiredArgument]
    [DefaultValue(null)]
    public InArgument<String> QueueName
    {
        get;
        set;
    }

    protected override Boolean CanInduceIdle
    {
        get
        {
            return true;
        }
    }

    [EditorBrowsable(EditorBrowsableState.Never)]
    public Boolean ShouldSerializeMaxConcurrency()
    {
        return m_maxConcurrencySet;
    }

    protected override void CacheMetadata(NativeActivityMetadata metadata)
    {
        base.CacheMetadata(metadata);
        metadata.AddImplementationVariable(m_itemQueue);
        metadata.AddImplementationVariable(m_hasCompleted);
        metadata.AddImplementationVariable(m_concurrencyCount);
    }

    protected override void Execute(NativeActivityContext context)
    {
        context.CreateBookmark(QueueName.Get(context), new BookmarkCallback(OnItemAdded), BookmarkOptions.MultipleResume);

        m_concurrencyCount.Set(context, 0);
        m_itemQueue.Set(context, new Queue<T>());
    }

    void OnItemAdded(NativeActivityContext context, Bookmark bookMark, Object data)
    {
        throw new NotImplementedException();
    }

    void OnChildCompleted(NativeActivityContext context, ActivityInstance activity)
    {
        throw new NotImplementedException();
    }

    public sealed class Enqueue : NativeActivity { … }

    public sealed class Shutdown : NativeActivity { … }

    private static Object s_shutdownItem = new Object();

    private Boolean m_maxConcurrencySet;
    private InArgument<Int32> m_maxConcurrency;
    private Variable<Queue<T>> m_itemQueue = new Variable<Queue<T>>("itemQueue");
    private Variable<Boolean> m_hasCompleted = new Variable<Boolean>("hasCompleted");
    private Variable<Int32> m_concurrencyCount = new Variable<Int32>("concurrencyCount");
}

Before I progress any more I would like to step back and break down some new pieces of this activity definition.

  1. ActivityAction<T> is the workflow version of the framework delegate type Action<T>. This type allows you provide a pluggable mechanism for a single-argument callback, just like the framework counterpart (there are also ActivityAction<T1, T2…> types up to 16 arguments or so).
  2. MaxConcurrency and its use of the Boolean field. Since the type of the property is InArgument<Int32> you cannot use the DefaultValueAttribute to specify the default and have the serializer automatically pick it up. Instead, if you want to provide a default value that should not be serialized to XAML then you need to do it the XAML way by keeping track of whether or not the property has been set and providing a method called ShouldSerializeXXXX (where XXXX is the name of the property) that returns a value indicating whether or not to serialize the property.
  3. CacheMetadata and why it exists. Typically when designing activities you do not need to provide your own implementation since the default behavior uses reflection to discover the arguments, variables, activities, etc., of your activity. However, in certain scenarios like the one we are dealing with, you may have the requirement to create private variables for data storage (workflow calls these implementation variables). Since the reflection logic only picks up public properties of certain types we need to manually inform the runtime of these variables so we can use them. Why do we need variables? If you recall my previous post about data flow, C# fields and workflow variables are not the same – one is activity definition storage, the other is activity instance storage. We need the latter, since there may be multiple of this type of activity running within a workflow or the same definition may be shared across multiple workflow instances.
  4. CanInduceIdle is a necessary override if you are writing an activity that can cause the workflow scheduler to become idle (this allows the runtime to perform certain validations to ensure correctness when composing activities together). In this case, since we are using a bookmark to “sleep,” we have the potential to induce an idle event on the workflow instance so we need to return true here.
  5. The Execute method of our activity has an easy task. It simply needs to create the bookmark used to notify it of items (sort of like a mutex being signaled) and set up some initial state for the private variables. You’ll notice that we pass BookmarkOptions.MultipleResume when creating it, which means that we expect to have the bookmark resumed more than one time and we will handle cleaning it up on shutdown. As there is a bookmark active for our activity the workflow runtime is smart enough to leave this activity running even when the Execute method returns, so by doing this we have effectively gone to sleep waiting for input.

The basic logic of OnItemAdded, which is invoked when an item is placed into our bookmark queue, is to store the item into our implementation variable and then determine if we have any available slots for scheduling. While we are not at our maximum consumer count we should schedule a new consumer for each item that exists in the queue. Once we have finished scheduling items, we store the current concurrency count and return. It should be noted that the call to NativeActivityContext.ScheduleAction here does not actually begin execution of the action – it merely places an entry into the workflow instance’s scheduler queue and returns. The only other thing we can do in this method is remove the bookmark if the producer has signaled completion by queuing our “shutdown item.”

void OnItemAdded(NativeActivityContext context, Bookmark bookMark, Object data)
{
    if (m_hasCompleted.Get(context))
    {
        return;
    }

    Int32 concurrencyCount = m_concurrencyCount.Get(context);
    Int32 maxConcurrentCount = MaxConcurrency.Get(context);

    if (data is T)
    {
        Queue<T> itemQueue = m_itemQueue.Get(context);
        itemQueue.Enqueue((T)data);

        while (itemQueue.Count > 0 && concurrencyCount < maxConcurrentCount)
        {
            concurrencyCount++;
            context.ScheduleAction<T>(Body, itemQueue.Dequeue(), new CompletionCallback(OnChildCompleted));
        }

        m_concurrencyCount.Set(context, concurrencyCount);
    }
    else if (Object.ReferenceEquals(data, s_shutdownItem))
    {
        context.RemoveBookmark(QueueName.Get(context));
    }
}

The last order of business for this particular activity is the OnChildCompleted callback which is hooked above. This method is invoked when a child completes execution, either successfully or via cancelation. Much like the previous method this also handles consumer scheduling. As consumers complete we need to see if there are any more items which may be scheduled, and if there are we should schedule the new item using the slot of the consumer that just finished. The major difference between this method and OnItemAdded is the handling of activity cancelation. If a child activity is canceled and we have a pending cancellation request then we mark ourselves complete by storing it into our private variable and then mark the activity itself canceled via NativeActivityContext.MarkCanceled. Once we have done this once, as other children are canceled or complete they will see that the activity has already completed and immediately return.

void OnChildCompleted(NativeActivityContext context, ActivityInstance activity)
{
    if (m_hasCompleted.Get(context))
    {
        return;
    }

    if (activity.State != ActivityInstanceState.Closed && context.IsCancellationRequested)
    {
        m_hasCompleted.Set(context, true);
        context.MarkCanceled();
    }
    else
    {
        Int32 concurrencyCount = m_concurrencyCount.Get(context);
        Int32 maxConcurrencyCount = this.MaxConcurrency.Get(context);

        // Subtract one since an activity just finished
        concurrencyCount--;

        Queue<T> itemQueue = m_itemQueue.Get(context);
        if (concurrencyCount < maxConcurrencyCount && itemQueue.Count > 0)
        {
            concurrencyCount++;
            context.ScheduleAction<T>(Body, itemQueue.Dequeue(), new CompletionCallback(OnChildCompleted));
        }

        m_concurrencyCount.Set(context, concurrencyCount);
    }
}

The final classes that have not been shown here are ParallelItemScheduler<T>.Enqueue and ParallelItemScheduler<T>.Shutdown. The former simply resumes the bookmark with the specified data and the latter queues the item stored in s_shutdownItem so the call to Object.ReferenceEquals will succeed and remove the bookmark.

Conclusion

Today we have learned how to write custom scheduling logic by extending NativeActivity. We designed a custom class, ParallelItemScheduler<T>, that may be used in a workflow to model a consumer scheduler which schedules N concurrent consumers. The consumer is an attachable ActivityAction<T> class that is pluggable by consumers of the activity, and it is generic so highly reusable in a variety of scenarios. Although I would have liked to cover it all in a single post, this one is already becoming quite long and I would like to provide the ability to allow this information to soak in before continuing onto the next piece. Once we design our ProducerConsumer<T> activity you will hopefully gain a better understanding of how this activity is intended to be used. Until then, enjoy your newfound knowledge of the workflow framework and happy coding!