Introduction to Async

Async code exists to allow a programming language to offer more control in the way that code can run concurrently. A famous example of an async programming language is JS, which can only execute code using 1 thread, but still needs to support allowing multiple streams of work to happen at the same time.

For instance, say you want to make a network request, but while you're communicating with the external service, you want to update the web page with a live updating progress bar.

One way we can accomplish this in a single threaded setup is by allowing multiplexing. That is, allowing one task to temporarily pause to allow another task to resume. Eventually, both tasks will be run to completion, but neither will block the other.

Threading vs Async

I've already hinted at another mechanism to allow concurrency, threads! Rust allows multithreading, so why do we need async code? Well, there's many reasons. Maybe you're writing code on an embedded device that only supports a single threaded operations. Maybe you're working in the Linux kernel itself and don't have access to just create threads as you please. Maybe you just want more control about what tasks can run and in which order, and to not be at the mercy of your OS scheduler.

Whatever your reasoning, async rust should be general enough to support your needs.

Concurrency vs Parallelism

Concurrency and Parallelism are different, but related concepts.

Concurrency is the process of performing tasks out-of-order. For example, if you have two tasks, a simple setup is to perform task 1, then task 2. But if these tasks involve any waiting, then you're wasting time. Instead, concurrency is the idea that you can do some of task 1 and some of task 2 in any order. Eventually both will be complete and the same end goal is reached, but in potentially much less time.

Parallelism is the process of performing tasks at the same time. This would involve multiple workers (CPU cores, services, threads, people etc) performing their single task. Using the example above, tasks 1 & 2 will be started immediately and will complete as soon as possible, however it might take double the resources in the worst case.

Both concepts have the same goal of reducing the time a combination of tasks may take, but in different ways. Parallelism requires more resources, but concurrency can't achieve the same throughput levels.

Async Rust provides you with concurrency. However, most async runtimes make use of thread pools in order to utilise a fixed amount of parallelism. In theory, this lets you achieve as much throughput as possible with the least resources available in your system.

Async Rust

Under the hood, async-await in Rust is syntax sugar that makes it easy to build cooperative-coroutines, unlike the preemptive green threads you might see in Go or Java. This means that you must expliclty mark where a task may pause using .await. The runtime isn't able to pause a task because it's taking a long time. For that reason, you typically have to be careful to not do too much CPU bound work in between .await lines.

Example of Async

use axum::{
    routing::get,
    Router,
};

#[tokio::main]
async fn main() {
    // build our application with a single route
    let app = Router::new().route("/", get(index));

    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}

async fn index() {
    let client = reqwest::Client::new();
    let res = client.post("http://example.com/counter")
        .send()
        .await;

    match res {
        Ok(_) => "Welcome to my site!",
        Err(_) => "Error registering your visit",
    }
}

This is a very typical async application, featuring axum as the HTTP server library, reqwest as the HTTP client library, and tokio as the async runtime.

The key feature here is that each incomming server request happen 'asynchronously'. While each request handler is waiting on the response, it yields control of the thread over to another request that needs to be handled.

Let's break down what that means:

As a request comes in, our server will launch the request handler as a new async 'task'.

#![allow(unused)]
fn main() {
// Task 1:
    let client = reqwest::Client::new();
    let res = client.post("http://example.com/counter")
        .send()
        .await;
        // register interest in sending and receiving data.
        // Our runtime will pause this task

// Task 2:
    let client = reqwest::Client::new();
    let res = client.post("http://example.com/counter")
        .send()
        .await;
        // register interest in sending and receiving data.
        // Our runtime will pause this task

// Task 1:
    // our runtime has monitored the OS and found that the data is now ready,
    // resumes task 1.
    match res {
        Ok(_) => "Welcome to my site!",
        Err(_) => "Error registering your visit",
    }

// Task 2:
    // our runtime has monitored the OS and found that the data is now ready,
    // resumes task 2.
    match res {
        Ok(_) => "Welcome to my site!",
        Err(_) => "Error registering your visit",
    }
}

Paired with multiple threads to fully utilise the CPU cores, a good runtime can achieve hundreds of thousands requests per second, with many many thousands concurrent requests.

Over the course of this book, we will find out how these actually work, from a high level down to some very low level implementation details

What is a runtime

At a high level, in order to run through your async functions, you need a runtime to manage which tasks are ready, as well as getting the tasks processed in an efficient way.

The two most popular runtimes at the time of writing are tokio, smol, and async-std. All three achieve similar end results by utilising work-stealing and IO polling.

Anatomy of a runtime

A runtime is collection of worker threads that share some state between them. At it's core, the runtime will has a shared queue of ready tasks, and possible a shared queue of waiting tasks. The job of the worker is to progress a task forward, poll the OS to see if any other tasks that were waiting for IO are ready, and countdown any waiting timers.

For efficiency, most runtime workers will have their own local queue of tasks, and employ tactics like work distribution and work stealing in order to not starve any workers so they can maintain the maximum parallelism and throughput.

Put into some psuedo-rust-code, this worker process might look like

loop {
    // check if anything new is ready
    timers.count_down();
    os.poll();

    // get a task from the local queue
    let mut task = local_queue.pop_front();

    // if our local queue has no tasks, take some from the global queue
    if task.is_none() {
        local_queue.take_from(&mut global_queue);
        task = local_queue.pop_front();
    }

    // if there still isn't a task available, check the other workers
    for worker in &mut workers {
        if task.is_some() { break }

        local_queue.steal_from(worker);
        task = local_queue.pop_front();
    }

    // if there was a task, run it.
    // otherwise, pause this thread.
    if let Some(task) = task {
        task.progress();
    } else {
        pause();
    }
}

In reality, there's a bit more logic involved to maximise CPU efficiency, but that's the gist of it.

Configuration

You might be working on platforms without access to the std library. This means you don't have access to the OS, or access to threads. In which case, you can simplify that event loop down significantly as you have no global queue or workers to steal from. You also don't need to worry about OS polling or pausing.

Similarly, you might be using a 'thread-per-core' architecture, which removes the potentially expensive work-stealing step, but can reduce throughput in some workloads. This also allows you to run tasks that are !Send (can't be sent between threads). This is the technique employed by actix-rt by utilising Tokio's LocalSet. Glommio is another such runtime with this feature

Choosing a runtime

Unfortunately, while there's a lot of options for runtimes, there's often not a lot of choice you as a user can make.

If you want to use the axum/reqwest libraries as shown in the previous example, you must use tokio. This is because internally these use tokio::spawn, tokio::net and tokio::time modules. All of these must be called in the context of a tokio runtime and will panic if the thread local runtime global is not set.

Some libraries are generic over runtimes. hyper, the HTTP engine that axum and reqwest are based on, is generic over executors using traits like Executor to abstract away the tokio::spawn, and Accept to abstract away the tokio::net::TcpListener.

Similarly, there exists executor-trait and reactor-trait which are used by lapin in order to allow the async runtime to be configured in trait-objects. This requires more allocation overhead but potentially can speed up compile times if you don't need extreme performance.

Because tokio is orders of magnitudes more popular than smol and async-std, there also exists compatibility layers like async-compat which run tokio-based async code within a tokio context. This comes with the added cost that any tokio actions are spawned into an extra single threaded runtime, reducing overall resource efficiency.

Using a runtime

Spawning

Spawning a task is analogous to spawning a thread. It creates a new top-level task with no parent for the runtime to process. This usually returns a JoinHandle, similar again to the threaded version. Awaiting the join handle will pause the current task until the spawned task is complete.

/// Downloads a set of pages concurrently
async fn download_pages(pages: Vec<String>) -> Vec<(String, Vec<u8>)> {
    let mut work = Vec::with_capacity(pages.len());
    for page in pages {
        // spawn each download job into it's own task.
        // this ensures that all the download tasks are run concurrently
        let handle = tokio::spawn(async {
            let bytes = download_page(&page).await;
            (page, bytes)
        });

        // store the handles
        work.push(handle);
    }

    let mut output = Vec::with_capacity(pages.len());
    for handle in work {
        // join lets us get the output of the task
        output.push(handle.await.unwrap())
    }

    output
}

/// Downloads the page contents at the URL
async fn download_page(url: &str) -> Vec<u8> {
    todo!()
}

Depending on the runtime, you may also get the ability to 'abort' or 'cancel' a task. This in theory should remove the task from any queues, but it's never that simple in practice and it usually just sets a flag that this task can be skipped instead.

Given the similar API to thread::spawn, this requires that tasks have a 'static lifetime.

Network

Much like std::net, general purpose runtimes expose their own network primitives. Also, much like how std has Read/Write traits, these runtimes will often have a similar AsyncRead/Write traits

Timers

Finally, most runtimes offer efficient alternatives to std::thread::sleep. This is often extended to provide interval clocks that repeatedly fire, as well as timeouts that can cancel tasks that take too long.

Futures