05 data ingestion modules

How to Build a Flexible Data Ingestion Pipeline in Python Using YAML Configs
Managing machine learning pipelines can get messy—fast. Hardcoding paths, URLs, and settings in your scripts is a recipe for chaos when your project grows. That’s why using YAML configuration files and a modular code structure is a game-changer for maintainability and scalability. In this article, I’ll walk you through a real-world setup for data ingestion using YAML and Python, inspired by best practices in the ML community15913.
Table of Contents
Why Use YAML for ML Configurations?
Anatomy of a Config File (
config.yaml
)Configuration Management in Python
Creating Typed Config Entities
Building the Data Ingestion Component
Orchestrating the Data Ingestion Pipeline
Bringing It All Together in
main.py
Final Thoughts
Why Use YAML for ML Configurations?
YAML is a human-readable format that’s perfect for configuration files in AI/ML projects. It keeps your settings clear, version-controlled, and separate from your code, making your workflow easier to manage and reproduce151316. YAML’s indentation-based structure also feels natural for Python developers.
Anatomy of a Config File (config.yaml
)
Here’s a sample config.yaml
for a data ingestion pipeline:
artifacts_root: artifacts
data_ingestion:
root_dir: artifacts/data_ingestion
source_URL: https://raw.githubusercontent.com/krishnaik06/datasets/main/winequality-data.zip
local_data_file: artifacts/data_ingestion/winequality-red.zip
unzip_dir: artifacts/data_ingestion
What’s happening here?
artifacts_root
: Where all pipeline outputs will be stored.data_ingestion
: Contains all settings related to downloading and extracting the dataset.
Configuration Management in Python
Instead of scattering config parsing logic everywhere, use a dedicated manager class. Here’s how:
from pathlib import Path
from src.Predict_Pipe.constants import *
from src.Predict_Pipe.utils.common import read_yaml, create_directories
from src.Predict_Pipe.entity.config_entity import DataIngestionConfig
class ConfigurationManager:
def __init__(
self,
config_filepath: Path = CONFIG_FILE_PATH,
params_filepath: Path = PARAMS_FILE_PATH,
schema_filepath: Path = SCHEMA_FILE_PATH
):
config_filepath = config_filepath.resolve()
params_filepath = params_filepath.resolve()
schema_filepath = schema_filepath.resolve()
if not config_filepath.exists():
raise FileNotFoundError(f"Config file not found: {config_filepath}")
if not params_filepath.exists():
raise FileNotFoundError(f"Params file not found: {params_filepath}")
if not schema_filepath.exists():
raise FileNotFoundError(f"Schema file not found: {schema_filepath}")
self.config = read_yaml(config_filepath)
self.params = read_yaml(params_filepath)
self.schema = read_yaml(schema_filepath)
create_directories([self.config.artifacts_root])
def get_data_ingestion_config(self) -> DataIngestionConfig:
config = self.config.data_ingestion
create_directories([config.root_dir])
return DataIngestionConfig(
root_dir=config.root_dir,
source_URL=config.source_URL,
local_data_file=config.local_data_file,
unzip_dir=config.unzip_dir
)
Why is this cool?
Centralizes all config loading and validation.
Ensures required files exist before proceeding.
Creates artifact directories automatically.
Creating Typed Config Entities
Typed config entities help you avoid bugs and make your code more readable:
from dataclasses import dataclass
from pathlib import Path
@dataclass
class DataIngestionConfig:
root_dir: Path
source_URL: str
local_data_file: Path
unzip_dir: Path
Building the Data Ingestion Component
This component handles downloading and extracting your dataset, using the config object for all paths and URLs.
import os
import urllib.request as request
from src.Predict_Pipe.logging import logger
import zipfile
from src.Predict_Pipe.entity.config_entity import DataIngestionConfig
class DataIngestion:
def __init__(self, config: DataIngestionConfig):
self.config = config
def download_file(self):
if not os.path.exists(self.config.local_data_file):
filename, headers = request.urlretrieve(
url=self.config.source_URL,
filename=self.config.local_data_file
)
logger.info(f"{filename} downloaded! Info: \n{headers}")
else:
logger.info(f"File already exists")
def extract_zip_file(self):
unzip_path = self.config.unzip_dir
os.makedirs(unzip_path, exist_ok=True)
with zipfile.ZipFile(self.config.local_data_file, 'r') as zip_ref:
zip_ref.extractall(unzip_path)
Orchestrating the Data Ingestion Pipeline
The pipeline class ties everything together and ensures each stage runs in order.
pythonfrom src.Predict_Pipe.config.configuration import ConfigurationManager
from src.Predict_Pipe.components.data_ingestion import DataIngestion
from src.Predict_Pipe.logging import logger
STAGE_NAME = "Data Ingestion stage"
class DataIngestionTrainingPipeline:
def __init__(self):
pass
def initiate_data_ingestion(self):
config = ConfigurationManager()
data_ingestion_config = config.get_data_ingestion_config()
data_ingestion = DataIngestion(config=data_ingestion_config)
data_ingestion.download_file()
data_ingestion.extract_zip_file()
if __name__ == "__main__":
try:
logger.info(f">>>>>> stage {STAGE_NAME} started <<<<<<")
obj = DataIngestionTrainingPipeline()
obj.initiate_data_ingestion()
logger.info(f">>>>>> stage {STAGE_NAME} completed <<<<<<\n\nx==========x")
except Exception as e:
logger.exception(e)
raise e
Bringing It All Together in main.py
Your main.py
becomes super clean—just import and run the pipeline stages you need.
pythonimport sys
import os
src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "src"))
if src_path not in sys.path:
sys.path.append(src_path)
from src.Predict_Pipe.logging import logger
from src.Predict_Pipe.pipeline.data_ingestion import DataIngestionTrainingPipeline
logger.info("Logging has started")
STAGE_NAME = "Data Ingestion stage"
try:
logger.info(f">>>>>> stage {STAGE_NAME} started <<<<<<")
obj = DataIngestionTrainingPipeline()
obj.initiate_data_ingestion()
logger.info(f">>>>>> stage {STAGE_NAME} completed <<<<<<\n\nx==========x")
except Exception as e:
logger.exception(e)
raise e
Heres the snapshot of the console:
Final Thoughts
Using YAML for configuration and a modular, class-based structure for your pipeline makes your ML projects:
Easier to maintain: Change configs without touching your code.
More robust: Typed configs and validation prevent silly mistakes.
Scalable: Add new pipeline stages with minimal changes.
If you want to see a working notebook example, check out this reference notebook.
Pro tip: Always keep your configs clean and your pipeline modular. Your future self will thank you!
Tags:
#Python #MachineLearning #YAML #MLOps #Pipeline #BestPractices
In the next section, we will go through the Data validation
Subscribe to my newsletter
Read articles from Yash Maini directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
