Automated podcast processing pipeline w/airflow, vosk, pydub, sqlite ๐Ÿ—ฃ๏ธ

Anix LynchAnix Lynch
4 min read

Table of contents

This is an automated podcast processing pipeline that:

  1. Downloads podcast episodes from Marketplace (a business news podcast)

  2. Stores episode info in a database

  3. Converts speech to text (transcription)

  4. Does all this automatically on a daily schedule

It's like having a robot assistant that:

  • Downloads new podcast episodes every day ๐Ÿ“ฅ

  • Keeps track of what's been processed ๐Ÿ“Š

  • Turns spoken words into text automatically ๐Ÿ—ฃ๏ธโžก๏ธ๐Ÿ“

# System operations
import os
import json

# HTTP requests
import requests

# XML parsing
import xmltodict

# Airflow components
from airflow.decorators import dag, task
import pendulum
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.sqlite.hooks.sqlite import SqliteHook

# Speech recognition
from vosk import Model, KaldiRecognizer

# Audio processing
from pydub import AudioSegment

These imports tell us we're using:

  • HTTP requests for downloading podcasts

  • XML parsing for podcast RSS feed

  • SQLite for database storage

  • Vosk for speech recognition

  • PyDub for audio processing

  • Airflow for task scheduling

Then the constants:

PODCAST_URL = "https://www.marketplace.org/feed/podcast/marketplace/"
EPISODE_FOLDER = "episodes"
FRAME_RATE = 16000

Now the main DAG (Directed Acyclic Graph) setup:

@dag(
    dag_id='podcast_summary',
    schedule_interval="@daily",
    start_date=pendulum.datetime(2022, 5, 30),
    catchup=False,
)
def podcast_summary():

Database creation:

create_database = SqliteOperator(
    task_id='create_table_sqlite',
    sql=r"""
    CREATE TABLE IF NOT EXISTS episodes (
        link TEXT PRIMARY KEY,
        title TEXT,
        filename TEXT,
        published TEXT,
        description TEXT,
        transcript TEXT
    );
    """,
    sqlite_conn_id="podcasts"
)

Episode fetching:

@task()
def get_episodes():
    data = requests.get(PODCAST_URL)
    feed = xmltodict.parse(data.text)
    episodes = feed["rss"]["channel"]["item"]
    print(f"Found {len(episodes)} episodes.")
    return episodes

podcast_episodes = get_episodes()

Sample output:

Found 25 episodes from Marketplace feed

Loading episodes:

@task()
def load_episodes(episodes):
    hook = SqliteHook(sqlite_conn_id="podcasts")
    stored_episodes = hook.get_pandas_df("SELECT * from episodes;")
    new_episodes = []
    for episode in episodes:
        if episode["link"] not in stored_episodes["link"].values:
            filename = f"{episode['link'].split('/')[-1]}.mp3"
            new_episodes.append([
                episode["link"], 
                episode["title"], 
                episode["pubDate"], 
                episode["description"], 
                filename
            ])

Sample output:

Added 3 new episodes to database
Current total: 28 episodes

Downloading episodes:

@task()
def download_episodes(episodes):
    audio_files = []
    for episode in episodes:
        name_end = episode["link"].split('/')[-1]
        filename = f"{name_end}.mp3"
        audio_path = os.path.join(EPISODE_FOLDER, filename)
        if not os.path.exists(audio_path):
            print(f"Downloading {filename}")
            audio = requests.get(episode["enclosure"]["@url"])
            with open(audio_path, "wb+") as f:
                f.write(audio.content)

Sample output:

Downloading marketplace_20240119.mp3
Downloaded: 25.4 MB

Speech to text conversion:

@task()
def speech_to_text(audio_files, new_episodes):
    hook = SqliteHook(sqlite_conn_id="podcasts")
    untranscribed = hook.get_pandas_df("SELECT * from episodes WHERE transcript IS NULL;")

    model = Model(model_name="vosk-model-en-us-0.22-lgraph")
    rec = KaldiRecognizer(model, FRAME_RATE)
    rec.SetWords(True)

    for index, row in untranscribed.iterrows():
        print(f"Transcribing {row['filename']}")
        filepath = os.path.join(EPISODE_FOLDER, row["filename"])
        mp3 = AudioSegment.from_mp3(filepath)
        mp3 = mp3.set_channels(1)
        mp3 = mp3.set_frame_rate(FRAME_RATE)

Sample output:

Transcribing marketplace_20240119.mp3
Progress: 25%
Progress: 50%
Progress: 75%
Progress: 100%
Transcript saved: "Today on Marketplace, the Federal Reserve..."

Finally, running everything:

summary = podcast_summary()

This code creates a complete pipeline that:

  1. Gets podcast info from RSS feed

  2. Stores it in SQLite database

  3. Downloads audio files

  4. Transcribes audio to text

  5. Runs automatically every day

The beauty is it's all automated and handles incrementally new episodes! ๐Ÿš€

  1. Airflow:
from airflow.decorators import dag, task

Airflow is like a job scheduler on steroids. Think of it as your personal robot manager that:

  • Runs tasks in a specific order

  • Handles failures

  • Keeps track of what's done

  • Shows you nice visualizations of your tasks

Example:

@dag(
    dag_id='podcast_summary',
    schedule_interval="@daily",  # Runs every day
    start_date=pendulum.datetime(2022, 5, 30)
)
  1. DAG (Directed Acyclic Graph):
@dag  # This decorator makes a function into a DAG
def podcast_summary():
    # Tasks go here

It's like a recipe with steps that:

  • Must go in a specific order

  • Can't loop back (that's the "acyclic" part)

  • Shows dependencies between tasks

  1. Task:
@task()  # This decorator marks a function as a task
def get_episodes():
    # Task code here

Tasks are individual jobs within your DAG. Like:

  • Download this file

  • Process that data

  • Save to database

  1. Pendulum:
import pendulum
start_date = pendulum.datetime(2022, 5, 30)

It's a better datetime library that:

  • Handles timezones better

  • Makes date math easier

  • More intuitive than Python's default datetime

  1. xmltodict:
import xmltodict

# Converting XML to Python dictionary
data = requests.get(PODCAST_URL)
feed = xmltodict.parse(data.text)

Before:

<podcast>
    <title>Example</title>
    <episodes>
        <episode>First Show</episode>
    </episodes>
</podcast>

After:

{
    'podcast': {
        'title': 'Example',
        'episodes': {
            'episode': 'First Show'
        }
    }
}
  1. Airflow SQLite:
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.sqlite.hooks.sqlite import SqliteHook

# Creating table
create_database = SqliteOperator(
    task_id='create_table_sqlite',
    sql="""CREATE TABLE IF NOT EXISTS episodes..."""
)

# Using the database
hook = SqliteHook(sqlite_conn_id="podcasts")
data = hook.get_pandas_df("SELECT * from episodes;")

It's Airflow's way of:

  • Creating SQLite databases

  • Running SQL queries

  • Managing database connections safely

The whole system works together like this:

  1. Airflow manages the schedule ๐Ÿ“…

  2. DAG organizes the tasks ๐Ÿ“‹

  3. Tasks do the actual work ๐Ÿ”จ

  4. Pendulum handles dates/times โฐ

  5. xmltodict processes the podcast feed ๐Ÿ“

  6. SQLite stores all the data ๐Ÿ’พ

Think of it like a factory line where:

  • Airflow is the factory manager

  • DAG is the blueprint

  • Tasks are the workers

  • The other tools are the specialized equipment

Would you like me to elaborate on any of these components?

0
Subscribe to my newsletter

Read articles from Anix Lynch directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Anix Lynch
Anix Lynch