Top 10 Python Scripts to Automate Data Science Tasks

Timothy KimutaiTimothy Kimutai
17 min read

Data science workflows often involve repetitive, time-consuming tasks that can drain productivity and delay insights. From cleaning messy datasets to generating consistent reports, these manual processes create bottlenecks that prevent data scientists from focusing on what matters most: extracting meaningful insights and building robust models.

Python has emerged as the undisputed champion for automating these tedious workflows. With its rich ecosystem of libraries and intuitive syntax, Python enables data scientists to transform hours of manual work into automated scripts that run reliably and consistently. Whether you’re dealing with data preprocessing, model training, or report generation — automation not only saves time but also reduces human error and ensures reproducibility.

The following Ten Python scripts represent essential automation tools that every data scientist should have in their toolkit. Each script addresses a common pain point in the data science workflow, providing practical solutions that can be implemented immediately and customized for specific needs.

1. Automated Data Cleaning with Pandas

Data cleaning typically consumes 60–80% of a data scientist’s time, making it the perfect candidate for automation. This script handles the most common data quality issues in a standardized, repeatable way.

import pandas as pd
import numpy as np

def automated_data_cleaning(df):
    """
    Comprehensive data cleaning pipeline
    """
    # Remove duplicate rows
    df = df.drop_duplicates()

    # Handle missing values
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    categorical_cols = df.select_dtypes(include=['object']).columns

    # Fill numeric columns with median
    df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())

    # Fill categorical columns with mode
    for col in categorical_cols:
        df[col] = df[col].fillna(df[col].mode()[0] if not df[col].mode().empty else 'Unknown')

    # Remove outliers using IQR method
    for col in numeric_cols:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        df = df[~((df[col] < (Q1 - 1.5 * IQR)) | (df[col] > (Q3 + 1.5 * IQR)))]

    # Standardize column names
    df.columns = df.columns.str.lower().str.replace(' ', '_')

    return df

# Usage
df_clean = automated_data_cleaning(raw_df)

Real-world use case: A retail company processes daily sales data from multiple stores with inconsistent formatting, missing entries, and occasional data entry errors. This script ensures all datasets follow the same quality standards before analysis.

Benefits of automation:

  • Consistent data quality across all datasets

  • Reduces cleaning time from hours to minutes

  • Prevents downstream errors in analysis and modeling

  • Enables immediate data processing in production pipelines

2. Exploratory Data Analysis with ydata-profiling

Manual exploratory data analysis (EDA) can take hours of writing repetitive code. This script generates comprehensive data profiles automatically, providing instant insights into your dataset’s characteristics.

from ydata_profiling import ProfileReport
import pandas as pd

def generate_eda_report(df, title="Data Analysis Report"):
    # Generate comprehensive EDA report automatically
       profile = ProfileReport(
        df,
        title=title,
        explorative=True,
        config_file={
            'correlations': {'auto': {'calculate': True}},
            'missing_diagrams': {'heatmap': True},
            'interactions': {'continuous': True}
        }
    )

    # Save report as HTML
    profile.to_file(f"{title.replace(' ', '_').lower()}.html")

    # Generate summary statistics
    summary = {
        'total_rows': len(df),
        'total_columns': len(df.columns),
        'missing_percentage': (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100,
        'duplicate_rows': df.duplicated().sum(),
        'numeric_columns': len(df.select_dtypes(include=[np.number]).columns),
        'categorical_columns': len(df.select_dtypes(include=['object']).columns)
    }

    return profile, summary

# Usage
profile, summary = generate_eda_report(df, "Customer Dataset Analysis")
print(f"Dataset has {summary['missing_percentage']:.2f}% missing values")

Real-world use case: A marketing team receives a new customer dataset every month and needs to quickly understand customer demographics, behavior patterns, and data quality issues before launching targeted campaigns.

Benefits of automation:

  • Generates publication-ready reports in seconds

  • Identifies data quality issues and patterns automatically

  • Provides interactive visualizations for stakeholder presentations

  • Standardizes EDA process across different teams and projects

3. Interactive Data Visualization Dashboard with Plotly and Dash

Creating dynamic dashboards for stakeholders traditionally requires extensive development time. This script creates interactive dashboards that update automatically with new data.

import dash
from dash import dcc, html, Input, Output
import plotly.express as px
import pandas as pd

def create_automated_dashboard(df):
    # Create interactive dashboard with automatic chart generation

    app = dash.Dash(__name__)

    # Get numeric and categorical columns
    numeric_cols = df.select_dtypes(include=['number']).columns.tolist()
    categorical_cols = df.select_dtypes(include=['object']).columns.tolist()

    app.layout = html.Div([
        html.H1("Automated Data Dashboard", style={'textAlign': 'center'}),

        html.Div([
            html.Label("Select X-axis:"),
            dcc.Dropdown(
                id='x-axis-dropdown',
                options=[{'label': col, 'value': col} for col in numeric_cols + categorical_cols],
                value=numeric_cols[0] if numeric_cols else categorical_cols[0]
            )
        ], style={'width': '48%', 'display': 'inline-block'}),

        html.Div([
            html.Label("Select Y-axis:"),
            dcc.Dropdown(
                id='y-axis-dropdown',
                options=[{'label': col, 'value': col} for col in numeric_cols],
                value=numeric_cols[1] if len(numeric_cols) > 1 else numeric_cols[0]
            )
        ], style={'width': '48%', 'float': 'right', 'display': 'inline-block'}),

        dcc.Graph(id='main-graph'),
        dcc.Graph(id='distribution-graph')
    ])

    @app.callback(
        [Output('main-graph', 'figure'),
         Output('distribution-graph', 'figure')],
        [Input('x-axis-dropdown', 'value'),
         Input('y-axis-dropdown', 'value')]
    )
    def update_graphs(x_axis, y_axis):
        # Scatter plot
        scatter_fig = px.scatter(df, x=x_axis, y=y_axis, title=f'{y_axis} vs {x_axis}')

        # Distribution plot
        if x_axis in numeric_cols:
            dist_fig = px.histogram(df, x=x_axis, title=f'Distribution of {x_axis}')
        else:
            dist_fig = px.bar(df[x_axis].value_counts().reset_index(), 
                             x='index', y=x_axis, title=f'Count of {x_axis}')

        return scatter_fig, dist_fig

    return app

# Usage
dashboard = create_automated_dashboard(df)
dashboard.run_server(debug=True)

Real-world use case: A sales manager needs real-time insights into team performance metrics. The dashboard automatically updates with new sales data and allows filtering by region, product, or time period without requiring technical knowledge.

Benefits of automation:

  • Creates professional dashboards without frontend development skills

  • Enables self-service analytics for non-technical stakeholders

  • Updates automatically with new data

  • Reduces dependency on data visualization specialists

4. Web Scraping for Data Collection with BeautifulSoup

Manually collecting data from websites is tedious and error-prone. This script automates web scraping with built-in error handling and rate limiting.

import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import random

def automated_web_scraper(urls, delay_range=(1, 3)):
    # Automated web scraper with error handling and rate limiting

    scraped_data = []

    for i, url in enumerate(urls):
        try:
            # Random delay to avoid being blocked
            time.sleep(random.uniform(*delay_range))

            # Make request with headers to appear more human-like
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()

            # Parse HTML
            soup = BeautifulSoup(response.content, 'html.parser')

            # Extract data (customize based on target website structure)
            data = {
                'url': url,
                'title': soup.find('title').text.strip() if soup.find('title') else 'N/A',
                'meta_description': '',
                'headings': [h.text.strip() for h in soup.find_all(['h1', 'h2', 'h3'])[:5]],
                'scraped_at': pd.Timestamp.now()
            }

            # Extract meta description
            meta_desc = soup.find('meta', attrs={'name': 'description'})
            if meta_desc:
                data['meta_description'] = meta_desc.get('content', '')

            scraped_data.append(data)
            print(f"Scraped {i+1}/{len(urls)}: {url}")

        except Exception as e:
            print(f"Error scraping {url}: {str(e)}")
            scraped_data.append({
                'url': url,
                'error': str(e),
                'scraped_at': pd.Timestamp.now()
            })

    return pd.DataFrame(scraped_data)

# Usage
urls_to_scrape = [
    'https://example1.com',
    'https://example2.com',
    'https://example3.com'
]

scraped_df = automated_web_scraper(urls_to_scrape)
scraped_df.to_csv('scraped_data.csv', index=False)

Real-world use case: A market research team needs to monitor competitor pricing across 500+ product pages daily. This script collects pricing data automatically and identifies price changes without manual checking.

Benefits of automation:

  • Collects data 24/7 without human intervention

  • Handles errors gracefully and continues processing

  • Scales to thousands of URLs easily

  • Maintains consistent data collection schedules

5. Automating Model Training with Scikit-learn Pipelines

Model training often involves repetitive preprocessing steps and parameter tuning. This script creates reusable pipelines that standardize the entire machine learning workflow.

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.metrics import classification_report
import pandas as pd
import joblib

def create_automated_ml_pipeline(df, target_column, model_type='classification'):
    # Create and train automated ML pipeline
    X = df.drop(columns=[target_column])
    y = df[target_column]

    # Identify column types
    numeric_features = X.select_dtypes(include=['int64', 'float64']).columns
    categorical_features = X.select_dtypes(include=['object']).columns

    # Create preprocessing pipeline
    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])

    # Combine preprocessing steps
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ]
    )

    # Create full pipeline with model
    if model_type == 'classification':
        model = RandomForestClassifier(n_estimators=100, random_state=42)
    else:
        from sklearn.ensemble import RandomForestRegressor
        model = RandomForestRegressor(n_estimators=100, random_state=42)

    pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])

    # Split data and train
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Train pipeline
    pipeline.fit(X_train, y_train)

    # Evaluate model
    cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5)
    test_score = pipeline.score(X_test, y_test)

    # Generate predictions and report
    y_pred = pipeline.predict(X_test)

    results = {
        'pipeline': pipeline,
        'cv_scores': cv_scores,
        'test_score': test_score,
        'classification_report': classification_report(y_test, y_pred) if model_type == 'classification' else None
    }

    # Save pipeline
    joblib.dump(pipeline, f'automated_ml_pipeline_{target_column}.pkl')

    return results

# Usage
results = create_automated_ml_pipeline(df, 'target_column', 'classification')
print(f"Cross-validation score: {results['cv_scores'].mean():.3f} (+/- {results['cv_scores'].std() * 2:.3f})")

Real-world use case: A financial institution needs to retrain fraud detection models weekly with new transaction data. This pipeline automatically handles data preprocessing, model training, and validation without manual intervention.

Benefits of automation:

  • Ensures consistent preprocessing across different datasets

  • Reduces model development time from days to hours

  • Prevents data leakage through proper pipeline structure

  • Enables easy model deployment and version control

6. Feature Engineering with Feature-engine

Feature engineering often requires domain expertise and repetitive coding. This script automates common feature engineering tasks with intelligent defaults and customizable options.

from feature_engine.creation import CombineWithReferenceFeature, MathFeatures
from feature_engine.discretisation import EqualFrequencyDiscretiser
from feature_engine.encoding import RareLabelEncoder, OneHotEncoder
from feature_engine.transformation import LogTransformer, BoxCoxTransformer
from feature_engine.selection import DropConstantFeatures, DropDuplicateFeatures
import pandas as pd
import numpy as np

def automated_feature_engineering(df, target_column=None):
    # Separate features and target
    if target_column:
        X = df.drop(columns=[target_column])
        y = df[target_column]
    else:
        X = df.copy()
        y = None

    numeric_vars = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
    categorical_vars = X.select_dtypes(include=['object']).columns.tolist()

    print(f"Starting feature engineering with {len(X.columns)} features...")

    # 1. Remove constant and duplicate features
    constant_dropper = DropConstantFeatures()
    duplicate_dropper = DropDuplicateFeatures()

    X = constant_dropper.fit_transform(X)
    X = duplicate_dropper.fit_transform(X)

    # 2. Handle rare categories in categorical variables
    if categorical_vars:
        rare_encoder = RareLabelEncoder(tol=0.01, n_categories=10)
        X = rare_encoder.fit_transform(X)

    # 3. Create mathematical combinations of numeric features
    if len(numeric_vars) >= 2:
        math_combiner = MathFeatures(
            variables=numeric_vars[:5],  # Limit to first 5 to avoid explosion
            func=['sum', 'prod', 'mean'],
            reference=['mean']
        )
        X = math_combiner.fit_transform(X)

    # 4. Apply transformations to numeric variables
    # Log transformation for skewed variables
    skewed_vars = []
    for var in numeric_vars:
        if X[var].min() > 0:  # Log only positive values
            skewness = X[var].skew()
            if abs(skewness) > 1:
                skewed_vars.append(var)

    if skewed_vars:
        log_transformer = LogTransformer(variables=skewed_vars)
        X = log_transformer.fit_transform(X)

    # 5. Discretize continuous variables
    if len(numeric_vars) > 0:
        discretizer = EqualFrequencyDiscretiser(
            variables=numeric_vars[:3],  # Discretize first 3 numeric vars
            q=5,
            return_object=True
        )
        X_discrete = discretizer.fit_transform(X)

        # Add discretized versions with suffix
        for var in discretizer.variables:
            X[f'{var}_binned'] = X_discrete[var]

    # 6. One-hot encode categorical variables
    updated_categorical_vars = X.select_dtypes(include=['object']).columns.tolist()
    if updated_categorical_vars:
        ohe = OneHotEncoder(
            variables=updated_categorical_vars,
            drop_last=True
        )
        X = ohe.fit_transform(X)

    print(f"Feature engineering complete. New feature count: {len(X.columns)}")

    # Create feature importance summary
    feature_summary = {
        'original_features': len(df.columns) - (1 if target_column else 0),
        'final_features': len(X.columns),
        'features_created': len(X.columns) - len(df.columns) + (1 if target_column else 0),
        'numeric_features': len(X.select_dtypes(include=['int64', 'float64']).columns),
        'categorical_features': len(X.select_dtypes(include=['object']).columns),
        'binary_features': len([col for col in X.columns if X[col].nunique() == 2])
    }

    return X, feature_summary

# Usage
X_engineered, summary = automated_feature_engineering(df, 'target_column')
print(f"Created {summary['features_created']} new features")

Real-world use case: An e-commerce company wants to improve their recommendation system by creating meaningful features from user behavior data, product attributes, and transaction history without manually coding hundreds of feature combinations.

Benefits of automation:

  • Systematically explores feature combinations that humans might miss

  • Applies domain-agnostic transformations consistently

  • Scales feature engineering to large datasets

  • Documents feature creation process for reproducibility

7. Automated Hyperparameter Tuning with Optuna

Manual hyperparameter tuning is time-consuming and often suboptimal. This script uses advanced optimization algorithms to find the best parameters automatically.

import optuna
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import accuracy_score
import pandas as pd
import numpy as np

def automated_hyperparameter_tuning(X, y, model_type='random_forest', n_trials=100):

    def objective(trial):
        if model_type == 'random_forest':
            params = {
                'n_estimators': trial.suggest_int('n_estimators', 50, 300),
                'max_depth': trial.suggest_int('max_depth', 3, 20),
                'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
                'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
                'max_features': trial.suggest_categorical('max_features', ['auto', 'sqrt', 'log2']),
                'bootstrap': trial.suggest_categorical('bootstrap', [True, False])
            }

            model = RandomForestClassifier(**params, random_state=42, n_jobs=-1)

        elif model_type == 'xgboost':
            import xgboost as xgb
            params = {
                'n_estimators': trial.suggest_int('n_estimators', 50, 300),
                'max_depth': trial.suggest_int('max_depth', 3, 10),
                'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
                'subsample': trial.suggest_float('subsample', 0.6, 1.0),
                'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
                'reg_alpha': trial.suggest_float('reg_alpha', 0, 10),
                'reg_lambda': trial.suggest_float('reg_lambda', 0, 10)
            }

            model = xgb.XGBClassifier(**params, random_state=42, n_jobs=-1)

        # Perform cross-validation
        cv_scores = cross_val_score(model, X, y, cv=5, scoring='accuracy', n_jobs=-1)
        return cv_scores.mean()

    # Create study and optimize
    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=n_trials, show_progress_bar=True)

    # Get best parameters and score
    best_params = study.best_params
    best_score = study.best_value

    # Train final model with best parameters
    if model_type == 'random_forest':
        best_model = RandomForestClassifier(**best_params, random_state=42, n_jobs=-1)
    elif model_type == 'xgboost':
        import xgboost as xgb
        best_model = xgb.XGBClassifier(**best_params, random_state=42, n_jobs=-1)

    best_model.fit(X, y)

    # Generate optimization history
    optimization_history = pd.DataFrame({
        'trial': range(len(study.trials)),
        'value': [trial.value for trial in study.trials],
        'params': [trial.params for trial in study.trials]
    })

    results = {
        'best_model': best_model,
        'best_params': best_params,
        'best_score': best_score,
        'study': study,
        'optimization_history': optimization_history
    }

    return results

# Usage
tuning_results = automated_hyperparameter_tuning(X_train, y_train, 'random_forest', n_trials=50)
print(f"Best cross-validation score: {tuning_results['best_score']:.4f}")
print(f"Best parameters: {tuning_results['best_params']}")

# Plot optimization history
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.plot(tuning_results['optimization_history']['trial'], 
         tuning_results['optimization_history']['value'])
plt.xlabel('Trial')
plt.ylabel('Accuracy')
plt.title('Hyperparameter Optimization Progress')
plt.show()

Real-world use case: A machine learning team needs to optimize models for different client projects with varying datasets and requirements. This script automatically finds optimal parameters for each use case without manual experimentation.

Benefits of automation:

  • Finds better parameters than manual tuning

  • Saves weeks of manual experimentation

  • Uses intelligent search algorithms instead of grid search

  • Provides optimization insights and visualizations

8. Model Evaluation Reports with Yellowbrick

Creating comprehensive model evaluation reports manually requires writing extensive plotting and analysis code. This script generates professional evaluation reports automatically.

from yellowbrick.classifier import ClassificationReport, ROCAUC, ConfusionMatrix
from yellowbrick.model_selection import ValidationCurve, LearningCurve
from yellowbrick.features import FeatureImportances
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import pandas as pd

def automated_model_evaluation(model, X, y, model_name="Model"):
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Create figure with subplots
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    fig.suptitle(f'{model_name} Evaluation Report', fontsize=16, fontweight='bold')

    # 1. Classification Report
    visualizer1 = ClassificationReport(model, ax=axes[0,0], support=True)
    visualizer1.fit(X_train, y_train)
    visualizer1.score(X_test, y_test)
    visualizer1.finalize()

    # 2. ROC-AUC Curve
    visualizer2 = ROCAUC(model, ax=axes[0,1])
    visualizer2.fit(X_train, y_train)
    visualizer2.score(X_test, y_test)
    visualizer2.finalize()

    # 3. Confusion Matrix
    visualizer3 = ConfusionMatrix(model, ax=axes[0,2])
    visualizer3.fit(X_train, y_train)
    visualizer3.score(X_test, y_test)
    visualizer3.finalize()

    # 4. Feature Importance (if model supports it)
    if hasattr(model, 'feature_importances_'):
        visualizer4 = FeatureImportances(model, ax=axes[1,0])
        visualizer4.fit(X_train, y_train)
        visualizer4.finalize()
    else:
        axes[1,0].text(0.5, 0.5, 'Feature Importance\nNot Available', 
                      ha='center', va='center', transform=axes[1,0].transAxes)

    # 5. Learning Curve
    visualizer5 = LearningCurve(model, ax=axes[1,1], scoring='accuracy')
    visualizer5.fit(X, y)
    visualizer5.finalize()

    # 6. Validation Curve (example with max_depth for tree-based models)
    if hasattr(model, 'max_depth'):
        visualizer6 = ValidationCurve(
            model, ax=axes[1,2], param_name='max_depth', 
            param_range=range(1, 11), scoring='accuracy'
        )
        visualizer6.fit(X, y)
        visualizer6.finalize()
    else:
        axes[1,2].text(0.5, 0.5, 'Validation Curve\nNot Available', 
                      ha='center', va='center', transform=axes[1,2].transAxes)

    plt.tight_layout()
    plt.savefig(f'{model_name.lower().replace(" ", "_")}_evaluation_report.png', 
                dpi=300, bbox_inches='tight')
    plt.show()

    # Generate numerical summary
    model.fit(X_train, y_train)
    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)

    summary = {
        'model_name': model_name,
        'train_accuracy': train_score,
        'test_accuracy': test_score,
        'overfit_gap': train_score - test_score,
        'total_features': X.shape[1],
        'training_samples': X_train.shape[0],
        'test_samples': X_test.shape[0]
    }

    return summary

def compare_multiple_models(models_dict, X, y):
    """
    Compare multiple models and generate comparison report
    """
    results = []

    for model_name, model in models_dict.items():
        print(f"Evaluating {model_name}...")
        summary = automated_model_evaluation(model, X, y, model_name)
        results.append(summary)

    # Create comparison DataFrame
    comparison_df = pd.DataFrame(results)
    comparison_df = comparison_df.sort_values('test_accuracy', ascending=False)

    # Save comparison report
    comparison_df.to_csv('model_comparison_report.csv', index=False)

    return comparison_df

# Usage
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression

models = {
    'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'SVM': SVC(random_state=42, probability=True),
    'Logistic Regression': LogisticRegression(random_state=42)
}

comparison_results = compare_multiple_models(models, X, y)
print("\nModel Comparison Results:")
print(comparison_results[['model_name', 'test_accuracy', 'overfit_gap']])

Real-world use case: A consulting firm needs to present model performance results to clients with clear visualizations and metrics. This script generates professional reports that can be directly included in client presentations.

Benefits of automation:

  • Creates publication-ready evaluation reports

  • Ensures consistent evaluation metrics across projects

  • Identifies overfitting and performance issues automatically

  • Enables easy model comparison and selection

9. Automating Dataset Versioning with DVC

Data versioning is crucial for reproducible machine learning but often overlooked due to complexity. This script automates dataset versioning and experiment tracking.

import dvc.api
import pandas as pd
import os
import git
from datetime import datetime
import hashlib
import json

class AutomatedDataVersioning:
    def __init__(self, project_path="."):
        self.project_path = project_path
        self.data_dir = os.path.join(project_path, "data")
        self.dvc_dir = os.path.join(project_path, ".dvc")

        # Initialize directories
        os.makedirs(self.data_dir, exist_ok=True)

    def setup_dvc_project(self):
        """Initialize DVC project if not already initialized"""
        try:
            if not os.path.exists(self.dvc_dir):
                os.system(f"cd {self.project_path} && dvc init")
                print("DVC project initialized")
            else:
                print("DVC project already exists")
        except Exception as e:
            print(f"Error initializing DVC: {e}")

    def add_dataset_version(self, dataframe, dataset_name, description=""):
        """Add new version of dataset with automatic tracking"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

        # Generate data hash for uniqueness
        data_string = dataframe.to_string()
        data_hash = hashlib.md5(data_string.encode()).hexdigest()[:8]

        # Create versioned filename
        filename = f"{dataset_name}_{timestamp}_{data_hash}.csv"
        filepath = os.path.join(self.data_dir, filename)

        # Save dataset
        dataframe.to_csv(filepath, index=False)

        # Create metadata
        metadata = {
            'dataset_name': dataset_name,
            'timestamp': timestamp,
            'description': description,
            'shape': dataframe.shape,
            'columns': list(dataframe.columns),
            'data_hash': data_hash,
            'file_size': os.path.getsize(filepath),
            'missing_values': dataframe.isnull().sum().sum(),
            'dtypes': dataframe.dtypes.to_dict()
        }

        # Save metadata
        metadata_file = filepath.replace('.csv', '_metadata.json')
        with open(metadata_file, 'w') as f:
            json.dump(metadata, f, indent=2, default=str)

        # Add to DVC tracking
        try:
            os.system(f"cd {self.project_path} && dvc add {filepath}")
            print(f"Dataset version saved: {filename}")
            print(f"Shape: {dataframe.shape}, Hash: {data_hash}")
        except Exception as e:
            print(f"Error adding to DVC: {e}")

        return filepath, metadata

    def list_dataset_versions(self, dataset_name=None):
        """List all versions of datasets"""
        versions = []

        for file in os.listdir(self.data_dir):
            if file.endswith('_metadata.json'):
                with open(os.path.join(self.data_dir, file), 'r') as f:
                    metadata = json.load(f)
                    if dataset_name is None or metadata['dataset_name'] == dataset_name:
                        versions.append(metadata)

        return pd.DataFrame(versions).sort_values('timestamp', ascending=False)

    def load_dataset_version(self, dataset_name, version_hash=None):
        """Load specific version of dataset"""
        versions = self.list_dataset_versions(dataset_name)

        if version_hash:
            version = versions[versions['data_hash'] == version_hash]
        else:
            version = versions.iloc[0]  # Latest version

        if len(version) == 0:
            raise ValueError(f"Version not found for {dataset_name}")

        filename = f"{dataset_name}_{version.iloc[0]['timestamp']}_{version.iloc[0]['data_hash']}.csv"
        filepath = os.path.join(self.data_dir, filename)

        return pd.read_csv(filepath)

# Usage
versioning = AutomatedDataVersioning()
versioning.setup_dvc_project()

# Add new dataset version
filepath, metadata = versioning.add_dataset_version(
    df, 
    "customer_data", 
    "Initial customer dataset with demographics"
)

# List all versions
versions = versioning.list_dataset_versions("customer_data")
print(versions[['dataset_name', 'timestamp', 'shape', 'data_hash']])

# Load specific version
df_v1 = versioning.load_dataset_version("customer_data", version_hash="abc12345")

Real-world use case: A machine learning team working on a customer churn model needs to track different versions of their training data as new customer segments are added and features are engineered, ensuring they can reproduce any previous model results.

Benefits of automation:

  • Ensures reproducibility of machine learning experiments

  • Tracks data lineage and changes automatically

  • Prevents data loss and enables rollback capabilities

  • Integrates with Git for complete project versioning

10. Scheduling & Monitoring Scripts with APScheduler

Data science workflows often need to run on schedules or respond to events. This script creates a robust scheduling system with monitoring and error handling.

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import pandas as pd
import logging
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
import os

class AutomatedDataPipeline:
    def __init__(self, config):
        self.config = config
        self.scheduler = BackgroundScheduler()
        self.setup_logging()

    def setup_logging(self):
        """Setup logging for pipeline monitoring"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('pipeline.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def send_notification(self, subject, message, is_error=False):
        """Send email notification on success or failure"""
        try:
            if 'email' in self.config:
                msg = MIMEText(message)
                msg['Subject'] = f"{'ERROR: ' if is_error else ''}{subject}"
                msg['From'] = self.config['email']['from']
                msg['To'] = self.config['email']['to']

                server = smtplib.SMTP(self.config['email']['smtp_server'], 587)
                server.starttls()
                server.login(self.config['email']['username'], self.config['email']['password'])
                server.send_message(msg)
                server.quit()

                self.logger.info(f"Notification sent: {subject}")
        except Exception as e:
            self.logger.error(f"Failed to send notification: {e}")

    def data_collection_job(self):
        """Automated data collection job"""
        try:
            self.logger.info("Starting data collection job")

            # Simulate data collection (replace with actual logic)
            data = pd.DataFrame({
                'timestamp': [datetime.now()],
                'records_collected': [1000],
                'status': ['success']
            })

            # Save collected data
            filename = f"collected_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
            data.to_csv(filename, index=False)

            self.logger.info(f"Data collection completed: {filename}")
            self.send_notification("Data Collection Success", f"Collected {len(data)} records")

        except Exception as e:
            error_msg = f"Data collection failed: {str(e)}"
            self.logger.error(error_msg)
            self.send_notification("Data Collection Failed", error_msg, is_error=True)

    def model_training_job(self):
        """Automated model training job"""
        try:
            self.logger.info("Starting model training job")

            # Load latest data
            data_files = [f for f in os.listdir('.') if f.startswith('collected_data_')]
            if not data_files:
                raise ValueError("No data files found for training")

            latest_file = max(data_files)
            df = pd.read_csv(latest_file)

            # Simulate model training (replace with actual logic)
            from sklearn.ensemble import RandomForestClassifier
            from sklearn.model_selection import cross_val_score

            # Dummy training process
            model = RandomForestClassifier(n_estimators=100)
            # X, y = prepare_features(df)  # Your feature preparation logic
            # scores = cross_val_score(model, X, y, cv=5)

            model_filename = f"model_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pkl"
            # joblib.dump(model, model_filename)

            self.logger.info(f"Model training completed: {model_filename}")
            self.send_notification("Model Training Success", f"Model saved as {model_filename}")

        except Exception as e:
            error_msg = f"Model training failed: {str(e)}"
            self.logger.error(error_msg)
            self.send_notification("Model Training Failed", error_msg, is_error=True)

    def data_quality_check(self):
        """Automated data quality monitoring"""
        try:
            self.logger.info("Starting data quality check")

            # Find latest data file
            data_files = [f for f in os.listdir('.') if f.startswith('collected_data_')]
            if not data_files:
                raise ValueError("No data files found for quality check")

            latest_file = max(data_files)
            df = pd.read_csv(latest_file)

            # Perform quality checks
            quality_report = {
                'total_records': len(df),
                'missing_values': df.isnull().sum().sum(),
                'duplicate_records': df.duplicated().sum(),
                'missing_percentage': (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100
            }

            # Set quality thresholds
            if quality_report['missing_percentage'] > 10:
                raise ValueError(f"High missing data: {quality_report['missing_percentage']:.2f}%")

            if quality_report['duplicate_records'] > len(df) * 0.05:
                raise ValueError(f"High duplicate rate: {quality_report['duplicate_records']} records")

            self.logger.info("Data quality check passed")
            self.send_notification("Data Quality Check", f"Quality metrics: {quality_report}")

        except Exception as e:
            error_msg = f"Data quality check failed: {str(e)}"
            self.logger.error(error_msg)
            self.send_notification("Data Quality Alert", error_msg, is_error=True)

    def setup_schedules(self):
        """Setup automated schedules for all jobs"""
        # Daily data collection at 2 AM
        self.scheduler.add_job(
            self.data_collection_job,
            CronTrigger(hour=2, minute=0),
            id='data_collection',
            replace_existing=True
        )

        # Weekly model training on Sundays at 3 AM
        self.scheduler.add_job(
            self.model_training_job,
            CronTrigger(day_of_week=6, hour=3, minute=0),
            id='model_training',
            replace_existing=True
        )

        # Hourly data quality checks
        self.scheduler.add_job(
            self.data_quality_check,
            CronTrigger(minute=0),
            id='quality_check',
            replace_existing=True
        )

        self.logger.info("All schedules configured")

    def start_pipeline(self):
        """Start the automated pipeline"""
        self.setup_schedules()
        self.scheduler.start()
        self.logger.info("Automated pipeline started")

        try:
            # Keep the script running
            import time
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            self.logger.info("Pipeline stopped by user")
            self.scheduler.shutdown()

# Configuration
config = {
    'email': {
        'smtp_server': 'smtp.gmail.com',
        'username': 'your_email@gmail.com',
        'password': 'your_app_password',
        'from': 'your_email@gmail.com',
        'to': 'alerts@company.com'
    }
}

# Usage
pipeline = AutomatedDataPipeline(config)

# Run jobs manually for testing
pipeline.data_collection_job()
pipeline.data_quality_check()

# Start automated pipeline (runs continuously)
# pipeline.start_pipeline()

Real-world use case: An e-commerce company needs to update their recommendation models daily with new user behavior data, check data quality every hour, and retrain models weekly, all while monitoring for failures and sending alerts to the data team.

Benefits of automation:

  • Ensures consistent execution of data pipelines

  • Provides immediate alerts when issues occur

  • Reduces manual monitoring and intervention

  • Scales to complex multi-step workflows


conclusion

These ten Python scripts represent a comprehensive toolkit for automating the most time-consuming and error-prone aspects of data science workflows. From data cleaning and exploration to model training and deployment monitoring, each script addresses a critical pain point that data scientists face daily.

The real power of these automation scripts lies not just in their individual capabilities, but in how they can be combined into end-to-end workflows. Imagine a pipeline that automatically collects data, performs quality checks, engineers features, trains models, evaluates performance, and deploys the best-performing model — all while maintaining version control and sending notifications about the process.

By implementing these scripts in your data science projects, you’ll experience significant productivity gains: tasks that once took hours can be completed in minutes, manual errors are virtually eliminated, and you can focus your expertise on the strategic aspects of data science rather than repetitive operational tasks.

Start by implementing one or two scripts that address your most pressing automation needs. As you become comfortable with the approach, gradually expand your automation toolkit. Remember that the initial time investment in setting up these scripts will pay dividends throughout your data science career, freeing you to tackle more complex problems and deliver insights faster than ever before.

These scripts provide the foundation for building robust, automated data science workflows that can handle the demands of modern data-driven organizations.

0
Subscribe to my newsletter

Read articles from Timothy Kimutai directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Timothy Kimutai
Timothy Kimutai

I simplify AI and tech for developers and entrepreneurs. Freelance Data scientist at Upwork. Join 10K+ readers for actionable insights.