Building python asyncio from scratch (to understand it)
In Python, asyncio
is a fundamental library that empowers you to write concurrent code using the async/await
syntax. This approach is particularly well-suited for applications that involve a significant amount of waiting for I/O (input/output) operations, such as network requests, file system access, or user input.
Traditional Multithreading vs. Asyncio
Multithreading: While multithreading allows for executing multiple tasks concurrently, it can introduce complexity due to the Global Interpreter Lock (GIL) in Python's CPython implementation. The GIL restricts multiple threads from executing Python bytecode simultaneously, potentially negating the benefits of multithreading in I/O-bound scenarios.
Asyncio: Asyncio provides a more efficient alternative for I/O-heavy tasks. It employs a single thread that rapidly switches between coroutines (asynchronous functions) as they await I/O completion. This event-driven approach avoids the overhead associated with thread creation and context switching, making it ideal for handling many concurrent I/O operations.
Example usage
import asyncio
import aiohttp
async def fetch_data(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data
else:
raise Exception(f"Error: {response.status}")
async def main():
url = "https://api.example.com/data"
data = await fetch_data(url)
print(data)
if __name__ == "__main__":
asyncio.run(main())
The aiohttp code fetches data from an API asynchronously. It defines an async
function fetch_data
that makes a GET request and parses the JSON response. The main
function calls fetch_data
asynchronously and prints the retrieved data. This approach keeps your program responsive while waiting for the API response.
Main concepts
Below is some information about the primary components of asyncio:
Event Loop: This part of asyncio is an essential construct, which oversees the execution of tasks, handling network operations, performing system I/O, and managing subprocesses among other things. It's the very core of any asyncio application.
Coroutines: Coroutines are special functions that allow concurrency through asyncio. They handle the 'heavy lifting' in asyncio applications.
Tasks: Representing an execution unit within the event loop, tasks manage the execution of coroutines and are responsible for their scheduling.
Futures: Futures work as placeholders for the results of tasks, which might not have been computed yet.
Transports and Protocols: These provide a pluggable low-level API for implementing a wide range of protocols.
Streams: This high-level API facilitates creating and managing network connections as streams of data.
Synchronization Primitives: asyncio provides these for managing concurrent operations.
Queues: Essential tools for inter-task communication, queues are part of asyncio's high-level API.
Sub-Processes: asyncio allows for the creation and management of sub-processes and, more importantly, provides interaction with their input/output/error pipes.
Locks: These are part of asyncio's control flow features, which provide an algorithm for the management of access rights, where multiple tasks need access to shared resources.
Here's some syntax so you know how it looks in practice:
import asyncio
# Event Loop Creation
loop = asyncio.get_event_loop()
# Coroutine Function
async def hello_world():
print("Hello World")
# Running a Coroutine
loop.run_until_complete(hello_world())
# Task Scheduling
task = loop.create_task(hello_world())
loop.run_until_complete(task)
# Future
future = loop.create_future()
asyncio.wait([future])
# Streams
reader, writer = await asyncio.open_connection('localhost', 8888)
data = await reader.read(100)
# Queue
queue = asyncio.Queue()
await queue.put('item')
get = await queue.get()
# Lock
lock = asyncio.Lock()
async with lock:
# access shared state
pass
# Close the loop
loop.close()
Under the hood
The main concept in asynchronous python programming is running different functions (coroutines) at the same time. How is it working?
Generators
In order to talk about coroutines in Python, we have to talk about generators first. Generators are a type of iterable defined using a function syntax. They're used to create an iterator in a more memory efficient way, as they generate values on the fly rather than storing them in memory all at once.
Here is a simple generator:
def simple_generator():
yield 'Hello'
yield 'World'
Generators are a special kind of function that maintain their state in between invocations. They are implemented using a special kind of stack frame (Frame object
), which isn't immediately discarded after the function yields.
In Python, whenever a function is called, a new stack frame is allocated to handle the function's local variables and to remember where to return once the function completes. When the function returns, the stack frame is discarded. With generators, however, this isn't the case.
Here is a simple generator:
def simple_generator():
yield 'Hello'
yield 'World'
Let's consider a few points for the above generator:
- When the generator function is first called, a generator object is created, and the function's code is not executed. A new
Frame object
is created to hold the generator's execution context but it's not activated yet.
gen = simple_generator() # Creates Frame object, but nothing is printed
- When
next()
is called on the generator object for the first time, the frame object becomes active, and the code in the generator function runs until it hits theyield
statement. Theyield
statement produces a value, which is returned by thenext()
call. After theyield
, the Frame object's state (local variables, instruction pointer etc.) is saved for later, and control return to the caller, but the frame object still exists in memory.
print(next(gen)) # Activates the Frame object, prints: Hello
- Each subsequent call to
next()
resumes execution in the generator function where it left off (it reactivates the frame with the saved state), and runs until the nextyield
statement or until the function exits. When the generator function exits, the frame is discarded.
print(next(gen)) # Resumes the Frame object, prints: World
print(next(gen)) # Raises StopIteration exception, Frame object is discarded
This ability to pause and resume execution is what makes generators so useful in asynchronous programming, as it’s the building block that asyncio and other asynchronous I/O libraries in Python use to switch between tasks.
Async/await
The async
and await
keywords in Python are used to define and work with coroutines. They are part of Python's native support for asynchronous programming introduced in Python 3.5.
- async: The
async
keyword is used to declare a function as a coroutine. When called, this function does not run immediately, but rather returns a coroutine object.
async def hello_world():
print("Hello, World!")
- await: On the other hand, the
await
keyword is used inside a coroutine to suspend its execution until the result is available from an awaitable object (like a coroutine, a Task, a Future, etc.). It hands control back to the event loop, which can then switch to other tasks.
async def main():
await hello_world() # This won't run until 'hello_world' is complete
The await
keyword essentially replaces the yield from
syntax provided in Python 3.4 as part of the asyncio library. Before await
was introduced, you'd use yield from
to yield control from a coroutine:
@asyncio.coroutine
def old_style_coroutine():
yield from asyncio.sleep(1)
The introduction of async
and await
provides a clearer, more concise way to write asynchronous code. Importantly, the await
expression suspends the coroutine till the awaitable it waits on is finished, transferring the control flow back to the event loop, similarly to how yield from
worked but with a syntax dedicated to asynchronous operations.
Yield from
Let's look at a simple example. Suppose we have a generator that yields values from multiple sequences:
def chain_generators(*iterables):
for iterable in iterables:
for item in iterable:
yield item
list(chain_generators([1, 2, 3], ["a", "b", "c"])) # Output: [1, 2, 3, 'a', 'b', 'c']
Now, suppose these iterables were actually generators or even more complex iterables. The chain_generators
function would become more complicated because we'd need to manage these generators manually. That's where yield from
comes in.
Using yield from
, we can replace our nested for loop with a simpler construct:
def chain_generators(*iterables):
for iterable in iterables:
yield from iterable
list(chain_generators([1, 2, 3], ["a", "b", "c"])) # Output: [1, 2, 3, 'a', 'b', 'c']
In this version, yield from iterable
replaces the inner loop entirely. The yield from
expression will yield all values from the iterable, effectively flattening the sequences.
So, yield from
is pausing the execution of current generator until the child generator ends it's execution, then continues.
One more important thing about generators is that not only they can talk to above program, but you can send information into them while executing.
The send()
method in a Python generator is a method that is used to send data to a generator function, and send()
is also used to resume the generator function's execution from where it left off. This method can be used to manipulate the behavior of a generator at run-time by setting the yield
expression's value to the argument of send()
.
Here's a simple example:
def simple_generator():
x = yield 'Hello'
yield x
gen = simple_generator()
print(next(gen)) # Initiate generator, output: Hello
print(gen.send('World')) # Send a value into the generator, replacing 'x', output: World
When the command next(gen)
is executed, the generator begins execution and stops at the yield 'Hello'
. At this point, the generator yields 'Hello' and waits for a value to be sent.
In the next line, gen.send('World')
sends the string 'World' into the generator, replacing x
. The generator then resumes and in the next line it yields x
, which is now 'World'.
However, it's important to note that before you send in a value other than None, the generator has to be at a yield
expression, so usually you would call next()
once to start the generator. If send()
is called without the generator having started, a TypeError
will be raised.
Event loop
The function of the asyncio event loop is to schedule calls, handle system events, and manage the multiplexing of I/O access. Here's a high-level view of how it works:
Initialization and Context: When the event loop is created, it becomes the current event loop for the current execution context. By default, Python's asyncio library creates an event loop that is suitable for the underlying operating system.
Registering Coroutines: When a coroutine is scheduled using
loop.create_task(coro())
, it's wrapped into aTask
object. The task is a subclass ofFuture
which wraps a coroutine and controls its execution. The newly created task is added to the loop's agenda.Polling for I/O and Other Events: The event loop uses system-dependent libraries like
select
,poll
,epoll
orkqueue
to poll socket descriptors for I/O availability. It also listens for other types of events such as timeouts or system signals.Executing Callbacks and Tasks: When an event becomes ready (e.g., a socket has data to be read, a system signal is caught), the event loop executes all callbacks registered for this event. In case a ready event is tied to a
Task
, the event loop will resume the task's coroutine at the point where it was paused withawait
.Looping Back: Once all ready events are processed (callbacks are called, tasks are resumed and run till their next suspension point), the event loop goes back to step 3 - it polls for more I/O and other events.
Stopping the Loop: The loop will break and stop running when explicitly asked to stop using
loop.stop()
, an exception occurs during handling of tasks/callbacks, or when the loop has run until completion of certain futures/tasks.
We will be implementing a simple but working event loop that is not polling for I/O events with select
, but checks all of the events without filtering.
Implementation
Here's what we are building:
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Union, List, NamedTuple, Callable, Generator
from enum import Enum
import time
class FutureStatus(Enum):
DONE = "DONE"
SCHEDULED = "SCHEDULED"
CANCELED = "CANCELED"
class Callback(NamedTuple):
fn: Callable
args: List
@dataclass
class Event:
callback: Callback
when: Union[datetime, None] = None
class loop:
events: List[Event]
def __init__(self) -> None:
self.events = []
def call_soon(self, c: Callback):
self.events.append(Event(c, when=datetime.now()))
def call_later(self, c: Callback, when: datetime):
self.events.append(Event(c, when=when))
current_loop: loop = loop()
@dataclass
class Future:
callbacks: List[Callback] = field(default_factory=list)
status: FutureStatus = FutureStatus.SCHEDULED
@property
def result(self):
return getattr(self, "_result", None)
def done(self):
return self.status != FutureStatus.SCHEDULED
def set_result(self, value):
self._result = value
self.status = FutureStatus.DONE
self._schedule_callbacks()
def cancel(self):
self.status = FutureStatus.CANCELED
@property
def canceled(self):
return self.status == FutureStatus.CANCELED
def __iter__(self):
if self.status == FutureStatus.DONE:
return self.result
if self.status == FutureStatus.CANCELED:
return None
yield self
def add_done_callback(self, c: Callback):
self.callbacks.append(c)
def _schedule_callbacks(self):
for c in self.callbacks:
current_loop.call_soon(c)
__await__ = __iter__
def create_future():
return Future()
def run(*coros: List[Generator]):
if len(coros) <= 0:
return
results = []
for coro in coros:
current_loop.call_soon(Callback(coro.send, [None]))
while len(current_loop.events):
current_loop.events.sort(key=lambda i: i.when)
now = datetime.now()
current_event = current_loop.events[0]
print("current event", current_event, f"{now=}")
if current_event.when <= now:
try:
res_or_fut = current_event.callback.fn(*current_event.callback.args)
except StopIteration as exc:
res_or_fut = exc.value
if isinstance(res_or_fut, Future):
res_or_fut.add_done_callback(
Callback(current_event.callback.fn, current_event.callback.args)
)
elif res_or_fut is not None:
results.append(res_or_fut)
del current_loop.events[0]
else:
print(
"loop sleeping ...", (current_event.when - datetime.now()).seconds + 0.5
)
time.sleep((current_event.when - datetime.now()).seconds + 0.5)
return results
def sleep(delay, result=None):
"""Coroutine that completes after a given time (in seconds)."""
future = create_future()
current_loop.call_later(
Callback(Future.set_result, [future, result]),
datetime.now() + timedelta(seconds=delay),
)
yield from future
print("slept async", delay, "seconds")
return future.result
if __name__ == "__main__":
run(sleep(5), sleep(5), sleep(5))
a very simple version of asyncio that is missing a lot of key features like select calls and much more, but it's working.
Let's go through it.
FutureStatus class: This is an enumeration that holds the three possible states of a Future
instance - DONE
, SCHEDULED
, and CANCELED
.
Callback class: This is a simple data structure to hold a reference to a callable and its arguments.
Event class: An event represents a unit in your event loop execution. Each event holds a Callback
instance and a time for when the callback should be called.
loop class: An event loop class that method call_soon
schedules a callback to be run as soon as possible and method call_later
schedules a callback to be run at the given datetime
.
current_loop: Represents your currently active event loop. All the scheduling is done on this.
Future Class: A Future
represents a computation that hasn’t necessarily completed yet. Futures are used to execute a function or a method later, and then retrieve any exceptions (if there are any) and the result. The future allows you to add callbacks that will be executed when the future resolves.
run function: Function to handle running of tasks queued up inside the event loop until all tasks have been executed. It sorts events based on time they should execute, and sleeps if the next event isn't ready to execute now.
create_future function: Utility function to simplify the creation of a new future.
sleep function: The sleep
function is a coroutine (as it yields from a future) that finishes after a set amount of time and sets the provided result as the future’s result.
run invocation in the main part: The last three lines of your script make the program “sleep” three times for five seconds but does so concurrently. So, if you run this program it will print the print message from the sleep
function three times after five seconds, not fifteen.
Concurrent http requests
Let's not use this event loop to perform concurrent http requests.
here's my code for a simple non blocking http requests with socket python package:
import socket
from event_loop import Future, Callback, run, current_loop, create_future
from datetime import timedelta, datetime
def request(url):
# Parse the URL to get the host and path
host = url.split("/")[2]
path = "/" + "/".join(url.split("/")[3:])
print("request to :", path)
# Create a socket object
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect to the server
s.connect((host, 80))
s.setblocking(False)
# Form the HTTP request
request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
# Send the request
s.sendall(request.encode())
# Receive the response
response = b""
while True:
future = create_future()
current_loop.call_soon(
Callback(Future.set_result, [future, None]),
)
try:
yield from future
finally:
future.cancel()
print("fetching", url)
try:
data = s.recv(1024)
print("got some data:", data)
except BlockingIOError:
future = create_future()
current_loop.call_soon(Callback(Future.set_result, [future, None]))
yield from future
else:
if not data:
break
response += data
# Close the connection
s.close()
return response.decode()
It's creating a non-blocking socket connection to a desired URL and fetching the data.
Here are key parts and their connection to our custom event loop:
request
function : This generator function creates a non-blocking socket connection to a server and sends an HTTP GET request. If no data is available to read (aBlockingIOError
is raised), the function creates a Future and schedules it to be resolved on the next event loop iteration. Afteryield
ing from this Future the flow returns to the event loop. On the next loop iteration, the Future is resolved and execution comes back into therequest
function to try and read from the socket again.This pattern repeats until there's data available to read from the socket, and the read data is appended to the response. This process continues until no more data is available (signifying end of the HTTP response), and the function then returns the response.
Futures : Futures are placeholders for a result that might not be available currently, generally used to represent and manage the state of a Task, which is some scheduled work that has not completed yet. When a Future is awaited (
yield from Future
), it pauses the execution of the current Task and gives up control to the event loop, which can then run other Tasks until the Future is resolved. The loop gods back to Tasks paused on this Future and resumes them where they left off, effectively creating concurrency in your app.Event Loop : event loop functions by going through each scheduled "Event" (a Future and a point in time when to complete this Future) and checking if the completion time has come. If the time hasn't come, it waits for the remaining time (essentially putting the entire program "to sleep"). If the time for an Event has indeed come, it uses a Python feature named "callback" to mark the Future as done, thus potentially resuming one or many tasks waiting on this Future.
run
Function : It accepts multiple async functions (Tasks), schedules them to be run, and executes the event loop until all tasks have exited, creating a concurrent execution of operations.
I would recommend revisiting this code in its entirety. Understanding the interplay between the task and the event loop is pivotal. The challenging yet most significant detail lies within comprehending how the low-level generator yields control back to the event loop. The event loop, in turn, adds a done callback
to the Future that the task yielded. This callback is what continues the task's execution where it left off. Wrapping your head around these nuances may seem challenging, but it's imperative to understanding how this all becomes a cohesive unit.
Here's how I can visualize this for myself (please don't judge my drawings too hard):
Conclusion
This is a simplified, barebones representation of concurrency powered by an event loop, callbacks, and generator-based coroutines. This does not present the full picture of what happens in an actual, full-scale asyncio event loop, which supports perfectly concurrent network IO by means of multiplexing system calls (select
, poll
, or similar), along with many more features.
Subscribe to my newsletter
Read articles from Denis Khodishchenko directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Denis Khodishchenko
Denis Khodishchenko
Python Software Engineer @ Hennessey Digital | AWS Certified Solutions Architect Associate