YouTube Media Downloader Pipeline

Daksh SawhneyDaksh Sawhney
17 min read

Tired of Paying for YouTube Premium? I Built My Own DevOps-Powered Media Downloader

What started as a simple idea — "Why not download videos myself?" — quickly evolved into a fully containerized, monitored, and CI/CD-automated pipeline that does way more than just download.

From a desktop GUI built in Python, to logging with Loguru, observability with Prometheus + Grafana, and seamless GitHub Actions automation — I designed a full-stack system that mimics a production-grade DevOps environment.

This post walks you through how I architected the project, the tech I used, and how you can deploy it in just one command with Docker.


TL;DR

  • Built my own GUI-based YouTube media manager

  • Used SQLite for backend metadata storage

  • Added real-time monitoring with Prometheus & Grafana

  • Logged everything using Loguru with log rotation

  • Packaged entire pipeline into Docker containers

  • Automated builds & tests using GitHub Actions

Folder Structure

media-downloader-pipeline/
├── downloads/
└── scripts/
    └── download.sh

Install yt-dlp

sudo apt install yt-dlp
sudo curl -L https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp -o /usr/local/bin/yt-dlp
sudo chmod a+rx /usr/local/bin/yt-dlp
yt-dlp --version

yt-dlp is a command-line tool used to **download videos, audio, and playlists** from websites like YouTube and many others

Phase 1 - Shell Scripting

Write download.sh

#!/bin/bash

read -p "Enter video URL: " url
yt-dlp -o "../downloads/%(title)s.%(ext)s" "$url"

cd ../downloads
# Sorting files based on extension
for file in *; do
    ext="${file##*.}"
    mkdir -p "$ext"
    mv "$file" "$ext/" 2>/dev/null  # error messages go to null
done

Phase 2 - Python - Metadata Capturer

Fetching Metadata of video without even downloading it with help of yt-dlp in Python

yt-dlp package not listing Inside Youtube_Downloader_project folder, write ==code .== on terminal

from yt_dlp import YoutubeDL
import os
import json
from datetime import timedelta

url = input("Enter Url: ")

# dictionary called ydl_opts that holds options for the YoutubeDL object
yds_opts = {
    'quiet': True,  # suppresses most console output to keep things clean
    'skip_download': True, # tells it not to download the actual video, just fetch info.
    'dump_single_json': True # tells it to retrieve and output all the video metadata as JSON.
}

# Creates a YoutubeDL instance using the options ydl_opts
with YoutubeDL(yds_opts) as ydl:
    # try-catch for invalid url's
    try:
        info = ydl.extract_info(url, download=False)
    except Exception as e:
        print(f"Error: {e}")
        exit(1)

# print("\n")
# print(info)
# print("\n")

print("\n")
print("Title: ", info.get('title'))

# Show Duration in HH:MM:SS
duration = str(timedelta(seconds=info.get('duration')))
print("Duration:", duration)

print("Format: ", info.get('ext'))
print("Tags:", info.get('tags'))

#! Store these info into video_metadata.json

# Need to create own object what we need to store to metadata_file
metadata = {
    "Title": info.get("title"),
    "Duration": str(timedelta(seconds=info.get('duration'))),
    "Format": info.get("ext"),
}

# Check if video_metadata.json is present or not, if not create it
metadata_file = "video_metadata.json"
if not os.path.exists(metadata_file):
    with open(metadata_file, "w") as f:
        json.dump([], f, indent=4)

# Check if it is not empty
with open(metadata_file, "r") as f:
    try:
        data = json.load(f)
    except json.JSONDecodeError:
        data = []

# Append only works on lists
data.append(metadata)

# Now push the data into metadata_file
with open(metadata_file, "w") as f:
    json.dump(data, f, indent=4)
print("✅ Metadata saved to video_metadata.json")

Phase 3 - SQLite setup

import sqlite3

# opens or creates the DB file - Connect to database (creates file if not exists)
conn = sqlite3.connect('videos.db')

# Create a cursor to execute SQL commands
cursor = conn.cursor()

# sends SQL commands to create the table - Create table if it does not exist already
cursor.execute('''
    CREATE TABLE IF NOT EXISTS videos_metadata (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        title TEXT NOT NULL,
        duration TEXT,
        format TEXT
    )
''')

print("Table 'videos_metadata' created or already exists.")

conn.commit() # saves changes.

conn.close() # closes connection safely.

Insert to DB

from yt_dlp import YoutubeDL
import sqlite3
from datetime import timedelta

url = input("Enter url: ")

# Create options for yt_dlp
yts_ops = {
    'quiet': True,
    'skip_download': True,
    'dump_single_json': True
}

# Create instance with help of these options
with YoutubeDL(yts_ops) as ydl:
    # Verify if url is correct or not
    try:
        info = ydl.extract_info(url, download=False)
    except Exception as e:
        print(f"Error: {e}")
        exit(1)

duration = str(timedelta(seconds=info.get("duration")))

# Connect to sqlite
conn = sqlite3.connect('videos.db')
cursor = conn.cursor()

# Insert into table
cursor.execute('''
    INSERT INTO videos_metadata (title, duration, format)
    VALUES (?, ?, ?)    
''', (info.get('title'), duration, info.get('ext'))
)

# Save and close
conn.commit()
conn.close()

print("✅ Metadata inserted into SQLite database.")

Fetch items from db

import sqlite3

# Connect to db
conn = sqlite3.connect('videos.db')
cursor = conn.cursor()

cursor.execute('SELECT * FROM videos_metadata')
rows = cursor.fetchall()

# print(rows) # It is a list of tuples

for row in rows:
    id, title, duration, format = row  # assigning values of tuple
    print(f"ID: {id}")
    print(f"Title: {title}")
    print(f"Duration: {duration}")
    print(f"Format: {format}")
    print("-" * 30)

# Close the Database
conn.close()

Phase 4 - Setup CLI Interface with argparse

Create cli_interface.py:

import argparse
import sqlite3

def list_videos():
    conn = sqlite3.connect("videos.db")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM videos_metadata")
    rows = cursor.fetchall()
    for row in rows:
        id, title, duration, fmt = row
        print(f"ID: {id} | Title: {title} | Duration: {duration} | Format: {fmt}")
    conn.close()

def search_by_title(keyword):
    conn = sqlite3.connect("videos.db")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM videos_metadata WHERE title LIKE ?", ('%' + keyword + '%',))
    rows = cursor.fetchall()
    for row in rows:
        id, title, duration, fmt = row
        print(f"ID: {id} | Title: {title} | Duration: {duration} | Format: {fmt}")
    conn.close()

def filter_by_format(fmt):
    conn = sqlite3.connect("videos.db")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM videos_metadata WHERE format = ?", (fmt,))
    rows = cursor.fetchall()
    for row in rows:
        id, title, duration, fmt = row
        print(f"ID: {id} | Title: {title} | Duration: {duration} | Format: {fmt}")
    conn.close()

# Set up argparse CLI
parser = argparse.ArgumentParser(description="🎬 Manage downloaded video metadata")
parser.add_argument('--list', action='store_true', help='List all stored videos')
parser.add_argument('--search', type=str, help='Search videos by title')
parser.add_argument('--filter-format', type=str, help='Filter videos by format')

args = parser.parse_args()

# CLI Actions
if args.list:
    list_videos()
elif args.search:
    search_by_title(args.search)
elif args.filter_format:
    filter_by_format(args.filter_format)
else:
    print("❌ No valid option provided. Use --help for available commands.")

Phase 5 - Writing Download function and adding download tag in extract.py

from yt_dlp import YoutubeDL
import os

def download_video(url):
    output_dir = "downloads"

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)  # creating folder by os

    # Create Options
    ydl_opts = {
        'quiet': False,
        'outtmpl': os.path.join(output_dir, '%(title)s.%(ext)s'),
        'no_check_certificate': True,
        'format': 'best'
    }

    # Create instance
    with YoutubeDL(ydl_opts) as ydl:
        try:
            ydl.download([url])
            print(f"✅ Download completed: saved in '{output_dir}'")
        except Exception as e:
            print(f"❌ Download failed: {e}")

extract.py

from yt_dlp import YoutubeDL
import os
import json
from datetime import timedelta
import argparse
from download import download_video

# Creating CLI Arguments
def parse_args():
    parser = argparse.ArgumentParser(description="Extract video metadata from a URL using yt-dlp.")

    parser.add_argument(
        '--save',
        action="store_true",
        help="Save metadata to video_metadata.json"
    )

    parser.add_argument(
        '--download',
        action="store_true",
        help="Download the video after extracting metadata"
    )

    parser.add_argument(
        '--url',
        required=False,
        type=str,
        help="URL of the video to extract metadata from (optional, if not provided will use input prompt)"
    )

    return parser.parse_args()

args = parse_args()

url = args.url or input("Enter Url: ")

# dictionary called ydl_opts that holds options for the YoutubeDL object
yds_opts = {
    'quiet': True,  # suppresses most console output to keep things clean
    'skip_download': True, # tells it not to download the actual video, just fetch info.
    'dump_single_json': True # tells it to retrieve and output all the video metadata as JSON.
}

# Creates a YoutubeDL instance using the options ydl_opts
with YoutubeDL(yds_opts) as ydl:
    # try-catch for invalid url's
    try:
        info = ydl.extract_info(url, download=False)
    except Exception as e:
        print(f"Error: {e}")
        exit(1)

print("\n")
print("Title: ", info.get('title'))

# Show Duration in HH:MM:SS
duration = str(timedelta(seconds=info.get('duration')))
print("Duration:", duration)

print("Format: ", info.get('ext'))
print("Tags:", info.get('tags'))

#! Store these info into video_metadata.json

# Need to create own object what we need to store to metadata_file
metadata = {
    "Title": info.get("title"),
    "Duration": str(timedelta(seconds=info.get('duration'))),
    "Format": info.get("ext"),
}

# Check if video_metadata.json is present or not, if not create it
if args.save:
    metadata_file = "video_metadata.json"
    if not os.path.exists(metadata_file):
        with open(metadata_file, "w") as f:
            json.dump([], f, indent=4)

    # Check if it is not empty
    with open(metadata_file, "r") as f:
        try:
            data = json.load(f)
        except json.JSONDecodeError:
            data = []

    # Append only works on lists
    data.append(metadata)

    # Now push the data into metadata_file
    with open(metadata_file, "w") as f:
        json.dump(data, f, indent=4)
    print("✅ Metadata saved to video_metadata.json")

if args.download:
    download_video(url)

Phase 6 - Combining all fxns into main.py

from yt_dlp import YoutubeDL
import os
from datetime import timedelta
import argparse

# Extract.py
from metadata.extract import parse_args_extract # arg_parse fxn
from metadata.extract import ydl_instance_creation  # extract info
from metadata.extract import save_summary # to save summary inside json

# DB
from metadata.db_setup import db_setup # Table setup
from metadata.insert_to_db import insert_db # Insert into DB
from metadata.fetch_metadata_db import fetch_db # fetch data from db

# cli_interface
from metadata.cli_interface import arg_parser_cli
from metadata.cli_interface import list_videos
from metadata.cli_interface import search_by_title
from metadata.cli_interface import search_by_filter

if __name__ == "__main__":

    args = parse_args_extract()
    url = args.url or input("Enter Url: ")
    info = ydl_instance_creation(url)
    if not info:
        print("❌ Failed to extract video info.")
        exit(1)
    duration = str(timedelta(seconds=info.get("duration")))
    save_summary(info, args)

    print("\n")

    db_setup()
    # insert_db(info, duration)
    fetch_db()

    print("\n")

    args_cli = arg_parser_cli()
    if args_cli.list:
        list_videos()
    elif args_cli.search:
        search_by_title(args_cli.search)
    elif args_cli.filter:
        search_by_filter(args_cli.filter)
    else:
        print("❌ No valid option provided. Use --help for available commands.")

Error coming because using two parse_args

need to give condition

if __name__ == "__main__":    
    # Need to give condition for passing two parse_args fxns
    if any(arg in ["--list", "--filter", "--search"] for arg in sys.argv):
        args_cli = arg_parser_cli()     # Passing cli_interface one
        if args_cli.list:
            list_videos()
        elif args_cli.search:
            search_by_title(args_cli.search)
        elif args_cli.filter:
            search_by_filter(args_cli.filter)
        else:
            print("❌ No valid option provided. Use --help for available commands.")
    else:
        args = parse_args_extract()
        url = args.url or input("Enter Url: ")
        info = ydl_instance_creation(url)
        if not info:
            print("❌ Failed to extract video info.")
            exit(1)
        duration = str(timedelta(seconds=info.get("duration")))
        save_summary(info, args)

        print("\n")

        db_setup()
        if args.add_to_db:
            insert_db(info, duration)
        fetch_db()

        print("\n")

        if args.download:
            download_video(url)

Modifying The download.sh file

#!/bin/bash

# read -p "Enter video URL: " url
# yt-dlp -o "../downloads/%(title)s.%(ext)s" "$url"

DOWNLOAD_DIR="downloads"

# Sorting files based on extension
for file in "$DOWNLOAD_DIR"/*; do
    if [[ -f "$file" ]]; then
        ext="${file##*.}"

        # Add date folder inside extention for proper record
        TODAY=$(date +"%d-%m-%Y")
        mkdir -p "$DOWNLOAD_DIR/$ext/$TODAY"    # Create Date Folder

        mv "$file" "$DOWNLOAD_DIR/$ext/$TODAY/" 2>/dev/null     # Move the file
    fi
done

echo "Files sorted by extension into directories."

Adding this to download.py file

from yt_dlp import YoutubeDL
import os
import subprocess

def download_video(url):
    output_dir = "downloads"

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)  # creating folder by os

    # Create Options
    ydl_opts = {
        'quiet': False,
        'outtmpl': os.path.join(output_dir, '%(title)s.%(ext)s'),
        'no_check_certificate': True,
        'format': 'best'
    }

    # Create instance
    with YoutubeDL(ydl_opts) as ydl:
        try:
            print(f"🔄 Downloading video from {url}...")
            ydl.download([url])
            print(f"✅ Download completed: saved in '{output_dir}'")
        except Exception as e:
            print(f"❌ Download failed: {e}")

    # Automatically call download.sh for moving files into their respective folders
    try:
        subprocess.run(["./scripts/./download.sh"], check=True)
        print("✅  Downloaded videos sorted successfully by extension and date.")
    except subprocess.CalledProcessError as e:
        print(f"❌ Failed to sort downloaded videos: {e}")

if __name__ == "__main__":
    video_url = input("Enter the video URL to download: ")
    download_video(video_url)

Delete Tag Functionality

delete_from_db.py

from yt_dlp import YoutubeDL
import sqlite3


def find_entries_by_keyword(keyword):
    conn = sqlite3.connect("videos.db")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM videos_metadata WHERE LOWER(title) LIKE ?", ('%' + keyword.lower() + '%',))
    results = cursor.fetchall()
    conn.close()
    return results

def delete_db(ids):
    conn = sqlite3.connect('videos.db')
    cursor = conn.cursor()
    cursor.execute('DELETE FROM videos_metadata WHERE id = ?', [(i,) for i in ids])
    conn.commit()
    deleted_count = cursor.rowcount
    conn.close()
    return deleted_count

main.py

if any(arg in ["--list", "--filter", "--search", "--delete"] for arg in sys.argv):
        args_cli = arg_parser_cli()     # Passing cli_interface one
        if args_cli.list:
            list_videos()
        elif args_cli.search:
            search_by_title(args_cli.search)
        elif args_cli.filter:
            search_by_filter(args_cli.filter)
        elif args_cli.delete:
            # Condition if delete comes, none other will come
            matches = find_entries_by_keyword(args_cli.delete)
            if not matches:
                print("❌ No entries found with that keyword.")
            else:
                print("Found entries:")
                for match in matches:
                    print(f"ID: {match[0]}, Title: {match[1]}, Duration: {match[2]}")
                confirm = input(f"Are you sure want to delete these {len(matches)} entries? (y/n): ")
                if confirm.lower() in ('y', 'yes'):
                    deleted = delete_db([row[0] for row in matches])
                    print(f"✅ Deleted {deleted} entries.")
                else:
                    print("❎ Deletion cancelled.")
        else:

Phase 7 - GUI Interface - Tkinter

gui_interface.py

import tkinter as tk
from tkinter import ttk, messagebox

from metadata.fetch_metadata_db import fetch_db

def start_gui():
    root = tk.Tk() # Create the main window
    root.title("Video Metadata Manager") # Set the window title
    root.geometry("600x400") # Set the window size

    tk.Label(root, text="Search by Keyword", font=("Arial", 16)).pack(pady=10) # Creates a label (text on the window)

    keyword_var = tk.StringVar() # Creates a special variable like a box to store text entered by the user
    tk.Entry(root, textvariable=keyword_var, width=50).pack(pady=10) # Adds an input box where user can type a word
    # Input box is connected to keyword_var so we can see what user types

    result_box = tk.Listbox(root, width=80, height=10) # Creates a Listbox where search results will be shown.
    result_box.pack(pady=10) # Adds the Listbox to the window with some padding

    # Define search logic
    def search():
        keyword = keyword_var.get().lower() # gets text from the input field
        result_box.delete(0, tk.END) # Delete old results - clear listbox before showing results

        data = fetch_db()
        for row in data:
            title = row[1]
            if keyword in title.lower():
                result_box.insert(tk.END, f"🎥 {title} | Duration: {row[2]} | Format: {row[3]}") # tk.END means add it to end of the list

    tk.Button(root, text="Search", command=search).pack()

    root.mainloop() # Start the GUI event loop so it waits for user actions

main.py

# GUI
    if "--gui" in sys.argv:
        start_gui()
        exit()

Adding More Buttons in GUI

Delete

def delete():
        selected = result_box.curselection() # Get the index of the selected item in the Listbox # It is a tuple of selected items
        if not selected:
            messagebox.showwarning("No Selection", "Please select a video to delete.")
            return

        # Need to fetch id from the selected item to call delete function
        selected_text = result_box.get(selected[0]) # fetch the selected text - select first item from selected tuple
        id = int(selected_text.split('|')[0].replace("🎥","").strip()) # Extract the title from whole line

        # Call delete function
        delete_count = delete_db(id)
        if delete_count > 0:
            messagebox.showinfo("Success", f"✅ Deleted {delete_count} video(s) with ID: {id}.")
            search() # Refresh list
        else:
            messagebox.showerror("Error", f"❌ No video found with ID: {id}.")

Download

Needed to add URL too inside DB for videos because download function requires URL to download

def download():
        selection = result_box.curselection()
        if not selection:
            messagebox.showwarning("No Selection", "Please select a video to download.")
            return

        # fetch the selected text and extract id from it
        selected_text = result_box.get(selection[0])
        id = selected_text.split("|")[0].replace("🎥", "").strip()

        # Now we need to fetch url from looking into db with help of id to download
        data = fetch_db()
        print("Data:", data)
        url = None

        for row in data:
            if str(row[0]) == id:
                url = row[4]
                break

        if not url:
            print("❌ No URL found for the selected video.")
            return

        # download the video
        try:
            download_video(url)
            messagebox.showinfo("Success", f"✅ Downloaded video with ID: {id}.")
        except Exception as e:
            messagebox.showerror("Error", f"❌ Failed to download video: {str(e)}")

Clear All Function

def clear_all():
        result_box.delete(0, tk.END) # Delete old results - clear listbox before showing results
        keyword_var.set("")

Phase 8 - Logging with python logging module

logging.py

import logging
import os 
from datetime import datetime

# Create log directory if not exists
os.makedirs("Logging/logs", exist_ok=True)

# Log filename with date
log_filename = datetime.now().strftime("Logging/logs/log_%d-%m-%Y.log")

# Setting up the logging configuration (for the whole file/app):
logging.basicConfig(
    filename=log_filename,  # Save logs in a file
    level=logging.INFO, # Only log info and above (DEBUG, INFO, WARNING, ERROR, CRITICAL)
    format='%(asctime)s - %(levelname)s - %(message)s' # Format of the log messages
)

def get_logger():
    return logging.getLogger(__name__)

Inside GUI write these

from Logging.logging import get_logger

logger = get_logger()
# Add Everywhere

logger.error(f"Failed to download video with ID: {id}. Error: {str(e)}")

logger.info(f"Download action performed for ID: {id}")

Phase 9 - Using Advance Logging using Loguru

# for profession grade logging, log rotations and much more
from loguru import logger
import os
from datetime import datetime

LOG_DIR = "logs"
os.makedirs(LOG_DIR, exist_ok=True)

# # os.path.basename(__file__) → Extracts just the file name from the path:||| os.path.splitext(...)[0] → Removes the extension (.py) and returns only the base name:
# FILE_NAME = os.path.splitext(os.path.basename(__file__))[0] # __file__ → Gives the full path of the current script

logger.add(
    os.path.join(LOG_DIR, f"app.log"),
    rotation="10 MB",  # Rotate log file when it reaches 10 MB
    retention="7 days",  # Keep logs for 7 days
    level="INFO",  # Log level
    compression="zip",  # Compress rotated logs
    enqueue=True,
    format="{time} {level} {message}",  # Log format
)

logger.info("Logger initialized successfully.")

Adding Log Viewer Button in GUI

def show_logs():
        log_window = tk.Toplevel(root) # Create a new window for logs
        log_window.title(" Application Logs") # Set the title of the log window
        log_window.geometry("700x400") # Set the size of the log window

        log_text = tk.Text(log_window, wrap=tk.WORD) # Create a Text widget to display logs
        log_text.pack(expand=True, fill="both") # Add the Text widget to the log window

        def update_logs():
            try:
                with open('logs/app.log', 'r') as f:
                    content = f.read()
                    log_text.delete("1.0", tk.END)  # was using 0 in listbox but in Text, we need to use "1.0" means delete from string 1 i.e. first line
                    log_text.insert(tk.END, content) # Read the log file and insert its content into the Text widget
                    log_text.see(tk.END) # Scroll to the end of the Text widget
            except Exception as e:
                log_text.delete("1.0", tk.END)
                log_text.insert(tk.END, f"❌ Error reading log file: {str(e)}")
            log_window.after(3000, update_logs) # Schedule the function to run every second

        update_logs()
 # Separate frame for the logs button (bottom row)
    logs_frame = tk.Frame(root)
    logs_frame.pack(pady=(0, 10))
    tk.Button(logs_frame, text="Show Logs", command=show_logs).pack()

Phase 10 - Testing using PyTest

import pytest


from metadata.fetch_metadata_db import fetch_db
from metadata.delete_from_db import delete_db
from metadata.download import download_video

def test_fetch_db():
    data = fetch_db()
    assert isinstance(data, list) # "Data should be a list" -- isinstance() is used to check type
    assert len(data) >= 0

def test_delete_db_with_invalid_id():
    result = delete_db([-1]) # it is in [(i,) for i in ids] format so wrap inside []
    assert result == 0 # Should raise an error or return 0 for invalid ID

def test_delete_db_with_valid_id():
    # Assuming there is a video with ID 1 in the database for testing purposes
    result = delete_db([1])
    assert result >= 0  # Should return the number of deleted rows, which should be >= 0

def test_download_video_with_invalid_url():
    invalid_url = "http://invalid-url.com/video.mp4"
    with pytest.raises(Exception):  # This is yt_dlp error so import from its utils
        download_video(invalid_url)  # This should raise an ValueError

Run using this Command

pytest tests/

Integration Testing into CI with GitHub Actions

inside .github/workspace/python-tests.yml

name: Python Tests

on:
  push:
    branches:
      - master
  pull_request:
    branches:
      - master

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
      - name: Checkout code
        uses: actions/checkout@v3

      - name: Set up Python 3.12
        uses: actions/setup-python@v4
        with:
          python-version: 3.12

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
          pip install pytest
          pip install yt-dlp
          pip install loguru

      - name: Run tests
        run: |
          pytest --maxfail=1 --disable-warnings -q

Build Stage

Creating Dockerfile

FROM python:3.12-slim

# install dependencies needed for tkinter GUI # \ does is split command into different lines 
RUN apt-get update && apt-get install -y \
    python3-tk \
    && rm -rf /var/lib/apt/lists/*  

WORKDIR /app

COPY requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# --no-cache-dir helps removing cache after install to lower image size

COPY . .

CMD ["python3", "main.py", "--gui"]

GUI Cant run inside docker, so we need to create a CLI flag for docker and condition if we pass --cli and --url so run inside docker otherwise when --gui comes then on local

args = parse_args_extract()
        # GUI mode ---------- for docker -------------
        if not args.cli:
            print("🎨 Launching GUI mode...")
            start_gui()
            exit()
        else:
            print("💻 CLI extract/download mode active...")
        # ---------------------------------------------

Adding CICD GitHub Actions

make a file ci.yml inside .github folder

# .github/workflows/ci.yaml
name: CI

on:
  push:
    branches:
      - master

jobs:
  build-and-push:
    runs-on: ubuntu-latest

    steps:
    - name: Checkout Code
      uses: actions/checkout@v3

    - name: Set up Docker Buildx
      uses: docker/setup-buildx-action@v3

    - name: Login to DockerHub
      uses: docker/login-action@v3
      with:
        username: ${{ secrets.DOCKER_USERNAME }}
        password: ${{ secrets.DOCKER_TOKEN }}

    - name: Build and Push Docker Image
      uses: docker/build-push-action@v5
      with:
        context: .
        push: true
        tags: ${{ secrets.DOCKER_USERNAME }}/yt_video_dwnlder:latest

Phase 11 - Monitoring using Prometheus

create a small background HTTP server inside your app to expose metrics

Set up a light weight Prometheus exporter

pip install prometheus_client
# Creating and exposing metrics over HTTP
from prometheus_client import start_http_server, Summary
import threading
import time
import random

# Create a metric to track time spent and requests made.
REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request')

def start_metrics_server():
    # Start up the server to expose the metrics.
    start_http_server(8000)  # Start exposing on port 8000
    print("Prometheus metrics server started on port 8000")

# Simulate some work to record metrics
@REQUEST_TIME.time() # measures how long process_request() takes.
def process_request():
    time.sleep(random.random())

def run_metrics_server_in_thread(): # Run the metrics server in a separate thread
    metrics_thread = threading.Thread(target=start_metrics_server)
    metrics_thread.daemon = True
    metrics_thread.start()

Adding Counter to fetch metrics in realtime from GUI

# Creating and exposing metrics over HTTP
from prometheus_client import start_http_server, Summary, Counter, Histogram
import threading
import time
import random

# Create a metric to track time spent and requests made.
REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request')

# Action counters #* Using Counter to INCREMENT different function called
search_counter = Counter("search_requests_total", "Total number of search actions performed")
download_counter = Counter("download_requests_total", "Total number of download actions performed")
delete_counter = Counter("delete_requests_total", "Total number of delete actions performed")

# Download duration histogram - measures time to download
download_duration_histogram = Histogram("download_duration_seconds", "Time taken to download video")

def start_metrics_server():
    # Start up the server to expose the metrics.
    start_http_server(8000)  # Start exposing on port 8000
    print("Prometheus metrics server started on port 8000")

# Simulate some work to record metrics
@REQUEST_TIME.time() # measures how long process_request() takes.
def process_request():
    time.sleep(random.random())

def run_metrics_server_in_thread(): # Run the metrics server in a separate thread
    metrics_thread = threading.Thread(target=start_metrics_server)
    metrics_thread.daemon = True
    metrics_thread.start()

Running Prometheus inside Docker

Run using its official image

docker run -d \
  --name prometheus \
  -p 9090:9090 \
  -v $(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml \
  prom/prometheus

Similarly installing Grafana using docker

docker run -d \
  --name=grafana \
  -p 3000:3000 \
  grafana/grafana

Need to create same network for Prometheus and Grafana to work together

docker network create monitoring
docker stop prometheus && docker rm prometheus

docker run -d \
  --name=prometheus \
  --network=monitoring \
  -p 9090:9090 \
  -v $(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml \
  prom/prometheus
docker stop grafana && docker rm grafana

docker run -d \
  --name=grafana \
  --network=monitoring \
  -p 3000:3000 \
  grafana/grafana

Now in Grafana UI:

10
Subscribe to my newsletter

Read articles from Daksh Sawhney directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Daksh Sawhney
Daksh Sawhney

Aspiring DevOps & DevSecOps Engineer | Automating Infra with Terraform & Ansible | Kubernetes Enthusiast | Building Scalable Pipelines 🤷‍♂️