Parallel Extensions and I/O

Parallel Extensions and I/O

Rate This
  • Comments 12

In this post, we’ll investigate some ways that Parallel Extensions can be used to introduce parallelism and asynchrony to I/O scenarios.

Here’s a simple scenario.  I want to retrieve data from a number of web resources.

static string[] Resources = new string[]

{

    "http://www.microsoft.com", "http://www.msdn.com",

    "http://www.msn.com", "http://www.bing.com"

};

 

Using the WebClient class, I might end up with the following.

var data = new List<byte[]>();

var wc = new WebClient();

 

foreach (string resource in Resources)

{

    data.Add(wc.DownloadData(resource));

}

 

// Use the data.

 

However, these days, downloading data from the web usually utilizes only a small fraction of my available bandwidth.  So there are potential performance gains here, and with TPL’s parallel ForEach loop, they are easily had.

var data = new ConcurrentBag<byte[]>();

 

Parallel.ForEach(Resources, resource =>

{

    data.Add((new WebClient()).DownloadData(resource));

});

 

// Use the data.

 

Note that WebClient instances do not support multiple pending asynchronous operations (and the class is not thread-safe), so I need a separate instance for each operation.  Also, since the normal BCL collections (List<T>, etc.) are not thread-safe, I need something like ConcurrentBag<T> to store the results.  Of course, storing all the data in a collection assumes the scenario requires that all retrieval operations complete before processing.  If this was not the case, I could start processing each data chunk right after obtaining it right in the loop, exploiting more parallelism.  However, for the purposes of this investigation, I wanted to determine the possible performance gains in the absence of CPU-intensive work.

As it turns out, the above often yields linear speedup against sequential, with some variation due to the inconsistent nature of web site response times.  And it was pretty straightforward.  However, things would have been even easier had I started out with a “LINQ” frame of mind.  First, I can convert my original sequential code to a LINQ query.  Then, I can turn it into PLINQ using the AsParallel method and use WithDegreeOfParallelism to control the number of concurrent retrievals.

var data =

    from resource in Resources

        .AsParallel()

        .WithDegreeOfParallelism(numConcurrentRetrievals)

    select (new WebClient()).DownloadData(resource);

 

// Sometime later...

foreach (byte[] result in data) { }

 

(As an aside, it’s worth noting that WithDegreeOfParallelism causes PLINQ to use exactly numConcurrentRetrivals Tasks.  This differs from the MaxDegreeOfParallelism option that I could have used with my previous Parallel.ForEach code, because that option sets the maximum; the actual number of threads still depends on the ThreadPool’s thread-adjusting logic.)

This code offers enhanced readability and makes storing the data easier.  In addition, I can continue on the main thread, as PLINQ queries do not execute until the data they represent is accessed – that is, when MoveNext is called on the relevant enumerator.  However, in this particular case, I don’t want to delay my query’s execution until I need the data; I actually want to execute my query while continuing on the main thread.  To do so, I can wrap my query in a Task and force its immediate execution using ToArray.

var t = Task.Factory.StartNew(() =>

{

    return

        from resource in Resources

            .AsParallel()

            .WithDegreeOfParallelism(numConcurrentRetrievals)

        select (new WebClient()).DownloadData(resource).ToArray();

});

 

// Sometime later...

foreach (byte[] result in t.Result) { }

 

// OR, use a continuation

t.ContinueWith(dataTask =>

{

    foreach (byte[] result in dataTask.Result) { }

});

 

Now, I’ve got asynchrony, and I still get similar speedup.  However, there’s still something about this code that is not ideal.  The work (sending off download requests and blocking) requires almost no CPU, but it is being done by ThreadPool threads since I’m using the default scheduler.  Ideally, threads should only be used for CPU-bound work (when there’s actually work to do).  Of course, this probably won’t matter much for most typical client applications, but in scenarios where resources are tight, it could be a serious issue.  Therefore, it’s worth investigating how we might reduce the number of blocked threads, perhaps by not using threads at all where possible.

To achieve this, I’ll be using ideas from a previous post: Tasks and the Event-based Asynchronous Pattern.  That article explained how to create a Task<TResult> from any type that implements the EAP, and it presented an extension method for WebClient (available along with many others in the ParallelExtensionsExtras):

public static Task<byte[]> DownloadDataTask(

    this WebClient webClient, Uri address);

 

The key point is that this method produces a Task<TResult> by integrating WebClient’s EAP implementation with a TaskCompletionSource<TResult>, and I can use it to rewrite my scenario.

var tasks = new Queue<Task<byte[]>>();

 

foreach (string resource in Resources)

{

    WebClient wc = new WebClient();

    tasks.Enqueue(wc.DownloadDataTask(new Uri(resource)));

}

 

// Sometime later...

while (tasks.Count > 0)

{

    byte[] result = tasks.Dequeue().Result;

}

 

// OR, use a continuation

Task<byte[]>.Factory.ContinueWhenAll(tasks.ToArray(), dataTasks =>

{

    foreach (var dataTask in dataTasks)

    {

        byte[] result = dataTask.Result;

    }

});

 

With this, I’ve got a solution that uses parallelism for speed-up, is asynchronous, and does not burn more threads than necessary!

To recap, in this post, we considered a typical I/O scenario.  First, we saw how easy it was to arrive at solutions that are better than the sequential one.  Then, we delved deeper to discover a more complex solution (integrating EAP with Tasks) that offers even more benefits.

Attachment: PFX-and-IO.zip
Leave a Comment
  • Please add 5 and 1 and type the answer here:
  • Post
  • Does your download assumption take into account that the ServicePointManager can be used to increase the number of outstanding web requests?

    ServicePointManager.DefaultConnectionLimit = 50;

    or some similar number

  • Hi Don, yes, that property was used.

  • The PFX team is really on a roll with showing the way to how to create better concurrent code!  As big fans of concurrent code, asynchronicity and RESTful web-services, we're always pushing the envelope ourselves.  In the spirit of open discussion and idea sharing, here is an alternative to concurrent I/O, similar, yet slightly different at the same time: http://blog.developer.mindtouch.com/2009/08/05/async-io-dream-vs-parallel-extensions

    Cheers,

    - Steve

  • Thanks for the comment and for the link, Steve.

  • Great post dashih, you think you could post a sample project?

  • Dan,

    Sorry for the delay.  I have attached my project to the post.

    Thanks.

  • Danish, thanks for posting the project, I appreciate it. I cooked my own example and tried comparing the results of sequential and parallel execution and to my surprise sequential execution is a lot faster!

    There must be some caching going on underneath, especially since I am targeting the same URL. What is your take on this? Thanks!

    Here is my code…

    ConcurrentDictionary<String, String> values = new ConcurrentDictionary<String, String>();

           string[] symbols = { "msft", "orcl", "ibm", "goog" };

           String yahooFinanceUrl = "http://finance.yahoo.com/q?s=";

              Parallel.ForEach(symbols, symbol => {

                   values.TryAdd(simbolo, readValue(symbol));

               });

  • Dan, when I try the code you provided (modified to make it a complete app), I do see the parallel version running noticeably faster than the sequential.

    Note that when dealing with web requests, which can be very variable in the amount of time they may take due to network latencies and the like, it's probably best to do perf tests with more than just a few URLs, and with multiple iterations in order to get averages.  Also note that there may be throttling happening in the HTTP stack which prevents more than a certain number of requests being made to a given site at a given time.

  • Great article, thanks!

    I would have hoped to find some comment on SQL processing though, as I think a lot of people (like me) are working on SQL data a lot (in fact nearly all my I/O calls are SQL calls (I categorize SQL as I/O now)).

    What I am doing at the moment (pre 4.0) is create a ThreadPool thread to issue some SQL command (reader and datatable) and process the result with a passed delegate in my main thread (like update the WinForms-UI).

    Using the TaskCompletionSource I could imaging a pattern using the Begin/EndExcuteReader like you wrote about in a previous article, but I'm not sure about all issues. Also I want to use DataTables (binding to grids) and there is no async-pattern for the SqlDataAdapter I believe...

    So I was hoping for a good SQL/TPL post that shows possible SQL patterns (not (!) using LINQ to SQL or Entity framework) including usage of DataReader and DataTables. There is not much to found on the web on this topic (I found "TPL  can't improve upon the interactions with SQL Server" on http://www.hanselman.com/blog/TheWeeklySourceCode51AsynchronousDatabaseAccessAndLINQToSQLFun.aspx, which is not very promising)

    Thanks for any new inside!

  • Hi Dashiel,

    Thanks for your comment.  I'm glad you found the post somewhat helpful, and we'll try to work on another one specific to SQL.  But I'll attempt to address your questions/concerns a bit here.

    Regarding SQL Server processing, it's true that Parallel Extensions will not improve performance by adding parallelism.  That is, the multiple cores on your client machine will not be used to parallelize the processing of the query, because SQL Server will already do this on the server machine.  That said, Parallel Extensions can still help.

    First, it's possible that you can benefit from further parallelism *after* fetching the data from the SQL Server.  For example, once the data is in memory on the client machine, you could use PLINQ on it.

    Second, as you mention, it might be possible to get some performance gain by adding asynchrony to SQL Server interactions, and TPL can help by making things more elegant.  Currently, I think the best you can do is something like:

    DataTable table = new DataTable();

    SqlCommand query = new SqlCommand(...);

    Task<SqlDataReader> queryTask =

       Task.Factory.FromAsync<SqlDataReader>(

           query.BeginExecuteReader,

           query.EndExecuteReader, null);

    queryTask.ContinueWith(completedTask =>

    {

       table.Load(completedTask.Result);

    });

    This pseudo-code uses a FromAsync() API to automatically wrap the Begin/EndExecuteReader calls in a Task, and it schedules a continuation Task to load the results into a DataTable.  Whether or not this speeds things up probably depends on how the Begin/End calls work (I don't know much about that).  If they complete after SQL Server begins sending back results (without waiting until ALL of the data has been received), then this strategy has a chance to yield speedup.

    Lastly, you are right that SqlDataAdapter doesn't have any async APIs, but there's the possibility that they will be added in a future release.  If you could elaborate your scenario, that would be great =).

    Hope this helps,

    Danny

  • Thanks Danny, you confirmed my ideas!

  • How would you determine how long each download operation took?

Page 1 of 1 (12 items)