Windows Azure Storage Client Library: Parallel Single Blob Upload Race Condition Can Throw an Unhandled Exception

Windows Azure Storage Client Library: Parallel Single Blob Upload Race Condition Can Throw an Unhandled Exception

  • Comments 6

Update 11/06/11:  The bug is fixed in the Windows Azure SDK September release.

There is a race condition in the current Windows Azure Storage Client Library that could potentially throw an unhandled exception under certain circumstances. Essentially the way the parallel upload executes is by dispatching up to N (N= CloudBlobClient.ParallelOperationThreadCount) number of simultaneous block uploads at a time and waiting on one of them to return via WaitHandle.WaitAny (Note: CloudBlobClient.ParallelOperationThreadCount is initialized by default to be the number of logical processors on the machine, meaning an XL VM will be initialized to 8). Once an operation returns it will attempt to kick off more operations until it satisfies the desired parallelism or there is no more data to write. This loop continues until all data is written and a subsequent PutBlockList operation is performed.

The bug is that there is a race condition in the parallel upload feature resulting in the termination of this loop before it gets to the PutBlockList. The net result is that some blocks will be added to a blobs uncommitted block list, but the exception will prevent the PutBlockList operation. Subsequently it will appear to the client as if the blob exists on the service with a size of 0 bytes. However, if you retrieve the block list you will be able to see the blocks that were uploaded to the uncommitted block list.

Mitigations

When looking at performance, it is important to distinguish between throughput and latency. If your scenario requires a low latency for a single blob upload, then the parallel upload feature is designed to meet this need. To get around the above issue, which should be a rare occurrence you could catch the exception and retry the operation using the current Storage Client Library. Alternatively the following code can be used to perform the necessary PutBlock / PutBlockList operations to perform the parallel blob upload to work around this issue:

///Joe Giardino, Microsoft 2011 /// <summary> /// Extension class to provide ParallelUpload on CloudBlockBlobs. /// </summary> public static class ParallelUploadExtensions {
 /// <summary> /// Performs a parallel upload operation on a block blob using the associated serviceclient configuration  /// </summary> /// <param name="blobRef">The reference to the blob.</param> /// <param name="sourceStream">The source data to upload.</param> /// <param name="options">BlobRequestOptions to use for each upload, can be null.</param> /// <summary> /// Performs a parallel upload operation on a block blob using the associated serviceclient configuration  /// </summary> /// <param name="blobRef">The reference to the blob.</param> /// <param name="sourceStream">The source data to upload.</param> /// <param name="blockIdSequenceNumber">The intial block ID, each subsequent block will increment of this value </param> /// <param name="options">BlobRequestOptions to use for each upload, can be null.</param>  public static void ParallelUpload(this CloudBlockBlob blobRef, Stream sourceStream, long blockIdSequenceNumber, BlobRequestOptions options)
 {
    // Parameter Validation & Locals if (null == blobRef.ServiceClient)
    {
        throw new ArgumentException("Blob Reference must have a valid service client associated with it");
    }

    if (sourceStream.Length - sourceStream.Position == 0)
    {
        throw new ArgumentException("Cannot upload empty stream.");
    }

    if (null == options)
    {
        options = new BlobRequestOptions()
        {
            Timeout = blobRef.ServiceClient.Timeout,
            RetryPolicy = RetryPolicies.RetryExponential(RetryPolicies.DefaultClientRetryCount, RetryPolicies.DefaultClientBackoff)
        };
    }

    bool moreToUpload = true;
    List<IAsyncResult> asyncResults = new List<IAsyncResult>();
    List<string> blockList = new List<string>();

    using (MD5 fullBlobMD5 = MD5.Create())
    {
        do {
            int currentPendingTasks = asyncResults.Count;

            for (int i = currentPendingTasks; i < blobRef.ServiceClient.ParallelOperationThreadCount && moreToUpload; i++)
            {
                // Step 1: Create block streams in a serial order as stream can only be read sequentially string blockId = null;

                // Dispense Block Stream int blockSize = (int)blobRef.ServiceClient.WriteBlockSizeInBytes;
                int totalCopied = 0, numRead = 0;
                MemoryStream blockAsStream = null;
                blockIdSequenceNumber++;

                int blockBufferSize = (int)Math.Min(blockSize, sourceStream.Length - sourceStream.Position);
                byte[] buffer = new byte[blockBufferSize];
                blockAsStream = new MemoryStream(buffer);

                do {
                    numRead = sourceStream.Read(buffer, totalCopied, blockBufferSize - totalCopied);
                    totalCopied += numRead;
                }
                while (numRead != 0 && totalCopied < blockBufferSize);


                // Update Running MD5 Hashes fullBlobMD5.TransformBlock(buffer, 0, totalCopied, null, 0);          
                blockId = GenerateBase64BlockID(blockIdSequenceNumber);
                
               // Step 2: Fire off consumer tasks that may finish on other threads blockList.Add(blockId);
                IAsyncResult asyncresult = blobRef.BeginPutBlock(blockId, blockAsStream, null, options, null, blockAsStream);
                asyncResults.Add(asyncresult);

                if (sourceStream.Length == sourceStream.Position)
                {
                    // No more upload tasks moreToUpload = false;
                }
            }

            // Step 3: Wait for 1 or more put blocks to finish and finish operations if (asyncResults.Count > 0)
            {
                int waitTimeout = options.Timeout.HasValue ? (int)Math.Ceiling(options.Timeout.Value.TotalMilliseconds) : Timeout.Infinite;
                int waitResult = WaitHandle.WaitAny(asyncResults.Select(result => result.AsyncWaitHandle).ToArray(), waitTimeout);

                if (waitResult == WaitHandle.WaitTimeout)
                {
                    throw new TimeoutException(String.Format("ParallelUpload Failed with timeout = {0}", options.Timeout.Value));
                }

                // Optimize away any other completed operations for (int index = 0; index < asyncResults.Count; index++)
                {
                    IAsyncResult result = asyncResults[index];
                    if (result.IsCompleted)
                    {
                        // Dispose of memory stream (result.AsyncState as IDisposable).Dispose();
                        asyncResults.RemoveAt(index);
                        blobRef.EndPutBlock(result);
                        index--;
                    }
                }
            }
        }
        while (moreToUpload || asyncResults.Count != 0);

        // Step 4: Calculate MD5 and do a PutBlockList to commit the blob fullBlobMD5.TransformFinalBlock(new byte[0], 0, 0);
        byte[] blobHashBytes = fullBlobMD5.Hash;
        string blobHash = Convert.ToBase64String(blobHashBytes);
        blobRef.Properties.ContentMD5 = blobHash;
        blobRef.PutBlockList(blockList, options);
    }
 }

 /// <summary> /// Generates a unique Base64 encoded blockID  /// </summary> /// <param name="seqNo">The blocks sequence number in the given upload operation.</param> /// <returns></returns>  private static string GenerateBase64BlockID(long seqNo)
 {
    // 9 bytes needed since base64 encoding requires 6 bits per character (6*12 = 8*9) byte[] tempArray = new byte[9];

    for (int m = 0; m < 9; m++)
    {
        tempArray[8 - m] = (byte)((seqNo >> (8 * m)) & 0xFF);
    }

    Convert.ToBase64String(tempArray);

    return Convert.ToBase64String(tempArray);
 } 
}

Note: In order to prevent potential block collisions when uploading to a pre-existing blob, use a non-constant blockIdSequenceNumber. To generate a random starting ID you can use the following code.

Random rand = new Random();
long blockIdSequenceNumber = (long)rand.Next() << 32;
blockIdSequenceNumber += rand.Next();

Instead of uploading a single blob in parallel, if your target scenario is uploading many blobs you may consider enforcing parallelism at the application layer. This can be achieved by performing a number of simultaneous uploads on N blobs while setting CloudBlobClient.ParallelOperationThreadCount = 1 (which will cause the Storage Client Library to not utilize the parallel upload feature). When uploading many blobs simultaneously, applications should be aware that the largest blob may take longer than the smaller blobs and start uploading the larger blob first. In addition, if the application is waiting on all blobs to be uploaded before continuing, then the last blob to complete may be the critical path and parallelizing its upload could reduce the overall latency.

Lastly, it is important to understand the implications of using the parallel single blob upload feature at the same time as parallelizing multiple blob uploads at the application layer. If your scenario initiates 30 simultaneous blob uploads using the parallel single blob upload feature, the default CloudBlobClient settings will cause the Storage Client Library to use potentially 240 simultaneous put block operations (8 x30) on a machine with 8 logical processors. In general it is recommended to use the number of logical processors to determine parallelism, in this case setting CloudBlobClient.ParallelOperationThreadCount = 1 should not adversely affect your overall throughput as the Storage Client Library will be performing 30 operations (in this case a put block) simultaneously. Additionally, an excessively large number of concurrent operations will have an adverse effect on overall system performance due to ThreadPool demands as well as frequent context switches. In general if your application is already providing parallelism you may consider avoiding the parallel upload feature altogether by setting CloudBlobClient.ParallelOperationThreadCount = 1.

This race condition in parallel single blob upload will be addressed in a future release of the SDK. Please feel free to leave comments or questions,

Joe Giardino

Leave a Comment
  • Please add 1 and 7 and type the answer here:
  • Post
  • Cool, this appears to be what I reported in <a href="social.msdn.microsoft.com/.../fc346fb5-3ce5-4f90-a158-a772aa923142">this thread</a>. Good to see things getting fixed :)

  • @Porges, your link is broken. The problem you posted in the forum is exactly the same issue, which prompted us to do this post.  Thanks for finding it and posting it.

    social.msdn.microsoft.com/.../fc346fb5-3ce5-4f90-a158-a772aa923142

  • Does the MD5 hash stuff provide any value if we're using HTTPS?

  • @Tofu - The transactinal layer MD5 is redundant when using HTTPS as HTTPS provides Transaction Layer Security, i.e. specifying x-ms-range-get-content-md5: true on a range get.

    However there are scenarios where the application level MD5 could still be helpful to you such as versioning, CMS, etc. It is important to understand that this application layer MD5 is treated as a property of the blob and is not explicitly calculated and validated on the service (except in the case of a single put blob operation) so design your application accordingly.

    Additionally Blob Metadata is available for more gerneric scenarios where you can annotate the data in some other meaningful way.

    This thread seems relevant to the MD5 posting @ blogs.msdn.com/.../windows-azure-blob-md5-overview.aspx

  • Does SDK 1.5 have fix for the issue in storage client library?

  • @Carl, yes it does.

Page 1 of 1 (6 items)