Monday, 21 April 2014

Notes On Concurrent Ring Buffer Queue Mechanics

{This post is part of a long running series on lock free queues, checkout the full index to get more context here}
Having recently implemented an MPMC queue for JAQ (my concurrent queue collection) I have been bitten several times by the quirks of the underlying data structure and it's behaviour. This is an attempt at capturing some of the lessons learnt. I call these aspects or observations 'mechanics' because that's how they feel to me (you could call them Betty). So here goes...

What is a Ring Buffer?

A ring buffer is sometimes called a circular array or circular buffer (here's wiki) and is a pre-allocated finite chunk of memory that is accessed in a circular fashion to give the illusion of an infinite one. The N+1 element of a ring buffer of size N is the first element again:

So the first mechanical aspect to observe here is how the circularity is achieved. I'm using a modulo operator above, but you could use a bitwise AND instead if queue size was a power of 2 (i%(2^k) == i&((2^k) -1)). Importantly, circularity implies 2 different indexes can mean the same element.
Trivial stuff, I know, moving right along.

A Ring Buffer Based Bounded Queue: Wrap and Bounds

Ring buffers are used to implement all sorts of things, but in this particular case let us consider a bounded queue. In a bounded queue we are using the underlying buffer to achieve re-use, but we no longer pretend the capacity is infinite. We need to detect the queue being full/empty and track the next index to be offered/polled. For the non-thread safe case we can meet these requirements thus:

As an alternative we can use the data in the queue to detect the full/empty states:

The next mechanical aspect we can notice above is that for a queue to work we need to stop the producer from wrapping around and overwriting unread elements. Similarly we need the consumer to observe the queue is empty and stop consuming. The consumer is logically 'chasing' the producer, but due to the circular nature of the data structure the producer is also chasing the consumer. When they catch up to each other we hit the full/empty state.

The Single Producer/Consumer case: Visibility and Continuity

In the  SPSC (single producer/consumer threads, no more) case life remains remarkably similar to the non-thread safe case. Sure we can optimize memory layout and so on, but inherently the code above works with very minor changes. You will notice in the following code that I'm using a lovely hungarian notation of my own design, lets get the full notation out of the way:
  • lv - load volatile (LoadLoad barrier)
  • lp - load plain
  • sv - store volatile (LoadStore barrier)
  • so - store ordered (StoreStore barrier), like lazySet
  • sp - store plain
  • cas - compare and swap
lv/so for the counters could be implemented using an AtomicLong, an AtomicFieldUpdater or Unsafe. Here goes:
And the alternative (lvElement/soElement could be implemented using Unsafe or by replacing the original buffer with an AtomicReferenceArray):

All we needed to add are appropriate memory barriers to enforce visibility and ordering. The mechanical observation to be made regarding the barriers is that correct publication is a product of ordered visibility. This is very prominent in the counter based approach where the counter visibility derives data visibility in the queue. The second approach is slightly more subtle but the ordered write and volatile read guarantee correct publication of the element contents. This is an important property of concurrent queues that elements are not made visible to other threads before preceding writes to their contents.
I've added an optimization on the alternative offer that highlights a mechanical property of the SPSC and non-thread safe cases. The offer is probing ahead on the buffer, if the "tail + look ahead constant" element is null we can deduce that all the elements up to it are also null and write without checking them. This property of the queue is the continuity of data existence. We expect the ring buffer to be split to an all empty section and an all full section and we expect no 'holes' in either.

The MPSC case: Atomicity and Holes in the fabric of Existence

So now we have multiple threads hitting the queue. We need to ensure the blighters don't over-write the same slot and so we must increment head atomically before writing to the queue:

An interesting thing happens on the consumer side here. We can no longer rely on the tail counter for data visibility because the increment is done before the data is written. We must wait for the data to become visible and can't just assume it is there as we did before. This highlights the fact that tail is no longer indicative of producer progress.
The alternative poll method in this case cuts straight to the issue:

I will not show the code for the SPMC case, it is very similar, but one point is worth examining. For the SPMC the offer method can no longer employ the probe ahead optimization shown above. That is because the continuity property is no longer true (a real shame, I liked it allot). Consider 2 consumer threads where one has progressed the danger point where head will be visible but not the data, and the other charging ahead of it. The slot is not null and remains so until the thread resumes. This means the empty section of the queue now has a hole (not null element) in it... making the probe ahead optimization void. If we were to keep the optimization the producer would assume the coast is clear and may overwrite an element in the queue before it is consumed.
For both MPSC/SPMC and also MPMC we can therefore observe that counter increment atomicity does not imply queue write atomicity. We can also see that this scheme has no fairness of counter acquisition or slot use so it is possible to have many producers/consumers stuck while others make progress. For example, given 3 producers A, B and C we can have the queue fill up such that the slots are claimed to the effect of: [ABBBBBBBCAAAAACCCABABACCCC...] or any such random layout based on the whims of the scheduler and CAS contention.

The MPMC case: What Goes Around

So finally all hell breaks loose and you have multiple producers and consumers all going ape pulling and pushing. What can we do? I ended up going with the solution put forward by Mr. D. Vyukov after I implemented a few flawed variations myself (an amusing story to be told some other time). His solution is in C and benefits from the memory layout afforded by languages with struct support. I had to mutate the algorithm (any bugs are my own) to use 2 arrays instead of one struct array but otherwise the algorithm is the very similar:

So... what just happened?
  • We're setting up each slot with a sequence
  • A producer can only write to the slot if the sequence matches tail and they won the CAS
  • A consumer can only read the slot if the sequence is head + 1 and they won the CAS
  • Once a producer writes a slot he sets it's sequence to tail + 1
  • Once a consumer reads a slot she sets the sequence to head + buffer.length
Why can't we rely on the head/tail anymore? well... the head/tail values were half useless before as pointed out in the MPSC section because they reflect the most advance consumer/producer and cannot indicate data state in the queue.
Can't we use the null/not-null check like before? mmm... this one is a bugger. The surprising problem here is producers catching up with other producers after wrapping and consumers doing the same to other consumers. Imagine a short queue, 2 slots only, and 2 producer threads. Thread one wins the CAS and stalls before writing slot 1, thread 2 fills second slot and comes round to hit slot 1 again, wins the CAS and either writes over thread 1 or gets written over by thread 1. They can both see the slot as empty when they get there.
A solution relying on counters exists such that it employs a second CAS on the data, but:

  1. It is slower, which is to be expected when you use 2 CAS instead of one
  2. It runs the risk of threads getting stuck to be freed only when the other threads come full circle again. Think of a producer hitting another producer on the wrap as discussed before and then one wins the CAS on data and the other is left to spin until the slot is null again. This should be extremely rare (very hard to produce in testing, possible by debugging to the right points), but is not a risk I am comfortable with.

I'm hoping to give concrete examples broken code in a further post, but for now you can imagine or dig through the commit history of JAQ for some examples.
The sequence array is doubling our memory requirement (tripling it for 32bit/compressed oops). We might be able to get by with an int array instead. The solution works great in terms of performance, but that is another story (expect followup post).
The important observation here on the mechanical side is that for MPMC both head and tail values are no longer reliable means of detecting wrap and as such we have to detect wrap by means other than head/tail counters and data existence.

Summary

  • Circular/ring array/buffer give the illusion of infinite arrays but are actually finite.
  • Bounded queues built on ring buffers must detect queue full/empty states and track head/tail positions.
  • Ring buffers exhibit continuity of existence for the full/empty sections ONLY in the SPSC or single threaded case.
  • MPSC/SPMC/MPMC queues lose continuity, can have holes.
  • Counter increment atomicity does not imply write atomicity.
  • MP means tail is no longer a reliable means of ensuring next poll is possible.
  • MC means head is no longer a reliable mean of ensuring next offer is possible.
  • MPMC implementations must contend with producer/producer and consumer/consumer collisions on wrap.
I'm publishing this post in a bit of a rush, so please comment on any inaccuracies/issues/uncertainties and I shall do my best to patch/explain if needed. Many thanks go out to Martin Thompson, Georges Gomez, Peter Hughes and anybody else who's bored of hearing my rambles on concurrent queues.

17 comments:

  1. I think I'm misunderstanding something fundamental about the SpscBoundedQueue (4th snippet). How are there visibility guarantees when the reader and writer never access the same volatile vars?

    ReplyDelete
    Replies
    1. The 2nd SPSC version employs a volatile array element read (lvElement) and an ordered array element store (soElement).
      These are not easily available in java and require you either:
      1. Use an AtomicReferenceArray (http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/AtomicReferenceArray.html)
      2. Use Unsafe to achieve the same effect. Which is exactly what AtomicReferenceArray if you look at the code.
      The full code of a queue using this approach can be found here: https://github.com/nitsanw/JAQ-InABox/blob/master/jaq-inabox/src/main/java/io/jaq/spsc/FFBufferWithOfferBatch.java
      Thanks for reading :-), do ask if anything else is unclear.

      Delete
    2. Ah-ha! Must've had my eyes crossed; I completely missed that the element was also volatile. Thanks :)

      Delete
    3. s/also//

      My takeaway from this is to not attempt concurrent programming before morning coffee.

      Delete
  2. Regarding tailIsClearUntil, is it not possible that we're skipping over empty positions, in particular when we're wrapping around?

    ReplyDelete
    Replies
    1. In the SPSC case the null elements are continuous, so:
      buffer[i] is null && buffer[i + K] is null => buffer[j] is null for all i<=j<=i+K
      Because the nulling of elements (in poll()) is ordered and done by a single thread.
      How do you expect this to fail?

      Delete
    2. Imagine you have a 6 element array with a look ahead of 4. You start at index zero and look ahead to index 4. This holds true. The produces puts items until it reaches index 4 and the consumer consumes index 0, writing null. At index 4, the producer looks ahead at index 2, which is non-null, and returns false, despite index 0 being free. What am I missing?

      Delete
    3. :-) you are not missing anything, but the side effect is reduced capacity, not a correctness issue. it's a tradeoff between space and speed. We will be giving up on up to LOOK_AHEAD - 1 capacity for the speedup.
      This has a further side effect which can be positive when the lost capacity is more than a cache line. As the producer avoids writing all the way up to the consumer they will not be contending on the cache line when the queue is near full.
      Thank you for reading the code so closely, I appreciate the feedback.

      Delete
    4. That makes sense. Thanks for the explanation!

      Delete
  3. In MpscBoundedQueue::poll it seems more than one thread can return the same element. Also, if sufficient time passes before a thread executes soHead(currentHead + 1) to allow the queue to wrap around, we can lose an element.

    ReplyDelete
    Replies
    1. MPSC -> Single consumer, only one thread.
      The consumer is making the slots visible after they are null, which implies producer cannot overwrite an unconsumed slot. As the head value is indicative (it is indeed where the consumer is) and since the consumer cannot move past a null slot the producer store must be visible for the head to progress past the slot. So producers cannot wrap past each other as the producer write must be seen by the consumer to allow the next producer to wrap back to the same element.
      Not sure it's a clear enough explanation... keep pushing if it still doesn't make sense.

      Delete
    2. Oh, sorry, got confused there, thought it was multiple consumer! :-)

      Delete
  4. Thanks! As always very interesting.

    True, I was always confident that Ring Buffer for only SPSC and not thought to check it ;-)
    But SPSC can avoid CAS and use only lazyset.

    ReplyDelete
  5. Hello. I'm currently working on performance improvements for the RxJava library and we have two places where a faster MPSC/MPMC behavior would come in handly.

    One place is where event delivery to an observer needs to be serialized, i.e., when multiple threads produce events, they have to be delivered one after the other. Currenlty, we use synchronized block with queue/drain logic, i.e., when one thread wins the right of emission, it would emit the values of the others too. The current implementation when run on a single thread is 10x slower than a directly emitting the same event.

    The second place where some better queueing could come in handy are some worker threads, each having separate queues (due to the requirement that tasks from the same source should be executed on the same thread, so no thread hopping). The implementation is backed by ScheduledThreadPoolExecutor and throughput barely reaches 10Mops/s on an older i7 920. An extra nice-to-have feature would be that workers could take/steal work from each other if such work hasn't been pinned to a particular thread yet.

    In addition, both places would need grow as necessary because a producer might be on the same thread as the consumer.

    Do you have any tips?

    ReplyDelete
    Replies
    1. Currently JAQ has MPMC and MPSC covered, but both implementations are bound, so this is probably not helpful. There is an unbounded SPSC and I plan to implement linked list and linked buffer versions for SPSC and other variants. If this is an area you would like to collaborate in I'd be happy to assist.
      If I understand correctly, the system you describe is for generic 'Actors' passing messages to each other. If the actors can be assumed non-blocking I believe the best result would be achieved by balancing the tasks/actors on top of a fixed size thread pool and routing messages via SPSC queues. This is not a trivial exercise and I have no proof to back my opinion here, so that is all it is for now.

      Delete
  6. i am quite confuse in Single producer and multi-producer.
    is single producer is one type of business logic such as one class which is having its method. and it is used by one thread only?
    is multi-producer is one type of business logic such as one class which is having its method. used by multiple threads or its two-three type type of business logic with different different class which will be used by many threads but 1 thread will execute only one class?

    also what is the main difference b/w using linkedlist and using this ringbufferQueue. the difference which i knows are bounded/unbounded and blocking the request.
    bt why we will created a bounded queue when we know we have to process all requests.

    ReplyDelete
    Replies
    1. The single/multi refers to the threads, not the call sites(i.e. methods/classes calling a method). A single producer thread may still offer into a queue from any number of classes/methods and still satisfy the single producer criteria. The restriction is in place to allow certain assumptions to be made about concurrent access to the queue internal data structure.
      Using a linked list will typically mean an unbounded queue, but there are counter examples. Akka for instance has a bounded linked MPSC queue implementation. An unbounded SPSC/MPSC queue is included in JCTools.
      A linked list based queue may be lock free or blocking, there is no need for it to be one or the other. ConcurrentLinkedQueue is a lock free linked list based queue for example. The JCTools linked queues are also lock free.
      One can use a series of linked bounded queues to construct an unbounded queue thus mixing the 2 approaches.
      Creating a bounded queue does not contradict processing all the requests, I'm not sure I understand that last question...

      Delete