asyncio wrapper for any sync functions

HU, PiliHU, Pili
4 min read

Wrap sync into async

Core technique:

  • data = await asyncio.to_thread(perform_blocking_get, url) is the universal way to put any blocking function inside an async architecture.
  • await asyncio.sleep(1) is used to wait inside aync function. Note that if time.sleep is used, this function basically blocks the execution of other functions.

In this demo:

  • A local HTTP server that simulates IN_PROGRESS and DONE (after 4 cumulative requests).
  • A client that implements a async check_status function that polls an HTTP endpoint for current status, it awaits (yields execution to the async loop for other tasks) when IN_PROGRESS is received, and returns when DONE is received.

Code

Below is the all in one code, that is good to execute directly.

import asyncio
import requests
from typing import Dict, Any

def perform_blocking_get(url: str) -> Dict[str, Any]:
    """
    A regular (synchronous) function that performs the blocking GET request.
    This is the function we will run in a separate thread.
    """
    try:
        # The requests.get() call will block this worker thread,
        # but not the main asyncio event loop.
        response = requests.get(url, timeout=10) # Set a timeout for the request itself

        # Raise an exception for bad status codes (4xx or 5xx)
        response.raise_for_status()

        # Decode the JSON response
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"A requests error occurred: {e}")
        # Re-raising allows the main coroutine to handle it.
        raise


async def check_status(url: str, timeout: int = 60) -> Dict[str, Any]:
    """
    Polls an HTTP endpoint using blocking `requests` in a separate thread.
    """
    print(f"Starting to poll endpoint: {url} (using blocking requests)")
    start_time = asyncio.get_running_loop().time()

    while True:
        # Check for overall polling timeout
        elapsed_time = asyncio.get_running_loop().time() - start_time
        if elapsed_time >= timeout:
            raise asyncio.TimeoutError(f"Polling timed out after {timeout} seconds.")

        try:
            print("Checking status...")
            # Run the blocking function in a worker thread without blocking the event loop
            data = await asyncio.to_thread(perform_blocking_get, url)

            status = data.get("status")

            if status == "DONE":
                print("Status is DONE. Returning the result.")
                return data
            elif status == "IN_PROGRESS":
                print("Status is IN_PROGRESS. Waiting for 1 second...")
                # asyncio.sleep is a non-blocking wait, allowing other tasks to run.
                await asyncio.sleep(1)
            else:
                raise ValueError(f"Received unexpected status: '{status}'")

        except Exception as e:
            print(f"An error occurred during the poll cycle: {e}")
            raise


import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
import json

# --- Dummy Server State ---
REQUEST_COUNT = 0

class MockHandler(BaseHTTPRequestHandler):
    """A mock handler that simulates a long-running process."""
    def do_GET(self):
        global REQUEST_COUNT
        REQUEST_COUNT += 1

        print(f"[Server] Received request #{REQUEST_COUNT}")

        self.send_response(200)
        self.send_header("Content-type", "application/json")
        self.end_headers()

        if REQUEST_COUNT < 4:
            response_data = {"status": "IN_PROGRESS", "progress": f"{REQUEST_COUNT * 33}%"}
        else:
            response_data = {"status": "DONE", "result": "Processing complete!", "final_id": 12345}

        self.wfile.write(json.dumps(response_data).encode("utf-8"))

def run_server(server_class=HTTPServer, handler_class=MockHandler, port=8080):
    server_address = ('', port)
    httpd = server_class(server_address, handler_class)
    print(f"Starting blocking mock server on port {port}")
    httpd.serve_forever()


async def main():
    """Main function to run the server and the client."""
    # Run the blocking server in a separate, daemonized thread
    server_thread = threading.Thread(target=run_server)
    server_thread.daemon = True # Allows main program to exit even if thread is running
    server_thread.start()
    await asyncio.sleep(0.1) # Give the server a moment to start up

    endpoint_url = "http://localhost:8080"

    try:
        final_result = await check_status(endpoint_url, timeout=10)
        print("\n--- Polling Finished ---")
        print("Final Result:", final_result)
        print("------------------------")
    except asyncio.TimeoutError:
        print("\nPolling failed due to a timeout.")
    except Exception as e:
        print(f"\nAn error occurred during polling: {e}")
    finally:
        # In a real app, you would add logic to shut down the server gracefully
        print("Main function finished. Server thread will exit.")


if __name__ == "__main__":
    asyncio.run(main())

Wrap async into sync

The other direction is relatively simple.

Suppose there is an aync func(), one can add await keywords to make it work in the sync environment.

result = await func()

A more curated version is to use callback on the async task.

  • a = func() -- without the await keyword, a would be a coroutine, instead of actual function return.
  • task = asyncio.create_task(a) --
  • task.add_done_callback(my_callback) -- set callback.

Good thing is that this allows one to set multiple concurrent tasks and use one await to yield the execution to the main loop in one go.

import asyncio

def my_callback(task):
    """This function is executed when the task finishes."""
    print("--- Callback starts ---")
    if task.exception():
        print(f"The task failed with an exception: {task.exception()}")
    else:
        # Get the result from the finished task
        result = task.result()
        print(f"The task finished successfully. Result: {result}")
    print("--- Callback ends ---")


async def func():
    """An example async function that takes time."""
    print("func() started...")
    await asyncio.sleep(2)  # Simulate a network request
    # Uncomment the line below to test exception handling
    # raise ValueError("Something went wrong!")
    return "Hello from func"


async def main():
    print("Calling func() to get the coroutine object.")
    a = func()

    print("Scheduling the coroutine as a task and adding a callback.")
    task = asyncio.create_task(a)
    task.add_done_callback(my_callback)

    # The rest of main() can continue to run without waiting
    print("main() can do other work now...")
    await asyncio.sleep(3) # Keep the program running to see the callback execute
    print("main() is finished.")
0
Subscribe to my newsletter

Read articles from HU, Pili directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

HU, Pili
HU, Pili

Just run the code, or yourself.