Asynchronous programming in Rust
Prerequisites
- All of the previous chapter
TcpStreamandTcpListenerfs::read_to_string- async syntax is presented in this chapter, but you can also check out this
Two chapters ago (in Rust's Many Pointers), the task was to write a multi-threaded calculator application, one that was rather contrived but served to illustrate the issue of sharing data across thread boundary.
However, threads are expensive, it's not a good idea to spin up many threads, since it takes up resources and context-switching takes time. Writing multi-threaded sync code is best suited to a small number of expensive computational tasks, or alternatively, if you are writing low-level OS code which allows you to schedule tasks precisely and gives you a great amount of control. Using threads directly might also be preferred in real-time computing.
For many applications, we need a handy way to avoid spawning many threads. A thread-pool alleviates the issue partially, but you still spin-up threads, and it's not easy to manage a multi-threaded application. Furthermore, what if you only have one thread available?
In comes concurrent programming. Concurrent programming is a general term for an approach which allows having more than one task in progress at once. The difference between concurrent, parallel and distributed programming is that in concurrent, tasks can all run on one thread, and a mechanism exists for switching between them, in parallel programming, tasks run simultaneously, whereas distributed programming uses multiple processes, often each running on a separate machine.
There is a number of mechanisms that facilitate concurrent programming (apart from using threads), for example event-driven programming, co-routines, or actor architecture. It is possible to utilize all of these in Rust by way of specialized libraries, however, Rust has native support for only one approach -> asynchronous programming.
If you've been involved with Rust materials, you might have seen the keywords async/await
mentioned, these are the ones we use for asynchronous programming.
Futures and promises
The Rust's async models revolves around the abstract concept of a *Future, also called a promise. You might have heard about the Promise type from Javascript.
A Future is the promise that at some point in the future, a value of a given type will be available.
In Rust, Future is a trait, there is nothing stopping you from implementing it on
your own custom type, although you are unlikely to want to do that unless you are writing
low-level async libraries. Most of the times, you will use the trait as a handy-dandy way
to abstract from the opaque type Rust generates for each future.
Here is how you can create a future in Rust:
async fn give_me_a_number() -> usize { 20090103 } fn main() { let x = give_me_a_number(); }
The async keyword serves to provide a tiny bit of syntactic sugar, under the hood,
the function definition is transformed to this:
use std::future::Future; fn give_me_a_number() -> impl Future<Output=usize> { async { 20090103 } }
The impl Trait syntax
In the previous example, you see a curious new piece of syntax, called the impl Trait syntax. This may look like another way to do generics, but it is in fact Rust's first foray into the world of existential types. Existential type, alternatively also called "existentially-quantified type" is a type that represents any one type satisfying some property or behavior. When talking Rust, we mean "any one type implementing some trait".
The important thing is we mean one type precisely. This function is not generic over all types
implementing Future, instead, it produces one concrete type implementing Future, we just don't
know, what that type is, since it is a compiler generated opaque type.
In this aspect, we can liken static generics to a universal quantifier (∀) and the expression
"for each type such that", whereas impl Trait is similar to an existential (hence existential types)
quantifier (∃) and the expression "there exists a type such that".
There are at least three common usages of impl Trait:
0. Futures
- Iterators (since iterators create known, but massively compounded types, which can reach thousands of characters very quickly)
- You want to employ as much information hiding in your crate's API as possible
Here's an example with Iterators:
fn double_iter(iter: impl Iterator<Item=usize>) -> impl Iterator<Item=usize> { iter.map(|x| x * 2) } fn main() { println!("{:?}", double_iter((0..=10)).collect::<Vec<_>>()); }
Also notice that I've included the impl Trait syntax in argument position.
This is not an existential type, it is generics. This alternative syntax was
included for parity, and is functionally equivalent except for one thing. It
is not possible to specify the concrete generic type of this parameter by hand,
since there is no named generic argument. In this case, it doesn't matter since
Rust should not have any hardship inferring the types, but keep this limitation
in mind if using it elsewhere.
It is helpful to think that for a function call, existential types are determined by the callee, whereas generics are determined by the caller. This means that the existential type is deduced from the function body.
Back to the Future
Rust futures and async code exhibit some behavior that you might not be used to when coming from other languages.
Rust Futures are inert
Creating a future will not run its code, it is lazy evaluated and the future won't
start until it's first polled, or, in other words, .awaited.
Consider the following example:
async fn foobar() { println!("Back to the future"); } fn main() { println!("Hello"); let x = foobar(); println!("What's your favorite movie?"); }
As you can see, we will never get the answer we so desire. This future was never polled
or awaited, so the code never got executed. We ca fix this easily by using the futures
crate.
async fn foobar() { println!("Back to the future B-)"); } fn main() { println!("Hello"); let x = foobar(); println!("What's your favorite movie?"); futures::executor::block_on(x); }
Now you should see the message. The futures crate provides the most basic tools for
working with asynchronous code, and it is highly recommended you check it out.
It is an official crate, but it is not built in.
In the previous example, we used something called an executor. An executor is a tool for running
asynchronous code. We can't just declare main() as async, since that posits the problem of what
would execute the Future it would become.
Rust does not have a built-in or default executor, and users are encouraged to use different implementations depending on their particular use case, whether it be single-threaded or multi-threaded. This allows for a great degree of flexibility.
Some crates, such as tokio also provide macros in the form of attributes for declaring an async main().
This also results in syntactic sugar, and an executor is spun up behind the scenes, but the specifics of
that are beyond the scope of this text.
The term executor is also sometimes confused with the terms reactor and runtime. A reactor
is a means of providing subscription mechanisms for events like IO, inter-process communication and
timers. Executors only handle scheduling and execution of tasks. The term runtime describes
reactors bundled with executors.
For reactors, you will find them in places where the program is supposed to interact with the outside world
or interact with things which may not be ready yet. In most async libraries, the common reactors are types
for files and file manipulation, and all sorts of sockets. A future to sleep the task may also be considered
a reactor (it reacts to a time duration elapsing).
No built-in runtime
As just mentioned, Rust does not come with any built-in runtime. Most commonly used are:
tokiofutures(very primitive)async_stdsmolbastion(facilitates distributed programming)
Mixing executors
The gist of this is - don't do it. It might be tempting when you run into a situation like this:
- Your program is async, using a serious executor like Tokio
- You are in a blocking concept, unavoidably
- You really need to await a future inside this blocking context
What you might think of in this case is using something like futures::executors::block_on. This might
work, but usually doesn't:
- Async runtimes don't necessarily have cross-compatible types, and might panic when used with a different executor
- The runtime will mess up the amount of threads your program has and likely will block the thread you are currently on.
- If your async action depends on input from other async tasks, it will therefore never advance
So just don't do it.
Rust async is zero-cost
Rust async does not have very many requirements and fairly efficient code is generated (although async binaries tend to be larger than non-async). In fact, you don't even need dynamic dispatch or heap allocations, so Rust async can be used in embedded environments without a hitch.
Async workings
As mentioned earlier, the impl Future syntax conceals a type generated by the compiler.
Understanding this is key to writing the most efficient async code.
Consider the following example:
#![allow(unused)] fn main() { async fn foo() {} async fn bar() {} async fn run_both() { let x = 2; dbg!(x); println!("starting foo"); foo().await; println!("starting bar"); let y = 3; println!("{:?}", y); bar().await; println!("{:?}", x); println!("both functions should be over by now"); } }
Rust will take this, and for each of those functions, it will generate a type resembling
an enum (actually, we can think of it exactly as an enum). It has a number of states,
and these are delimited by the usages of the .await keyboard, which yields back to the
runtime and gives opportunity to other async tasks to run. The executor will then poll
the futures until execution can be resumed.
We can visually separate these blocks:
#![allow(unused)] fn main() { async fn run_both() { { let x = 2; dbg!(x); println!("starting foo"); foo().await; } { println!("starting bar"); let y = 3; println!("{:?}", y); bar().await; } { println!("{:?}", x); println!("both functions should be over by now"); } } }
Of course, this pseudo-code wouldn't compile, since we are referencing a variable from the first block in the last black. But this brings us to an important issue. How do we deal with it in the compiler generated code? Rust variables are on the stack by default, and of course, stack is not preserved between continuations of the future.
Well, Rust solves this by checking which variables can't be just on the stack (as they are referenced
across the .await boundary), and it moves them into the future. In our case, y can be on the
stack willy-nilly, but x cannot, we reference it in the last block.
Important!
If you use very large variables across
.awaitboundary, the underlying type of the future can grow greatly. Keep in mind that a future also stores all of the underlying futures, further increasing the total size of the type. Improperly written async code can grow into gargantuan proportions in the department of type-size and if you have the unfortunate idea to store futures in a memory-limited environment, such as embedded devices, you might run into issues with memory capacity.
We can also illustrate the waiting mechanism of a future to make it clearer without any syntax sugar:
#![allow(unused)] fn main() { fn run_both() -> impl Future<Output = ()> { async { let first = foo(); while !foo.is_ready() { yield_now(); foo.try_complete(); } let second = bar(); while !bar.is_ready() { yield_now(); bar.try_complete(); } } } }
Once again, this is pseudo-code and is of course not runnable.
Storing futures
Two paragraphs ago, I said "don't do it", so now, let me tell you how to do it. It is not possible to
just store futures as they are, since the type is opaque and unknown, the compiler generates it. We only
know that it implements Future<Output=T>, we don't even know if the type is Sized. This means that
we can't store it directly, but we need to use a trait object.
Apart from using & and &mut, as the usual suspects, we can also use Box, Rc or Arc. However,
a special care must be taken, as we need to pin the value.
use std::pin::Pin; use std::future::Future; async fn print_async(string: &'static str) { println!("{}", string); } let boxed: Pin<Box<dyn Future<Output=()>>> = Box::pin(print_async("hello from box")); futures::executor::block_on(boxed);
It is usually not necessary to type out the type at all, this is for illustrative purposes. These boxes are
stored on the heap like any other box, the Pin signifies that a value cannot be moved in memory. Pinning is
an important concept in Rust, but going into detail is beyond the scope of this chapter.
Common futures' operations
There is a couple things that you can do with Futures that are more common with asynchronous programming than with the synchronous approach.
canceling a future
Sometimes, you might need to cancel a future, a different part of your program has determined that you no
longer need to finish a certain computation, and doing it would either waste resources, fail, or produce
irrelevant results. In Rust, cancelling a future is really simple - just Drop it. You can either let it go
out of scope, or explicitly drop it with std::mem::drop(), which is in Rust 2021 automatically imported.
select and select_biased
Select and its biased counterpart are common future operations. A select! is a control structure similar
to match, however, instead of matching a value on a pattern, we are "matching" on the first future that
resolves.
futures::executor::block_on(async { use futures::future::FutureExt; use futures::select; async fn async_identity_fn(arg: usize) -> usize { arg } let res = select! { a_res = async_identity_fn(62).fuse() => a_res + 1, b_res = async_identity_fn(13).fuse() => b_res, }; assert!(res == 63 || res == 13); println!("{}", res); });
Note: The futures crate implementation of select! requires futures to be fuse futures, which is to
say that after being completed, they will never be polled again. In practice, you may want to fuse your futures
outside of select. Check the relevant page in futures doc
A select is handy when you want to select from a number of possible event. Consider the following scenario:
- Your app reads data from the network
- Your app accepts input from the command-line
In this situation, using async is ideal and it can help you avoid constructing a multi-threaded headache. Simply use async primitives for reading stdin() and reading sockets, and in your event loop, spin on a select between futures from reading stdin and reading the socket.
If the term select is confusing for you, you can also think of it as a race, whoever is ready first, wins.
The difference between select and select_biased is that if multiple futures are ready, select chooses pseudo-randomly,
whereas select_biased takes the first one by order of declaration. Generally, you might want to prefer the former over the
latter, as it is entirely possible for one operation to completely starve the rest by just being faster.
join
This operation is very similar to its multi-threaded namesake. Just like we can join multiple threads and wait for them
to finish, the join! macro from the futures library allows us to wait for multiple futures at once, returning their
results together.
The implementation from this library is variadic and can take any number of parameters, returning the results of the futures passed into it as a tuple. Here is an example borrowed from the library's documentation:
futures::executor::block_on(async { use futures::join; let a = async { 1 }; let b = async { 2 }; assert_eq!(join!(a, b), (1, 2)); let c = async { 3 }; let d = async { 4 }; let e = async { 5 }; assert_eq!(join!(c, d, e), (3, 4, 5)); });
stream_select!
This macro combines several streams, provided all of which produce values of the same type. Much like
select!, if multiple streams are ready at once, one is selected pseudo-randomly to prevent streams
being starved
Here is an example of how to use stream_select!:
futures::executor::block_on(async { use futures::{stream, StreamExt, stream_select}; let endless_ints = |i| stream::iter(vec![i].into_iter().cycle()).fuse(); let mut endless_numbers = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3)); for _ in 0..10 { match endless_numbers.next().await { Some(1) => println!("Got a 1"), Some(2) => println!("Got a 2"), Some(3) => println!("Got a 3"), _ => unreachable!(), } } });
From this example, you can see how the algorithm chooses futures in this case, illustrating its pseudo-random nature.
NOTE: You can think of streams as async iterators. They are commonly seen across Rust async code.
Finally, let's look into some cans of worms.
Mutex in async
Usage of mutexes and other safe synchronization mechanisms is quite common in Rust, however, special care
needs to be taken when using std or similar implementation of Mutex (as opposed to eg. tokio implementation,
which is async).
Synchronous Mutexes or RwLocks should never preserve locks across .await boundary. This is because you
might be cooperating with another future that needs this lock, perhaps creating a non-obvious deadlock. Considering
the random nature of choosing the next future, the deadlock bug might not always occur, or may even occur very
infrequently depending on how your application is constructed, making it very difficult to debug.
For example, this is a likely source of deadlock:
#![allow(unused)] fn main() { loop { let x = some_mutex.lock(); async_read_to_string("file").await; *x += 1; } }
Thread locality/confinement
When what you are awaiting isn't ready, the future yields, returning control back to the executor, allowing it
to poll another future. While some executors are single-threaded, many are multi-threaded. Rust makes no guarantees
about futures (or other types) being confined to one thread so long as the appropriate selection of Send & Sync
traits is implemented . Generally, a fast way to not have that is to have any sort of Cell type involved, Rc,
or raw pointers involved.
Because thread confinement is not guaranteed for Rust futures (and frankly, if we are after performance, is downright undesirable), it is unwise to behave as if you had it. Namely, don't use thread_local storage, it will end poorly, and can lead to another Heisenbug, which will likely once again be a pain to debug.
This also means that you should make use you don't use libraries which depend on thread_local storage. For example, some logging frameworks might do that.
Send and sync futures
Futures desugar into an enum whose variants carry local variables that are used across .await points. Otherwise,
pretty much the same rules apply as for types that you might write by hand. In other words, a future is Sync and Send
if its contained types are:
- A type is Send if it is safe to send it to another thread.
- A type is Sync if it is safe to share between threads (T is Sync if and only if &T is Send).
If you are dealing with troublesome types that do not have this feature, make sure to either contain them in something
that is Send + Sync, or that you do not carry them across .await points.
Async traits
The white whale at the end of the road. If you are already well versed with the nuances of traits, you might have already arrived at the question of "How do futures mesh together with traits?". The simple answer is: quite poorly.
As a naive attempt, we can try something like this:
use std::future::Future; trait MyTrait { fn my_function() -> impl Future<Output = u8>; // alternatively written as // async my_function() -> u8; }
If you try to run this example, you will find that Rust complains that impl Trait is not allowed here.
When we introduced existential types earlier, we posited a simplification in saying that the underlying type is determined by the function body rather than the context of the caller. This is not possible here, it has no body in the trait definition.
If we consider implementing this trait, it is also a no-go, since each of the implementations will create
a different Future-implementing type, and that is likely to have a different size, and won't mesh together
with the other implementations.
Furthermore, for this same reason, you can't use the trait for dynamic dispatch either, because the size needs to be known.
The solution is provided by the async-trait crate. By using the attribute provided by this crate (needs to
be both on trait definition and each implementation), you can write async traits like this:
use async_trait::async_trait; #[async_trait] trait MyTrait { async fn my_function() -> u8; }
This looks just like what we need.
Behind the scenes, the macro rewrites the code to this:
use std::pin::Pin; use std::future::Future; trait MyTrait { fn my_function() -> Pin<Box<dyn Future<Output = u8>>>; }
Because it's a trait object behind a pointer, it suddenly has a known size - the size of the pointer, so it's now completely valid. It's also no longer using an existential type, but that's fine.
However, this adds a level of indirection, requires a heap allocation and because it uses a trait object, it hampers
compiler optimizations. Therefore, it is most suitable to be somewhere at the top of your stack (ie. you have trait Service,
implementors of which comprise your application on the top level), which enables your code to be still well-optimized by the
compiler, as opposed to having traits for many menial things to be async. However, your mileage may vary and the performance
cost might not be significant enough for your use case to avoid async traits entirely.
For the sake of completeness, here's a matching impl for the previously mentioned trait:
use async_trait::async_trait; #[async_trait] trait MyTrait { async fn my_function() -> u8; } struct MyStruct; #[async_trait] impl MyTrait for MyStruct { async fn my_function() -> u8 { // the answer to life, death, universe and everything 42 } }
The Task: Making an HTTP server concurrent
For this chapter's project, we are going to do a bit of an compare-and-contrast. If you've went through the Rust book, it has you create a single-threaded HTTP server, then turn it into a multi-threaded one, and finally have you implement some nice to haves on it.
The relevant chapters are these:
- https://doc.rust-lang.org/book/ch20-01-single-threaded.html
- https://doc.rust-lang.org/book/ch20-02-multithreaded.html
Your task is to take the single-threaded implementation, posted here for ergonomics:
use std::fs; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; fn main() { // Listen for incoming TCP connections on localhost port 7878 let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); // Block forever, handling each request that arrives at this IP address for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { // Read the first 1024 bytes of data from the stream let mut buffer = [0; 1024]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; // Respond with greetings or a 404, // depending on the data in the request let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); // Write response back to the stream, // and flush the stream to ensure the response is sent back to the client let response = format!("{status_line}{contents}"); stream.write_all(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
hello.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Hello!</h1>
<p>Hi from Rust</p>
</body>
</html>
404.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>
And turn it into a concurrent one by using the following crates:
- The
smolruntime, and you can usesmol-potatfor runtime initialization by letting you declareasync fn main() - The
futures, which is included insmolasfutures-lite
1. Introducing the async
Start by creating a crate with Cargo and pasting the files above into it. You should verify that you've done it correctly by compiling and running the server, for example with:
cargo run --release
And then cURLing it, simply this should work:
╭[RAM 27%] bos ~ tax:
╰ 23:04 lh-thinkpad magnusi » curl localhost:7878
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Hello!</h1>
<p>Hi from Rust</p>
</body>
</html>
Alternatively, opening it from the browser should also work.
Next, add in the crates above as dependencies, and transform main into an async function, modify handle_connection to be naively async.
2. Streams
If you test your application now, it is quite likely that it will work, but that you can still grind it to a halt with a single Slowloris-like request, that just takes a long time to transmit and complete.
This is because the for loop is not yet async, and is thus it can still block.
Replace TcpListener with its async equivalent provided by smol.
Then, check out the StreamExt trait,
which should allow you to make handling connections concurrent.
You can also use the ever-handy spawn function.
3. Testing and making it parallel
Now, even if you were to get a slowloris like request, your server should not cease responding even though its single-threaded.
You can try verifying that's the case my using netcat to open a TCP connection manually and just slowly typing in your request,
but never finishing it (remember that an HTTP request is terminated by two empty lines, or, in other words, "\r\n\r\n").
If you've used smol-potat, or even if you didn't, it should be now trivial to spin up multiple threads and make your
application both concurrent and multi-threaded. Check out the documentation on how to do just that.
4. End product
In the end you should be left with a well prepared project, that has the following:
- full functionality
- documented code (where applicable)
- clean git history that does not contain fix-ups, merge commits or malformed/misformatted commits