Why you should async as a Data Engineer

Table of contents

Async coding is one of those things that I struggled to wrap my head around for a LONG time when I started coding in python, but as I had a few use cases that required it, it became apparent just how valuable it can be.
As a data engineer, creating data pipelines typically follow a few patterns:
Pull data from a business specific database (watermark, full file, CDC)
Pull data from a 3rd party service (Salesforce, Segment, Logrocket)
- Sometimes using a specific third party connector (Fivetran, Airbyte)
Pull data from sharpoint, google drive, excel (all the fun ones, you don’t want to pull from, but somehow… it always happens)
There’s a variety of other sources, but typically if it’s a third party service, Fivetran (or another service might be your first approach). If Fivetran doesn’t support it, you might try to use other connectors. However, if there’s no service that supports that (or your existing service stops supporting it - not speaking from experience or anything), you might build a custom set of code for pulling from that service.
There might also might be pricing constraints. I had a personal experience where the cost of using a third party connector pulling from a third party service was INCREDIBLY high. So, you might have to build your own solution for this reason.
When put in a situation you have to build something yourself, you’ll likely code it synchronously (i.e., pull it then do something with it). However, if you have to pull a LOT of data, it might take forever for your code to pull it. So, that’s where async programming can REALLY come in handy.
Let’s talk about how to utilize async programming in data pipelines!
Requirements
We’re going to keep requirements really simple. Really all you need is Python. I’m utilizing version 3.12 currently, but anything 3.10+ is probably fine (and even older versions should generally be fine).
We also will utilize a variety of modules:
import requests #generally included in python standard packages, but might have to install
import datetime #standard python package
import os #standard python package
import dotenv #(optional) - but utilize this to import environmental files python-dotenv
import polars as pl #polars - utilizing for transforming/writing data
#async modules that are already installed
from concurrent.futures import ThreadPoolExecutor #we'll utilize this for pooling our tasks
import asyncio #standard asyncio package
Our Project
For demonstration, finding a free API that doesn’t have harsh limits on calls per hour (or day) is difficult, that’s why I went with utilizing NASA’s free API’s - we’re going to be utilizing the Asteroid NeoWs API - you can refer to the documentation here: https://api.nasa.gov/ - there is a limit of 1000 requests PER hour, but that should be reasonable enough for this demo.
We’re going to pull the “Near Earth Objects” feed, and pull all the objects since January 1, 2020, and write them to a parquet file (maybe to mimic writing them to a datalake for another process to pickup).
There’s also other API’s - if you’re into the outdoors, there’s also the RIDB API: https://ridb.recreation.gov/docs - but feel free to find other API’s.
Code
Setting standard variables
We’ll need to define a few standard variables:
api_url
- this is the endpoint of our API, for us it will be thehttps://api.nasa.gov/neo/rest/v1/feed
api_key
- this is the key for authenticating to the API url (this isn’t required for all API’s, but I need it for the NEO feed) - I will define this utilizing an environmental file
api_url = "https://api.nasa.gov/neo/rest/v1/feed"
api_key = os.environ.get("API_KEY")
Defining our Tasks to Execute
A big part of async coding, is to split the whole task into individual components, that we can reasonably execute, and each task is independent. There are ways of communicating between tasks to manage dependency (if needed), but for ease, it’s best if the tasks are independent of each other.
With data pipelines, generally the easiest thing to split on is date (or datetime). There might be other obvious things to split on, id’s, order, or something else. Our API can queried based on date, so we’ll split our tasks by date. So we’ll split all our tasks into groups based on week.
start_date = datetime.date(2020, 1, 1)
end_date = datetime.date.today()
date_list = []
current_date = start_date
while current_date <= end_date:
date_list.append(
{
"start_date":current_date.strftime("%Y-%m-%d"),
"end_date":(current_date + datetime.timedelta(days=6)).strftime("%Y-%m-%d")
}
)
current_date += datetime.timedelta(days=7)
The date_list
object will consist of every date from January 1, 2020, in seven day increments; e.g.,
date_list[0] = {'start_date': '2020-01-01', 'end_date': '2020-01-07'}
date_list[1] = {'start_date': '2020-01-08', 'end_date': '2020-01-14'}
Building our function to pull from the API
To execute tasks asynchronously, we’ll have to build our base function to pull from the API, taking our dates as input.
def pull_api(start_date:str,end_date:str):
print(f"Pulling data for {start_date} to {end_date}")
try:
#set our request url
request_url = api_url
#set our request parameters
request_params = {
"start_date": start_date,
"end_date": end_date,
"api_key": api_key,
}
#make our api call
response = requests.get(request_url,params=request_params)
#raise an error if we get a bad response
response.raise_for_status()
#parse the response json
neos = []
for neo in response.json()["near_earth_objects"].values():
neos+=neo
#check if the output path exists - it not, create it
if not os.path.exists("./data"): os.mkdir("./data")
#create dataframe from our neos list
df = pl.DataFrame(neos)
#write dataframe out as parquet
df.write_parquet(f"./data/neos_part_{start_date}_{end_date}.parquet")
return {"start_date":start_date,"end_date":end_date,"result":"success","exception":None}
except Exception as ex:
return {"start_date":start_date,"end_date":end_date,"result":"error","exception":str(ex)}
You’ll notice a couple things:
I am writing out my data to a placeholder path
./data
- depending on your need, you could write this data to:local storage
cloud storage
a database
or do something else entirely with it - completely up to your specific use case
I am wrapping more or less everything in a
try/catch
, then I’m returning an dictionary with key:values representing the run + exception (if one happened) - the reason for this, is when I execute this asynchronously, I want to be able to catch these errors, without necessarily failing anything
There’s a variety of ways to approach a function like this, and you need to be mindful of concurrency issues with modules you’re utilizing. However, requests is up to the task of pulling my data asynchronously. As well, you might have different needs/requirements on what to do with this data, so play around with it.
Building our Async Container Function
Next we need to build a function to execute our tasks asynchronously, utilizing the function we just built.
async def pull_api_async(dates:list,max_workers:int=4):
#create our thread pool executor
with ThreadPoolExecutor(max_workers=max_workers) as executor:
#create our event loop
loop = asyncio.get_event_loop()
#create the tasks to be executed
tasks = [
loop.run_in_executor(
executor,
pull_api,
*(
d["start_date"], #start_date argument of our pull_api function
d["end_date"], #end_date argument of our pull_api function
)
)
for d in dates
]
#create a list to store the results of our execution
results = []
#gather the results of our tasks
for r in await asyncio.gather(*tasks,return_exceptions=True):
results.append(r)
#return the results
return results
My function will take a list of tasks I’m planning on executing dates
. As well, I’m going to configure it to limit the max number of workers (i.e., the number of tasks that will execute simultaneously). To limit the number of tasks running I’m utilizing ThreadPoolExecutor - I prefer this simply because it’s a super simple way to throttle the number of tasks executing, easily.
Within the function, I then create a loop. Once I have the loop, I utilize it to create a set of tasks to execute by looping through each task in my list dates
. The loop.run_in_excutor()
takes the executor (i.e., the ThreadPoolExecutor), the function to execute pull_api()
, and arguments passed (in the order the function pull_api()
should take them).
Once I’ve got my tasks
, I then create an empty list results
to append the results/outcome to, then I loop through all my tasks, utilizing asyncio.gather()
, which simply takes all my tasks, and then I specify that exceptions should be returned. This last part is vital to ensure our tasks will continue even if one fails. You can always change this if needed.
Finally, I simply just return my results.
Executing our Tasks
Executing our tasks is very simple!
#create a loop
loop = asyncio.get_event_loop()
#get "futures" of our tasks
future = asyncio.ensure_future(
pull_api_async(
dates=date_list,
max_workers=4,
)
)
#pull the results from our loop when it finishes executing
results = loop.run_until_complete(future)
We create a loop that we’ll utilize to run our tasks/futures. We create our future
which is simply our async tasks we have yet to execute. We then utilize the loop to run the future
until all subtasks are complete!
That’s it! Once you’ve got your results, you can do whatever with them. I typically will loop through them and see if there was an exception, however, you have lot’s of options depending on your use case!
Considerations
When building an async solution like this, there’s quite a few considerations:
API Rate Limiting
Most API’s have limits on how many calls you can make per second, minute, hour, day. Make sure to read up the documentation on your API, and you might have to build a solution to manage how many requests you make in a given period.
Watermarking
You can actually utilize watermarking in some API’s. So you can limit the amount of times you have to hit it. So if you’ve already pulled data through yesterday, you can utilize that to change parameters you pass to your API. I highly recommend finding a way to do this, utilizing a date, ID, or something else. It will drastically speed up your pipeline.
Exception Handling
Biggest thing, how do you want to handle exceptions. This very much depends on your setup, what you’re trying to accomplish. I have a pipeline where if a particular day fails, it will append it to the group being executed during the next run; which once successfully ran, it will remove it from the list to be ran.
Just consider your use case/process, and figure out how to deal with exceptions, because they WILL happen.
Cancelling Tasks
Async programming is both great and terrible. Once you set that train moving, stopping it, is difficult. So if you need the ability to cancel it, definitely thing about it. If you either have to kill the PID, or put in signal handling, which is a whole separate thing. So just consider the need/use case. It might make things a bit more complicated, so just be mindful when testing. Test with smaller groups of tasks, or have a way to kill it.
Technical Skill Set
Depending on your team, technical skill set might be a limiting factor. Even if it can be coded, maintenance will come up, so just consider what skill set you have on your team; and weigh the pros and cons of building a solution like this.
As well, consider if another solution is possible via a pre-built tool. They are way less complex to set up, and way more standard; as well they require way less technical know-how. So, if it’s significantly easier (and cheaper) choose the pre-built tool.
Databricks/Spark
Just as a call out, if you are executing it via Spark/Databricks, you might have to install a separate module because there’s already technically async processes/event loops running within Databricks. So you’ll have to install nest-asyncio
, and before you execute async tasks, you’ll have to execute nest_asyncio.apply()
.
Conclusion
As you can see, async programming can be VERY powerful, but comes with some considerations. So if you find yourself limited by pre-built tools, cost, or otherwise; async programming can be very handy. Just consider your use case, and limitations. There’s a reason tools like Fivetran are so popular, but, they aren’t a silver bullet either.
Happy coding!
Full Solution
Subscribe to my newsletter
Read articles from James directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

James
James
I am Data Engineer with a big passion for learning as much as he can. I enjoy the outdoors, mountain biking, finding cool ways to solve new coding problems, and teaching others to code.