Skip to content

Using Tokio

Tokio is a mature asynchronous runtime for Rust. It provides building blocks to write high performance networking applications.

Essentials

Add Tokio to your project's dependencies in Cargo.toml.

toml
# Later, pare down the features from "full" to only what's needed
tokio = { version = "1", features = ["full"] }
# Later, pare down the features from "full" to only what's needed
tokio = { version = "1", features = ["full"] }

Don't use Tokio for

  • CPU-bound applications where you're trying to run expensive operations in parallel to improve performance. Tokio is designed for IO-bound applications where the slowest part of the application is waiting for IO procedures to complete, e.g., network requests.
  • Reading lots of files -> This is a CPU-bound application
  • Sending a single web request

Lessons Learned from Tokio Tutorials

Hello Tokio

Create a new Rust program (binary)

sh
# Both of these commands do the same thing
$ cargo new your-project
$ cargo init your-project
     Created binary (application) `your-project` package
# Both of these commands do the same thing
$ cargo new your-project
$ cargo init your-project
     Created binary (application) `your-project` package

By labelling the main function

rust
#[tokio::main]
async fn main() {
	// code
}
#[tokio::main]
async fn main() {
	// code
}

At compile time this gets converted into

rust
fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        // code
    })
}
fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        // code
    })
}

Spawning

Setting up a server. You can accept inbound TCP sockets with tokio::net::TcpListener and process their streams concurrently with the following logic (where process is implemented elsewhere and does whatever your please with the socket.

rust
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

loop {
    let (socket, _) = listener.accept().await.unwrap();
    tokio::spawn(async move {
        process(socket).await;
    });
}
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

loop {
    let (socket, _) = listener.accept().await.unwrap();
    tokio::spawn(async move {
        process(socket).await;
    });
}

The call to tokio::spawn creates a Tokio task, what they call an asynchronous green-thread (aside: a green-thread is one that is created and managed by the use process, not scheduled by the kernel, these are typically less performant than non-green threads for CPU-bound operations, but can handle more I/O bound operations.)

When you spawn a task in this way you can also return a handle which can be awaited to get the result of the completed task, see https://tokio.rs/tokio/tutorial/spawning.

Tasks in Tokio are very lightweight. Under the hood, they require only a single allocation and 64 bytes of memory. Applications should feel free to spawn thousands, if not millions of tasks.

Bounds
  • A task must have a 'static lifetime, this means that the spawned taks mustn't contain references to data owned outside the task. That's why the move keyword was used in the above example. Otherwise, the data must be shared using synchronization primitiaves such as Arc.
  • A task must implement the Send trait because this allows tokio to move tasks between threads while they are suspended at an await. For more information see https://tokio.rs/tokio/tutorial/spawning.

Shared State

Be aware of the differences between std::sync::Mutex and tokio::sync::Mutex, see https://tokio.rs/tokio/tutorial/shared-state.

Using sharding can be a good way to add throughput to your async Mutexs

Channels

The below snippet is a great example of how to get started with tokio::sync::mpsc::channel.

rust
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send("sending from first handle").await;
    });

    tokio::spawn(async move {
        tx2.send("sending from second handle").await;
    });

    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send("sending from first handle").await;
    });

    tokio::spawn(async move {
        tx2.send("sending from second handle").await;
    });

    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}

I/O

Two do file IO you should implement the AsyncRead and AsyncWrite traits.

Framing

Framing is the process of taking a byte stream and converting it into a stream of frames.

[! Tip] Use Bytes when sending messages over a network. You can use &[u8], but Bytes is simply a wrapper around this is a little extra functionailty and safety.