Using Tokio
Tokio is a mature asynchronous runtime for Rust. It provides building blocks to write high performance networking applications.
Essentials
Add Tokio to your project's dependencies in Cargo.toml.
# Later, pare down the features from "full" to only what's needed
tokio = { version = "1", features = ["full"] }
# Later, pare down the features from "full" to only what's needed
tokio = { version = "1", features = ["full"] }
Don't use Tokio for
- CPU-bound applications where you're trying to run expensive operations in parallel to improve performance. Tokio is designed for IO-bound applications where the slowest part of the application is waiting for IO procedures to complete, e.g., network requests.
- Reading lots of files -> This is a CPU-bound application
- Sending a single web request
Lessons Learned from Tokio Tutorials
Hello Tokio
Create a new Rust program (binary)
# Both of these commands do the same thing
$ cargo new your-project
$ cargo init your-project
Created binary (application) `your-project` package
# Both of these commands do the same thing
$ cargo new your-project
$ cargo init your-project
Created binary (application) `your-project` package
By labelling the main function
#[tokio::main]
async fn main() {
// code
}
#[tokio::main]
async fn main() {
// code
}
At compile time this gets converted into
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
// code
})
}
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
// code
})
}
Spawning
Setting up a server. You can accept inbound TCP sockets with tokio::net::TcpListener
and process their streams concurrently with the following logic (where process
is implemented elsewhere and does whatever your please with the socket.
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
process(socket).await;
});
}
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
process(socket).await;
});
}
The call to tokio::spawn
creates a Tokio task, what they call an asynchronous green-thread (aside: a green-thread is one that is created and managed by the use process, not scheduled by the kernel, these are typically less performant than non-green threads for CPU-bound operations, but can handle more I/O bound operations.)
When you spawn a task in this way you can also return a handle which can be awaited to get the result of the completed task, see https://tokio.rs/tokio/tutorial/spawning.
Tasks in Tokio are very lightweight. Under the hood, they require only a single allocation and 64 bytes of memory. Applications should feel free to spawn thousands, if not millions of tasks.
Bounds
- A task must have a
'static
lifetime, this means that the spawned taks mustn't contain references to data owned outside the task. That's why themove
keyword was used in the above example. Otherwise, the data must be shared using synchronization primitiaves such asArc
. - A task must implement the
Send
trait because this allows tokio to move tasks between threads while they are suspended at an await. For more information see https://tokio.rs/tokio/tutorial/spawning.
Shared State
Be aware of the differences between std::sync::Mutex
and tokio::sync::Mutex
, see https://tokio.rs/tokio/tutorial/shared-state.
Using sharding can be a good way to add throughput to your async Mutexs
Channels
The below snippet is a great example of how to get started with tokio::sync::mpsc::channel
.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
tokio::spawn(async move {
tx.send("sending from first handle").await;
});
tokio::spawn(async move {
tx2.send("sending from second handle").await;
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
tokio::spawn(async move {
tx.send("sending from first handle").await;
});
tokio::spawn(async move {
tx2.send("sending from second handle").await;
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
I/O
Two do file IO you should implement the AsyncRead
and AsyncWrite
traits.
Framing
Framing is the process of taking a byte stream and converting it into a stream of frames.
[! Tip] Use
Bytes
when sending messages over a network. You can use&[u8]
, but Bytes is simply a wrapper around this is a little extra functionailty and safety.