Asynchronous programming using the Tokio framework
Prerequisites
- All of the previous chapter
TcpStreamandTcpListenerfs::read_to_string- async syntax
Using async properly requires a large commitment, and usually requires subscribing to a kitchen-sink type of network. This is because of the nature of IO and networking (and possibly other OS events). An proper async application benefits most when async IO is used, and tasks can react properly to updates on sockets and files.
Blocking IO may defeat many of the benefits of having your application be async.
In Rust, there exist several implementations of non-blocking IO, the most prevalent of which is current mio. However, these libraries are usually low-level, and may not even use futures at all, so async frameworks usually wrap these in a more ergonomic API.
The most mature framework for asynchronous programming in Rust is Tokio, and the Tokio stack forms the backbone of many large Rust projects. In Braiins, we use tokio also.
To get familiarized with the basics of Tokio, you should read the tutorial
Tokio vs std vs async_std
You will see that tokio mirrors many of the APIs found in the standard library. If you are not
sure about a particular use case, it is better to just use the analogues provided by tokio, since
with std, you may run into the aforementioned issues with blocking IO.
There also exists some confusion with async_std. This is a library by one
of the former contributors to Tokio and several others. It aims to mirror std API as much as
possible, but it is still a 3rd party async framework much like Tokio (or smol, which we've briefly
encountered in the last chapter).
It is generally a bad idea to import two or more of these libraries and mix their types, and it may lead to un-performant code or even panics at runtime.
AsyncRead and AsyncWrite
The difference from std is not only in types, but also core traits. This is generally
because std traits also only provide a blocking API, and so non-blocking analogues needed to
be created.
The ones that can be considered universal are located in the futures crate. If you want your code to be as compatible with 3rd party libraries as possible and have as broad audience as possible, you should prefer these to other alternatives.
The two most commonly encountered traits are likely to be AsyncRead
and AsyncWrite. If you click
the links, you will see that they are fairly bare-bones, and do not match much of their std::io
counterparts at all.
Ergonomic IO functionality is provided by their extension traits, AsyncReadExt
and AsyncWriteExt respectively.
Here is an example asynchronously reading bytes from two vectors into another vector.
#![allow(unused)] fn main() { use futures::io::{AsyncReadExt, Cursor}; let reader1 = Cursor::new([1, 2, 3, 4]); let reader2 = Cursor::new([5, 6, 7, 8]); let mut reader = reader1.chain(reader2); let mut buffer = Vec::new(); // read the value into a Vec. reader.read_to_end(&mut buffer).await?; assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]); }
A Cursor wraps an in-memory buffer and provides it with IO traits implementations,
it has both a futures variant and
a blocking std variant.
Mixing async libraries and mixing executors
Going back to mixing types and traits, this brings us to the broader topic of mixing asynchronous frameworks in general. Simply put, it should be avoided at all costs, and doing that might have unforeseen consequences.
It is a particularly dangerous idea to mix executors (such as, to call async code inside a non-async function
which is called by an async function, it might be tempting to do something like futures::block_on). While
this may work in some cases, it is likely at the cost of a reduced performance. However, it is important to
note that an executor is a blocking task, so if you are, for example, trying to read from an async channel,
which is being fed with a future on another executor, if both of them are on the same OS thread, you will
dead-lock (since the producer future is never advanced).
Multi-threaded executors also often use settings fine-tuned for the host machine (such as, spawning a worker thread for each CPU core or physical thread), so it is better to only run one executor.
Tokio tasks, spawn() and spawn_blocking()
In the previous chapter, we spoke about how spawning OS threads is unsuitable for workloads involving many small tasks because of the overhead involved in thread creation, and how concurrent programming deal with this issue.
Tokio (and other async frameworks) provide means of dividing workload into logical units that the executors schedule. Within the context of tokio, these are called tasks. If you are familiar with the concept of a green-thread, you can liken a task to green-threads.
In Tokio, this is how you create a task:
#![allow(unused)] fn main() { use tokio::task; task::spawn(async { // perform some work here... }); }
Much like std::thread::spawn, the function task::spawn
returns a JoinHandle, which you can use to either ensure a graceful shutdown (making all tasks have ended by joining
all task handles), join a couple finite tasks, or, if your task produces a tangible value, then to capture that value.
Joining a task is fallible, with the JoinError, which
lets you discern, what went wrong, usually, you can run into cancelled tasks or tasks that panicked.
There exists a similar function called task::spawn_blocking.
This is useful for CPU-intensive computations or blocking operations. While the API is pretty much the same, Tokio will
send this task to a special blocking thread, where blocking is acceptable. You can still join it or await a result value
just the same.
If your code in a task can diverge into a blocking path where you are sure that this will be the case until it finishes or program ends, you can opt to have the async worker thread your task is currently running on be transitioned into a blocking thread, which can save you some performance by avoiding context switches:
#![allow(unused)] fn main() { use tokio::task; let result = task::block_in_place(|| { // do some compute-heavy work or call synchronous code "blocking completed" }); assert_eq!(result, "blocking completed"); }
The Tokio scheduler is cooperative, as opposed to preemptive, such as the one that schedules threads in your operating
system. This means that a task is never preempted (paused forcefully with the scheduler switching to another), so a Tokio
task will run until it yields, which is an indication to the scheduler that it currently cannot continue executing. Tokio
library functions and types generally have periodical explicit yield points to lessen the risk that a task starves another,
however, if you are in a situation where you want to give absolute priority to a task, you may opt-out of cooperative scheduling
with task::unconstrained:
#![allow(unused)] fn main() { use tokio::{task, sync::mpsc}; let fut = async { let (tx, mut rx) = mpsc::unbounded_channel(); for i in 0..1000 { let _ = tx.send(()); // This will always be ready. If coop was in effect, this code would be forced to yield // periodically. However, if left unconstrained, then this code will never yield. rx.recv().await; } }; task::unconstrained(fut).await; }
In the unlikely event that you have a future which is not Send, and thus cannot be scheduled on a different OS thread
than the one it has been created on, you can make a task out of it with task::spawn_local():
use std::rc::Rc; use tokio::task; #[tokio::main] async fn main() { let unsend_data = Rc::new("my unsend data..."); let local = task::LocalSet::new(); // Run the local task set. local.run_until(async move { let unsend_data = unsend_data.clone(); task::spawn_local(async move { println!("{}", unsend_data); // ... }).await.unwrap(); }).await; }
Rc cannot be used where Send + Sync is required because it contains non-atomic counters,
and sharing it across threads carries the risk of data races.
Tokio console
Just like on a system with many processes running, it may be useful to expect what are these processes doing, it may also be useful to inspect the asynchronous tasks running in a large application. Tokio in this aspect has a clear edge over most other async frameworks by providing tokio-console.
This allows you to inspect tasks, their state, duration they've been running or idle for, and other information.
Check out this link to see how the console works and how to use it with a Tokio project: https://docs.rs/tokio-console/latest/tokio_console/#using-the-console
Shared state patterns
Concurrent and parallel programs both share the headache of properly sharing state between tasks (and/or threads). Luckily, the solution is similar for both:
- Guarding the shared state with synchronization primitives such as Mutex or RwLock
- Using lock-free data structures
- Spawning a task (thread) to manage the task and using message passing via a mpmc or mpsc channels.
Some considerations must be taken:
- Be wary that you don't dead lock your application with Mutexes and RwLocks
- Channels that have an async API work the best for async applications,
flumeis highly-recommended, as Tokio-provided channels have worse performance
Cancelling tasks and destructors
It can often occur in async applications that a future is no longer needed and waiting on it would be a waste of resources. We spoke about this already in the previous chapter.
You might be asking how to do proper cleanup, when a future is cancelled and does not know it is being cancelled. The answer is easy, and that is to follow the RAII pattern, which is ever-present in Rust.
Simply put relevant bits of your cleanup code into the Drop implementations on your types, and you can be sure that it will run.
Sleeping
Much like you can suspend a thread by making it sleep, the same is available for Tokio tasks. The analogue of std::thread::sleep
is tokio::time::sleep.
use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() { sleep(Duration::from_millis(100)).await; println!("100 ms have elapsed"); }
A couple considerations need to be taken:
- Setting a duration of 100 milliseconds doesn't mean your task will continue after 100ms precisely. The executor may decide to poll your future later. Therefore, this shouldn't be used for tasks that require high-resolution timers
- This will not sleep the thread the future was running on, but merely make the future claim to be not ready until the time has advanced enough.
Blocking tasks
One of the specifics of Tokio is that its threadpool supports both asynchronous and synchronous threads. As such, it is possible to move an expensive blocking to another thread and await it from a future that is scheduled by the runtime.
The tool for this is tokio::task::spawn_blocking.
This will run a closure on a thread where blocking is acceptable.
However, keep in mind that it is impossible to cancel a blocking task, when you shut down the executor, it will wait
indefinitely for these tasks to finish. You can use a timeout to not do that. All in all, it is a good idea to not run
indefinite tasks with spawn_blocking
#![allow(unused)] fn main() { use tokio::task; let res = task::spawn_blocking(move || { // do some compute-heavy work or call synchronous code "done computing" }).await?; assert_eq!(res, "done computing"); }
When not to use Tokio
There are a couple domains, where Tokio is not the optimal solution, or simply provides little benefit.
In general, Tokio is best suited for places where you need to do many things at the same time, and inversely, least suitable for tasks, where you need to do one thing at once with optimal performance. This includes many utility type programs, mathy applications, or domains such as text processing.
Speaking of performance, Tokio is unsuited for speeding up CPU-bound tasks by spreading them across multiple threads.
You are better of using rayon or another thread-pool computation library.
Also, you need to evaluate the fact that using async may introduce extra complexity in very simple programs, so you should weigh the benefits of introducing Tokio into your project before jumping in head first.
The Project - A KV store client
For this project, we are going to draw inspiration from the tutorial one provided by Tokio itself, which already serves as a great example:
https://tokio.rs/tokio/tutorial/setup
Your task is to implement the client according to the tutorial. For this project, try to at least get down these two server commands:
GETSET
If the project is too time consuming, it is okay to skip the PUBLISH and SUBSCRIBE commands.
1. The Braiins spin
Your project should be developed in accordance with the Braiins standard, and in accordance with the best developer practices:
- Code should be cleanly organized
- Add logging with tracing to what is happening clear
- Formatted with rustfmt, and producing no lints when ran through clippy
2. The Final product
In the end you should be left with a well prepared project, that has the following:
- full functionality
- documented code (where applicable)
- clean git history that does not contain fix-ups, merge commits or malformed/misformatted commits