Concurrent and Parallel
One of the most difficult classes of problems to debug, or rather areas to handle correctly, is concurrent and parallel programming. Rust touts fearless concurrency as one of its main selling points, so let's see how this is handled in Rust, and what options are there for commonly used 3rd party libraries to make this easier.
The considerations of sharing state across threads
Sharing state correctly is a very hard thing to do, and shooting yourself in the foot is incredibly easy.
There are two main types of problems we are likely to encounter:
- data races
- race conditions
Quite often, you might get two for the price of one.
Data race
A data race occurs when you have a program with multiple threads, a shared mutable location, and one thread tries to write to said location while another is reading from it.
Three things can occur:
- The old value is read
- The new value is read
- A corrupted value is read
This is undefined behavior, and it may or may not blow up in your face during the execution of your program. What happens in these situations may also be hardware and OS-dependent and a data race might go undiscovered for a long time simply because it "works for me (tm)".
Safe Rust prevents data races as part of its fearless concurrency promise. This guarantee is a combination of the following features:
- Explicit mutability encoded into types of references (you know if a reference is mutable)
- It is forbidden to mutate or use mutable statics in safe Rust
- Whether something can safely be shared across threads is encoded in its type (remember the
Send+Synctraits)
This is pretty strict, but if we really need global mutable state, we have a couple options we will discuss later
Race conditions
This term is often confused with data races, however, no data corruption occurs. A race condition means that your program behaves differently depending on the order parallel scheduled operations run.
Race conditions are not inherently unsafe and because there is no data corruption, Rust does not outright prevent them. However, the fact that Rust requires you to use explicit synchronisation primitives that you cannot opt out of forces you to write your program mindfully and helps prevent some occurrences of race conditions.
You should try to design your multithreaded programs in such a way that race conditions do not occur. This usually means limiting shared state to what is absolutely necessary, being mindful on how data is shared between threads, and using synchronisation primitives.
Threads and tasks
Before we get to synchronisation primitives, it is helpful to compare threads and tasks.
Threads and tasks (such as from Tokio) look quite similar in their API, except for the
fact that tasks use async {} closures.
However, they are orthogonal. An async scheduler may poll the same task on different threads, tasks may be split among threads, and tasks may also run all on one thread.
To broaden our horizons, this chapter mostly uses threads, but you should know about tasks and you will use them in the task.
See the async and tokio chapters for more details if you haven't seen them already.
Synchronisation primitives in Rust
Back in the chapter about Rust's pointer types, we touched on the topic
of Arc<Mutex<T>>.
In safe Rust, you cannot mutate statics (the equivalent of global variables) directly,
and you either have to settle with immutable ones, or use a container type providing
interior mutability.
Interior mutability allows mutating data in what should be an immutable context, such as
a static, the insides of an Arc, or behind an immutable reference &.
Internally, these containers use unsafe {} to bypass mutability and ownership rules,
and it is the responsibility of the author of these types to ensure Rust's safety invariants
remain, well, invariants.
Writing these interior mutability types is not a simple discipline and requires much scrutiny.
In Rust, we encounter two families of types that provide interior mutability:
- thread-unsafe ones based on
UnsafeCell, such asCellorRefCell - thread-safe ones, such as
MutexorRwLock
Why would someone use a thread-unsafe interior mutability type? Well, they are much faster, and so they may be suitable when your program is single-threaded (or the data contained is not shared across threads) when performance is a major concern.
That is the same as the difference between Mutex and RwLock: Mutex is faster,
but RwLock can be more flexible (it mimics borrowing rules, you can have any number of readers or only at most one write-capable
lock exactly).
Check out a little example, kindly borrowed from the Rust standard library documentation:
#![allow(unused)] fn main() { use std::sync::{Arc, Mutex}; use std::thread; use std::sync::mpsc::channel; const N: usize = 10; // Spawn a few threads to increment a shared variable (non-atomically), and // let the main thread know once all increments are done. // // Here we're using an Arc to share memory among threads, and the data inside // the Arc is protected with a mutex. let data = Arc::new(Mutex::new(0)); let (tx, rx) = channel(); for _ in 0..N { let (data, tx) = (Arc::clone(&data), tx.clone()); thread::spawn(move || { // The shared state can only be accessed once the lock is held. // Our non-atomic increment is safe because we're the only thread // which can access the shared state when the lock is held. // // We unwrap() the return value to assert that we are not expecting // threads to ever fail while holding the lock. let mut data = data.lock().unwrap(); *data += 1; if *data == N { tx.send(()).unwrap(); } // the lock is unlocked here when `data` goes out of scope. }); } rx.recv().unwrap(); }
As you can see, we need the Arc to be actually able to pass the mutex to multiple places at once, since it can't inherently be cloned.
Also notice that we used channels in this example.
Chanel no. 5
Channels are data structures which facilitate sending data between two or more decoupled ends. We usually talk about sender and receiver halves.
There is several types of channels:
- SPSC - single producer, single consumer; these have only one sender part and one receiver part, they are the most primitive
- SPMC - single producer, multiple producer; these multiple receiver halves. They are, from my experience, not used often
- MPSC - multiple producer, single consumer; these have multiple sender halves. They are used quite often and built-in into Rust.
- MPMC - multiple producer, multiple consumer; most flexible, can have N senders and M receivers
As mentioned, Rust has native support for mpsc channels, found in the std::sync::mpsc
module.
#![allow(unused)] fn main() { use std::thread; use std::sync::mpsc::channel; // Create a shared channel that can be sent along from many threads // where tx is the sending half (tx for transmission), and rx is the receiving // half (rx for receiving). let (tx, rx) = channel(); for i in 0..10 { let tx = tx.clone(); thread::spawn(move|| { tx.send(i).unwrap(); }); } for _ in 0..10 { let j = rx.recv().unwrap(); assert!(0 <= j && j < 10); } }
Since the Receiver and Sender bits are completely decoupled, channels are a great way of sharing data between threads
Atomics
Atomic types and operations on them form the backbone of many parallel access options. Atomic operations are operations that appear to the rest of the system to have occurred instantaneously, the naming comes from the etymology for atom, meaning indivisible.
Rust provides a number of atomic types:
- AtomicBool
- AtomicI8
- AtomicI16
- AtomicI32
- AtomicI64
- AtomicIsize
- AtomicPtr
- AtomicU8
- AtomicU16
- AtomicU32
- AtomicU64
- AtomicUsize
NOTE: Not all of these types are available on all platforms. Some platforms come without hardware support and atomics are implemented on OS-level. This usually incurs a further penalty to performance.
A simple spinlock:
use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{hint, thread}; fn main() { let spinlock = Arc::new(AtomicUsize::new(1)); let spinlock_clone = Arc::clone(&spinlock); let thread = thread::spawn(move|| { spinlock_clone.store(0, Ordering::SeqCst); }); // Wait for the other thread to release the lock while spinlock.load(Ordering::SeqCst) != 0 { hint::spin_loop(); } if let Err(panic) = thread.join() { println!("Thread had an error: {panic:?}"); } }
As you can see from the exampe, you generally need to specify an explicit atomic memory ordering. Memory orderings specify the way that operations with atomic types should be ordered.
Per the reference for Ordering:
Memory orderings specify the way atomic operations synchronize memory. In its weakest
Ordering::Relaxed, only the memory directly touched by the operation is synchronized. On the other hand, a store-load pair ofOrdering::SeqCstoperations synchronize other memory while additionally preserving a total order of such operations across all threads.
Methods on atomic types generally take &self as param, meaning you can use them in immutable contexts.
Keep in mind that atomics are much slower than their non-atomic counterparts, sometimes several orders of magnitude.
Barrier
A barrier is a synchronisation tool which does not contain any inner type, it merely enables a number of threads to synchronize the beginning of some operation:
#![allow(unused)] fn main() { use std::sync::{Arc, Barrier}; use std::thread; let mut handles = Vec::with_capacity(10); let barrier = Arc::new(Barrier::new(10)); for _ in 0..10 { let c = Arc::clone(&barrier); // The same messages will be printed together. // You will NOT see any interleaving. handles.push(thread::spawn(move|| { println!("before wait"); c.wait(); println!("after wait"); })); } // Wait for other threads to finish. for handle in handles { handle.join().unwrap(); } }
Condvar
Rust also features support for condition variables. Condition variables give you the ability to block a thread in such a way that it does not use the CPU while waiting for an event to occur (a condition to be fulfilled).
Condvars are usually compiled with a boolean predicate and a mutex.
#![allow(unused)] fn main() { use std::sync::{Arc, Mutex, Condvar}; use std::thread; let pair = Arc::new((Mutex::new(false), Condvar::new())); let pair2 = Arc::clone(&pair); // Inside of our lock, spawn a new thread, and then wait for it to start. thread::spawn(move|| { let (lock, cvar) = &*pair2; let mut started = lock.lock().unwrap(); *started = true; // We notify the condvar that the value has changed. cvar.notify_one(); }); // Wait for the thread to start up. let (lock, cvar) = &*pair; let mut started = lock.lock().unwrap(); while !*started { started = cvar.wait(started).unwrap(); } }
Note that a condvar should be used with one particular Mutex exactly, using more than one can lead to a panic.
Once
This is a more Rust-specific synchronization primitive which can be used to do a one-time global initialization. This is useful for FFI or related functionality.
#![allow(unused)] fn main() { use std::sync::Once; static START: Once = Once::new(); START.call_once(|| { // run initialization here }); }
The closure will run once exactly regardless of how many times you call .call_once().
Rust libraries
Since parallel/concurrent access is very important, there exists a number of crates in the Rust ecosystem focused on improving and extending the multi-threaded experience.
We have mentioned rayon already, let's focus on crates centered around shared state.
Crossbeam
Crossbeam is a loose collections of tools for concurrent programming, it contains a wide variety of smaller crates.
For example, it contains an implementation of AtomicCell,
which is very similar to the Cell type, except that it is thread-safe.
You can check via AtomicCell::<T>::is_lock_free() if the platform you are running for actually supports
the necessary atomics to make this type lockfree.
#![allow(unused)] fn main() { use crossbeam_utils::atomic::AtomicCell; let a = AtomicCell::new(7); assert_eq!(a.load(), 7); a.store(8); assert_eq!(a.load(), 8); }
Atomic loads use the Acquire ordering and atomic stores use the Release ordering.
Flume
While crossbeam does provide its own implementation for channels, which is mpmc,
the crate flume is the king when it comes to channels.
It is blazing fast, MPMC by default, and it supports both async and sync contexts, even
allowing you to mix and match to your hearts content.
use std::thread; fn main() { println!("Hello, world!"); let (tx, rx) = flume::unbounded(); thread::spawn(move || { (0..10).for_each(|i| { tx.send(i).unwrap(); }) }); let received: u32 = rx.iter().sum(); assert_eq!((0..10).sum::<u32>(), received); }
As far as I am concerned, it is magic.
Parking_lot
This crate does not really bring anything new to the table, it just takes existing, and remakes it better.
parking_lot contains smaller, faster and more flexible
implementations of Standard library synchronization primitives, and is therefore always preferred
if you are allowed to use it.
#![allow(unused)] fn main() { use parking_lot::{Mutex, Condvar}; use std::sync::Arc; use std::thread; let pair = Arc::new((Mutex::new(false), Condvar::new())); let pair2 = pair.clone(); // Inside of our lock, spawn a new thread, and then wait for it to start thread::spawn(move|| { let &(ref lock, ref cvar) = &*pair2; let mut started = lock.lock(); *started = true; cvar.notify_one(); }); // wait for the thread to start up let &(ref lock, ref cvar) = &*pair; let mut started = lock.lock(); if !*started { cvar.wait(&mut started); } // Note that we used an if instead of a while loop above. This is only // possible because parking_lot's Condvar will never spuriously wake up. // This means that wait() will only return after notify_one or notify_all is // called. }
Fits like a glove.
In fact, most of the time, introducing parking_lot to existing code requires no big rewrites,
just change where you importing the synchronization primitives from at the top of your Rust file :D
The Task: Concurrent ping pong
By this point, you should have read the async chapter, and should well be familiar with Tokio tasks.
Write a program with three tasks:
- a
Pingtask, which creates a ping every half a second - a
Pongtask, which responds with a pong - a display task, which displays these events to the standard output (eg. with
println!())
Your main task is choosing the most appropriate synchronisation primitives.
The output, as displayed by the display thread, should look something like this:
Ping
Pong
Ping
Pong
Ping
Pong
If your program prints either of those strings twice in a row, you have a race condition.
End product
In the end you should be left with a well prepared project, that has the following:
- documented code explaining your reasoning where it isn't self-evident
- optionally tests
- and an example or two where applicable
- clean git history that does not contain fix-ups, merge commits or malformed/misformatted commits
Your Rust code should be formatted by rustfmt / cargo fmt and should produce no
warnings when built. It should also work on stable Rust and follow the Braiins Standard