Fearless concurrency with hazard pointers

For those who don't know, I am working on a file system, TFS, which employs various concurrent structures to improve performance. Whenever you do this kind of advanced concurrency, you will meet the ABA problem, roughly describable as "what if another thread runs the destructor on a value you are reading?"

What this problem is, and how can it be solved, is what this blog post will investigate. It presents a form of an optimized form of hazard-pointers as well as an implementation thereof.

The problem

The problem is a lot harder than it seems at first, because you can't always replace the pointer atomically. In particular, you will often have more threads reading a single value. If one thread destroys it while other threads reads it, you get safety issues.

To illustrate, we implement a very simple form of Software Transactional Memory (STM). The idea is that we read a pointer, dereference it, apply some function to the data, and then try to update the pointer. We repeat these steps until it succeeds:

use std::sync::atomic::AtomicPtr;

/// A software transactional memory container.
pub struct Stm<T> {
    /// The inner data.
    inner: AtomicPtr<T>,
}

impl<T> Stm<T> {
    /// Create a new STM container.
    pub fn new(data: T) -> Stm<T> {
        Stm {
            inner: AtomicPtr::new(Box::into_raw(Box::new(data))),
        }
    }

    /// Update the data.
    ///
    /// This applies closure `f` to the data of `self`. If the data isn't updated in the meantime,
    /// the change will applied. Otherwise, the closure is reevaluated.
    pub fn update<F>(&self, f: F)
    where
        F: Fn(Option<T>) -> Option<T>,
        T: 'static,
    {
        loop {
            // Read a snapshot of the current data.
            let snapshot = self.inner.load(atomic::Ordering::Acquire);
            // Evaluate the closure on the snapshot.
            let ret = Box::into_raw(Box::new(f(*snapshot)));

            // If the snapshot pointer is still the same, update the data to the closure output.
            if self.inner.compare_and_store(snapshot, ret, atomic::Ordering::Release) == snapshot {
                // Drop the now-replaced snapshot.
                drop(Box::from_raw(snapshot));
                break;
            }
        }
    }
}

There is a very critical bug in the above code. Look closely: Imagine thread A reads a snapshot, then thread B reads the same pointer and runs the closure, does the CAS, and destroys the old snapshot, which is being read by thread A causing a use-after-free:

THREAD A THREAD B NOTES
let snapshot = self.inner.load(atomic::Ordering::Acquire); Reading pointer A
let snapshot = self.inner.load(atomic::Ordering::Acquire); Reading pointer A
let ret = Box::into_raw(Box::new(f(*snapshot))); Dereferencing A
... Skipping some lines
drop(Box::from_raw(snapshot)) Dropping A
let ret = Box::into_raw(Box::new(f(*snapshot))); Dereferencing A which is dropped

If you remove the line with drop(), the code is safe, but it leaks memory, which is obviously unwanted, especially if you update the STM millions of times.

This roughly describes the ABA problem, a problem which you will run into if you implement virtually any non-trivial concurrent/lock-less data structure. It is unavoidable, and thus it is important to have a strong memory reclamation system at hands.

Concurrent memory reclamation

"Concurrent memory reclamation systems" names the class of systems which solves this problem. Often, they manage a queue of garbage objects, and a system that determines what garbage can be destroyed. Since this is a problem, which is pretty core to concurrent program, there are several of such concurrent memory reclamation systems.

In some languages (e.g. Java), they are implemented into the language itself, in the form of garbage collectors.

This blog post describes a runtime-less (in the sense that it doesn't require the language to implement a runtime) solution.

Deferred destruction

The first mechanism, we will explore, is deferred destruction. The idea is that we have a queue of garbage to be destroyed, which is shared among all threads. The queue is a simple MPSC channel, where the sender is shared among all the threads and the receiver is protected behind a mutex. Various messages (hereunder adding garbage to be destroyed) can then be passed to the garbage collector.

The idea behind deferring destructors is that we get to choose under what conditions an object can be destroyed. If an object is being read, the garbage collector will skip it until next collection cycle.

The garbage collection is simply run randomly according to some probability distribution (to avoid more communication than necessary). If another thread is GC-ing, we skip the cycle.

A timeline

Above diagram shows a timeline where three threads reads values (represented by black lines), then a thread adds the pointer to the garbage (represented by the trashcan), and finally the pointer is destroyed in garbage collection (represented by the recycle symbol).

A very important rule must be followed when adding garbage: After the garbage is added (the destruction is planned), there must be no way of starting new readers. In the diagram, this is describes as after the trashcan, there can be no black lines that starts (but keep in mind, existing lines can continue, like the one in thread B above the trashcan).

If this rule was not followed, we could create new readers after the destructor was run, thus causing use-after-free. The enforcement ultimately lies on the programmer, but - as we get back to - we provide various APIs to do all this safely.

Garbage

To be able to have a queue of garbage, we need a way of representing garbage. There are two components in our representation:

  1. A pointer to the object it represents.
  2. A virtual function to the destructor, which takes the pointer as argument.

Destroying this garbage is as simple as it seems: You take the pointer and gives it as argument to the virtual function.

Hazards

A hazard pointer is supposed to tell the garbage collector that the destruction shall be delayed as the pointer is currently in use.

Hazard pointers are shared between two ends, together making a hazard pair. Namely, one side (the writer side) sets the state and "controls" the pointer, while another controls the garbage collector and uses the information given through the hazard pointer to determine what objects may be destroyed.

A hazard pair is nothing but a shared pointer to a heap allocated state. A hazard pair has four possible states:

A diagram showing the hazard types.

For now, ignore the "thread-local" collumn, we'll get back to that later.

Each state represents the hazard's "message" to the garbage collector:

  1. Blocked: the hazard will eventually go into another state, but it doesn't know which one yet. When the reader reads this state, it is instructed to re-read the hazard until it is in another state. It is necessary when reading the pointer that shall be protected, as garbage collection between the pointer being reading the pointer and setting the hazard will invalidate the pointer.
  2. Protect(x): protect the pointer x from being garbage collected. For example, if the writer side sets the hazard state to Protect(ptr), the garbage collector reads all the hazard (it owns the reader side of all the hazards) and determines what garbage can't be destroyed yet (that is, the garbage which is protected by hazards) and what can.
  3. Free: the hazard is inactive and can be reused later on (the importance of this state is covered later).
  4. Dead: the thread that used the hazard is gone and the hazard won't be used anymore. This marks it lacks a writer, and thus the reader can safely deallocate the hazard.

Thread-local caching of garbage

Since passing messages between threads is expensive, it makes sense to bundle them up to reduce the overhead. What we do is to let each thread hold a queue of garbage:

Garbage stores

Since only the global garbage collector can actually destroy garbage (as it is the only one that has the necessary information), we must propagate garbage through it eventually. We simply have following behavior: When a thread local garbage queue becomes too long, the garbage is exported to the global queue, which means it can eventually be garbage collected.

(to be clear: the garbage collector doesn't represent a thread, but a global state which can be accessed through locking a mutex)

Thread-local caching of hazards

Since hazard pair creation requires sending a message over a channel to the global garbage collector, there is an unnecessary overhead in not reusing hazard pairs. To solve this, we let each thread have a thread-local cache of the available recyclable hazards:

The different hazard stores

This is where Free state comes into the image: Clearly, if we need to reuse hazards, we need an inactive state other than Dead; we need a way to mark that a hazard is not currently in use, but it might be in the future.

So how do we manage the states of the hazards in the thread-local cache? Clearly, if we let the states stand unchanged, it would accumulate hazards making garbage collection impossible until the thread exits.

We could set the state of the hazard to Free, when it is added to the cache. However that would be expensive and unnecessary. We can soundly store non-free hazards in the cache, as long as we require them to be non-Blocked. However, we want to solve the unbounded accumulation issue.

The way we do this is by setting a limit on the number of non-free hazards. When this limit is exceeded, the non-free hazards are set to Free. Ideally, this should happen relatively rarely, so most of the time, the hazard will just go straight into reuse.

If we briefly revisit the earlier diagram, we will note how the different hazard variants has their own places, where the reader belong. For example, Blocked can only be on user-side, because otherwise it would cause deadlock, whereas Protect can appear in both user-side and thread-local cache:

A diagram showing the hazard types.

conc and API design

What I described above is implemented in my crate conc, but very little of what I described above is actually exposed API-wise. It is merely its internal mechanics.

The API exposed is much simpler, but it is free as an abstraction. You can add garbage through a simple method add_garbage() which takes the two parameters, and the hazard pointers are exposed in the form of a wrapper type called Guard<T>.

What Guard does is acting like a pointer, which simultaneously holds the reader end of the hazard, which protects the pointer. Creation of this type is done through a method, Guard::new(), which takes a closure. Before it runs the closure, it does a critical thing: It creates a blocked hazard, meaning that you can safely read from e.g. atomic pointers in the closure, without fearing the garbage collector deallocating it meanwhile.

Everything else is being handled for you. You can force garbage collection yourself (i.e. conc::gc()), but you don't have to. Caching, hazard writers, everything is built into the system and is handled for you without additional overhead. The only two API interfaces are for adding garbage and protecting garbage.

Atomic<T>

Since it quickly gets messy handling destructors yourself, it also exposes Atomic<T>, a bit of a hybrid between Box<T>, Option<T>, and AtomicPtr<T>. What it does is really simple: It allows you to store a (nullable) pointer to some data, which can be safely read without removing it from the structure. On top of that, it exposes various atomic operations such as compare-and-swap, making it a quite powerful primitive.

It means that a large part of the use cases are actually covered in a very intuitive and simple API, so you can get running without much boilerplate.

sync

sync implements various data structures like stacks and locks. Currently, there are only two. Feel free to contribute more.

Debugging tools

There is a few debugging tools included, which can be enabled by enabling a feature and setting an environment variable (refer to the docs). This allows one to debug a data structure by tracing its call.

Several debug assertions are built into the library, making it catch several different forms of erroneous usage.

Internal API

The internal API is a whole other story.

Hazards are divided into two parts: Reader and Writer representing respective sides of the pair. Each of these implements methods in accordance with their allowed behavior. For example, Reader has destroy() and get(), and Writer has free(), protect() among more. The hazard pairs share a pointer to a heap allocated AtomicPtr, but the special states (Blocked, Dead, Free) are implemented in a sneaky way: To avoid colliding with other values, it reserves pointers for these states by having static variables, making it resistance to people using trap pointers like 0x1 or similar.

The API is divided into two main states: local and global, representing the thread-local state and the process-wide state respectively.

The local state is stored in a thread-local variable, which keeps the non-exported garbage and the available hazards. The garbage is regularly exported to the global state, and force export can be done through local::export_garbage().

Secondly, the global state has two parts:

  • The sender part of the message passing channel.
  • The garbo part.

The "garbo" represents the garbage collector and is protected behind a mutex. It stores the other part of the channel, allowing it to receive messages. It also holds the global (exported) garbage and the hazard readers of all the thread.

To know when to garbage collect, global::tick() exist. This is called when new garbage is added among other things. What it does is essentially to generate a random number and see if it below a certain limit, in which case it tries to GC. That is, it corresponds to a randomly deciding whether or not to garbage collect.

Compared to epochs

Epoch-based reclamation or EBR is another popular way of doing concurrent memory reclamation.

The architectural difference is in how destruction is deferred. EBR blocks all the objects from the epoch from being collected, whereas hazard pointers is more fine grained in its behavior. It does it on an object-to-object basis.

This approach is necessarily slower than EBR as described in aturon's blog post. There is however a serious problem with epochs in some cases: Memory blowup, which happens when several threads constantly reads and writes, blocking garbage collection indefinitely and thus causing very high memory usage, if not out-of-memory. This issue isn't in hazards (like this design).

There are some other advantages as well:

  • It is less memory hungry.
  • It is more general-purpose. What makes it more general is the fact that there is no lifetime in the Guard type, which means that it can exist for an arbitrary amount of time, making it usable for more-or-less anything.
  • It is compatible with things like futures, where you can't be bound to a lifetime.
  • The API is simpler from the user's perspective (that is, it is simpler to deal with as a user of a library which depends on conc), as you don't need to add it as dependency (which you do in crossbeam, where the call-side must call epoch::pin()).
    • It implements Guard::map() which allows a library to expose e.g. a particular field of the Guard's content.

Fixed example

I want to round off with a fixed version of the example, I started out with. Below is seen a sound and safe implementation of STM through the conc library:

/// A software transactional memory container.
pub struct Stm<T> {
    /// The inner data.
    inner: Atomic<T>,
}

impl<T> Stm<T> {
    /// Create a new STM container.
    pub fn new(data: Option<Box<T>>) -> Stm<T> {
        Stm {
            inner: Atomic::new(data),
        }
    }

    /// Update the data.
    ///
    /// This applies closure `f` to the data of `self`. If the data isn't updated in the meantime,
    /// the change will applied. Otherwise, the closure is reevaluated.
    pub fn update<F>(&self, f: F)
    where
        F: Fn(Option<Guard<T>>) -> Option<Box<T>>,
        T: 'static,
    {
        loop {
            // Read a snapshot of the current data.
            let snapshot = self.inner.load(atomic::Ordering::Acquire);
            // Construct a pointer from this guard.
            let snapshot_ptr = snapshot.as_ref().map(Guard::as_ptr);
            // Evaluate the closure on the snapshot.
            let ret = f(snapshot);

            // If the snapshot pointer is still the same, update the data to the closure output.
            if self.inner.compare_and_store(snapshot_ptr, ret, atomic::Ordering::Release).is_ok() {
                break;
            }
        }
    }
}

Note how there is not a line of unsafe code in the above code block, which is a neat consequence of the API choices of conc.


Follow me on Twitter or Github.