Building a Pro-Level Threaded File Downloader in Python: A Step-by-Step Evolution

Table of contents
- 🚀 From Simple Script to Production-Ready Tool
- 🧱 Stage 1: The Naive Threaded Downloader
- 🔧 Stage 2: Cleaning It Up
- 🧠 Stage 3: Using ThreadPoolExecutor
- 🎉 Stage 4: Real-Time Progress Bars with tqdm
- 🧾 Stage 5: Configurable Logging (Console or File)
- ✅ Final Version: Complete Pro-Grade Downloader
- 🔚 Recap
- 📦 What’s Next?
- 🙌 Final Thoughts

🚀 From Simple Script to Production-Ready Tool
Every developer eventually writes a file downloader. But not every developer upgrades it to handle:
Parallel downloads
Real-time progress bars
Configurable logging
Streamed downloads (for large files)
Graceful error handling
In this post, we’ll walk through the evolution of a threaded downloader in Python — from a basic script to a robust, professional-grade tool.
🧱 Stage 1: The Naive Threaded Downloader
We started with this basic idea: use Python threads to download multiple files at once.
import threading
import requests
def download_file(url, filename):
print(f"Stating {filename}")
resp = request.get(url)
with open(filename, 'wb') as f:
f.write(resp.content)
print(f"Finished {filename}")
urls = [
('https://apple.com', 'apple.txt'),
('https: //google.com', 'google.txt')
]
threads = []
for url, filename in urls:
thread = threading.Thread(target=download_file, args=(url, filename))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
❌ Issues Detected
request.get()
should berequests.get()
Malformed URL:
'https: //
google.com
'
"Stating"
is a typoNo error handling or timeouts
No headers (may cause 403 errors)
🔧 Stage 2: Cleaning It Up
Let’s fix the basic issues and add robustness.
import threading
import requests
def download_file(url, filename):
print(f"Starting {filename}")
headers = {'User-Agent': 'Mozilla/5.0'}
try:
resp = requests.get(url, headers=headers, timeout=10)
resp.raise_for_status()
with open(filename, 'wb') as f:
f.write(resp.content)
print(f"Finished {filename}")
except Exception as e:
print(f"Error downloading {filename}: {e}")
urls = [
('https://apple.com', 'apple.txt'),
('https://google.com', 'google.txt')
]
threads = []
for url, filename in urls:
thread = threading.Thread(target=download_file, args=(url, filename))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
✅ Now we’re handling HTTP errors, timeouts, and malformed input. Much better.
🧠 Stage 3: Using ThreadPoolExecutor
Python’s concurrent.futures.ThreadPoolExecutor
gives us cleaner concurrency and better resource management.
import logging
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
def is_valid_url(url):
parsed = urlparse(url)
return all([parsed.scheme, parsed.netloc])
def download_file(url, filename):
logging.info(f"Starting download: {filename}")
headers = {'User-Agent': 'Mozilla/5.0'}
try:
with requests.get(url, headers=headers, timeout=10, stream=True) as response:
response.raise_for_status()
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logging.info(f"Finished download: {filename}")
except Exception as e:
logging.error(f"Failed to download {url}: {e}")
def main():
logging.basicConfig(level=logging.INFO)
urls = [
('https://apple.com', 'apple.txt'),
('https://google.com', 'google.txt'),
]
valid_urls = [(url, filename) for url, filename in urls if is_valid_url(url)]
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(download_file, url, filename) for url, filename in valid_urls]
for future in as_completed(futures):
future.result()
if __name__ == "__main__":
main()
🎉 Stage 4: Real-Time Progress Bars with tqdm
Let's make it interactive! We’ll display live download progress bars for each file, using tqdm
.
Install
tqdm
if you haven’t:
pip install tqdm
Here’s how we use it:
from tqdm import tqdm
def download_file(url, filename, position=0):
headers = {'User-Agent': 'Mozilla/5.0'}
try:
with requests.get(url, headers=headers, timeout=10, stream=True) as response:
response.raise_for_status()
total = int(response.headers.get('content-length', 0))
with open(filename, 'wb') as f, tqdm(
total=total,
unit='B',
unit_scale=True,
desc=filename,
position=position,
leave=False,
dynamic_ncols=True
) as progress:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
progress.update(len(chunk))
tqdm.write(f"[SUCCESS] Finished download: {filename}")
except Exception as e:
tqdm.write(f"[ERROR] Failed {filename}: {e}")
Each download has its own bar using position=...
to avoid clashing in the terminal.
🧾 Stage 5: Configurable Logging (Console or File)
Let’s give the user an option to log to console or a file — a common requirement in production.
def configure_logging(to_file=False, log_file='download.log'):
handlers = [logging.StreamHandler()]
if to_file:
handlers.append(logging.FileHandler(log_file))
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=handlers
)
✅ Final Version: Complete Pro-Grade Downloader
import logging
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
from tqdm import tqdm
def configure_logging(to_file=False, log_file='download.log'):
handlers = [logging.StreamHandler()]
if to_file:
handlers.append(logging.FileHandler(log_file))
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=handlers
)
def is_valid_url(url):
parsed = urlparse(url)
return all([parsed.scheme, parsed.netloc])
def download_file(url, filename, position=0):
headers = {'User-Agent': 'Mozilla/5.0'}
try:
with requests.get(url, headers=headers, timeout=10, stream=True) as response:
response.raise_for_status()
total = int(response.headers.get('content-length', 0))
with open(filename, 'wb') as f, tqdm(
total=total,
unit='B',
unit_scale=True,
desc=filename,
position=position,
leave=False,
dynamic_ncols=True
) as progress:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
progress.update(len(chunk))
tqdm.write(f"[SUCCESS] Finished download: {filename}")
except Exception as e:
tqdm.write(f"[ERROR] {url} → {filename}: {e}")
logging.error(f"Download error: {url} → {filename}: {e}")
def main(log_to_file=False):
configure_logging(to_file=log_to_file)
urls = [
('https://apple.com', 'apple.txt'),
('https://google.com', 'google.txt')
]
valid_urls = [(url, filename) for url, filename in urls if is_valid_url(url)]
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for i, (url, filename) in enumerate(valid_urls):
future = executor.submit(download_file, url, filename, i)
futures.append(future)
for future in as_completed(futures):
future.result()
if __name__ == "__main__":
main(log_to_file=True) # Set to False for console logging
🔚 Recap
Feature Added | Benefit |
✅ Threads → ThreadPoolExecutor | Cleaner concurrency |
✅ try-except | Graceful failure |
✅ tqdm | Real-time UX |
✅ Logging config | Adaptable for dev or prod |
✅ Streamed I/O | Scales for large files |
📦 What’s Next?
To make this even better, you could:
Add
argparse
orclick
for CLI optionsImplement retry logic with exponential backoff
Support async downloads via
aiohttp
Package it into a pip-installable CLI tool
🙌 Final Thoughts
This started as a simple Python script, but we leveled it up into something you can use in real apps, CI pipelines, and automation scripts.
Feel free to fork it into your own tool — and if you’d like a GitHub-ready version or CLI wrapper, just drop a comment!
#Python #Threading #Downloader #Tqdm #Logging #Programming #Coding #Automation
Subscribe to my newsletter
Read articles from Athul A directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
