YouTube Media Downloader Pipeline


Tired of Paying for YouTube Premium? I Built My Own DevOps-Powered Media Downloader
What started as a simple idea — "Why not download videos myself?" — quickly evolved into a fully containerized, monitored, and CI/CD-automated pipeline that does way more than just download.
From a desktop GUI built in Python, to logging with Loguru, observability with Prometheus + Grafana, and seamless GitHub Actions automation — I designed a full-stack system that mimics a production-grade DevOps environment.
This post walks you through how I architected the project, the tech I used, and how you can deploy it in just one command with Docker.
TL;DR
Built my own GUI-based YouTube media manager
Used SQLite for backend metadata storage
Added real-time monitoring with Prometheus & Grafana
Logged everything using Loguru with log rotation
Packaged entire pipeline into Docker containers
Automated builds & tests using GitHub Actions
Folder Structure
media-downloader-pipeline/
├── downloads/
└── scripts/
└── download.sh
Install yt-dlp
sudo apt install yt-dlp
sudo curl -L https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp -o /usr/local/bin/yt-dlp
sudo chmod a+rx /usr/local/bin/yt-dlp
yt-dlp --version
yt-dlp is a command-line tool used to **download videos, audio, and playlists** from websites like YouTube and many others
Phase 1 - Shell Scripting
Write download.sh
#!/bin/bash
read -p "Enter video URL: " url
yt-dlp -o "../downloads/%(title)s.%(ext)s" "$url"
cd ../downloads
# Sorting files based on extension
for file in *; do
ext="${file##*.}"
mkdir -p "$ext"
mv "$file" "$ext/" 2>/dev/null # error messages go to null
done
Phase 2 - Python - Metadata Capturer
Fetching Metadata of video without even downloading it with help of yt-dlp in Python
yt-dlp package not listing Inside Youtube_Downloader_project folder, write ==code .== on terminal
from yt_dlp import YoutubeDL
import os
import json
from datetime import timedelta
url = input("Enter Url: ")
# dictionary called ydl_opts that holds options for the YoutubeDL object
yds_opts = {
'quiet': True, # suppresses most console output to keep things clean
'skip_download': True, # tells it not to download the actual video, just fetch info.
'dump_single_json': True # tells it to retrieve and output all the video metadata as JSON.
}
# Creates a YoutubeDL instance using the options ydl_opts
with YoutubeDL(yds_opts) as ydl:
# try-catch for invalid url's
try:
info = ydl.extract_info(url, download=False)
except Exception as e:
print(f"Error: {e}")
exit(1)
# print("\n")
# print(info)
# print("\n")
print("\n")
print("Title: ", info.get('title'))
# Show Duration in HH:MM:SS
duration = str(timedelta(seconds=info.get('duration')))
print("Duration:", duration)
print("Format: ", info.get('ext'))
print("Tags:", info.get('tags'))
#! Store these info into video_metadata.json
# Need to create own object what we need to store to metadata_file
metadata = {
"Title": info.get("title"),
"Duration": str(timedelta(seconds=info.get('duration'))),
"Format": info.get("ext"),
}
# Check if video_metadata.json is present or not, if not create it
metadata_file = "video_metadata.json"
if not os.path.exists(metadata_file):
with open(metadata_file, "w") as f:
json.dump([], f, indent=4)
# Check if it is not empty
with open(metadata_file, "r") as f:
try:
data = json.load(f)
except json.JSONDecodeError:
data = []
# Append only works on lists
data.append(metadata)
# Now push the data into metadata_file
with open(metadata_file, "w") as f:
json.dump(data, f, indent=4)
print("✅ Metadata saved to video_metadata.json")
Phase 3 - SQLite setup
import sqlite3
# opens or creates the DB file - Connect to database (creates file if not exists)
conn = sqlite3.connect('videos.db')
# Create a cursor to execute SQL commands
cursor = conn.cursor()
# sends SQL commands to create the table - Create table if it does not exist already
cursor.execute('''
CREATE TABLE IF NOT EXISTS videos_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
duration TEXT,
format TEXT
)
''')
print("Table 'videos_metadata' created or already exists.")
conn.commit() # saves changes.
conn.close() # closes connection safely.
Insert to DB
from yt_dlp import YoutubeDL
import sqlite3
from datetime import timedelta
url = input("Enter url: ")
# Create options for yt_dlp
yts_ops = {
'quiet': True,
'skip_download': True,
'dump_single_json': True
}
# Create instance with help of these options
with YoutubeDL(yts_ops) as ydl:
# Verify if url is correct or not
try:
info = ydl.extract_info(url, download=False)
except Exception as e:
print(f"Error: {e}")
exit(1)
duration = str(timedelta(seconds=info.get("duration")))
# Connect to sqlite
conn = sqlite3.connect('videos.db')
cursor = conn.cursor()
# Insert into table
cursor.execute('''
INSERT INTO videos_metadata (title, duration, format)
VALUES (?, ?, ?)
''', (info.get('title'), duration, info.get('ext'))
)
# Save and close
conn.commit()
conn.close()
print("✅ Metadata inserted into SQLite database.")
Fetch items from db
import sqlite3
# Connect to db
conn = sqlite3.connect('videos.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM videos_metadata')
rows = cursor.fetchall()
# print(rows) # It is a list of tuples
for row in rows:
id, title, duration, format = row # assigning values of tuple
print(f"ID: {id}")
print(f"Title: {title}")
print(f"Duration: {duration}")
print(f"Format: {format}")
print("-" * 30)
# Close the Database
conn.close()
Phase 4 - Setup CLI Interface with argparse
Create cli_
interface.py
:
import argparse
import sqlite3
def list_videos():
conn = sqlite3.connect("videos.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM videos_metadata")
rows = cursor.fetchall()
for row in rows:
id, title, duration, fmt = row
print(f"ID: {id} | Title: {title} | Duration: {duration} | Format: {fmt}")
conn.close()
def search_by_title(keyword):
conn = sqlite3.connect("videos.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM videos_metadata WHERE title LIKE ?", ('%' + keyword + '%',))
rows = cursor.fetchall()
for row in rows:
id, title, duration, fmt = row
print(f"ID: {id} | Title: {title} | Duration: {duration} | Format: {fmt}")
conn.close()
def filter_by_format(fmt):
conn = sqlite3.connect("videos.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM videos_metadata WHERE format = ?", (fmt,))
rows = cursor.fetchall()
for row in rows:
id, title, duration, fmt = row
print(f"ID: {id} | Title: {title} | Duration: {duration} | Format: {fmt}")
conn.close()
# Set up argparse CLI
parser = argparse.ArgumentParser(description="🎬 Manage downloaded video metadata")
parser.add_argument('--list', action='store_true', help='List all stored videos')
parser.add_argument('--search', type=str, help='Search videos by title')
parser.add_argument('--filter-format', type=str, help='Filter videos by format')
args = parser.parse_args()
# CLI Actions
if args.list:
list_videos()
elif args.search:
search_by_title(args.search)
elif args.filter_format:
filter_by_format(args.filter_format)
else:
print("❌ No valid option provided. Use --help for available commands.")
Phase 5 - Writing Download function and adding download tag in extract.py
from yt_dlp import YoutubeDL
import os
def download_video(url):
output_dir = "downloads"
if not os.path.exists(output_dir):
os.makedirs(output_dir) # creating folder by os
# Create Options
ydl_opts = {
'quiet': False,
'outtmpl': os.path.join(output_dir, '%(title)s.%(ext)s'),
'no_check_certificate': True,
'format': 'best'
}
# Create instance
with YoutubeDL(ydl_opts) as ydl:
try:
ydl.download([url])
print(f"✅ Download completed: saved in '{output_dir}'")
except Exception as e:
print(f"❌ Download failed: {e}")
from yt_dlp import YoutubeDL
import os
import json
from datetime import timedelta
import argparse
from download import download_video
# Creating CLI Arguments
def parse_args():
parser = argparse.ArgumentParser(description="Extract video metadata from a URL using yt-dlp.")
parser.add_argument(
'--save',
action="store_true",
help="Save metadata to video_metadata.json"
)
parser.add_argument(
'--download',
action="store_true",
help="Download the video after extracting metadata"
)
parser.add_argument(
'--url',
required=False,
type=str,
help="URL of the video to extract metadata from (optional, if not provided will use input prompt)"
)
return parser.parse_args()
args = parse_args()
url = args.url or input("Enter Url: ")
# dictionary called ydl_opts that holds options for the YoutubeDL object
yds_opts = {
'quiet': True, # suppresses most console output to keep things clean
'skip_download': True, # tells it not to download the actual video, just fetch info.
'dump_single_json': True # tells it to retrieve and output all the video metadata as JSON.
}
# Creates a YoutubeDL instance using the options ydl_opts
with YoutubeDL(yds_opts) as ydl:
# try-catch for invalid url's
try:
info = ydl.extract_info(url, download=False)
except Exception as e:
print(f"Error: {e}")
exit(1)
print("\n")
print("Title: ", info.get('title'))
# Show Duration in HH:MM:SS
duration = str(timedelta(seconds=info.get('duration')))
print("Duration:", duration)
print("Format: ", info.get('ext'))
print("Tags:", info.get('tags'))
#! Store these info into video_metadata.json
# Need to create own object what we need to store to metadata_file
metadata = {
"Title": info.get("title"),
"Duration": str(timedelta(seconds=info.get('duration'))),
"Format": info.get("ext"),
}
# Check if video_metadata.json is present or not, if not create it
if args.save:
metadata_file = "video_metadata.json"
if not os.path.exists(metadata_file):
with open(metadata_file, "w") as f:
json.dump([], f, indent=4)
# Check if it is not empty
with open(metadata_file, "r") as f:
try:
data = json.load(f)
except json.JSONDecodeError:
data = []
# Append only works on lists
data.append(metadata)
# Now push the data into metadata_file
with open(metadata_file, "w") as f:
json.dump(data, f, indent=4)
print("✅ Metadata saved to video_metadata.json")
if args.download:
download_video(url)
Phase 6 - Combining all fxns into main.py
from yt_dlp import YoutubeDL
import os
from datetime import timedelta
import argparse
# Extract.py
from metadata.extract import parse_args_extract # arg_parse fxn
from metadata.extract import ydl_instance_creation # extract info
from metadata.extract import save_summary # to save summary inside json
# DB
from metadata.db_setup import db_setup # Table setup
from metadata.insert_to_db import insert_db # Insert into DB
from metadata.fetch_metadata_db import fetch_db # fetch data from db
# cli_interface
from metadata.cli_interface import arg_parser_cli
from metadata.cli_interface import list_videos
from metadata.cli_interface import search_by_title
from metadata.cli_interface import search_by_filter
if __name__ == "__main__":
args = parse_args_extract()
url = args.url or input("Enter Url: ")
info = ydl_instance_creation(url)
if not info:
print("❌ Failed to extract video info.")
exit(1)
duration = str(timedelta(seconds=info.get("duration")))
save_summary(info, args)
print("\n")
db_setup()
# insert_db(info, duration)
fetch_db()
print("\n")
args_cli = arg_parser_cli()
if args_cli.list:
list_videos()
elif args_cli.search:
search_by_title(args_cli.search)
elif args_cli.filter:
search_by_filter(args_cli.filter)
else:
print("❌ No valid option provided. Use --help for available commands.")
Error coming because using two parse_args
need to give condition
if __name__ == "__main__":
# Need to give condition for passing two parse_args fxns
if any(arg in ["--list", "--filter", "--search"] for arg in sys.argv):
args_cli = arg_parser_cli() # Passing cli_interface one
if args_cli.list:
list_videos()
elif args_cli.search:
search_by_title(args_cli.search)
elif args_cli.filter:
search_by_filter(args_cli.filter)
else:
print("❌ No valid option provided. Use --help for available commands.")
else:
args = parse_args_extract()
url = args.url or input("Enter Url: ")
info = ydl_instance_creation(url)
if not info:
print("❌ Failed to extract video info.")
exit(1)
duration = str(timedelta(seconds=info.get("duration")))
save_summary(info, args)
print("\n")
db_setup()
if args.add_to_db:
insert_db(info, duration)
fetch_db()
print("\n")
if args.download:
download_video(url)
Modifying The download.sh file
#!/bin/bash
# read -p "Enter video URL: " url
# yt-dlp -o "../downloads/%(title)s.%(ext)s" "$url"
DOWNLOAD_DIR="downloads"
# Sorting files based on extension
for file in "$DOWNLOAD_DIR"/*; do
if [[ -f "$file" ]]; then
ext="${file##*.}"
# Add date folder inside extention for proper record
TODAY=$(date +"%d-%m-%Y")
mkdir -p "$DOWNLOAD_DIR/$ext/$TODAY" # Create Date Folder
mv "$file" "$DOWNLOAD_DIR/$ext/$TODAY/" 2>/dev/null # Move the file
fi
done
echo "Files sorted by extension into directories."
Adding this to download.py file
from yt_dlp import YoutubeDL
import os
import subprocess
def download_video(url):
output_dir = "downloads"
if not os.path.exists(output_dir):
os.makedirs(output_dir) # creating folder by os
# Create Options
ydl_opts = {
'quiet': False,
'outtmpl': os.path.join(output_dir, '%(title)s.%(ext)s'),
'no_check_certificate': True,
'format': 'best'
}
# Create instance
with YoutubeDL(ydl_opts) as ydl:
try:
print(f"🔄 Downloading video from {url}...")
ydl.download([url])
print(f"✅ Download completed: saved in '{output_dir}'")
except Exception as e:
print(f"❌ Download failed: {e}")
# Automatically call download.sh for moving files into their respective folders
try:
subprocess.run(["./scripts/./download.sh"], check=True)
print("✅ Downloaded videos sorted successfully by extension and date.")
except subprocess.CalledProcessError as e:
print(f"❌ Failed to sort downloaded videos: {e}")
if __name__ == "__main__":
video_url = input("Enter the video URL to download: ")
download_video(video_url)
Delete Tag Functionality
delete_from_db.py
from yt_dlp import YoutubeDL
import sqlite3
def find_entries_by_keyword(keyword):
conn = sqlite3.connect("videos.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM videos_metadata WHERE LOWER(title) LIKE ?", ('%' + keyword.lower() + '%',))
results = cursor.fetchall()
conn.close()
return results
def delete_db(ids):
conn = sqlite3.connect('videos.db')
cursor = conn.cursor()
cursor.execute('DELETE FROM videos_metadata WHERE id = ?', [(i,) for i in ids])
conn.commit()
deleted_count = cursor.rowcount
conn.close()
return deleted_count
if any(arg in ["--list", "--filter", "--search", "--delete"] for arg in sys.argv):
args_cli = arg_parser_cli() # Passing cli_interface one
if args_cli.list:
list_videos()
elif args_cli.search:
search_by_title(args_cli.search)
elif args_cli.filter:
search_by_filter(args_cli.filter)
elif args_cli.delete:
# Condition if delete comes, none other will come
matches = find_entries_by_keyword(args_cli.delete)
if not matches:
print("❌ No entries found with that keyword.")
else:
print("Found entries:")
for match in matches:
print(f"ID: {match[0]}, Title: {match[1]}, Duration: {match[2]}")
confirm = input(f"Are you sure want to delete these {len(matches)} entries? (y/n): ")
if confirm.lower() in ('y', 'yes'):
deleted = delete_db([row[0] for row in matches])
print(f"✅ Deleted {deleted} entries.")
else:
print("❎ Deletion cancelled.")
else:
Phase 7 - GUI Interface - Tkinter
gui_interface.py
import tkinter as tk
from tkinter import ttk, messagebox
from metadata.fetch_metadata_db import fetch_db
def start_gui():
root = tk.Tk() # Create the main window
root.title("Video Metadata Manager") # Set the window title
root.geometry("600x400") # Set the window size
tk.Label(root, text="Search by Keyword", font=("Arial", 16)).pack(pady=10) # Creates a label (text on the window)
keyword_var = tk.StringVar() # Creates a special variable like a box to store text entered by the user
tk.Entry(root, textvariable=keyword_var, width=50).pack(pady=10) # Adds an input box where user can type a word
# Input box is connected to keyword_var so we can see what user types
result_box = tk.Listbox(root, width=80, height=10) # Creates a Listbox where search results will be shown.
result_box.pack(pady=10) # Adds the Listbox to the window with some padding
# Define search logic
def search():
keyword = keyword_var.get().lower() # gets text from the input field
result_box.delete(0, tk.END) # Delete old results - clear listbox before showing results
data = fetch_db()
for row in data:
title = row[1]
if keyword in title.lower():
result_box.insert(tk.END, f"🎥 {title} | Duration: {row[2]} | Format: {row[3]}") # tk.END means add it to end of the list
tk.Button(root, text="Search", command=search).pack()
root.mainloop() # Start the GUI event loop so it waits for user actions
# GUI
if "--gui" in sys.argv:
start_gui()
exit()
Adding More Buttons in GUI
Delete
def delete():
selected = result_box.curselection() # Get the index of the selected item in the Listbox # It is a tuple of selected items
if not selected:
messagebox.showwarning("No Selection", "Please select a video to delete.")
return
# Need to fetch id from the selected item to call delete function
selected_text = result_box.get(selected[0]) # fetch the selected text - select first item from selected tuple
id = int(selected_text.split('|')[0].replace("🎥","").strip()) # Extract the title from whole line
# Call delete function
delete_count = delete_db(id)
if delete_count > 0:
messagebox.showinfo("Success", f"✅ Deleted {delete_count} video(s) with ID: {id}.")
search() # Refresh list
else:
messagebox.showerror("Error", f"❌ No video found with ID: {id}.")
Download
Needed to add URL too inside DB for videos because download function requires URL to download
def download():
selection = result_box.curselection()
if not selection:
messagebox.showwarning("No Selection", "Please select a video to download.")
return
# fetch the selected text and extract id from it
selected_text = result_box.get(selection[0])
id = selected_text.split("|")[0].replace("🎥", "").strip()
# Now we need to fetch url from looking into db with help of id to download
data = fetch_db()
print("Data:", data)
url = None
for row in data:
if str(row[0]) == id:
url = row[4]
break
if not url:
print("❌ No URL found for the selected video.")
return
# download the video
try:
download_video(url)
messagebox.showinfo("Success", f"✅ Downloaded video with ID: {id}.")
except Exception as e:
messagebox.showerror("Error", f"❌ Failed to download video: {str(e)}")
Clear All Function
def clear_all():
result_box.delete(0, tk.END) # Delete old results - clear listbox before showing results
keyword_var.set("")
Phase 8 - Logging with python logging module
import logging
import os
from datetime import datetime
# Create log directory if not exists
os.makedirs("Logging/logs", exist_ok=True)
# Log filename with date
log_filename = datetime.now().strftime("Logging/logs/log_%d-%m-%Y.log")
# Setting up the logging configuration (for the whole file/app):
logging.basicConfig(
filename=log_filename, # Save logs in a file
level=logging.INFO, # Only log info and above (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format='%(asctime)s - %(levelname)s - %(message)s' # Format of the log messages
)
def get_logger():
return logging.getLogger(__name__)
Inside GUI write these
from Logging.logging import get_logger
logger = get_logger()
# Add Everywhere
logger.error(f"Failed to download video with ID: {id}. Error: {str(e)}")
logger.info(f"Download action performed for ID: {id}")
Phase 9 - Using Advance Logging using Loguru
# for profession grade logging, log rotations and much more
from loguru import logger
import os
from datetime import datetime
LOG_DIR = "logs"
os.makedirs(LOG_DIR, exist_ok=True)
# # os.path.basename(__file__) → Extracts just the file name from the path:||| os.path.splitext(...)[0] → Removes the extension (.py) and returns only the base name:
# FILE_NAME = os.path.splitext(os.path.basename(__file__))[0] # __file__ → Gives the full path of the current script
logger.add(
os.path.join(LOG_DIR, f"app.log"),
rotation="10 MB", # Rotate log file when it reaches 10 MB
retention="7 days", # Keep logs for 7 days
level="INFO", # Log level
compression="zip", # Compress rotated logs
enqueue=True,
format="{time} {level} {message}", # Log format
)
logger.info("Logger initialized successfully.")
Adding Log Viewer Button in GUI
def show_logs():
log_window = tk.Toplevel(root) # Create a new window for logs
log_window.title(" Application Logs") # Set the title of the log window
log_window.geometry("700x400") # Set the size of the log window
log_text = tk.Text(log_window, wrap=tk.WORD) # Create a Text widget to display logs
log_text.pack(expand=True, fill="both") # Add the Text widget to the log window
def update_logs():
try:
with open('logs/app.log', 'r') as f:
content = f.read()
log_text.delete("1.0", tk.END) # was using 0 in listbox but in Text, we need to use "1.0" means delete from string 1 i.e. first line
log_text.insert(tk.END, content) # Read the log file and insert its content into the Text widget
log_text.see(tk.END) # Scroll to the end of the Text widget
except Exception as e:
log_text.delete("1.0", tk.END)
log_text.insert(tk.END, f"❌ Error reading log file: {str(e)}")
log_window.after(3000, update_logs) # Schedule the function to run every second
update_logs()
# Separate frame for the logs button (bottom row)
logs_frame = tk.Frame(root)
logs_frame.pack(pady=(0, 10))
tk.Button(logs_frame, text="Show Logs", command=show_logs).pack()
Phase 10 - Testing using PyTest
import pytest
from metadata.fetch_metadata_db import fetch_db
from metadata.delete_from_db import delete_db
from metadata.download import download_video
def test_fetch_db():
data = fetch_db()
assert isinstance(data, list) # "Data should be a list" -- isinstance() is used to check type
assert len(data) >= 0
def test_delete_db_with_invalid_id():
result = delete_db([-1]) # it is in [(i,) for i in ids] format so wrap inside []
assert result == 0 # Should raise an error or return 0 for invalid ID
def test_delete_db_with_valid_id():
# Assuming there is a video with ID 1 in the database for testing purposes
result = delete_db([1])
assert result >= 0 # Should return the number of deleted rows, which should be >= 0
def test_download_video_with_invalid_url():
invalid_url = "http://invalid-url.com/video.mp4"
with pytest.raises(Exception): # This is yt_dlp error so import from its utils
download_video(invalid_url) # This should raise an ValueError
Run using this Command
pytest tests/
Integration Testing into CI with GitHub Actions
inside .github/workspace/python-tests.yml
name: Python Tests
on:
push:
branches:
- master
pull_request:
branches:
- master
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python 3.12
uses: actions/setup-python@v4
with:
python-version: 3.12
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest
pip install yt-dlp
pip install loguru
- name: Run tests
run: |
pytest --maxfail=1 --disable-warnings -q
Build Stage
Creating Dockerfile
FROM python:3.12-slim
# install dependencies needed for tkinter GUI # \ does is split command into different lines
RUN apt-get update && apt-get install -y \
python3-tk \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# --no-cache-dir helps removing cache after install to lower image size
COPY . .
CMD ["python3", "main.py", "--gui"]
GUI Cant run inside docker, so we need to create a CLI flag for docker and condition if we pass --cli and --url so run inside docker otherwise when --gui comes then on local
args = parse_args_extract()
# GUI mode ---------- for docker -------------
if not args.cli:
print("🎨 Launching GUI mode...")
start_gui()
exit()
else:
print("💻 CLI extract/download mode active...")
# ---------------------------------------------
Adding CICD GitHub Actions
make a file ci.yml inside .github folder
# .github/workflows/ci.yaml
name: CI
on:
push:
branches:
- master
jobs:
build-and-push:
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_TOKEN }}
- name: Build and Push Docker Image
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: ${{ secrets.DOCKER_USERNAME }}/yt_video_dwnlder:latest
Phase 11 - Monitoring using Prometheus
create a small background HTTP server inside your app to expose metrics
Set up a light weight Prometheus exporter
pip install prometheus_client
# Creating and exposing metrics over HTTP
from prometheus_client import start_http_server, Summary
import threading
import time
import random
# Create a metric to track time spent and requests made.
REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request')
def start_metrics_server():
# Start up the server to expose the metrics.
start_http_server(8000) # Start exposing on port 8000
print("Prometheus metrics server started on port 8000")
# Simulate some work to record metrics
@REQUEST_TIME.time() # measures how long process_request() takes.
def process_request():
time.sleep(random.random())
def run_metrics_server_in_thread(): # Run the metrics server in a separate thread
metrics_thread = threading.Thread(target=start_metrics_server)
metrics_thread.daemon = True
metrics_thread.start()
Adding Counter to fetch metrics in realtime from GUI
# Creating and exposing metrics over HTTP
from prometheus_client import start_http_server, Summary, Counter, Histogram
import threading
import time
import random
# Create a metric to track time spent and requests made.
REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request')
# Action counters #* Using Counter to INCREMENT different function called
search_counter = Counter("search_requests_total", "Total number of search actions performed")
download_counter = Counter("download_requests_total", "Total number of download actions performed")
delete_counter = Counter("delete_requests_total", "Total number of delete actions performed")
# Download duration histogram - measures time to download
download_duration_histogram = Histogram("download_duration_seconds", "Time taken to download video")
def start_metrics_server():
# Start up the server to expose the metrics.
start_http_server(8000) # Start exposing on port 8000
print("Prometheus metrics server started on port 8000")
# Simulate some work to record metrics
@REQUEST_TIME.time() # measures how long process_request() takes.
def process_request():
time.sleep(random.random())
def run_metrics_server_in_thread(): # Run the metrics server in a separate thread
metrics_thread = threading.Thread(target=start_metrics_server)
metrics_thread.daemon = True
metrics_thread.start()
Running Prometheus inside Docker
Run using its official image
docker run -d \
--name prometheus \
-p 9090:9090 \
-v $(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml \
prom/prometheus
Similarly installing Grafana using docker
docker run -d \
--name=grafana \
-p 3000:3000 \
grafana/grafana
Need to create same network for Prometheus and Grafana to work together
docker network create monitoring
docker stop prometheus && docker rm prometheus
docker run -d \
--name=prometheus \
--network=monitoring \
-p 9090:9090 \
-v $(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml \
prom/prometheus
docker stop grafana && docker rm grafana
docker run -d \
--name=grafana \
--network=monitoring \
-p 3000:3000 \
grafana/grafana
Now in Grafana UI:
Go to http://localhost:3000
Default login:
admin
/admin
Go to Configuration → Data Sources
Choose Prometheus
Set URL as:
http://prometheus:9090
Subscribe to my newsletter
Read articles from Daksh Sawhney directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Daksh Sawhney
Daksh Sawhney
Aspiring DevOps & DevSecOps Engineer | Automating Infra with Terraform & Ansible | Kubernetes Enthusiast | Building Scalable Pipelines 🤷♂️