Double Buffers - Concurrent Updates Part 2
Let’s make a production grade multi-threaded double buffer.
This is a part of a series of blogs, you can see the full list here.
Previously we implemented a sound double buffer, but it didn’t have the
performance characteristics that we set out to implement. In particular, readers didn’t scale independently
of writers, and could be blocked indefinitely by writers. To fix this, we will adapt the synchronization strategy
used by evmap made by @jonhoo. This isn’t
a 1-1 implementation of evmap
’s synchronization strategy, but it is heavily inspired by it, and mostly only
differs in the details of the implementation.
Before jumping into the code, let’s get the high-level ideas down first.
evmap
synchronization strategy theory
The double buffer will store a flag: AtomicBool
, which signals which buffer is currently active.
Readers
Each reader of the double buffer will manage a shared counter (Arc<AtomicUsize>
) called an epoch
.
This epoch
carry two pieces of information:
- If a reader is currently reading from the buffer
- How many times the reader read from the buffer (their
count
)
When a readers starts reading from the buffer it will:
- increment the
count
- load the
flag
to see which buffer it’s pointing into
When a reader finishes reading from the buffer it will:
- increment the
count
- notify the writer that this reader has finished reading
When a reader’s count
is odd, then it is currently reading, if it is odd
it is inert.
Writers
In the shared double buffer, we will hold a list of these shared counters, so the writer may access the counts as well.
Swapping will consist of two phases: a setup and an iterative phase.
During the setup phase, the writer will
- swap the buffers
- garbage collect any readers which are no longer alive
- see which readers are still reading from the buffer, and store their
count
in a second buffer
During the iterative phase, the writer will iterate over all readers which where reading during the setup phase (in the second buffer)
- if their
count
is the same as in the second buffer, then they are still reading from the buffer. Restart the loop, and wait until a reader finishing reading. - if their
count
is different from before, then they are not reading from the buffer.
The iterative phase ends when the the writer has seen that all readers are no longer reading from the map at some point during the iterative phase.
Before going into the implementation proper, this description should be specific enough to write you own implementation
of this strategy, so try to do it! Then check back to see how it compares with what I describe below. What’s different,
what’s the same. What assumptions did you make? Can you prove the correctness of your implementation? What Ordering
s did you use?
The implementation
Now let’s see how this looks in practice! In our previous section we used a double buffer like below, but this is clearly not thread-safe. So we need to update this to reach our goal of a concurrent hash map.
use std::cell::{Cell, UnsafeCell};
use std::rc::Rc;
struct DoubleBuffer<T> {
which: Cell<bool>,
active_readers: Cell<u32>,
// we use UnsafeCell here because we are formally sharing the data, even if they are all accesses are disjoint
data: [UnsafeCell<T>; 2],
}
pub struct WriteHandle<T> {
buffer: Rc<DoubleBuffer<T>>,
}
pub struct ReadHandle<T> {
buffer: Rc<DoubleBuffer<T>>,
}
So how do we fix this? We could try naively switching to atomics. Let’s try it out and see how it goes
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
struct DoubleBuffer<T> {
which: AtomicBool,
active_readers: [AtomicU32; 2],
// we use UnsafeCell here because we are formally sharing the data, even if they are all accesses are disjoint
data: [UnsafeCell<T>; 2],
}
pub struct WriteHandle<T> {
buffer: Arc<DoubleBuffer<T>>,
}
pub struct ReadHandle<T> {
buffer: Arc<DoubleBuffer<T>>,
}
Now we did need to make one more change, active_readers
is now two counters instead of one. This allows us to track how many
readers are in each buffer.
Then we can simply increment the counter for the buffer we are looking at.
impl WriteHandle<T> {
// -- snip --
pub fn try_swap_buffers(&mut self) -> bool {
let next_which = !self.buffer.which.load(Ordering::Relaxed);
if self.buffer.active_readers[next_which].load(Ordering::Acquire) == 0 {
self.buffer.which.store(next_which, Ordering::Release);
true
} else {
false
}
}
pub fn swap_buffers(&mut self) {
assert!(self.try_swap_buffers(), "Tried to swap buffers while there was an active reader")
}
// -- snip --
}
impl<T> ReadHandle<T> {
pub fn read_buffer(&self) -> ReadGuard<'_, T> {
let which = self.buffer.which.load(Ordering::Acquire);
let old_readers = self.buffer.active_readers[which as usize].fetch_add(1, Ordering::Release);
if old_readers == u32::MAX {
panic!("Tried to read too many times")
}
ReadGuard {
which,
buffer: &self.buffer,
}
}
}
The ReadGuard
has the same representation from last time, but we will need to update the Drop
impl to account for the two counters
and atomics.
struct ReadGuard<'a, T> {
which: bool,
buffer: &'a DoubleBuffer<T>,
}
impl<T> Deref for ReadGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
let buffer = &self.buffer.data[(!self.which) as usize];
// SAFETY: We ensure that the writer isn't allowed to access this data
unsafe { &*buffer.get() }
}
}
// this has changed
impl<T> Drop for ReadGuard<'_, T> {
fn drop(&mut self) {
let readers = &self.buffer.active_readers[(!self.which) as usize];
readers.fetch_sub(1, Ordering::Release);
}
}
And again if we try to try to swap two buffers while there is an active reader, we get a panic. This is worse for a multi-threaded double buffer, but at least this is thread-safe. Right?
let mut writer = WriteHandle::new([0, 1]);
let reader = writer.read_handle();
let a = reader.read_buffer();
writer.swap_buffers();
let b = writer.write_buffer();
assert_eq!(*a, *b); // now unreachable
thread 'main' panicked at src/main.rs:42:9:
Tried to swap buffers while there was an active reader
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Thread Safety
// -- snip --
// in ReadHandle::read_buffer
let which = self.buffer.which.load(Ordering::Acquire);
let old_readers = self.buffer.active_readers[which as usize].fetch_add(1, Ordering::Release);
if old_readers == u32::MAX {
panic!("Tried to read too many times")
}
// -- snip --
// in WriteHandle::try_swap_buffers
let next_which = !self.buffer.which.load(Ordering::Relaxed);
if self.buffer.active_readers[next_which].load(Ordering::Acquire) == 0 {
// point A
self.buffer.which.store(next_which, Ordering::Release);
true
} else {
false
}
// -- snip --
Above is how we just defined reading and swapping buffers. But this does not uphold the guarantees that we want.
In particular, if ReadHandle::read_buffer
is running on Thread 1
and WriteHandle::try_swap_buffers
is running on Thread 2
,
we could have the following code execution:
- run
WriteHandle::try_swap_buffers
inThread 2
until point A - run
ReadHandle::read_buffer
to completion inThread 1
- finish
WriteHandle::try_swap_buffers
inThread 2
This will lead to both the ReadHandle
in Thread 1
and the WriteHandle
in Thread 2
both pointing to the same buffer.
So our API can expose an aliased &mut _
, which makes this API unsound.
So it looks like just naively translating our single-threaded code to the multi-threaded implementation doesn’t work. In general, single-threaded algorithms fundamentally rely on being single-threaded, so cannot be easily translated to a multi-threaded implementation naively.
A Sound Multithreaded implementation
So to fix this, we will need to introduce a loop and a lock.
impl WriteHandle<T> {
// -- snip --
pub fn try_swap_buffers(&mut self) -> bool {
let next_which = !self.buffer.which.load(Ordering::Relaxed);
// before swapping, lock the current readers in the next buffer. If there are already any readers, then we will fail to lock
if self.buffer.active_readers[next_which].compare_exchange(0, u32::MAX, Ordering::AcqRel, Ordering::Acquire).is_ok() {
// under the lock, swap the buffers
self.buffer.which.store(next_which, Ordering::Release);
// release the lock
self.buffer.active_readers[next_which].store(0, Ordering::Release);
true
} else {
false
}
}
pub fn swap_buffers(&mut self) {
assert!(self.try_swap_buffers(), "Tried to swap buffers while there was an active reader")
}
// -- snip --
}
impl<T> ReadHandle<T> {
pub fn read_buffer(&self) -> ReadGuard<'_, T> {
let mut which = self.buffer.which.load(Ordering::Acquire);
let mut readers = &self.buffer.active_readers[(!swapped) as usize];
let mut old_readers = readers.load(Ordering::Acquire);
loop {
// try to increment the counter
let Some(next_num_readers) = num_readers.checked_add(1) else {
// if it is locked by the `WriteHandle`, then refresh
// all pointers, and point into the next buffer.
which = self.buffer.which.load(Ordering::Acquire);
reader_count = &self.num_readers[(!swapped) as usize];
num_readers = reader_count.load(Ordering::Acquire);
core::hint::spin_loop();
continue;
};
// try to update `reader_count`
match reader_count.compare_exchange_weak(
num_readers,
next_num_readers,
Ordering::AcqRel,
Ordering::Acquire,
) {
Err(current) => num_readers = current,
Ok(_) => return ReadGuard {
which,
buffer: &self.buffer,
},
}
core::hint::spin_loop();
}
}
}
The write locks the active_readers
for the next buffer, then does the swap. This ensures that all readers that are racing with
WriteHandle::try_swap_buffer
are aware that a buffer swap is taking place, and can cooperate (by reloading which buffer they should
point to). Now, this implementation is still flawed, it ties read scaling back to the writer. And if a writer is constantly swapping,
then readers will be unable to make progress. So this is not ideals. But it is sound, which is far better than we can say about the
initial implementation.