Building Async Coordination Primitives, Part 7: AsyncReaderWriterLock

Building Async Coordination Primitives, Part 7: AsyncReaderWriterLock

Rate This
  • Comments 19

In my last past, we looked at building an AsyncLock in terms of an AsyncSemaphore.  In this post, we’ll build a more advanced construct, an asynchronous reader/writer lock.

An asynchronous reader/writer lock is more complicated than any of the previous coordination primitives we’ve created.  It also involves more policy, meaning there are more decisions to be made about how exactly the type should behave.  For the purposes of this example, I’ve made a few decisions.  First, writers take precedence over readers.  This means that regardless of the order in which read or write requests arrive, if a writer is waiting, it will also get priority over any number of waiting readers, even if it arrived later than those readers.  Second, I’ve decided not to throttle readers, meaning that all waiting readers will be released as soon as there are no writers outstanding or waiting.  Both of those points could be debated based on the intended usage of the type, so you might choose to modify the implementation based on your needs.

Here’s the shape of the type we’ll build:

public class AsyncReaderWriterLock
{
    public AsyncReaderWriterLock();

    public Task<Releaser> ReaderLockAsync();
    public Task<Releaser> WriterLockAsync();

    public struct Releaser : IDisposable
    {
        public void Dispose();
    }
}

As with the AsyncLock, we’ll utilize a disposable Releaser to make it easy to use this type in a scoped manner, e.g.

private readonly AsyncReaderWriterLock m_lock = new AsyncReaderWriterLock();

using(var releaser = await m_lock.ReaderLockAsync())
{
    … // protected code here
}

This Releaser is almost identical to that used in AsyncLock, except that we’re using the same type to represent both readers and writers, and since we need to behave differently based on which kind of lock is being released, I’ve parameterized the Releaser accordingly:

public struct Releaser : IDisposable
{
    private readonly AsyncReaderWriterLock m_toRelease;
    private readonly bool m_writer;

    internal Releaser(AsyncReaderWriterLock toRelease, bool writer)
    {
        m_toRelease = toRelease;
        m_writer = writer;
    }

    public void Dispose()
    {
        if (m_toRelease != null)
        {
            if (m_writer) m_toRelease.WriterRelease();
            else m_toRelease.ReaderRelease();
        }
    }
}

In terms of members variables, I need several more for this type than I’ve needed for the other data structures previously discussed.  First, we will have fast paths in this type, so I want to cache a Task<Releaser> for reader waits that complete immediately, and one for writer waits that complete immediately.

private readonly Task<Releaser> m_readerReleaser;
private readonly Task<Releaser> m_writerReleaser;

These members will be initialized in the constructor:

public AsyncReaderWriterLock()
{
    m_readerReleaser = Task.FromResult(new Releaser(this, false));
    m_writerReleaser = Task.FromResult(new Releaser(this, true));
}

Next, I need to maintain a queue of writer waiters, one TaskCompletionSource<Releaser> for each, since I need to be able to wake them individually.  I also need a TaskCompletionSource<Releaser> for my readers; however, for the readers, per our previously discussed design, when it’s time to allow a reader to run, I can allow them all to run, and therefore I just need a single TaskCompletionSource<Releaser> that all of the readers in a given group will wait on.  However, since I’m maintaining a single TaskCompletionSource<Releaser> for all readers, I also need to maintain a count of how many readers are waiting, so that when I eventually wake them all, I can keep track of all of their releases and know when there are no more outstanding readers.

private readonly Queue<TaskCompletionSource<Releaser>> m_waitingWriters = 
   
new Queue<TaskCompletionSource<Releaser>>();
private TaskCompletionSource<Releaser> m_waitingReader =
    new TaskCompletionSource<Releaser>();
private int m_readersWaiting;

Finally, I need a variable to maintain the current status of the lock.  This will be an integer, where the value of 0 means that no one has acquired the lock, a value of –1 means that a writer has acquired the lock, and a positive value means that one or more readers have acquired the lock, where the positive value indicates how many.

private int m_status;

We now have four methods to implement: ReaderLockAsync, ReaderRelease, WriterLockAsync, and WriterRelease.

ReaderLockAsync is used when a new reader wants in.  After acquiring the lock on m_waitingWriters (which we’ll use across all four of these methods to ensure data consistency), we need to determine whether the reader should be allowed in immediately or should be forced to wait.  Based on the policy described earlier, if there are currently no writers active or waiting, then this reader can be allowed in immediately; in that case, we increment the status (which would have either been 0, meaning no activity on the lock, or positive, meaning there are currently readers) and we return the cached reader releaser.  If, however, there was an active or waiting writer, then we need to force the reader to wait, which we do by incrementing the count of the number of readers waiting, and return the m_waitingReader task (or, rather, a continuation off of the reader task, ensuring that all awaiters will be able to run concurrently rather than getting serialized).

public Task<Releaser> ReaderLockAsync()
{
    lock (m_waitingWriters)
    {
        if (m_status >= 0 && m_waitingWriters.Count == 0)
        {
            ++m_status;
            return m_readerReleaser;
        }
        else
        {
            ++m_readersWaiting;
            return m_waitingReader.Task.ContinueWith(t => t.Result);
        }
    }
}

WriterLockAsync is used when a new writer wants in.  As with ReaderLockAsync, there are two cases to deal with: when the writer can be allowed in immediately, and when the writer must be forced to wait.  The only time a writer can be allowed in immediately is when the lock is currently not being used at all; since a writer must be exclusive, it can’t run when there are an active readers or active writers.  So, if m_status is 0, we change the status to indicate that there’s now an active writer, and we return the cached writer releaser.  Otherwise, we create a new TaskCompletionSource<Releaser> for this writer, queue it, and return its Task.

public Task<Releaser> WriterLockAsync()
{
    lock (m_waitingWriters)
    {
        if (m_status == 0)
        {
            m_status = -1;
            return m_writerReleaser;
        }
        else
        {
            var waiter = new TaskCompletionSource<Releaser>();
            m_waitingWriters.Enqueue(waiter);
            return waiter.Task;
        }
    }
}

Now we need to write the release functions, which are called when an active reader or writer completes its work and wants to release its hold on the lock.  ReaderRelease needs to decrement the count of active readers, and then check the current state of the lock.  If it was the last active reader and there are now writers waiting, then it needs to wake one of those writers and mark that the lock now has an active writer.  We don’t need to check for any pending readers; if there are any writers, then they’d take priority anyway, and if there aren’t any pending writers, than any readers that had arrived would have been allowed in immediately.

private void ReaderRelease()
{
    TaskCompletionSource<Releaser> toWake = null;

    lock (m_waitingWriters)
    {
        --m_status;
        if (m_status == 0 && m_waitingWriters.Count > 0)
        {
            m_status = -1;
            toWake = m_waitingWriters.Dequeue();
        }
    }

    if (toWake != null)
        toWake.SetResult(new Releaser(this, true));
}

Finally, we need our WriterRelease method.  When a writer completes, if there are any pending writers waiting to get in, we simply dequeue and complete one of their tasks (we don’t need to update the lock’s status, since there will still be a single active writer, with one having completed and a new one having taken its place).  If there aren’t any writers, but there are readers waiting, then we can complete the single task on which all of those readers are waiting; in that case, we also need to create a new single task for all subsequent readers to wait on, and we need to update our status accordingly to now indicate how many active readers there are.  If there weren’t any writers or readers waiting, then we can simply reset the lock’s status.

private void WriterRelease()
{
    TaskCompletionSource<Releaser> toWake = null;
    bool toWakeIsWriter = false;

    lock (m_waitingWriters)
    {
        if (m_waitingWriters.Count > 0)
        {
            toWake = m_waitingWriters.Dequeue();
            toWakeIsWriter = true;
        }
        else if (m_readersWaiting > 0)
        {
            toWake = m_waitingReader;
            m_status = m_readersWaiting;
            m_readersWaiting = 0;
            m_waitingReader = new TaskCompletionSource<Releaser>();
        }
        else m_status = 0;
    }

    if (toWake != null)
        toWake.SetResult(new Releaser(this, toWakeIsWriter));
}

That’s it.  Now, a production implementation of such a lock would likely want to be better instrumented, throw exceptions for erroneous usage (e.g. releasing when there wasn’t anything to be released), and so forth, but this should give you a basic sense of how such an asynchronous reader/writer lock could be implemented.

Before I conclude, it’s worth highlighting that .NET 4.5 includes a related type: ConcurrentExclusiveSchedulerPair.  I briefly discussed this type when describing what was new for parallelism in .NET 4.5 Developer Preview, but in short, it provides reader/writer-like scheduling for tasks (and it’s robust and has been well-tested, unlike the code in this post).  Hanging off of an instance of ConcurrentExclusiveSchedulerPair are two TaskScheduler instances: ConcurrentScheduler and ExclusiveScheduler. These two schedulers collude to ensure that an “exclusive” (or writer) task may only run when no other task associated with the schedulers is running, and that one or more “concurrent” (or reader) tasks may run concurrently as long as there are no exclusive tasks.  The type includes more advanced capabilities than what I’ve implemented in this post, for example being able to throttle readers (whereas in my AsyncReaderWriterLock in this post, all readers are allowed in as long as there are no writers).

You can use ConcurrentExclusiveSchedulerPair to build similar solutions as what you might use AsyncReaderWriterLock in.  For example, instead of wrapping the protected code with a block that will on entrance access and await AsyncReaderWriterLock.WriterLockAsync and then on exit call the returned Releaser’s Dispose, you could instead await a task queued to the ConcurrentExclusiveSchedulerPair’s ExclusiveScheduler.  ConcurrentExclusiveSchedulerPair also works well with systems layered on top of TPL and in terms of TaskScheduler, like TPL Dataflow.  You can, for example, create multiple ActionBlocks that all target the same ConcurrentScheduler instance configured with a maximum concurrency level, and then all of those blocks will collude to ensure they don’t go above that maximum.

However, there is a key behavior aspect of ConcurrentExclusiveSchedulerPair to keep in mind: it works at the level of a Task’s execution, and this may or may not be what you want.  If you write code like the following:

Task.Factory.StartNew(async delegate =>
{
    … // code #1
    await SomethingAsync();
    … // code #2
}, CancellationToken.None, TaskCreationOptions.None, myExclusiveScheduler);

that could either end up resulting in one Task queued to the scheduler (if the Task returned from SomethingAsync was complete by the time we awaited it), or it could result in two Tasks queued to the scheduler (if the Task returned from SomethingAsync wasn’t yet complete by the time we awaited it).  If it results in two tasks, then the atomicity provided by the exclusive scheduler applies to each task individually, not across them… in effect the exclusive lock can be released while awaiting.  For certain scenarios, this is exactly what you want, and in particular for cases where you’re using the atomicity to provide consistency while accessing in-memory data structures and where you ensure that await calls do not come in the middle of such modifications.  To achieve the same behavior with AsyncReaderWriterLock, you’d need to do something like the following:

Task t = null;
using(var releaser = await m_lock.WriterLockAsync())
{
    … // code #1
    t = SomethingAsync();
}
await t;
using (var releaser = await m_lock.WriterLockAsync())
{
    … // code #2
}

That concludes my short series on building async coordination primitives.  I hope you enjoyed it.

Leave a Comment
  • Please add 2 and 4 and type the answer here:
  • Post
  • Hey, there are missing await keywords here too. The same typo as in the previous post. Unfortunately the Task class implements IDisposable, so the compiler interprets this as valid code. Would it be a reliable solution to wrap the returned Task instances with lightweight objects that has only a GetAwaiter method?

  • Thanks, Tamas.  I'd typed that example at the end directly into Live Writer, so no compiler was involved.  I've fixed it the typo.

    Regarding wrapping Task, you could do that if you wanted to.  If you remembered to do it, it would help flag missing awaits, but then again, if you remember to do it, you might as well remember to use awaits ;)  I expect an FxCop rule or some similar analysis tool would be more reliable.

  • Great series, thanks.  Are there plans to include any of these types natively in .NET 4.5?  (Perhaps other than AsyncReaderWriterLock, which seems redundant compared to ConcurrentExclusiveSchedulerPair.)

  • Hi Dave-

    I'm glad you enjoyed the posts.  Regarding .NET 4.5 plans, stay tuned :)

  • Great series! Very interesting.

    I was wondering about memory barriers.

    In these sample posts, you're using lock during each acquire/release - which includes a memory barrier - so a volatile qualifier wouldn't be necessary for any data protected by AsyncReaderWriterLock (I think).

    Question: what memory barrier guarantees does ConcurrentExclusiveSchedulerPair make?

  • Hi Stephen-

    I'm glad you like the series.

    CESP doesn't itself need to make any such guarantees, because it only schedules Tasks, and tasks themselves have their own guarantees.  Any data published prior to a Task getting scheduled will be visible to that Task's body, and any modifications by a Task will be visible to anyone joining with the Task when it completes.

  • Hi Stephen,

    I have a very general question regarding async context 'behind' await - and was not able to clear this from existing documents.

    In order to have lock-free code I use the actor model and single threaded synchronization contexts (like WinForms or Stephen Clearys Nito library).

    In this case, it seems that when continuing after 'await' it is guaranteed to be on the same thread as before - therefore no locks are needed to protect datastructures accessed before and after await.

    But - what thread is used inside the async method2 called through 'await' ?

    Do I have to protect shared datastructures when accessing them from e.g. the UI-Thread as well as from a task generated by the compiler through await ?

    In my opinion this would be *VERY* dangerous.

    But on the other hand, how can these auto-generated tasks beeing scheduled onto other threads and CPU's in order to speed up long running tasks ?

    e.g:

    List<string> list; // shared, not threadsafe data

    async Task method1()

    {

       list.Add ("Start");

       await method2();

       list.Add (" "Stop");  // ok, same thread (when NOT using the default threadpool synchronization context)

    }

    async Task method2()

    {

       for(i=0; i<1000; i++)  

       {

           list.Add ("Doing "+i);  // ???? what thread ????

           DoSomethingComputeIntense();

       }

    }

  • Hi Stefan-

    Asynchronous methods are invoked synchronously.  When you call method2, you're calling it on the current thread.  If method2 never awaits something that's not yet completed, method2 will run to completion on the current thread.  Since you have no awaits in your method2, method2 will run entirely synchronously on the current thread.

    If you want to run code on a thread other than the current one, you can use Task.Run, e.g.

       ... // on UI thread

       await Task.Run(() =>

       {

           ... // on ThreadPool thread

       });

       ... // back on UI thread

    I hope that helps.

  • Sorry, my example was not so good, I try to formulate my question different:

    1) Is it true, that I never will run on another thread unless I use "await Task.Run(...)" or explicitly start a thread ?

    Reason:

    - the start of an async function runs synchronously

    - the completion after await runs on the same thread as before await - since I use a single threaded synchronization context.

    2) Is it true, that using this programming model, the non interruptable (atomic) code portions span from one await to the next await - regardless of how deeply nested the async functions are ?

    Many thanks for yor response!

  • Hi Stefan-

    Regarding (1), no, that's not true.  There are two initial cases to think through here: has the task being awaited completed or not when you await it.  If it has completed, then the await is effectively a nop, and the current thread just continues running what comes after the await.  The second case of the task not yet being complete is the more interesting one, and again there are two cases here, based on whether or not there's a custom SynchronizationContext or TaskScheduler on the current thread.  If there is such a context/scheduler set, then the code that comes after the await will be scheduled back to that context/scheduler when the awaited task completes (e.g. if you were running on the UI thread of a WPF app when you awaited the task, then when the task completes, the continuation will be queued back to the Dispatcher for the UI thread).  If there is no custom context, then there's nowhere to marshal the continuation back to, and as such, it'll typically just run on whatever thread is completing the task... that's likely to be a thread pool thread.

    Now, for your specific example under "Reason", yes, that's true.  Since you were using a single-threaded synchronization context, all of the awaits will return back to that same thread, so unless you explicitly change that, you'll only execute on that one thread.  Such "explicit" means could include using Task.Run(...) to put a delegate onto a ThreadPool thread.  It could also include using "await task.ConfigureAwait(false);" to ignore the current context if there is one.

    Regarding (2), yes, that's true.  The only points at which an async method will potentially yield is at an await site within the method.

  • Now I got it - many thanks for your detailed answer!

    Coming back to async coordination primitives ...

    I understand the possibility to do such things and it is instructive - but they look very scary to me.

    C# async/await seems to be a concept to bring asynchronous multitasked applications to mainstream.

    Would't it be nice to have a small set of rules that enables 90% of the programmers to write deadlock- and race condition-free multitasked applications without ever thinking about locks of whatever kind ?

    I think an actor based model could come near to that goal and the rules would have to include ...

    - Always have a single threaded synchronization context, and be warned when falling back to a default multithreaded threadpool context.

    - Data belongs to a sync context, do something against foreign threads trying to access your data.

    - Guarantee that data sent to another sync context (e.g. post messages, post lambdas+parameters...) is not accessible by the origing sync context anymore.

    It would be interresting to hear from you what you think about an actor based model and what you would do to enforce such guiding rules.

  • Sorry for being a bit off topic here.

    I recently updated AsyncWcfLib (sourceforge.net/.../asyncwcflib) to support async-await and an actor based programming model (sourceforge.net/.../index.php).

    It is a library implementation of the ideas I mentioned in my last post.

    The library user can write lockfree code even for a multithreaded and asynchronous application.

    But I could not make it completly foolproof. The user has to be careful not to fall back on a threadpool thread and he may not modify a already sent message. I'm doing some threadsafety checking during runtime (like WinForms or WPF) to fight against these programming errors.

  • Any chance of a awaitable (global) Mutex or is that not possible?

  • @ Richard Szalay: By "global", you mean across Windows processes?

  • I do, though since Mutex's have thread identity I ended up using them synchronously in a background thread. (Semaphores aren't available on WP7)

Page 1 of 2 (19 items) 12