ParallelExtensionsExtras Tour - #12 - AsyncCache

ParallelExtensionsExtras Tour - #12 - AsyncCache

  • Comments 3

(The full set of ParallelExtensionsExtras Tour posts is available here.)

Caches are ubiquitous in computing, serving as a staple of both hardware architecture and software development.  In software, caches are often implemented as dictionaries, where some data is retrieved or computed based on a key, and then that key and its resulting data/value are added to the dictionary.  Prior to re-retrieving or re-computing the value for a given key, we can first check the dictionary/cache to see whether we’ve already done so, and if we have, we simply copy the element from the dictionary.

As we all know, a multithreaded environment can bring with it many challenges, and such challenges apply to caches as well.  Imagine creating a cache to store downloaded web pages.  If multiple threads are trying to access the cache at the same time, we not only want to make sure that they don’t corrupt the employed data structures, we also want to make sure that they’re not doing more work than they need to: if two threads need the same page downloaded, just download it once rather than twice, and give them both copies.  In this fashion, we need a form of an asynchronous cache that allows threads to get back a handle for the thing in the cache they want, a handle that, for example, will then provide them with a callback notification when the download has completed or that will allow them to wait for the download to complete. 

The AsyncCache class in AsyncCache.cs in ParallelExtensionsExtras provides this support, and it may surprise you just how little code is required to do this, taking advantage of the new concurrency support in .NET 4. The type is defined as follows:

public class AsyncCache<TKey, TValue>

 

and contains two fields.  The first field is a delegate that will be invoked for a key when that key is requested and is not yet in the dictionary; it is this delegate that produces the value for the key:

private readonly Func<TKey, Task<TValue>> _valueFactory;

 

Note that this isn’t a Func<TKey,TValue>, but rather a Func<TKey,Task<TValue>>.  The function is supplied by the user to the AsyncCache constructor and produces a task that represents the retrieval of the value for a given key.  This task could either be computational in nature (e.g. one created by Task.Factory.StartNew), or it could be async I/O-based, such as a task representing a download from a web site.  Either way, it’s this task that’s stored in the cache in the second field:

private readonly ConcurrentDictionary<TKey, Lazy<Task<TValue>>> _map;

 

As you might have guessed, we’re using a ConcurrentDictionary as the storage for the cache, which helps to ensure that multiple threads may access the cache concurrently without corrupting the internals of the data store.  As noted earlier, the function to generate values for keys returns tasks, but the dictionary’s value isn’t just Task<TValue>, it’s Lazy<Task<TValue>>.  The addition of the Lazy<> here makes it really easy to ensure that only one task is generated for any one key, avoiding any races that might otherwise result.  We can see this by looking at the most important method on AsyncCache:

public Task<TValue> GetValue(TKey key)

{

    var value = new Lazy<Task<TValue>>(() => _valueFactory(key));

    return _map.GetOrAdd(key, value).Value;

}

 

You’ve now seen almost all of AsyncCache’s implementation… everything else in the type is really secondary (e.g. implementing the ICollection interface).  GetValue simply creates a new Lazy<Task<TValue>> that will run the _valueFactory when invoked.  The method then checks whether the dictionary already has a Lazy<> for this key, adding the one we just created if it didn’t yet have one, and regardless returning the Value of whatever Lazy<> we got back.  By accessing the Lazy<Task<TValue>>’s Value, we get back the task for this key, and that’s handed back to the caller. The caller now has a Task<TValue> for the supplied TKey, and as with any other task, the caller can use ContinueWith to be notified when the task has completed, can Wait on the task to block until the task has completed, or can simply use its Result property to get at the data when it’s available (potentially blocking in the process).

With AsyncCache<TKey,TValue> in place, it’s now straightforward to either use it as is, or to create specialized variants of the cache. For example, in our earlier problem statement we described wanting to be able to cache downloaded web pages.  Here’s the complete implementation of that:

public sealed class HtmlAsyncCache : AsyncCache<Uri, string>

{

    public HtmlAsyncCache() :

        base(uri => new WebClient().DownloadStringTask(uri)) { }

}

 

The DownloadStringTask extension method on WebClient is another method defined in ParallelExtensionExtras, and we’ll get to that another day.  Suffice it to say that this method returns a Task<string> that represents the asynchronous downloading of a web page at the specified Uri.  As such, our HtmlAsyncCache is simply a derived AsyncCache<Uri,string>, where the valueFactory calls DownloadStringTask for the supplied key/uri.

A consumer of this type may request a particular page:

HtmlAsyncCache cache = new HtmlAsyncCache();

...

Task<string> page =

    cache.GetValue(new Uri(“http://www.microsoft.com”));

 

and then either use its value directly, blocking if it’s not yet available:

Console.WriteLine(page.Result);

 

or ask to be notified when the value is available:

page.ContinueWith(completed =>

    Console.WriteLine(completed.Result));

 

And if you wanted to download multiple pages and only do something when you had all three, that’s easy as well.  Since our asynchronous operations are represented as tasks, we can use the combinators provided by Task for this purpose, e.g.

var page1 = cache.GetValue(

    new Uri(“http://msdn.microsoft.com/pfxteam”));

var page2 = cache.GetValue(

    new Uri(“http://msdn.com/concurrency”));

var page3 = cache.GetValue(

    new Uri(“http://www.microsoft.com”));

 

Task.Factory.ContinueWhenAll(

    new [] { page1, page2, page3 }, completedPages =>

{

    … // use the downloaded pages here

});

 

(Thanks go to Luca Bolognese for originally supplying the idea for AsyncCache.)

Leave a Comment
  • Please add 4 and 7 and type the answer here:
  • Post
  • This is really interesting and shows further the creep towards functional designs in C#. I have a similar solution using a provider model, but this is far more reusable, and elegant.

    Btw, if you happen to speak to Joe Duffy, ask him if he's still considering writing about concurrency in UIs. Its still an area that seems to be in darkness.

  • Thanks, Luke.  I'll pass your question along to Joe.

  • Amazing design. I wonder if the concept of using a future (Task<T>) would work in Velocity. Since it uses serialization in some cases, the value would need to be deserialized as a completed task.

Page 1 of 1 (3 items)