In my previous post I introduced you to NativeActivity and walked through the design and implementation of a class named ParallelItemScheduler<T>. In part 2 of this series we will be utilizing the aforementioned activity to build a new composed activity named ProducerConsumer<T>. Once we have completed this task we will have a framework for building our final activity, CopyDirectory.

Building Activities Through Composition

Unlike the previous activity, the ProducerConsumer<T> activity may be built using composition. This means that we will be using Activity as a sub-class and providing an implementation using initializer syntax and numerous built-in and custom activities that we have been working on. Before we start designing our activity, however, lets go back to the producer-consumer diagram from the previous post to make sure we get it right.

Parallel

  • Producer
    • While continue-producing
      • Invoke producer to generate items
      • For each of the items produced, notify the consumer that a new item is available
    • Notify the consumer to shut down
  • Consumer
    • Wait for next item
    • If consumer is available
      • Then
        • Invoke consumer to handle item
      • Else
        • Queue the item and wait for a consumer to complete

The pieces in bold are the configurable properties, so lets break down what we will need to provide and what workflow classes will support our implementation.

  • InArgument<Int32> ConsumerCount
    • Allows users to specify how many concurrent consumers should be scheduled for handling items
  • ActivityAction<T> Consumer
    • Allows users to specify custom consumer logic for processing items produced by the producer
  • Activity<Boolean> ContinueProducing
    • Allows users to specify a custom condition to determine when to stop running the producer loop
  • ActivityFunc<ICollection<T>> Producer
    • Specifies custom producer logic for feeding items into the consumer

What is ActivityFunc<T>?

Much like ActivityAction<T> is synonymous to the delegate type Action<T>, ActivityFunc<T> is synonymous to the delegate type Func<T>. Providing a hook for an ActivityFunc<> class provides a customizable hook for users of the activity to provide custom logic that returns a value and may or may not take parameters. In our particular scenario, we need to provide a custom mechanism for producing items to queue up for our consumer and do not require any arguments, so we use ActivityFunc<ICollection<T>> as our producer definition. As you will see in the implementation that follows, you can schedule ActivityAction/ActivityFunc objects using the InvokeAction/InvokeFunc activities in composition or by using the methods NativeActivityContext.ScheduleAction/NativeActivityContext.ScheduleFunc if you are scheduling from a native activity.

Implementing the ProducerConsumer

When using composition there is one major difference to how you build the activity vs. another type of activity. Rather than providing an Execute method to determine the logic, you provide what is called the activity Implementation, which is defined as:

protected virtual Func<Activity> Implementation { get; set; }

We developed a pattern internally that will be shown below, but essentially every activity that uses Activity as a base class has a private method called CreateBody() that returns the logic as workflow generated with initializer syntax. In the constructor we simply set the implementation to the result of an anonymous delegate that invokes this method. Keeping this in mind I have pasted the implementation of the activity below, with an explanation that follows.

public sealed class ProducerConsumer<T> : Activity
{
    public ProducerConsumer()
    {
        base.Implementation = () => CreateBody();
    }

    public InArgument<Int32> ConsumerCount
    {
        get;
        set;
    }

    public ActivityAction<T> Consumer
    {
        get;
        set;
    }

    public Activity<Boolean> ContinueProducing
    {
        get;
        set;
    }

    public ActivityFunc<ICollection<T>> Producer
    {
        get;
        set;
    }

    protected override void CacheMetadata(ActivityMetadata metadata)
    {
        base.CacheMetadata(metadata);

        if (Consumer == null)
        {
            metadata.AddValidationError(new ValidationError("Consumer must be set", false, "Consumer"));
        }

        if (ContinueProducing == null)
        {
            metadata.AddValidationError(new ValidationError("ContinueProducing condition must be set", false, "ContinueProducing"));
        }

        if (Producer == null)
        {
            metadata.AddValidationError(new ValidationError("Producer must be set", false, "Producer"));
        }
    }

    Activity CreateBody()
    {
        Variable<String> queueName = new Variable<String>();
        DelegateInArgument<T> itemToProduce = new DelegateInArgument<T>();
        Variable<ICollection<T>> itemsProduced = new Variable<ICollection<T>>();

        return new Sequence
        {
            Variables = 
            {
                queueName,
            },
            Activities =
            {
                new Assign<String>
                {
                    Value = new InArgument<String>(env => Guid.NewGuid().ToString("D")),
                    To = new OutArgument<String>(queueName),
                },
                new Parallel
                {
                    Branches = 
                    {
                        new ParallelItemScheduler<T>
                        {
                            QueueName = new InArgument<String>(queueName),
                            MaxConcurrency = new InArgument<Int32>(env => ConsumerCount.Get(env)),
                            Body = this.Consumer,
                        },
                        new TryCatch
                        {
                            Try = new While
                            {
                                Condition = this.ContinueProducing,
                                Body = new Sequence
                                {
                                    Variables =
                                    {
                                        itemsProduced,
                                    },
                                    Activities =
                                    {
                                        new InvokeFunc<ICollection<T>>
                                        {
                                            Func = this.Producer,
                                            Result = new OutArgument<ICollection<T>>(itemsProduced),
                                        },
                                        new If
                                        {
                                            Condition = new InArgument<Boolean>(env => itemsProduced.Get(env) != null && itemsProduced.Get(env).Count > 0),
                                            Then = new ForEach<T>
                                            {
                                                Values = new InArgument<IEnumerable<T>>(itemsProduced),
                                                Body = new ActivityAction<T>
                                                {
                                                    Argument = itemToProduce,
                                                    Handler = new ParallelItemScheduler<T>.Enqueue
                                                    {
                                                        QueueName = new InArgument<String>(queueName),
                                                        Value = new InArgument<T>(itemToProduce),
                                                    },
                                                },
                                            },
                                        },
                                    },
                                },
                            },
                            Finally = new ParallelItemScheduler<T>.Shutdown
                            {
                                QueueName = new InArgument<String>(queueName),
                            },
                        },
                    },
                },
            },
        };
    }
}

As you can see above, for the most part the workflow framework provides the building blocks for our implementation. If you compare the outline for the activity above with the construction of the activity logic, you should see that it lines up almost exactly. What we have done is create a parallel activity with two branches, one is the producer and the other is the consumer. The consumer thread simply waits for input and then schedules the provided consumer each time an available slot is available. The producer thread loops and continually calls the provided callback, pushing the produced items into the consumer for scheduling. Once the producer is finished, we have the entire block wrapped in a TryCatch so we can ensure to invoke the ParallelItemScheduler<T>.Shutdown activity.

For all intents and purposes, the queue name (really the bookmark name) simply has to be unique. In an attempt to keep things simple I have chosen to create a queue name by generating a new GUID. There are many other ways to generate queue names, but this was the easiest way to ensure there are not any conflicts. One important thing about the queue, however, is that the consumer is first in the parallel and the producer is second. This activity may not work the other way around since the consumer is what creates the bookmark that the producer will be resuming it. As it turns out, the built-in Parallel activity is not strictly parallel. What I mean by that is since the workflow scheduler works off of a single thread, the ability of concurrent work to occur is completely dependent on the child activities that it is invoking. If we did not design our ParallelItemScheduler<T> to use the bookmark and go idle, for instance, then the producer would never run since the scheduler would never have a chance to execute it while it’s blocked on the aforementioned activity. Using this knowledge, we can guarantee that the first branch in the parallel will invoke its first child BEFORE the second child is invoked, and so on down the list in sequential order. The parallelism occurs when an activity goes idle using bookmarks or AsyncCodeActivity, which allows the scheduler to move onto the next item in the queue while execution of the previous activity continues off of the workflow thread.

Note: See Nate Talbert's post on the WF 3.5 ParallelActivity and how it works for a more complete explanation of how parallelism works in a workflow instance.

Validation Errors

The previous post introduced one of the mechanisms for CacheMetadata, and this one introduces another use for overriding this method. Workflow validation occurs against the workflow definition and not against the runtime inputs, so all you can validate are things like the activity hierarchy, check for the existence of properties, and other things that belong to the activity definition and are not runtime-specific. What I have done here is add validation errors to the workflow if any properties required for property execution are not set. You can test the behavior in this scenario by trying to run this activity without one, two, or all three of the required properties set.

Conclusion

This part of the 3 part series introduced you to activity composition, activity functions, and basic activity validation. At this point in time we have all of the building blocks necessary to build more activities that follow the model of producer-consumer. As you probably guessed, the CopyDirectory activity that I will be going over in part 3 is one such activity. Given the proper inputs a directory copy operation that walks the hierarchy while simultaneously copying discovered files N at a time can provide large performance gains when network latency is large. Since we’re using workflow, we have not and will not need to make any use of synchronization primitives or do anything to handle cancelation of the operation! Hopefully, if not yet, by the next post you will start seeing the benefits of writing workflow in small and reusable modules rather than trying to implement everything in code.