Automated podcast processing pipeline w/airflow, vosk, pydub, sqlite ๐ฃ๏ธ

Table of contents
This is an automated podcast processing pipeline that:
Downloads podcast episodes from Marketplace (a business news podcast)
Stores episode info in a database
Converts speech to text (transcription)
Does all this automatically on a daily schedule
It's like having a robot assistant that:
Downloads new podcast episodes every day ๐ฅ
Keeps track of what's been processed ๐
Turns spoken words into text automatically ๐ฃ๏ธโก๏ธ๐
# System operations
import os
import json
# HTTP requests
import requests
# XML parsing
import xmltodict
# Airflow components
from airflow.decorators import dag, task
import pendulum
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
# Speech recognition
from vosk import Model, KaldiRecognizer
# Audio processing
from pydub import AudioSegment
These imports tell us we're using:
HTTP requests for downloading podcasts
XML parsing for podcast RSS feed
SQLite for database storage
Vosk for speech recognition
PyDub for audio processing
Airflow for task scheduling
Then the constants:
PODCAST_URL = "https://www.marketplace.org/feed/podcast/marketplace/"
EPISODE_FOLDER = "episodes"
FRAME_RATE = 16000
Now the main DAG (Directed Acyclic Graph) setup:
@dag(
dag_id='podcast_summary',
schedule_interval="@daily",
start_date=pendulum.datetime(2022, 5, 30),
catchup=False,
)
def podcast_summary():
Database creation:
create_database = SqliteOperator(
task_id='create_table_sqlite',
sql=r"""
CREATE TABLE IF NOT EXISTS episodes (
link TEXT PRIMARY KEY,
title TEXT,
filename TEXT,
published TEXT,
description TEXT,
transcript TEXT
);
""",
sqlite_conn_id="podcasts"
)
Episode fetching:
@task()
def get_episodes():
data = requests.get(PODCAST_URL)
feed = xmltodict.parse(data.text)
episodes = feed["rss"]["channel"]["item"]
print(f"Found {len(episodes)} episodes.")
return episodes
podcast_episodes = get_episodes()
Sample output:
Found 25 episodes from Marketplace feed
Loading episodes:
@task()
def load_episodes(episodes):
hook = SqliteHook(sqlite_conn_id="podcasts")
stored_episodes = hook.get_pandas_df("SELECT * from episodes;")
new_episodes = []
for episode in episodes:
if episode["link"] not in stored_episodes["link"].values:
filename = f"{episode['link'].split('/')[-1]}.mp3"
new_episodes.append([
episode["link"],
episode["title"],
episode["pubDate"],
episode["description"],
filename
])
Sample output:
Added 3 new episodes to database
Current total: 28 episodes
Downloading episodes:
@task()
def download_episodes(episodes):
audio_files = []
for episode in episodes:
name_end = episode["link"].split('/')[-1]
filename = f"{name_end}.mp3"
audio_path = os.path.join(EPISODE_FOLDER, filename)
if not os.path.exists(audio_path):
print(f"Downloading {filename}")
audio = requests.get(episode["enclosure"]["@url"])
with open(audio_path, "wb+") as f:
f.write(audio.content)
Sample output:
Downloading marketplace_20240119.mp3
Downloaded: 25.4 MB
Speech to text conversion:
@task()
def speech_to_text(audio_files, new_episodes):
hook = SqliteHook(sqlite_conn_id="podcasts")
untranscribed = hook.get_pandas_df("SELECT * from episodes WHERE transcript IS NULL;")
model = Model(model_name="vosk-model-en-us-0.22-lgraph")
rec = KaldiRecognizer(model, FRAME_RATE)
rec.SetWords(True)
for index, row in untranscribed.iterrows():
print(f"Transcribing {row['filename']}")
filepath = os.path.join(EPISODE_FOLDER, row["filename"])
mp3 = AudioSegment.from_mp3(filepath)
mp3 = mp3.set_channels(1)
mp3 = mp3.set_frame_rate(FRAME_RATE)
Sample output:
Transcribing marketplace_20240119.mp3
Progress: 25%
Progress: 50%
Progress: 75%
Progress: 100%
Transcript saved: "Today on Marketplace, the Federal Reserve..."
Finally, running everything:
summary = podcast_summary()
This code creates a complete pipeline that:
Gets podcast info from RSS feed
Stores it in SQLite database
Downloads audio files
Transcribes audio to text
Runs automatically every day
The beauty is it's all automated and handles incrementally new episodes! ๐
- Airflow:
from airflow.decorators import dag, task
Airflow is like a job scheduler on steroids. Think of it as your personal robot manager that:
Runs tasks in a specific order
Handles failures
Keeps track of what's done
Shows you nice visualizations of your tasks
Example:
@dag(
dag_id='podcast_summary',
schedule_interval="@daily", # Runs every day
start_date=pendulum.datetime(2022, 5, 30)
)
- DAG (Directed Acyclic Graph):
@dag # This decorator makes a function into a DAG
def podcast_summary():
# Tasks go here
It's like a recipe with steps that:
Must go in a specific order
Can't loop back (that's the "acyclic" part)
Shows dependencies between tasks
- Task:
@task() # This decorator marks a function as a task
def get_episodes():
# Task code here
Tasks are individual jobs within your DAG. Like:
Download this file
Process that data
Save to database
- Pendulum:
import pendulum
start_date = pendulum.datetime(2022, 5, 30)
It's a better datetime library that:
Handles timezones better
Makes date math easier
More intuitive than Python's default datetime
- xmltodict:
import xmltodict
# Converting XML to Python dictionary
data = requests.get(PODCAST_URL)
feed = xmltodict.parse(data.text)
Before:
<podcast>
<title>Example</title>
<episodes>
<episode>First Show</episode>
</episodes>
</podcast>
After:
{
'podcast': {
'title': 'Example',
'episodes': {
'episode': 'First Show'
}
}
}
- Airflow SQLite:
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
# Creating table
create_database = SqliteOperator(
task_id='create_table_sqlite',
sql="""CREATE TABLE IF NOT EXISTS episodes..."""
)
# Using the database
hook = SqliteHook(sqlite_conn_id="podcasts")
data = hook.get_pandas_df("SELECT * from episodes;")
It's Airflow's way of:
Creating SQLite databases
Running SQL queries
Managing database connections safely
The whole system works together like this:
Airflow manages the schedule ๐
DAG organizes the tasks ๐
Tasks do the actual work ๐จ
Pendulum handles dates/times โฐ
xmltodict processes the podcast feed ๐
SQLite stores all the data ๐พ
Think of it like a factory line where:
Airflow is the factory manager
DAG is the blueprint
Tasks are the workers
The other tools are the specialized equipment
Would you like me to elaborate on any of these components?
Subscribe to my newsletter
Read articles from Anix Lynch directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
