Building python asyncio from scratch (to understand it)

In Python, asyncio is a fundamental library that empowers you to write concurrent code using the async/await syntax. This approach is particularly well-suited for applications that involve a significant amount of waiting for I/O (input/output) operations, such as network requests, file system access, or user input.

Traditional Multithreading vs. Asyncio

  • Multithreading: While multithreading allows for executing multiple tasks concurrently, it can introduce complexity due to the Global Interpreter Lock (GIL) in Python's CPython implementation. The GIL restricts multiple threads from executing Python bytecode simultaneously, potentially negating the benefits of multithreading in I/O-bound scenarios.

  • Asyncio: Asyncio provides a more efficient alternative for I/O-heavy tasks. It employs a single thread that rapidly switches between coroutines (asynchronous functions) as they await I/O completion. This event-driven approach avoids the overhead associated with thread creation and context switching, making it ideal for handling many concurrent I/O operations.

Example usage

import asyncio
import aiohttp

async def fetch_data(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            if response.status == 200:
                data = await response.json()
                return data
            else:
                raise Exception(f"Error: {response.status}")

async def main():
    url = "https://api.example.com/data"
    data = await fetch_data(url)
    print(data)

if __name__ == "__main__":
    asyncio.run(main())

The aiohttp code fetches data from an API asynchronously. It defines an async function fetch_data that makes a GET request and parses the JSON response. The main function calls fetch_data asynchronously and prints the retrieved data. This approach keeps your program responsive while waiting for the API response.

Main concepts

Below is some information about the primary components of asyncio:

  1. Event Loop: This part of asyncio is an essential construct, which oversees the execution of tasks, handling network operations, performing system I/O, and managing subprocesses among other things. It's the very core of any asyncio application.

  2. Coroutines: Coroutines are special functions that allow concurrency through asyncio. They handle the 'heavy lifting' in asyncio applications.

  3. Tasks: Representing an execution unit within the event loop, tasks manage the execution of coroutines and are responsible for their scheduling.

  4. Futures: Futures work as placeholders for the results of tasks, which might not have been computed yet.

  5. Transports and Protocols: These provide a pluggable low-level API for implementing a wide range of protocols.

  6. Streams: This high-level API facilitates creating and managing network connections as streams of data.

  7. Synchronization Primitives: asyncio provides these for managing concurrent operations.

  8. Queues: Essential tools for inter-task communication, queues are part of asyncio's high-level API.

  9. Sub-Processes: asyncio allows for the creation and management of sub-processes and, more importantly, provides interaction with their input/output/error pipes.

  10. Locks: These are part of asyncio's control flow features, which provide an algorithm for the management of access rights, where multiple tasks need access to shared resources.

Here's some syntax so you know how it looks in practice:

import asyncio

# Event Loop Creation
loop = asyncio.get_event_loop()

# Coroutine Function
async def hello_world():
    print("Hello World")

# Running a Coroutine
loop.run_until_complete(hello_world())

# Task Scheduling
task = loop.create_task(hello_world())
loop.run_until_complete(task)

# Future
future = loop.create_future()
asyncio.wait([future])

# Streams
reader, writer = await asyncio.open_connection('localhost', 8888)
data = await reader.read(100)

# Queue
queue = asyncio.Queue()
await queue.put('item')
get = await queue.get()

# Lock
lock = asyncio.Lock()
async with lock:
    # access shared state
    pass

# Close the loop
loop.close()

Under the hood

The main concept in asynchronous python programming is running different functions (coroutines) at the same time. How is it working?

Generators

In order to talk about coroutines in Python, we have to talk about generators first. Generators are a type of iterable defined using a function syntax. They're used to create an iterator in a more memory efficient way, as they generate values on the fly rather than storing them in memory all at once.

Here is a simple generator:

def simple_generator():
    yield 'Hello'
    yield 'World'

Generators are a special kind of function that maintain their state in between invocations. They are implemented using a special kind of stack frame (Frame object), which isn't immediately discarded after the function yields.

In Python, whenever a function is called, a new stack frame is allocated to handle the function's local variables and to remember where to return once the function completes. When the function returns, the stack frame is discarded. With generators, however, this isn't the case.

Here is a simple generator:

def simple_generator():
    yield 'Hello'
    yield 'World'

Let's consider a few points for the above generator:

  1. When the generator function is first called, a generator object is created, and the function's code is not executed. A new Frame object is created to hold the generator's execution context but it's not activated yet.
gen = simple_generator() # Creates Frame object, but nothing is printed
  1. When next() is called on the generator object for the first time, the frame object becomes active, and the code in the generator function runs until it hits the yield statement. The yield statement produces a value, which is returned by the next() call. After the yield, the Frame object's state (local variables, instruction pointer etc.) is saved for later, and control return to the caller, but the frame object still exists in memory.
print(next(gen)) # Activates the Frame object, prints: Hello
  1. Each subsequent call to next() resumes execution in the generator function where it left off (it reactivates the frame with the saved state), and runs until the next yield statement or until the function exits. When the generator function exits, the frame is discarded.
print(next(gen)) # Resumes the Frame object, prints: World
print(next(gen)) # Raises StopIteration exception, Frame object is discarded

This ability to pause and resume execution is what makes generators so useful in asynchronous programming, as it’s the building block that asyncio and other asynchronous I/O libraries in Python use to switch between tasks.

Async/await

The async and await keywords in Python are used to define and work with coroutines. They are part of Python's native support for asynchronous programming introduced in Python 3.5.

  • async: The async keyword is used to declare a function as a coroutine. When called, this function does not run immediately, but rather returns a coroutine object.
async def hello_world():
    print("Hello, World!")
  • await: On the other hand, the await keyword is used inside a coroutine to suspend its execution until the result is available from an awaitable object (like a coroutine, a Task, a Future, etc.). It hands control back to the event loop, which can then switch to other tasks.
async def main():
    await hello_world()  # This won't run until 'hello_world' is complete

The await keyword essentially replaces the yield from syntax provided in Python 3.4 as part of the asyncio library. Before await was introduced, you'd use yield from to yield control from a coroutine:

@asyncio.coroutine
def old_style_coroutine():
    yield from asyncio.sleep(1)

The introduction of async and await provides a clearer, more concise way to write asynchronous code. Importantly, the await expression suspends the coroutine till the awaitable it waits on is finished, transferring the control flow back to the event loop, similarly to how yield from worked but with a syntax dedicated to asynchronous operations.

Yield from

python yield from

Let's look at a simple example. Suppose we have a generator that yields values from multiple sequences:

def chain_generators(*iterables):
    for iterable in iterables:
        for item in iterable:
            yield item


list(chain_generators([1, 2, 3], ["a", "b", "c"]))  # Output: [1, 2, 3, 'a', 'b', 'c']

Now, suppose these iterables were actually generators or even more complex iterables. The chain_generators function would become more complicated because we'd need to manage these generators manually. That's where yield from comes in.

Using yield from, we can replace our nested for loop with a simpler construct:

def chain_generators(*iterables):
    for iterable in iterables:
        yield from iterable

list(chain_generators([1, 2, 3], ["a", "b", "c"]))  # Output: [1, 2, 3, 'a', 'b', 'c']

In this version, yield from iterable replaces the inner loop entirely. The yield from expression will yield all values from the iterable, effectively flattening the sequences.

So, yield from is pausing the execution of current generator until the child generator ends it's execution, then continues.

One more important thing about generators is that not only they can talk to above program, but you can send information into them while executing.

The send() method in a Python generator is a method that is used to send data to a generator function, and send() is also used to resume the generator function's execution from where it left off. This method can be used to manipulate the behavior of a generator at run-time by setting the yield expression's value to the argument of send().

Here's a simple example:

def simple_generator():
    x = yield 'Hello'
    yield x

gen = simple_generator()

print(next(gen))      # Initiate generator, output: Hello

print(gen.send('World'))  # Send a value into the generator, replacing 'x', output: World

When the command next(gen) is executed, the generator begins execution and stops at the yield 'Hello'. At this point, the generator yields 'Hello' and waits for a value to be sent.

In the next line, gen.send('World') sends the string 'World' into the generator, replacing x. The generator then resumes and in the next line it yields x, which is now 'World'.

However, it's important to note that before you send in a value other than None, the generator has to be at a yield expression, so usually you would call next() once to start the generator. If send() is called without the generator having started, a TypeError will be raised.

Event loop

asyncio event loop

The function of the asyncio event loop is to schedule calls, handle system events, and manage the multiplexing of I/O access. Here's a high-level view of how it works:

  1. Initialization and Context: When the event loop is created, it becomes the current event loop for the current execution context. By default, Python's asyncio library creates an event loop that is suitable for the underlying operating system.

  2. Registering Coroutines: When a coroutine is scheduled using loop.create_task(coro()), it's wrapped into a Task object. The task is a subclass of Future which wraps a coroutine and controls its execution. The newly created task is added to the loop's agenda.

  3. Polling for I/O and Other Events: The event loop uses system-dependent libraries like select, poll, epoll or kqueue to poll socket descriptors for I/O availability. It also listens for other types of events such as timeouts or system signals.

  4. Executing Callbacks and Tasks: When an event becomes ready (e.g., a socket has data to be read, a system signal is caught), the event loop executes all callbacks registered for this event. In case a ready event is tied to a Task, the event loop will resume the task's coroutine at the point where it was paused with await.

  5. Looping Back: Once all ready events are processed (callbacks are called, tasks are resumed and run till their next suspension point), the event loop goes back to step 3 - it polls for more I/O and other events.

  6. Stopping the Loop: The loop will break and stop running when explicitly asked to stop using loop.stop(), an exception occurs during handling of tasks/callbacks, or when the loop has run until completion of certain futures/tasks.

We will be implementing a simple but working event loop that is not polling for I/O events with select, but checks all of the events without filtering.

Implementation

Here's what we are building:

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Union, List, NamedTuple, Callable, Generator
from enum import Enum
import time


class FutureStatus(Enum):
    DONE = "DONE"
    SCHEDULED = "SCHEDULED"
    CANCELED = "CANCELED"


class Callback(NamedTuple):
    fn: Callable
    args: List


@dataclass
class Event:
    callback: Callback
    when: Union[datetime, None] = None


class loop:
    events: List[Event]

    def __init__(self) -> None:
        self.events = []

    def call_soon(self, c: Callback):
        self.events.append(Event(c, when=datetime.now()))

    def call_later(self, c: Callback, when: datetime):
        self.events.append(Event(c, when=when))


current_loop: loop = loop()


@dataclass
class Future:
    callbacks: List[Callback] = field(default_factory=list)
    status: FutureStatus = FutureStatus.SCHEDULED

    @property
    def result(self):
        return getattr(self, "_result", None)

    def done(self):
        return self.status != FutureStatus.SCHEDULED

    def set_result(self, value):
        self._result = value
        self.status = FutureStatus.DONE
        self._schedule_callbacks()

    def cancel(self):
        self.status = FutureStatus.CANCELED

    @property
    def canceled(self):
        return self.status == FutureStatus.CANCELED

    def __iter__(self):
        if self.status == FutureStatus.DONE:
            return self.result

        if self.status == FutureStatus.CANCELED:
            return None

        yield self

    def add_done_callback(self, c: Callback):
        self.callbacks.append(c)

    def _schedule_callbacks(self):
        for c in self.callbacks:
            current_loop.call_soon(c)

    __await__ = __iter__


def create_future():
    return Future()


def run(*coros: List[Generator]):
    if len(coros) <= 0:
        return

    results = []

    for coro in coros:
        current_loop.call_soon(Callback(coro.send, [None]))

    while len(current_loop.events):
        current_loop.events.sort(key=lambda i: i.when)
        now = datetime.now()
        current_event = current_loop.events[0]
        print("current event", current_event, f"{now=}")
        if current_event.when <= now:
            try:
                res_or_fut = current_event.callback.fn(*current_event.callback.args)
            except StopIteration as exc:
                res_or_fut = exc.value

            if isinstance(res_or_fut, Future):
                res_or_fut.add_done_callback(
                    Callback(current_event.callback.fn, current_event.callback.args)
                )
            elif res_or_fut is not None:
                results.append(res_or_fut)

            del current_loop.events[0]
        else:
            print(
                "loop sleeping ...", (current_event.when - datetime.now()).seconds + 0.5
            )
            time.sleep((current_event.when - datetime.now()).seconds + 0.5)

    return results


def sleep(delay, result=None):
    """Coroutine that completes after a given time (in seconds)."""

    future = create_future()

    current_loop.call_later(
        Callback(Future.set_result, [future, result]),
        datetime.now() + timedelta(seconds=delay),
    )

    yield from future
    print("slept async", delay, "seconds")
    return future.result


if __name__ == "__main__":
    run(sleep(5), sleep(5), sleep(5))

a very simple version of asyncio that is missing a lot of key features like select calls and much more, but it's working.

Let's go through it.

FutureStatus class: This is an enumeration that holds the three possible states of a Future instance - DONE, SCHEDULED, and CANCELED.

Callback class: This is a simple data structure to hold a reference to a callable and its arguments.

Event class: An event represents a unit in your event loop execution. Each event holds a Callback instance and a time for when the callback should be called.

loop class: An event loop class that method call_soon schedules a callback to be run as soon as possible and method call_later schedules a callback to be run at the given datetime.

current_loop: Represents your currently active event loop. All the scheduling is done on this.

Future Class: A Future represents a computation that hasn’t necessarily completed yet. Futures are used to execute a function or a method later, and then retrieve any exceptions (if there are any) and the result. The future allows you to add callbacks that will be executed when the future resolves.

run function: Function to handle running of tasks queued up inside the event loop until all tasks have been executed. It sorts events based on time they should execute, and sleeps if the next event isn't ready to execute now.

create_future function: Utility function to simplify the creation of a new future.

sleep function: The sleep function is a coroutine (as it yields from a future) that finishes after a set amount of time and sets the provided result as the future’s result.

run invocation in the main part: The last three lines of your script make the program “sleep” three times for five seconds but does so concurrently. So, if you run this program it will print the print message from the sleep function three times after five seconds, not fifteen.

Concurrent http requests

Let's not use this event loop to perform concurrent http requests.

python concurrent http requests

here's my code for a simple non blocking http requests with socket python package:

import socket
from event_loop import Future, Callback, run, current_loop, create_future
from datetime import timedelta, datetime


def request(url):
    # Parse the URL to get the host and path
    host = url.split("/")[2]
    path = "/" + "/".join(url.split("/")[3:])
    print("request to :", path)
    # Create a socket object
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # Connect to the server
    s.connect((host, 80))

    s.setblocking(False)

    # Form the HTTP request
    request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"

    # Send the request
    s.sendall(request.encode())

    # Receive the response
    response = b""
    while True:

        future = create_future()
        current_loop.call_soon(
            Callback(Future.set_result, [future, None]),
        )

        try:
            yield from future
        finally:
            future.cancel()

        print("fetching", url)
        try:
            data = s.recv(1024)
            print("got some data:", data)
        except BlockingIOError:
            future = create_future()

            current_loop.call_soon(Callback(Future.set_result, [future, None]))
            yield from future
        else:
            if not data:
                break
            response += data

    # Close the connection
    s.close()

    return response.decode()

It's creating a non-blocking socket connection to a desired URL and fetching the data.

Here are key parts and their connection to our custom event loop:

  1. request function : This generator function creates a non-blocking socket connection to a server and sends an HTTP GET request. If no data is available to read (a BlockingIOError is raised), the function creates a Future and schedules it to be resolved on the next event loop iteration. After yielding from this Future the flow returns to the event loop. On the next loop iteration, the Future is resolved and execution comes back into the request function to try and read from the socket again.

    This pattern repeats until there's data available to read from the socket, and the read data is appended to the response. This process continues until no more data is available (signifying end of the HTTP response), and the function then returns the response.

  2. Futures : Futures are placeholders for a result that might not be available currently, generally used to represent and manage the state of a Task, which is some scheduled work that has not completed yet. When a Future is awaited (yield from Future), it pauses the execution of the current Task and gives up control to the event loop, which can then run other Tasks until the Future is resolved. The loop gods back to Tasks paused on this Future and resumes them where they left off, effectively creating concurrency in your app.

  3. Event Loop : event loop functions by going through each scheduled "Event" (a Future and a point in time when to complete this Future) and checking if the completion time has come. If the time hasn't come, it waits for the remaining time (essentially putting the entire program "to sleep"). If the time for an Event has indeed come, it uses a Python feature named "callback" to mark the Future as done, thus potentially resuming one or many tasks waiting on this Future.

  4. run Function : It accepts multiple async functions (Tasks), schedules them to be run, and executes the event loop until all tasks have exited, creating a concurrent execution of operations.

I would recommend revisiting this code in its entirety. Understanding the interplay between the task and the event loop is pivotal. The challenging yet most significant detail lies within comprehending how the low-level generator yields control back to the event loop. The event loop, in turn, adds a done callback to the Future that the task yielded. This callback is what continues the task's execution where it left off. Wrapping your head around these nuances may seem challenging, but it's imperative to understanding how this all becomes a cohesive unit.

Here's how I can visualize this for myself (please don't judge my drawings too hard):

python coroutine to event loop flow

Conclusion

This is a simplified, barebones representation of concurrency powered by an event loop, callbacks, and generator-based coroutines. This does not present the full picture of what happens in an actual, full-scale asyncio event loop, which supports perfectly concurrent network IO by means of multiplexing system calls (select, poll, or similar), along with many more features.

1
Subscribe to my newsletter

Read articles from Denis Khodishchenko directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Denis Khodishchenko
Denis Khodishchenko

Python Software Engineer @ Hennessey Digital | AWS Certified Solutions Architect Associate