Efficient Async Programming: Celery and FastAPI in Action

You know how moving apps can feel like changing a tire while you're driving down the highway? Yeah, it was kinda like that recently! We were shifting a pretty important interview creation service at ZekoAI from AWS Lambda over to FastAPI on Kubernetes, and boy, did we hit a few bumps. Especially when dealing with async Python, Celery background tasks, and Redis. So, I thought I'd share the story of what tripped us up, mainly with those tricky event loops, and how we figured things out.
What We Wanted: Speedy FastAPI with Celery Helpers
The plan was simple: make our new FastAPI service super quick. A bunch of steps in creating an interview (like asking an LLM questions or saving files) took a while, so they were perfect jobs to hand off to Celery to do in the background. Since FastAPI loves async
, and lots of our code was already doing async
stuff (talking to databases, LLMs, S3), going all-in with async
/await
just made sense.
First Little Puzzle: Running async
Stuff in Celery
Okay, first thing: Celery usually just runs tasks one after the other, nice and simple (synchronously). But our helper functions were async def
! How do you make those work? Turns out, there's a standard trick: just wrap your async
function call inside asyncio.run
()
.
# tasks.py
import asyncio
from app import celery_app
from .utils import do_async_work # Imagine this is your async def function
@celery_app.task(name="my_async_task")
def run_async_task_wrapper(arg1, arg2):
"""The plain old Celery task that wraps the async stuff."""
# asyncio.run() just spins up a little loop for the async function
return asyncio.run(do_async_work(arg1, arg2))
# Calling it from somewhere else:
# run_async_task_wrapper.delay(value1, value2)
Easy enough! That let our async
code run happily inside the Celery worker. Or so we thought...
Uh Oh, Redis Trouble: "Event loop is closed"? Whaaat?
Things got weird when our async
functions needed to chat with Redis (using redis-py
's async features). We thought we were being smart by setting up a shared Redis connection pool when the app and workers first started up. Saves time making new connections, right?
# When the app/worker starts up...
# CAUTION: This global pool was the sneaky culprit!
redis_connection_pool = redis.asyncio.ConnectionPool(host=..., decode_responses=True)
# Inside an async function (like set_sample_questions_redis)
# that got called directly OR through that Celery asyncio.run() trick
async def set_sample_questions_redis(...):
# # Using that global pool we made earlier
redis_client = RedisHandler(pool=redis_connection_pool)
await redis_client.get_key(...) # <-- BANG! Error right here in the Celery task
await redis_client.set_key(...)
Now, when we called set_sample_questions_redis
directly using await
from FastAPI, it usually worked fine. But when Celery ran the exact same function using our asyncio.run
()
wrapper, we got these head-scratching errors:
RuntimeError: Event loop is closed
# Or this fun one:
RuntimeError: Task <...> got Future <...> attached to a different loop
Confusing, right?!
Figuring it Out: The Event Loop Clash!
So, what was the deal? Turns out, asyncio.run
()
basically sets up its own little temporary workspace with its own power source (that's the event loop) just for the function it's running. But our Redis connection pool? We'd made that way back when the worker first started, and it was hooked up to the worker's original power source.
When our code inside that temporary asyncio.run
workspace tried to use the Redis pool, it was like trying to plug a tool designed for one power outlet into a completely different one. The tool (Redis pool connections) just wasn't compatible with the temporary workspace's power (the new event loop), leading to those crashes.
That explained why direct await
calls mostly worked (they were using the main power source the pool was already plugged into) but the asyncio.run
calls always failed!
The Fix: Make the Tools Inside the Workshop!
The big "aha!" moment was realizing that things sensitive to the event loop (like that connection pool) need to be created and used inside the same workspace (the same event loop).
Since our set_sample_questions_redis
function was being run in two different "workspaces" (the main FastAPI one and the temporary Celery asyncio.run
one), the simplest, most reliable fix was to just create a new connection pool right inside the function every time it runs. This way, the pool is always plugged into the right power source for that specific run.
# utils.py (or wherever set_sample_questions_redis lives)
import asyncio
from redis.asyncio import ConnectionPool
from app import settings # Need our Redis connection details
from .redis_handler import RedisHandler # Our helper class
async def set_sample_questions_redis(interview_id: str, sample_questions: dict):
"""
Saves/updates sample questions in Redis.
Makes its own pool each time for event loop compatibility!
"""
pool = None # Need this defined here so 'finally' can see it
try:
# Make a fresh pool right here! It'll use the current event loop.
pool = ConnectionPool(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB,
max_connections=10, # A reasonable number
decode_responses=True # Usually want strings back
)
# Use this new pool
redis_client = RedisHandler(pool=pool)
redis_key = f"{interview_id}SampleQuestions"
# ... (the actual work: get old data, mix in new, save it) ...
existing_data_raw = await redis_client.get_key(redis_key)
# ... mix it up ...
await redis_client.set_key(redis_key, json.dumps(existing_questions))
await redis_client.set_expiry(redis_key, 60 * 60 * 24) # 1 day
return True # Hooray!
except Exception as e:
# Uh oh, log it
# ... error logging ...
return False
finally:
# Super important: Clean up the pool we made!
if pool:
await pool.disconnect()
Doing it this way totally fixed those annoying loop errors! Normally, you'd want a shared pool for best performance, as creating/destroying them has some overhead. But in this specific Celery + asyncio.run situation, that global pool caused those loop errors. Having it actually work without crashing was way more important, right? Correctness first!
While we were wrestling with those event loops, we also bumped into another little async-related snag: Bonus Tip: Celery Results Aren't Simple Data!
Another little "gotcha" we saw mentioned (though we sidestepped it!) relates to calling Celery tasks with .delay()
. When you call .delay()
, it gives you back an AsyncResult
object right away – think of it like a tracking number for your background job.
Now, if you needed the background job to calculate something and return it (like maybe the S3 key after it uploads a file), you'd have a problem trying to immediately save that AsyncResult
object somewhere using json.dumps()
. JSON just doesn't know what to do with that complex Python object! People often work around this by saving the task_id
string from the AsyncResult
instead.
But in our case, for saving the job description to S3, we realized we didn't actually need the task to return the S3 key. We could just decide what the key should be before even calling the task!
# What we actually did in get_jd_summary.py
# Decide the S3 key *before* calling the task
object_key = f"ait_odam_jds/{user_id}_{time.time()}.txt"
# Call the task, passing the pre-decided key. We don't need its return value!
save_jd_s3_task.delay(job_description, object_key)
# ... later, when saving data to Redis ...
# We already know the object_key, so we can just use it directly!
response["jd_object_key"] = object_key
await redis_client.set_key(interview_id, json.dumps(response)) # Works perfectly!
So, by generating the identifier (the S3 object key) upfront, we completely avoided needing to get anything back from the Celery task or worrying about how to store its result. Simple and effective!
Wrapping Up: Watch Those Loops!
So yeah, moving to async tools like FastAPI while mixing in Celery means you really gotta pay attention to how you handle async resources, especially those event loops!
Our main lessons learned:
asyncio.run
()
is your friend for runningasync
code in regular Celery tasks.Watch out for async things (like connection pools) you create globally if you plan to use them inside
asyncio.run
()
. That event loop mismatch is sneaky!Making those sensitive resources inside the async function you pass to
asyncio.run
is a solid way to keep things working correctly.Remember, Celery's
.delay()
gives you a tracker (AsyncResult
), not the final answer. Save thetask_id
if you need to refer back to the job.
It took some head-scratching, but figuring out how Celery, asyncio
, and things like Redis pools play together (or fight!) was key to getting our service running smoothly, prioritizing getting it working reliably over squeezing out every last drop of performance in this tricky spot. Hopefully, our little adventure saves you some trouble!
Subscribe to my newsletter
Read articles from Hemendra Chaudhary directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
