It has recently come to my attention that there is unrest regarding our inclusion of a CopyDirectory activity without a CopyFile activity. Therefore, I decided to take this opportunity to introduce everyone to the sub-class AsyncCodeActivity and how you can utilize it with a real world example – a cancelable file copy operation. As you may recall from my previous workflow posts, CodeActivity does provide a mechanism for cancelation while the AsyncCodeActivity does facilitate this need. If you are not familiar with asynchronous programming in .NET then you should probably read the following article to brush up before continuing.

.NET API Overview

The .NET API for System.IO.File does not provide a way to asynchronously copy a file, so we will need to provide our own mechanism using the asynchronous APIs provided on System.IO.FileStream (BeginRead/BeginWrite and EndRead/EndWrite respectively). I have provided the documentation for the aforementioned APIs below to aid in development of our activity.

  • public IAsyncResult BeginRead(Byte[] array, Int32 offset, Int32 numBytes, AsyncCallback userCallback, Object stateObject)
  • public Int32 EndRead(IAsyncResult asyncResult)
  • public IAsyncResult BeginWrite(Byte[] array, Int32 offset, Int32 numBytes, AsyncCallback userCallback, Object stateObject)
  • public void EndWrite(IAsyncResult asyncResult)

Just so we’re all on the same page, the parameters serve the following purpose:

  • array – The array of bytes that should be written to the file stream or a user allocated buffer in which to place bytes read from the file stream
  • offset – The offset in the array that reading or writing should occur
  • numBytes – The number of bytes from the offset that should be read or written

Also, it’s probably important to mention that the return value from EndRead returns the number of bytes that were actually read from the stream, which may be equal to or less than the range provided by (numBytes – offset). When the return value is less than the range provided to BeginRead you are at the end of the file. Now that we have this basic introduction out of the way, lets see how this all fits into the AsyncCodeActivity framework.

AsyncCodeActivity API Overview

The workflow framework provides the base classes, AsyncCodeActivity/AsyncCodeActivity<T>, which we will be using in this post. This activity provides 3 important methods for overriding that I have broken down.

  1. IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, Object asyncState)
    • The synchronous entry point at which time all values required for asynchronous execution should be extracted using the provided context and stored into a custom result or state.
  2. void EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
    • The synchronous exit point at which time all values for the workflow, such as OutArgument<T> and InOutArgument<T>, should be set using the provided context.
  3. void Cancel(AsyncCodeActivityContext context)
    • The synchronous entry point invoked from workflow to begin the cancelation process.

In the next section we will be diving a little deeper into how each of these may be implemented in a real world activity.

Writing the CopyFile Activity

The first thing we should do when designing out custom activity is to determine the type of flexibility we would like to provide to consumers of our activity. At the very least, copying a file will require a Source and a Target. This is where we will start, but we will continue evaluating other values we may like to promote to configurable properties. So, to begin we have the following activity outline:

public sealed class CopyFile : AsyncCodeActivity
{
    [RequiredArgument]
    public InArgument<String> Source
    {
        get;
        set;
    }

    [RequiredArgument]
    public InArgument<String> Target
    {
        get;
        set;
    }

    protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, Object state)
    {
        throw new NotImplementedException();
    }

    protected override void Cancel(AsyncCodeActivityContext context)
    {
        base.Cancel(context);
    }

    protected override void EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
    {
        throw new NotImplementedException();
    }
}

Note: For the remainder of this section we will be investigating the implementation of the activity and simply refer to the individual method we are currently writing.

The first caveat I would like to point out is that the BeginRead API takes a buffer and an offset/length to determine how much data to read. Since there is no possible way we can know up-front how much data we will be streaming from the file, we will most likely be making multiple calls back into BeginRead each time we have copied the pending buffer into the target location. On top of this, when the callback provided to BeginExecute is invoked the workflow framework queues an operation on the workflow execution thread to invoke the EndExecute method – once this method completes the activity has completed execution. With this knowledge, we know that we will need to provide our own callback(s) to BeginRead/BeginWrite and only invoke the callback provided by the workflow framework once the entire copy operation has completed. The other important caveat you may have noticed is that there is no way to cancel an ongoing call to BeginRead/BeginWrite, as the built-in IAsyncResult interface does not provide a mechanism to get this feedback to the asynchronous operation. So, in order to properly support cancellation we should just read/write in a small enough buffer such that we can check a flag on our asynchronous copy state and stop the process once the ongoing read or write operation completes.

So, in summary, we will follow a pattern similar to the following:

  1. Set up our async state in BeginExecute and start the first read operation by invoking FileStream.BeginRead
  2. When the first read operation completes, if we have not been cancelled then begin the first write operation using the buffer from (1)
  3. When the write operation completes, if we have not been cancelled begin the next read operation if we are not at the end of the file. If we are at the end of the file, invoke the callback provided by the workflow framework to trigger the EndExecute method.

I realize that this was a lot of information to ingest but hopefully it will start making sense as we start writing the method implementations.

Asynchronous State

The first thing we will need to define is our asynchronous state, which I have included for reference below. As it turns out, all we need is the original callback and state, a buffer, and the source and target file streams opened with the appropriate permissions and flags (some of the fields required by IAsyncResult have been omitted for space).

sealed class CopyFileAsyncResult : IAsyncResult
{
    public Exception Exception { get; set; }
    public Boolean Canceled { get; set; }
    public Byte[] Buffer { get; set; }
    public FileStream ReadStream { get; set; }
    public FileStream WriteStream { get; set; }
}

Implementation

Now that we have defined the data we will need to retain for our operation, we can start implementing the logic of our activity. Using steps 1-3 above, we come to something similar to the following:

protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, Object state)
{
    String source = Path.GetFullPath(Source.Get(context));
    String target = Path.GetFullPath(Target.Get(context));

    Int32 bufferSize = BufferSize.Get(context);

    CopyFileAsyncResult copyFileResult = new CopyFileAsyncResult
    {
        AsyncState = state,
        AsyncCallback = callback,
        Buffer = new Byte[bufferSize],
        ReadStream = new FileStream(source, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize, true),
        WriteStream = new FileStream(target, FileMode.CreateNew, FileAccess.Write, FileShare.None, bufferSize, true),
    };

    context.UserState = copyFileResult;
    copyFileResult.ReadStream.BeginRead(copyFileResult.Buffer,
                                        0,
                                        copyFileResult.Buffer.Length,
                                        new AsyncCallback(OnEndRead),
                                        copyFileResult);

    return copyFileResult;
}

protected override void Cancel(AsyncCodeActivityContext context)
{
    ((CopyFileAsyncResult)context.UserState).Canceled = true;
}

protected override void EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
{
    CopyFileAsyncResult copyFileResult = result as CopyFileAsyncResult;
    if (copyFileResult == null)
    {
        return;
    }

    if (copyFileResult.AsyncWaitHandle != null)
    {
        ((IDisposable)copyFileResult.AsyncWaitHandle).Dispose();
    }

    if (copyFileResult.Exception != null)
    {
        throw copyFileResult.Exception;
    }
}

static void OnEndRead(IAsyncResult result)
{
    CopyFileAsyncResult copyFileResult = result.AsyncState as CopyFileAsyncResult;
    if (copyFileResult == null)
    {
        return;
    }

    Int32 bytesRead = 0;

    try
    {
        bytesRead = copyFileResult.ReadStream.EndRead(result);
    }
    catch (Exception ex)
    {
        copyFileResult.Exception = ex;
        OnCopyCompleted(copyFileResult);
    }

    if (copyFileResult.Canceled || bytesRead == 0)
    {
        OnCopyCompleted(copyFileResult);
    }
    else
    {
        // Start the next write operation using the current write offset and the bytes read.
        copyFileResult.WriteStream.BeginWrite(copyFileResult.Buffer,
                                              0,
                                              bytesRead,
                                              new AsyncCallback(OnEndWrite),
                                              copyFileResult);
    }
}

static void OnEndWrite(IAsyncResult result)
{
    CopyFileAsyncResult copyFileResult = result.AsyncState as CopyFileAsyncResult;
    if (copyFileResult == null)
    {
        return;
    }

    try
    {
        copyFileResult.WriteStream.EndWrite(result);

        if (copyFileResult.Canceled ||
            (copyFileResult.WriteStream.Length == copyFileResult.ReadStream.Length &&
             copyFileResult.ReadStream.Position == copyFileResult.ReadStream.Length))
        {
            OnCopyCompleted(copyFileResult);
        }
        else 
        {
            copyFileResult.ReadStream.BeginRead(copyFileResult.Buffer, 
                                                0,
                                                copyFileResult.Buffer.Length, 
                                                new AsyncCallback(OnEndRead), 
                                                copyFileResult);
        }
    }
    catch (Exception ex)
    {
        copyFileResult.Exception = ex;
        OnCopyCompleted(copyFileResult);
    }
}

static void OnCopyCompleted(CopyFileAsyncResult copyFileResult)
{
    copyFileResult.ReadStream.Dispose();
    copyFileResult.WriteStream.Dispose();

    copyFileResult.AsyncWaitHandle.Set();

    if (copyFileResult.AsyncCallback != null)
    {
        copyFileResult.AsyncCallback(copyFileResult);
    }
}

As you can see from the code above, essentially we loop back and forth between OnEndRead and OnEndWrite allowing the framework to handle threading for us. We determine the process is completed in a few ways – an exception is thrown, a cancellation request is made, or the read stream and write stream have equal lengths and our position in the read stream is at the end (as seen in OnEndWrite). Asynchronous APIs should not throw exceptions on the background thread, including callbacks, since an unhandled exception on a background thread may have the unintended consequences of shutting down the application domain or the entire process. What should occur instead, as you can see by the try/catch blocks in OnEndRead and OnEndWrite, is that an exception simply ends the read/write loop and completes the operation. Once the EndExecute method is invoked by the framework we check for an exception that was saved in our state, and if one exists we throw it on the foreground thread provided by the workflow runtime. Something else that’s important to point out is that we store our state into the AsyncCodeActivityContext.UserState property, which is how we retrieve it when the activity is canceled (see the Cancel method above).

For those of you paying close attention you may be noticed the introduction of an InArgument<Int32> BufferSize which is used in the BeginExecute method to construct our in-memory buffer. Although I didn’t cover this initially I wanted to provide this functionality as a configurable property so we do not lose flexibility provided by the standard file copy APIs.

Conclusion

Although this article is quite wordy, hopefully it introduced the workflow APIs and provided an understandable and realistic example of how to utilize asynchronous APIs and cancellation in the workflow framework. Since this activity is asynchronous, it may be used as a branch of a Parallel or the body of a ParallelForEach<T> for copying multiple files in parallel to more efficiently saturate the network when high latency is involved. In fact, I plan on illustrating how you might use this activity to build a more efficient CopyDirectory than what we ship in the box for copying a configurable number of files in parallel. However, this will involve a more complex implementation since the standard ParallelForEach<T> provides no mechanism for capping the parallel branches at a maximum number, and providing this functionality requires us to derive from NativeActivity.

I hope this article serves as a starting point for the development of more complex activities, perhaps using this as a building block to develop that parallel directory copy I referred to above!