For most people, asynchronous rust is as simple as cargo add tokio -F full, and then adding #[tokio::main] to your main function. Then, wham bam alakazam, you’re running rust asynchronously - you’ve got tasks and polling and all the other wonders of async rust.

If you’ve ever felt particularly curious, you might’ve chosen to cargo expand, and you’d find something like this:

fn main () {
    let body = async {
        {
            ::std::io::_print(format_args!("Hello, world!\n"));
        };
    };
    #[allow(clippy::expect_used, clippy::diverging_sub_expression)]
    {
        return tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .build()
            .expect("Failed building the Runtime")
            .block_on(body);
    }
}

Here, we can see that it just starts a runtime with some custom options - if you add options to tokio::main, then it’ll change stuff here, but it doesn’t really tell us anything interesting.

If you then try to find the source code, it doesn’t really explain all that much - you can dig into the tokio source code, and you’ll eventually find Runtime::block_on which’ll eventually lead to CurrentThread or one of the other executors. The problem is that they’re not the easiest to read for a beginner1?

My goal for this essay is to try to explain how an executor works by building the most basic, least performant executor you will have ever seen but we’ll build it step-by-step and hopefully you’ll be able to understand other executors better afterwards.

For reading this, I’d like a intermediate understanding of Rust, and a pretty strong understanding of what a future is - if you have the former but not the latter, I can enthusiastically Amos’ article on futures because it really gets to the core of the matter in a way I aim to replicate.

What does an async executor do, anyways?

I’m going to start this essay much the same as how I started my allocators essay - that is that I’ll try to find a standard library trait (or traits) which describe the required behaviour.

The easiest place to start is to look at std::future::Future - we aren’t necessarily going to be writing any, but I know that we’ll definitely be calling them:

pub trait Future {
    type Output;

    // Required method
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

Here, we can see that when we call the poll method, we need to provide a std::task::Context with an anonymous lifetime - if we go to check that, we can see that it just provides access to a std::task::Waker which it gets from the constructor.

A Waker allows a task to wake itself back up and encapsulates a std::task::RawWaker which holds two pointers:

pub struct RawWaker {
    data: *const (),
    vtable: &'static RawWakerVTable,
}

The data pointer holds pretty much any data that the executor needs to hold - it’s expected to be used as an equivalent of std::any::Any2.

std::task::RawWakerVTable holds a list of function pointers, and if we check the documentation we can see that each function takes in a *const () which always uses data from the RawWaker, and then each function has the following purpose:

FunctionPurpose
cloneThis function gets used to create a new RawWaker for use when we clone a Waker.
wakeThis function is called when wake is called on the Waker and should wake up the relevant task. This also should consume the Waker and so needs to deal with any relevant cleanup (eg. freeing memory).
wake_by_refThis function is the same as wake but doesn’t consume the Waker, so doesn’t need to deal with cleanup.
dropThis is called when the Waker is dropped and has to Drop the contents of the WakerData.

So, we now know roughly what our Waker needs, so let’s think about what else an async runtime could need.

What else?

We probably need a list of tasks to poll and a list of results. I think it’ll be easiest to look at these as we go, so we’ll dive straight in.

Executor Time!

I haven’t built an async executor before, so this essay is more of the come explore with me variety, rather than the guided tour of my thesis variety. We’ll start with a relatively simple single-threaded executor and then see what happens from there.

First, we’ll begin a new cargo project with cargo new --bin basic_async_executor, and then open that project with whatever editor you think is best.

We can then start by laying out our definition for our executor.

use std::future::Future;
use std::sync::mpsc::{Receiver, Sender, channel};
use std::pin::Pin;

pub type BoxedFuture<T> = Pin<Box<dyn Future<Output = T>>>;

pub struct Executor {
    tasks_to_poll: Vec<BoxedFuture<u32>>,
    tasks_receiver: Receiver<BoxedFuture<u32>>, 
    tasks_sender: Sender<BoxedFuture<u32>>,
}

impl Default for Executor {
    fn default() -> Self {
        let (tx, rx) = channel();
        Self {
            tasks_to_poll: vec![],
            tasks_sender: tx,
            tasks_receiver: rx,
        }
    }
}

Here, you can see that we start with some imports - Future, std::pin::Pin and channels. Even though this is all single-threaded we need to use the synchronisation primitives because of the method signatures in std::task::RawWakerVTable - they all have the data as a non-mutable object, so we have to use interior mutability which is easiest with the synchronisation primitives we have in the standard library.

We then also define a type alias to not have to write out the whole future definition every time, which just makes life a bit easier. For our boxed future, we also need to have wrapped it in Pin. This is because of the way that Futures work, and I could try to explain it but I think I’d probably do it poorly and so will defer to Amos’ Article.

We then define a struct for our actual executor - a list of tasks that we need to poll, and a receiver for tasks to add themselves to the list of tasks to be polled. They both have the concrete type of u32 because that can communicate a fair few distinct values and it makes it easier not to have to deal with multiple types yet. As I mentioned earlier, we’re not trying to make a good one, we’re trying to make one that works.

Our basic event loop will go something like this:

  1. Check the receiver for tasks to poll.
  2. Loop through the tasks we need to poll.
  3. For each task, poll it.
  4. For each task, if it is pending, continue on.
  5. For each task, if it is ready, then just print out the result.
  6. Repeat ad infinitum.

Since that should be relatively simple, we can also write out that loop:

impl Executor {
    pub fn block_on(mut self, future: BoxedFuture<u32>) {
        self.tasks_to_poll.push(future);

        loop {
            for task in self.tasks_receiver.try_iter() {
                self.tasks_to_poll.push(task);
            }

            for mut task in std::mem::take(&mut self.tasks_to_poll) {
                let cx = todo!();
                match task.as_mut().poll(&mut cx) {
                    Poll::Ready(res) => println!("Received Result: {res}"),
                    Poll::Pending => {}
                }
            }
        }
    }
}

Here, we’ve now got a skeleton for our basic block_on function, with the only missing bit being our context, which will probably be the more difficult part.

Context

First, I think the best place to start would be to create the data for each waker that lives inside the *const ().

#[derive(Clone)]
pub struct WakerData;

As you can see, it’s currently empty because it isn’t yet clear what data we’ll need.

Now we need to do the functions for our VTable. We need to pay pretty careful attention here because it’s unsafe to call the functions that make the VTable, where the invariant is that all of these functions need to be thread-safe. Here, we can follow that by making sure to use the synchronisation primitives.

use std::task::RawWakerVTable;

const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);

unsafe fn clone (data: *const ()) -> RawWaker {
    todo!()
}

unsafe fn wake (data: *const ()) {
    todo!()
}

unsafe fn wake_by_ref (data: *const ()) {
    todo!()
}

unsafe fn drop (data: *const ()) {
    todo!()
}

Remember when I mentioned that I was learning at the same time? Well we’ve come across our first problem here - we need to be able to wake up a task using only the data in our WakerData. The first thing that comes to mind is the idea that we could have each WakerData store an index into the list of tasks, and our sender sends that index rather than the entire future. This is because we can’t send a partially completed future from inside the future, where we can’t access it.

Following Rust best practices, we’ll use an enum rather than a boolean for by ref versus owned - I’m not normally a shill for mindlessly following best practices but this one makes 100% sense to me. It makes it far easier to see in github diffs and to know what the code does at a glance.

use std::task::Poll;

pub struct Executor {
    tasks_to_poll: Vec<Option<BoxedFuture<u32>>>,
    tasks_receiver: Receiver<usize>, 
    tasks_sender: Sender<usize>,
}

impl Default for Executor {
    fn default () -> Self {
        let (tasks_sender, tasks_receiver) = channel();
        Self {
            tasks_to_poll: Vec::new(),
            tasks_sender,
            tasks_receiver
        }
    }
}

impl Executor {
    pub fn block_on(mut self, future: BoxedFuture<u32>) {
        self.tasks_to_poll.push(Some(future));

        loop {
            for index in self.tasks_receiver.try_iter() {
                if index >= self.tasks_to_poll.len() {
                    panic!("index out of bounds");
                }

                let cx = todo!();

                if let Some(task) = &mut self.tasks_to_poll[index] {
                    match task.as_mut().poll(&mut cx) {
                        Poll::Ready(res) => {
                            println!("[executor] got: {res}");
                            self.tasks_to_poll[index] = None;
                        },
                        Poll::Pending => {}
                    }
                }
            }
        }
    }
}

Here you can see that I’ve moved to a list of Options where each can hold a future, and that future gets removed when the task finishes. We technically don’t need to do this, but it allows the program to free the memory used by those tasks and ensures we don’t poll a task after completion.

We’ll also need to add the index to the waker data, as well as a use statement for the RawWaker parts we’ll be doing next.

use std::task::RawWaker;

#[derive(Clone)]
pub struct WakerData {
    tasks_sender: Sender<usize>,
    id: usize,
}

Now, we can get back to our VTable.

Clone

Clone should be relatively easy - we can just make a new RawWaker relatively easily. However, we do need to be careful on a few fronts to try to avoid undefined behaviour:

  1. We need to make sure that we create a new Sender - this avoids freeing the same Sender multiple times.
  2. We need to ensure that the WakerData inside data does not get dropped as clone should only read the original, not drop it.
  3. We need to make sure that the new data gets sent to the heap to make sure it will not just be dropped at the end of the function.
unsafe fn clone (data: *const ()) -> RawWaker {
    let old_data = std::ptr::read(data as *const WakerData);
    let new_data = old_data.clone(); //cloning clones all the interior contents, creating a new `Sender`
    std::mem::forget(old_data);

    let boxed = Box::new(new_data);
    let raw_ptr = Box::into_raw(boxed);

    RawWaker::new(raw_ptr as *const (), &VTABLE)
}

Wake & Wake By Ref

These are pretty easy - we just need to read in the pointer and send the task id in the sender. However, we also need to remember to std::mem::forget the data to ensure that it doesn’t drop the contents of the pointer and make it invalid. For our implementation, there isn’t really any difference between wake and wake_by_ref.

unsafe fn wake(data: *const ()) {
    let data = std::ptr::read(data as *const WakerData);
    data.tasks_sender
        .send(data.id)
        .expect("unable to send task id to executor");
    std::mem::forget(data);
}

unsafe fn wake_by_ref(data: *const ()) {
    let data = std::ptr::read(data as *const WakerData);
    data.tasks_sender
        .send(data.id)
        .expect("unable to send task id to executor");
    std::mem::forget(data);
}

Drop

For Drop, we just use the already existing std::ptr::drop_in_place method. We could just read in the data and drop it manually, but this has a few advantages as listed in the documentation.

unsafe fn drop (data: *const ()) {
    std::ptr::drop_in_place(data as *mut WakerData);
}

Virtual Function Table

We can finally then create the virtual function table. Here, I’m using a const variable as the constructor for RawWakerVTable is const and it just contains function pointers and so this makes it easiest to use.

pub const VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);

Creating the Context

However, we still need to create the context to poll with. The best way to do this is to work backwards through the documentation to work out what we need.

  1. We need to finish with a Context, which seems to have a constructor method of Context::from_waker which takes in a Waker.
  2. Waker has Waker::from_raw which takes in a RawWaker. This is an unsafe function, where the invariant is that we need to make sure that our RawWaker and our RawWakerVTable are all sound and follow their invariants.
  3. RawWaker has a constructor which takes in a pointer to some WakerData and our VTABLE.

So, we can then use this code block inside of our block_on loop to create our Context! When you add this, don’t forget to add an use std::task::{Waker, Context} to the top.

let waker_data = WakerData {
    id: index,
    tasks_sender: self.tasks_sender.clone(),
};
let boxed_waker_data = Box::new(waker_data);
let raw_waker_data = Box::into_raw(boxed_waker_data);

let raw_waker =
    RawWaker::new(raw_waker_data as *const WakerData as *const (), &VTABLE);
let waker = unsafe { Waker::from_raw(raw_waker) };
let mut cx = Context::from_waker(&waker);

Similar to the clone method, we need to make sure to put the WakerData onto the heap to make sure it doesn’t get double-freed.

Creating a Future

We can then create a very basic future that immediately returns to test our executor:

struct BasicFuture;

impl Future for BasicFuture {
    type Output = u32;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Poll::Ready(123)
    }
}

And create a main method to run it:

fn main () {
    let fut = BasicFuture;
    let exec = Executor::default();
    exec.block_on(Box::pin(fut));
}

We then get this output:

[executor] Received Result: 123

And the program hangs forever. This however, we can easily fix! Since we know that once our main future (the one we called block_on with) is finished, all tasks are finished, we can just break the loop when that one finishes!

Just put this in our Poll::Ready branch, and make sure to label the loop.

if index == 0 {
    break 'main;
}

Here, I’ve used the 'main label, but you could use anything - this is to make sure that it breaks the right loop, as break breaks the inner-most loop by default.

Creating a more complicated future

Whilst that future works, we can also create more complicated futures to actually test whether things are actually running concurrently. For a basic timer future, we can use this one.

struct TimerFuture {
    start: Option<Instant>,
    time: Duration,
    timeout_ms: u32
}

impl TimerFuture {
    pub fn new (timeout_ms: u32) -> Self {
        Self {
            start: None,
            timeout_ms,
            time: Duration::from_millis(timeout_ms as u64)
        }
    }
}

impl Future for TimerFuture {
    type Output = u32;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.start {
            None => {
                self.start = Some(Instant::now());
            },
            Some(x) => if x.elapsed() >= self.time {
                return Poll::Ready(self.timeout_ms);
            }
        }

        cx.waker().wake_by_ref();
        Poll::Pending
    }
}

We can also then add the futures crate and this snippet which creates three of our futures, and then prints how long it takes for them all to run at the same time. If we’re doing actual concurrency, they should hopefully only take roughly as long as the longest timeout.

use std::time::{Instant, Duration};

fn main() {
    Executor::default().block_on(Box::pin(async move {
        println!("[main] Creating futures");

        let one = TimerFuture::new(250);
        let two = TimerFuture::new(500);
        let three = TimerFuture::new(750);

        println!("[main] Created all");

        let t = Instant::now();
        let res = futures::join!(one, two, three);
        let elapsed = t.elapsed();
        println!("[main] Got {res:?} in {elapsed:?}");

        0
    }));
}

And if we run that, we then get:

[main] Creating futures
[main] Created all
[main] Got (250, 500, 750) in 750.380037ms
[executor] Received Result: 0

Yes! We now have a basic executor that works!

But is it safe?

So, all of the code looks relatively safe but we still have to check. My tool for that is miri, so let’s cargo +nightly miri run and we get the exact same result without any complaints!

Running multiple tasks at once

Right now, we’re only running our main task, which works well, but also means that we have to use the futures machinery to run multiple futures at once. Right now, all of our work with keeping track of multiple tasks is also all going to waste!

So, we need to fix that.

What will we need?

  1. We need to move the task execution to a new thread - this way we can add more tasks whilst tasks are running.
  2. We need a way to add tasks to the executor.

Moving tasks to a new thread

For this, we basically just need to move most of the Executor block_on logic to a new thread, so let’s change that method. For now, we’ll also change it to not take in a task and we can call it a start method.

For reference, this is our current code:

pub fn block_on(mut self, future: BoxedFuture<u32>) {
    self.tasks_to_poll.push(Some(future));
    self.tasks_sender.send(0).unwrap();

    'main: loop {
        for index in self.tasks_receiver.try_iter() {
            if index >= self.tasks_to_poll.len() {
                panic!("index out of bounds");
            }

            let waker_data = WakerData::new(self.tasks_sender.clone(), index);
            let boxed_waker_data = Box::new(waker_data);
            let raw_waker_data = Box::into_raw(boxed_waker_data);

            let raw_waker =
                RawWaker::new(raw_waker_data as *const WakerData as *const (), &VTABLE);
            let waker = unsafe { Waker::from_raw(raw_waker) };
            let mut cx = Context::from_waker(&waker);   

            if let Some(task) = &mut self.tasks_to_poll[index] {
                if let Poll::Ready(res) = task.as_mut().poll(&mut cx) {
                    println!("[executor] Received Result: {res}");
                    self.tasks_to_poll[index] = None;

                    if index == 0 {
                        break 'main;
                    }

                }
            }
        }
    }
}

And our first try at moving stuff around gets us this:

pub fn start(&self) {
    std::thread::spawn(|| {
        'main: loop {
            for index in self.tasks_receiver.try_iter() {
                if index >= self.tasks_to_poll.len() {
                    panic!("index out of bounds");
                }
                println!("[executor] polling {index}");

                let waker_data = WakerData::new(self.tasks_sender.clone(), index);
                let boxed_waker_data = Box::new(waker_data);
                let raw_waker_data = Box::into_raw(boxed_waker_data);

                let raw_waker =
                    RawWaker::new(raw_waker_data as *const WakerData as *const (), &VTABLE);
                let waker = unsafe { Waker::from_raw(raw_waker) };
                let mut cx = Context::from_waker(&waker);   

                if let Some(task) = &mut self.tasks_to_poll[index] {
                    if let Poll::Ready(res) = task.as_mut().poll(&mut cx) {
                        println!("[executor] Received Result: {res}");
                        self.tasks_to_poll[index] = None;

                        if index == 0 {
                            break 'main;
                        }
                    }
                }
            }
        }
    });
}

However, we immediately encounter problems. Firstly - we get told that our futures cannot be sent across threads:

error[E0277]: `(dyn futures::Future<Output = u32> + 'static)` cannot be sent between threads safely
    --> src/main.rs:41:28
     |
41   |           std::thread::spawn(|| {
     |  _________------------------_^
     | |         |
     | |         required by a bound introduced by this call
42   | |             'main: loop {
43   | |                 for index in self.tasks_receiver.try_iter() {
44   | |                     if index >= self.tasks_to_poll.len() {
...    |
71   | |             }
72   | |         });
     | |_________^ `(dyn futures::Future<Output = u32> + 'static)` cannot be sent between threads safely
     |
     = help: the trait `std::marker::Send` is not implemented for `(dyn futures::Future<Output = u32> + 'static)`, which is required by `{closure@src/main.rs:41:28: 41:30}: std::marker::Send`

This is because we are trying to move all of our tasks from one thread to another, and to do that they need to implement std::marker::Send. That one is relatively easy to fix - we just need to change our BoxedFuture to have a Send bound, like this:

pub type BoxedFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;

Our next error is then:

error[E0521]: borrowed data escapes outside of method
  --> src/main.rs:41:9
   |
40 |       pub fn start(&self) {
   |                    -----
   |                    |
   |                    `self` is a reference that is only valid in the method body
   |                    let's call the lifetime of this reference `'1`
41 | /         std::thread::spawn(|| {
42 | |             'main: loop {
43 | |                 for index in self.tasks_receiver.try_iter() {
44 | |                     if index >= self.tasks_to_poll.len() {
...  |
71 | |             }
72 | |         });
   | |          ^
   | |          |
   | |__________`self` escapes the method body here
   |            argument requires that `'1` must outlive `'static`

This one is that we borrow things from self, but that the thread must have a 'static lifetime.

The easiest way to fix this one3 is to change the way that our start function works - we can make it into a constructor, rather than just a method. That way, we can let the thread own things like tasks_to_poll and the tasks_receiver and we don’t need to worry about borrowing and other issues.

Making start a constructor

Firstly, we need to decide on what our Executor will hold and not hold.

Currently, we have this:

pub struct Executor {
    tasks_to_poll: Vec<Option<BoxedFuture<u32>>>,
    tasks_receiver: Receiver<usize>, 
    tasks_sender: Sender<usize>,
}

We already decided to move tasks_to_poll, tasks_receiver and tasks_sender to the thread, and since we’re now using a thread we’ll hold onto a std::thread::JoinHandle.

That gives our new definition as:

use std::thread::JoinHandle;

pub struct Executor {
    thread: JoinHandle<()>
}

We can then finish off our new constructor:

    pub fn start() -> Self {
        let thread = std::thread::spawn(move || {
            let mut tasks_to_poll: Vec<Option<BoxedFuture<u32>>> = vec![];
            let (tasks_sender, tasks_receiver) = channel();

            //snip: executor loop, with all `self.`s removed
        });

        Self { thread }
    }

One slight issue now - we can’t actually add any new tasks! For that, we’ll need another Sender/Receiver pair.

Sending tasks

Firstly, we’ll add the Sender to our struct definition:

pub struct Executor {
    thread: JoinHandle<()>,
    new_task_sender: Sender<BoxedFuture<u32>>
}

Then add a function for sending tasks when we get them:

impl Executor {
    pub fn add_task (&self, f: impl Future<Output = u32> + Send + 'static) {
        self.new_task_sender.send(Box::pin(f)).expect("unable to send new task to executor");
    }
}

We can even use some a fun impl Trait to allow users to pass in any future to be used and then we deal with Boxing/Pinning it. As with BoxedFuture, we need to add the Send bound and here we also need to add the 'static bound to make sure that the task can live forever as we’ll be sending it to a different thread with no idea when we’ll be executing it.

We then just need to add some more logic to our start function to add any new tasks. This logic also needs to send those tasks to the tasks_sender to ensure they get polled!

impl Executor {
    pub fn start() -> Self {
        let (new_task_sender, new_task_receiver) = channel();
        let thread = std::thread::spawn(move || {
            let mut tasks_to_poll: Vec<Option<BoxedFuture<u32>>> = vec![];
            let (tasks_sender, tasks_receiver) = channel();

            'main: loop {
                for task in new_task_receiver.try_iter() {
                    let index = tasks_to_poll.len();
                    tasks_to_poll.push(Some(task));
                    tasks_sender.send(index).unwrap();
                }

                //snip: polling loop
            }
        });
        
        Self { thread, new_task_sender }
    }
}

Time to test!

If we then add a few simple futures to our main function, let’s see what happens!

fn main() {
    let executor = Executor::start();
    executor.add_task(async {0});
    executor.add_task(async {1});
}

These futures don’t do anything vaguely interesting, but we should see some print statements with their results if this works.

$ cargo run
# snip: warnings from cargo
$

Nothing!

Nothing?

First step when things don’t seem to work? Print statements! Let’s add one at each ‘stage’ of our main function to see how far it gets.

fn main() {
    println!("[main] start");
    let executor = Executor::start();
    println!("[main] started executor");
    executor.add_task(async {0});
    executor.add_task(async {1});
    println!("[main] added tasks");
}

And then if we run it:

$ cargo run
# snip: warnings from cargo
[main] start
[main] started executor
[main] added tasks
$

So it appears that our main function is all running, but we aren’t getting anything from our executor. I’ve got a hunch, so I’m going to add some print statements to our thread:

impl Executor {
    pub fn start() -> Self {
        let (new_task_sender, new_task_receiver) = channel();
        let thread = std::thread::spawn(move || {
            println!("[executor] thread started");
            let mut tasks_to_poll: Vec<Option<BoxedFuture<u32>>> = vec![];
            let (tasks_sender, tasks_receiver) = channel();
            println!("[executor] channels created");

            'main: loop {
                println!("[executor] loop");
                //snip: adding tasks from new_tasks_receiver
                println!("[executor] got new tasks");
                //snip: polling
            }
        });

        Self { thread, new_task_sender }
    }
}

And then:

$ cargo run
# snip: warnings from cargo
[main] start
[main] started executor
[main] added tasks
$

It looks like our thread is never actually running. If we check the docs for thread::spawn, it looks like our thread has become detached. This means that our program starts the thread, and runs all of the rest of the logic before our thread can do anything and then exits.

That means that we need to join the thread at the end of the program. We unfortunately cannot use Drop as JoinHandle::join takes self on the thread, but Drop only has &mut self.

impl Executor {
    pub fn join (self) {
        self.thread.join().expect("error joining thread");
    }
}

fn main() {
    println!("[main] start");
    let executor = Executor::start();
    println!("[main] started executor");
    executor.add_task(async {0});
    executor.add_task(async {1});
    println!("[main] added tasks");
    executor.join();
}

And then?

$ cargo run
# snip: cargo warnings
[main] start
[main] started executor
[main] added tasks
[executor] thread started
[executor] channels created
[executor] loop
[executor] got new tasks
[executor] polling 0
[executor] Received Result: 0

Nearly? It looks like only the first task is running…. Which actually makes sense, because when the first task added finishes, we stop the executor 🤦.

Stopping the Executor at the right time

My initial instinct is just to remove the check and break, but then the issue becomes that we then can’t stop the executor.

The next possibility is changing it out for a check whether all the tasks are done. This would work, but it would also mean that if we got through a single cycle of the loop before any tasks were added, it would finish straight away.

I propose we add an atomic variable to allow the executor to stop, and then if that variable is true and all the tasks are finished, we can stop the loop. We can have that variable default to false, and then set it to true when we call Executor::join.

Firstly, the struct definition:

use std::sync::{Arc, atomic::AtomicBool};

pub struct Executor {
    thread: JoinHandle<()>,
    new_task_sender: Sender<BoxedFuture<u32>>,
    can_stop: Arc<AtomicBool>
}

Then the join method:

use std::sync::atomic::Ordering;

impl Executor {
    pub fn join (self) {
        self.can_stop.store(true, Ordering::Relaxed);
        self.thread.join().expect("error joining thread");
    }
}

And finally, a few changes to the constructor:

impl Executor {
    pub fn start () -> Self {
        let (new_task_sender, new_task_receiver) = channel();
        let can_stop = Arc::new(AtomicBool::new(false));

        let executor_can_stop = can_stop.clone();
        let thread = std::thread::spawn(move || {
            //snip: variable construction
            'main: loop {
                //snip: get tasks
                //snip: poll tasks. NB: we removed the check for breaking the loop

                if executor_can_stop.load(Ordering::Relaxed) {
                    if tasks_to_poll.iter().all(Option::is_none) {
                        break 'main;
                    }
                }
            }
        });

        Self { thread, new_task_sender, can_stop }
    }
}

Does it work?

If we run our same main function from earlier, we get:

$ cargo run
[main] start
[main] started executor
[main] added tasks
[executor] thread started
[executor] channels created
[executor] loop
[executor] got new tasks
[executor] polling 0
[executor] Received Result: 0
[executor] polling 1
[executor] Received Result: 1

Which looks all correct to me!

However, I want to test some more complicated futures. I’ve removed some of the print statements and created the following main function to test lots of futures at once.

fn main() {
    let executor = Executor::start();
   
    let create_task = |time, id| async move {
        let fut = TimerFuture::new(time);
        println!("[task {id}] created future");

        let start = Instant::now();
        let res = fut.await;
        let el = start.elapsed();

        println!("[task {id}] awaited future, got {res:?} in {el:?}");

        res
    };

    let task_1 = create_task(150, 1);
    let task_2 = create_task(150, 2);
    let task_3 = create_task(50, 3);
    let task_4 = create_task(200, 4);
    
    let start = Instant::now();
    executor.run(task_1);
    executor.run(task_2);
    executor.run(task_3);
    executor.run(task_4);

    println!("[main] created all tasks, joining executor");
    executor.join();
    let el = start.elapsed();
    println!("[main] joined, took {el:?}");
}

This creates a number of different tasks, some with similar times, some with different times. Ideally, they should all print out and say that they took as long as we would have expected, and they’ll finish in the order 3,1/2,1/2,4. Then, the whole thing should only take about as long as the longest task.

$ cargo run
[main] created all tasks, joining executor
[executor] adding new task @ 0
[executor] adding new task @ 1
[executor] adding new task @ 2
[executor] adding new task @ 3
[task 1] created future
[task 2] created future
[task 3] created future
[task 4] created future
[task 3] awaited future, got 50 in 50.005528ms
[executor] Received 50 from 2
[task 1] awaited future, got 150 in 150.008273ms
[executor] Received 150 from 0
[task 2] awaited future, got 150 in 150.023708ms
[executor] Received 150 from 1
[task 4] awaited future, got 200 in 200.004303ms
[executor] Received 200 from 3
[main] joined, took 200.377115ms

And that’s what we got! Sick!

We can even run it through miri, and whilst it might take longer, it should still work? We can also add the --release flag. This won’t make it any faster (as we’re still in miri), but it will remove some checks like integer overflows that can cause issues if you aren’t being careful.

$ cargo +nightly miri run --release
[main] created all tasks, joining executor
[executor] adding new task @ 0
[executor] adding new task @ 1
[executor] adding new task @ 2
[executor] adding new task @ 3
[task 1] created future
[task 2] created future
[task 3] created future
[task 4] created future
[task 3] awaited future, got 50 in 70.405ms
[executor] Received 50 from 2
[task 1] awaited future, got 150 in 290.625ms
[executor] Received 150 from 0
[task 2] awaited future, got 150 in 346.535ms
[executor] Received 150 from 1
[task 4] awaited future, got 200 in 391.34ms
[executor] Received 200 from 3
[main] joined, took 763.05ms

It took longer, but no task executed faster than we would have expected and crucially we don’t have any memory issues!

Conclusion

Bam! We’ve just finished making our async executor and it actually works. If I can work it out, next time will be running the tasks on multiple threads using a pool of workers.

The final code from this is here.


  1. Read noob - the Tokio developers have a ton of features and are working with performance as a goal, rather than being easy to read for someone without any async background experience. ↩︎

  2. Or in C, a void*↩︎

  3. Or at least the way I’ve done it. ↩︎