Top 10 Python Scripts to Automate Data Science Tasks


Data science workflows often involve repetitive, time-consuming tasks that can drain productivity and delay insights. From cleaning messy datasets to generating consistent reports, these manual processes create bottlenecks that prevent data scientists from focusing on what matters most: extracting meaningful insights and building robust models.
Python has emerged as the undisputed champion for automating these tedious workflows. With its rich ecosystem of libraries and intuitive syntax, Python enables data scientists to transform hours of manual work into automated scripts that run reliably and consistently. Whether you’re dealing with data preprocessing, model training, or report generation — automation not only saves time but also reduces human error and ensures reproducibility.
The following Ten Python scripts represent essential automation tools that every data scientist should have in their toolkit. Each script addresses a common pain point in the data science workflow, providing practical solutions that can be implemented immediately and customized for specific needs.
1. Automated Data Cleaning with Pandas
Data cleaning typically consumes 60–80% of a data scientist’s time, making it the perfect candidate for automation. This script handles the most common data quality issues in a standardized, repeatable way.
import pandas as pd
import numpy as np
def automated_data_cleaning(df):
"""
Comprehensive data cleaning pipeline
"""
# Remove duplicate rows
df = df.drop_duplicates()
# Handle missing values
numeric_cols = df.select_dtypes(include=[np.number]).columns
categorical_cols = df.select_dtypes(include=['object']).columns
# Fill numeric columns with median
df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())
# Fill categorical columns with mode
for col in categorical_cols:
df[col] = df[col].fillna(df[col].mode()[0] if not df[col].mode().empty else 'Unknown')
# Remove outliers using IQR method
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
df = df[~((df[col] < (Q1 - 1.5 * IQR)) | (df[col] > (Q3 + 1.5 * IQR)))]
# Standardize column names
df.columns = df.columns.str.lower().str.replace(' ', '_')
return df
# Usage
df_clean = automated_data_cleaning(raw_df)
Real-world use case: A retail company processes daily sales data from multiple stores with inconsistent formatting, missing entries, and occasional data entry errors. This script ensures all datasets follow the same quality standards before analysis.
Benefits of automation:
Consistent data quality across all datasets
Reduces cleaning time from hours to minutes
Prevents downstream errors in analysis and modeling
Enables immediate data processing in production pipelines
2. Exploratory Data Analysis with ydata-profiling
Manual exploratory data analysis (EDA) can take hours of writing repetitive code. This script generates comprehensive data profiles automatically, providing instant insights into your dataset’s characteristics.
from ydata_profiling import ProfileReport
import pandas as pd
def generate_eda_report(df, title="Data Analysis Report"):
# Generate comprehensive EDA report automatically
profile = ProfileReport(
df,
title=title,
explorative=True,
config_file={
'correlations': {'auto': {'calculate': True}},
'missing_diagrams': {'heatmap': True},
'interactions': {'continuous': True}
}
)
# Save report as HTML
profile.to_file(f"{title.replace(' ', '_').lower()}.html")
# Generate summary statistics
summary = {
'total_rows': len(df),
'total_columns': len(df.columns),
'missing_percentage': (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100,
'duplicate_rows': df.duplicated().sum(),
'numeric_columns': len(df.select_dtypes(include=[np.number]).columns),
'categorical_columns': len(df.select_dtypes(include=['object']).columns)
}
return profile, summary
# Usage
profile, summary = generate_eda_report(df, "Customer Dataset Analysis")
print(f"Dataset has {summary['missing_percentage']:.2f}% missing values")
Real-world use case: A marketing team receives a new customer dataset every month and needs to quickly understand customer demographics, behavior patterns, and data quality issues before launching targeted campaigns.
Benefits of automation:
Generates publication-ready reports in seconds
Identifies data quality issues and patterns automatically
Provides interactive visualizations for stakeholder presentations
Standardizes EDA process across different teams and projects
3. Interactive Data Visualization Dashboard with Plotly and Dash
Creating dynamic dashboards for stakeholders traditionally requires extensive development time. This script creates interactive dashboards that update automatically with new data.
import dash
from dash import dcc, html, Input, Output
import plotly.express as px
import pandas as pd
def create_automated_dashboard(df):
# Create interactive dashboard with automatic chart generation
app = dash.Dash(__name__)
# Get numeric and categorical columns
numeric_cols = df.select_dtypes(include=['number']).columns.tolist()
categorical_cols = df.select_dtypes(include=['object']).columns.tolist()
app.layout = html.Div([
html.H1("Automated Data Dashboard", style={'textAlign': 'center'}),
html.Div([
html.Label("Select X-axis:"),
dcc.Dropdown(
id='x-axis-dropdown',
options=[{'label': col, 'value': col} for col in numeric_cols + categorical_cols],
value=numeric_cols[0] if numeric_cols else categorical_cols[0]
)
], style={'width': '48%', 'display': 'inline-block'}),
html.Div([
html.Label("Select Y-axis:"),
dcc.Dropdown(
id='y-axis-dropdown',
options=[{'label': col, 'value': col} for col in numeric_cols],
value=numeric_cols[1] if len(numeric_cols) > 1 else numeric_cols[0]
)
], style={'width': '48%', 'float': 'right', 'display': 'inline-block'}),
dcc.Graph(id='main-graph'),
dcc.Graph(id='distribution-graph')
])
@app.callback(
[Output('main-graph', 'figure'),
Output('distribution-graph', 'figure')],
[Input('x-axis-dropdown', 'value'),
Input('y-axis-dropdown', 'value')]
)
def update_graphs(x_axis, y_axis):
# Scatter plot
scatter_fig = px.scatter(df, x=x_axis, y=y_axis, title=f'{y_axis} vs {x_axis}')
# Distribution plot
if x_axis in numeric_cols:
dist_fig = px.histogram(df, x=x_axis, title=f'Distribution of {x_axis}')
else:
dist_fig = px.bar(df[x_axis].value_counts().reset_index(),
x='index', y=x_axis, title=f'Count of {x_axis}')
return scatter_fig, dist_fig
return app
# Usage
dashboard = create_automated_dashboard(df)
dashboard.run_server(debug=True)
Real-world use case: A sales manager needs real-time insights into team performance metrics. The dashboard automatically updates with new sales data and allows filtering by region, product, or time period without requiring technical knowledge.
Benefits of automation:
Creates professional dashboards without frontend development skills
Enables self-service analytics for non-technical stakeholders
Updates automatically with new data
Reduces dependency on data visualization specialists
4. Web Scraping for Data Collection with BeautifulSoup
Manually collecting data from websites is tedious and error-prone. This script automates web scraping with built-in error handling and rate limiting.
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import random
def automated_web_scraper(urls, delay_range=(1, 3)):
# Automated web scraper with error handling and rate limiting
scraped_data = []
for i, url in enumerate(urls):
try:
# Random delay to avoid being blocked
time.sleep(random.uniform(*delay_range))
# Make request with headers to appear more human-like
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# Parse HTML
soup = BeautifulSoup(response.content, 'html.parser')
# Extract data (customize based on target website structure)
data = {
'url': url,
'title': soup.find('title').text.strip() if soup.find('title') else 'N/A',
'meta_description': '',
'headings': [h.text.strip() for h in soup.find_all(['h1', 'h2', 'h3'])[:5]],
'scraped_at': pd.Timestamp.now()
}
# Extract meta description
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc:
data['meta_description'] = meta_desc.get('content', '')
scraped_data.append(data)
print(f"Scraped {i+1}/{len(urls)}: {url}")
except Exception as e:
print(f"Error scraping {url}: {str(e)}")
scraped_data.append({
'url': url,
'error': str(e),
'scraped_at': pd.Timestamp.now()
})
return pd.DataFrame(scraped_data)
# Usage
urls_to_scrape = [
'https://example1.com',
'https://example2.com',
'https://example3.com'
]
scraped_df = automated_web_scraper(urls_to_scrape)
scraped_df.to_csv('scraped_data.csv', index=False)
Real-world use case: A market research team needs to monitor competitor pricing across 500+ product pages daily. This script collects pricing data automatically and identifies price changes without manual checking.
Benefits of automation:
Collects data 24/7 without human intervention
Handles errors gracefully and continues processing
Scales to thousands of URLs easily
Maintains consistent data collection schedules
5. Automating Model Training with Scikit-learn Pipelines
Model training often involves repetitive preprocessing steps and parameter tuning. This script creates reusable pipelines that standardize the entire machine learning workflow.
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.metrics import classification_report
import pandas as pd
import joblib
def create_automated_ml_pipeline(df, target_column, model_type='classification'):
# Create and train automated ML pipeline
X = df.drop(columns=[target_column])
y = df[target_column]
# Identify column types
numeric_features = X.select_dtypes(include=['int64', 'float64']).columns
categorical_features = X.select_dtypes(include=['object']).columns
# Create preprocessing pipeline
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
# Combine preprocessing steps
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
]
)
# Create full pipeline with model
if model_type == 'classification':
model = RandomForestClassifier(n_estimators=100, random_state=42)
else:
from sklearn.ensemble import RandomForestRegressor
model = RandomForestRegressor(n_estimators=100, random_state=42)
pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', model)
])
# Split data and train
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train pipeline
pipeline.fit(X_train, y_train)
# Evaluate model
cv_scores = cross_val_score(pipeline, X_train, y_train, cv=5)
test_score = pipeline.score(X_test, y_test)
# Generate predictions and report
y_pred = pipeline.predict(X_test)
results = {
'pipeline': pipeline,
'cv_scores': cv_scores,
'test_score': test_score,
'classification_report': classification_report(y_test, y_pred) if model_type == 'classification' else None
}
# Save pipeline
joblib.dump(pipeline, f'automated_ml_pipeline_{target_column}.pkl')
return results
# Usage
results = create_automated_ml_pipeline(df, 'target_column', 'classification')
print(f"Cross-validation score: {results['cv_scores'].mean():.3f} (+/- {results['cv_scores'].std() * 2:.3f})")
Real-world use case: A financial institution needs to retrain fraud detection models weekly with new transaction data. This pipeline automatically handles data preprocessing, model training, and validation without manual intervention.
Benefits of automation:
Ensures consistent preprocessing across different datasets
Reduces model development time from days to hours
Prevents data leakage through proper pipeline structure
Enables easy model deployment and version control
6. Feature Engineering with Feature-engine
Feature engineering often requires domain expertise and repetitive coding. This script automates common feature engineering tasks with intelligent defaults and customizable options.
from feature_engine.creation import CombineWithReferenceFeature, MathFeatures
from feature_engine.discretisation import EqualFrequencyDiscretiser
from feature_engine.encoding import RareLabelEncoder, OneHotEncoder
from feature_engine.transformation import LogTransformer, BoxCoxTransformer
from feature_engine.selection import DropConstantFeatures, DropDuplicateFeatures
import pandas as pd
import numpy as np
def automated_feature_engineering(df, target_column=None):
# Separate features and target
if target_column:
X = df.drop(columns=[target_column])
y = df[target_column]
else:
X = df.copy()
y = None
numeric_vars = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
categorical_vars = X.select_dtypes(include=['object']).columns.tolist()
print(f"Starting feature engineering with {len(X.columns)} features...")
# 1. Remove constant and duplicate features
constant_dropper = DropConstantFeatures()
duplicate_dropper = DropDuplicateFeatures()
X = constant_dropper.fit_transform(X)
X = duplicate_dropper.fit_transform(X)
# 2. Handle rare categories in categorical variables
if categorical_vars:
rare_encoder = RareLabelEncoder(tol=0.01, n_categories=10)
X = rare_encoder.fit_transform(X)
# 3. Create mathematical combinations of numeric features
if len(numeric_vars) >= 2:
math_combiner = MathFeatures(
variables=numeric_vars[:5], # Limit to first 5 to avoid explosion
func=['sum', 'prod', 'mean'],
reference=['mean']
)
X = math_combiner.fit_transform(X)
# 4. Apply transformations to numeric variables
# Log transformation for skewed variables
skewed_vars = []
for var in numeric_vars:
if X[var].min() > 0: # Log only positive values
skewness = X[var].skew()
if abs(skewness) > 1:
skewed_vars.append(var)
if skewed_vars:
log_transformer = LogTransformer(variables=skewed_vars)
X = log_transformer.fit_transform(X)
# 5. Discretize continuous variables
if len(numeric_vars) > 0:
discretizer = EqualFrequencyDiscretiser(
variables=numeric_vars[:3], # Discretize first 3 numeric vars
q=5,
return_object=True
)
X_discrete = discretizer.fit_transform(X)
# Add discretized versions with suffix
for var in discretizer.variables:
X[f'{var}_binned'] = X_discrete[var]
# 6. One-hot encode categorical variables
updated_categorical_vars = X.select_dtypes(include=['object']).columns.tolist()
if updated_categorical_vars:
ohe = OneHotEncoder(
variables=updated_categorical_vars,
drop_last=True
)
X = ohe.fit_transform(X)
print(f"Feature engineering complete. New feature count: {len(X.columns)}")
# Create feature importance summary
feature_summary = {
'original_features': len(df.columns) - (1 if target_column else 0),
'final_features': len(X.columns),
'features_created': len(X.columns) - len(df.columns) + (1 if target_column else 0),
'numeric_features': len(X.select_dtypes(include=['int64', 'float64']).columns),
'categorical_features': len(X.select_dtypes(include=['object']).columns),
'binary_features': len([col for col in X.columns if X[col].nunique() == 2])
}
return X, feature_summary
# Usage
X_engineered, summary = automated_feature_engineering(df, 'target_column')
print(f"Created {summary['features_created']} new features")
Real-world use case: An e-commerce company wants to improve their recommendation system by creating meaningful features from user behavior data, product attributes, and transaction history without manually coding hundreds of feature combinations.
Benefits of automation:
Systematically explores feature combinations that humans might miss
Applies domain-agnostic transformations consistently
Scales feature engineering to large datasets
Documents feature creation process for reproducibility
7. Automated Hyperparameter Tuning with Optuna
Manual hyperparameter tuning is time-consuming and often suboptimal. This script uses advanced optimization algorithms to find the best parameters automatically.
import optuna
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import accuracy_score
import pandas as pd
import numpy as np
def automated_hyperparameter_tuning(X, y, model_type='random_forest', n_trials=100):
def objective(trial):
if model_type == 'random_forest':
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 300),
'max_depth': trial.suggest_int('max_depth', 3, 20),
'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
'max_features': trial.suggest_categorical('max_features', ['auto', 'sqrt', 'log2']),
'bootstrap': trial.suggest_categorical('bootstrap', [True, False])
}
model = RandomForestClassifier(**params, random_state=42, n_jobs=-1)
elif model_type == 'xgboost':
import xgboost as xgb
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 300),
'max_depth': trial.suggest_int('max_depth', 3, 10),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 0, 10),
'reg_lambda': trial.suggest_float('reg_lambda', 0, 10)
}
model = xgb.XGBClassifier(**params, random_state=42, n_jobs=-1)
# Perform cross-validation
cv_scores = cross_val_score(model, X, y, cv=5, scoring='accuracy', n_jobs=-1)
return cv_scores.mean()
# Create study and optimize
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=n_trials, show_progress_bar=True)
# Get best parameters and score
best_params = study.best_params
best_score = study.best_value
# Train final model with best parameters
if model_type == 'random_forest':
best_model = RandomForestClassifier(**best_params, random_state=42, n_jobs=-1)
elif model_type == 'xgboost':
import xgboost as xgb
best_model = xgb.XGBClassifier(**best_params, random_state=42, n_jobs=-1)
best_model.fit(X, y)
# Generate optimization history
optimization_history = pd.DataFrame({
'trial': range(len(study.trials)),
'value': [trial.value for trial in study.trials],
'params': [trial.params for trial in study.trials]
})
results = {
'best_model': best_model,
'best_params': best_params,
'best_score': best_score,
'study': study,
'optimization_history': optimization_history
}
return results
# Usage
tuning_results = automated_hyperparameter_tuning(X_train, y_train, 'random_forest', n_trials=50)
print(f"Best cross-validation score: {tuning_results['best_score']:.4f}")
print(f"Best parameters: {tuning_results['best_params']}")
# Plot optimization history
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.plot(tuning_results['optimization_history']['trial'],
tuning_results['optimization_history']['value'])
plt.xlabel('Trial')
plt.ylabel('Accuracy')
plt.title('Hyperparameter Optimization Progress')
plt.show()
Real-world use case: A machine learning team needs to optimize models for different client projects with varying datasets and requirements. This script automatically finds optimal parameters for each use case without manual experimentation.
Benefits of automation:
Finds better parameters than manual tuning
Saves weeks of manual experimentation
Uses intelligent search algorithms instead of grid search
Provides optimization insights and visualizations
8. Model Evaluation Reports with Yellowbrick
Creating comprehensive model evaluation reports manually requires writing extensive plotting and analysis code. This script generates professional evaluation reports automatically.
from yellowbrick.classifier import ClassificationReport, ROCAUC, ConfusionMatrix
from yellowbrick.model_selection import ValidationCurve, LearningCurve
from yellowbrick.features import FeatureImportances
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import pandas as pd
def automated_model_evaluation(model, X, y, model_name="Model"):
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Create figure with subplots
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle(f'{model_name} Evaluation Report', fontsize=16, fontweight='bold')
# 1. Classification Report
visualizer1 = ClassificationReport(model, ax=axes[0,0], support=True)
visualizer1.fit(X_train, y_train)
visualizer1.score(X_test, y_test)
visualizer1.finalize()
# 2. ROC-AUC Curve
visualizer2 = ROCAUC(model, ax=axes[0,1])
visualizer2.fit(X_train, y_train)
visualizer2.score(X_test, y_test)
visualizer2.finalize()
# 3. Confusion Matrix
visualizer3 = ConfusionMatrix(model, ax=axes[0,2])
visualizer3.fit(X_train, y_train)
visualizer3.score(X_test, y_test)
visualizer3.finalize()
# 4. Feature Importance (if model supports it)
if hasattr(model, 'feature_importances_'):
visualizer4 = FeatureImportances(model, ax=axes[1,0])
visualizer4.fit(X_train, y_train)
visualizer4.finalize()
else:
axes[1,0].text(0.5, 0.5, 'Feature Importance\nNot Available',
ha='center', va='center', transform=axes[1,0].transAxes)
# 5. Learning Curve
visualizer5 = LearningCurve(model, ax=axes[1,1], scoring='accuracy')
visualizer5.fit(X, y)
visualizer5.finalize()
# 6. Validation Curve (example with max_depth for tree-based models)
if hasattr(model, 'max_depth'):
visualizer6 = ValidationCurve(
model, ax=axes[1,2], param_name='max_depth',
param_range=range(1, 11), scoring='accuracy'
)
visualizer6.fit(X, y)
visualizer6.finalize()
else:
axes[1,2].text(0.5, 0.5, 'Validation Curve\nNot Available',
ha='center', va='center', transform=axes[1,2].transAxes)
plt.tight_layout()
plt.savefig(f'{model_name.lower().replace(" ", "_")}_evaluation_report.png',
dpi=300, bbox_inches='tight')
plt.show()
# Generate numerical summary
model.fit(X_train, y_train)
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
summary = {
'model_name': model_name,
'train_accuracy': train_score,
'test_accuracy': test_score,
'overfit_gap': train_score - test_score,
'total_features': X.shape[1],
'training_samples': X_train.shape[0],
'test_samples': X_test.shape[0]
}
return summary
def compare_multiple_models(models_dict, X, y):
"""
Compare multiple models and generate comparison report
"""
results = []
for model_name, model in models_dict.items():
print(f"Evaluating {model_name}...")
summary = automated_model_evaluation(model, X, y, model_name)
results.append(summary)
# Create comparison DataFrame
comparison_df = pd.DataFrame(results)
comparison_df = comparison_df.sort_values('test_accuracy', ascending=False)
# Save comparison report
comparison_df.to_csv('model_comparison_report.csv', index=False)
return comparison_df
# Usage
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
models = {
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'SVM': SVC(random_state=42, probability=True),
'Logistic Regression': LogisticRegression(random_state=42)
}
comparison_results = compare_multiple_models(models, X, y)
print("\nModel Comparison Results:")
print(comparison_results[['model_name', 'test_accuracy', 'overfit_gap']])
Real-world use case: A consulting firm needs to present model performance results to clients with clear visualizations and metrics. This script generates professional reports that can be directly included in client presentations.
Benefits of automation:
Creates publication-ready evaluation reports
Ensures consistent evaluation metrics across projects
Identifies overfitting and performance issues automatically
Enables easy model comparison and selection
9. Automating Dataset Versioning with DVC
Data versioning is crucial for reproducible machine learning but often overlooked due to complexity. This script automates dataset versioning and experiment tracking.
import dvc.api
import pandas as pd
import os
import git
from datetime import datetime
import hashlib
import json
class AutomatedDataVersioning:
def __init__(self, project_path="."):
self.project_path = project_path
self.data_dir = os.path.join(project_path, "data")
self.dvc_dir = os.path.join(project_path, ".dvc")
# Initialize directories
os.makedirs(self.data_dir, exist_ok=True)
def setup_dvc_project(self):
"""Initialize DVC project if not already initialized"""
try:
if not os.path.exists(self.dvc_dir):
os.system(f"cd {self.project_path} && dvc init")
print("DVC project initialized")
else:
print("DVC project already exists")
except Exception as e:
print(f"Error initializing DVC: {e}")
def add_dataset_version(self, dataframe, dataset_name, description=""):
"""Add new version of dataset with automatic tracking"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Generate data hash for uniqueness
data_string = dataframe.to_string()
data_hash = hashlib.md5(data_string.encode()).hexdigest()[:8]
# Create versioned filename
filename = f"{dataset_name}_{timestamp}_{data_hash}.csv"
filepath = os.path.join(self.data_dir, filename)
# Save dataset
dataframe.to_csv(filepath, index=False)
# Create metadata
metadata = {
'dataset_name': dataset_name,
'timestamp': timestamp,
'description': description,
'shape': dataframe.shape,
'columns': list(dataframe.columns),
'data_hash': data_hash,
'file_size': os.path.getsize(filepath),
'missing_values': dataframe.isnull().sum().sum(),
'dtypes': dataframe.dtypes.to_dict()
}
# Save metadata
metadata_file = filepath.replace('.csv', '_metadata.json')
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2, default=str)
# Add to DVC tracking
try:
os.system(f"cd {self.project_path} && dvc add {filepath}")
print(f"Dataset version saved: {filename}")
print(f"Shape: {dataframe.shape}, Hash: {data_hash}")
except Exception as e:
print(f"Error adding to DVC: {e}")
return filepath, metadata
def list_dataset_versions(self, dataset_name=None):
"""List all versions of datasets"""
versions = []
for file in os.listdir(self.data_dir):
if file.endswith('_metadata.json'):
with open(os.path.join(self.data_dir, file), 'r') as f:
metadata = json.load(f)
if dataset_name is None or metadata['dataset_name'] == dataset_name:
versions.append(metadata)
return pd.DataFrame(versions).sort_values('timestamp', ascending=False)
def load_dataset_version(self, dataset_name, version_hash=None):
"""Load specific version of dataset"""
versions = self.list_dataset_versions(dataset_name)
if version_hash:
version = versions[versions['data_hash'] == version_hash]
else:
version = versions.iloc[0] # Latest version
if len(version) == 0:
raise ValueError(f"Version not found for {dataset_name}")
filename = f"{dataset_name}_{version.iloc[0]['timestamp']}_{version.iloc[0]['data_hash']}.csv"
filepath = os.path.join(self.data_dir, filename)
return pd.read_csv(filepath)
# Usage
versioning = AutomatedDataVersioning()
versioning.setup_dvc_project()
# Add new dataset version
filepath, metadata = versioning.add_dataset_version(
df,
"customer_data",
"Initial customer dataset with demographics"
)
# List all versions
versions = versioning.list_dataset_versions("customer_data")
print(versions[['dataset_name', 'timestamp', 'shape', 'data_hash']])
# Load specific version
df_v1 = versioning.load_dataset_version("customer_data", version_hash="abc12345")
Real-world use case: A machine learning team working on a customer churn model needs to track different versions of their training data as new customer segments are added and features are engineered, ensuring they can reproduce any previous model results.
Benefits of automation:
Ensures reproducibility of machine learning experiments
Tracks data lineage and changes automatically
Prevents data loss and enables rollback capabilities
Integrates with Git for complete project versioning
10. Scheduling & Monitoring Scripts with APScheduler
Data science workflows often need to run on schedules or respond to events. This script creates a robust scheduling system with monitoring and error handling.
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import pandas as pd
import logging
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
import os
class AutomatedDataPipeline:
def __init__(self, config):
self.config = config
self.scheduler = BackgroundScheduler()
self.setup_logging()
def setup_logging(self):
"""Setup logging for pipeline monitoring"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('pipeline.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def send_notification(self, subject, message, is_error=False):
"""Send email notification on success or failure"""
try:
if 'email' in self.config:
msg = MIMEText(message)
msg['Subject'] = f"{'ERROR: ' if is_error else ''}{subject}"
msg['From'] = self.config['email']['from']
msg['To'] = self.config['email']['to']
server = smtplib.SMTP(self.config['email']['smtp_server'], 587)
server.starttls()
server.login(self.config['email']['username'], self.config['email']['password'])
server.send_message(msg)
server.quit()
self.logger.info(f"Notification sent: {subject}")
except Exception as e:
self.logger.error(f"Failed to send notification: {e}")
def data_collection_job(self):
"""Automated data collection job"""
try:
self.logger.info("Starting data collection job")
# Simulate data collection (replace with actual logic)
data = pd.DataFrame({
'timestamp': [datetime.now()],
'records_collected': [1000],
'status': ['success']
})
# Save collected data
filename = f"collected_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
data.to_csv(filename, index=False)
self.logger.info(f"Data collection completed: {filename}")
self.send_notification("Data Collection Success", f"Collected {len(data)} records")
except Exception as e:
error_msg = f"Data collection failed: {str(e)}"
self.logger.error(error_msg)
self.send_notification("Data Collection Failed", error_msg, is_error=True)
def model_training_job(self):
"""Automated model training job"""
try:
self.logger.info("Starting model training job")
# Load latest data
data_files = [f for f in os.listdir('.') if f.startswith('collected_data_')]
if not data_files:
raise ValueError("No data files found for training")
latest_file = max(data_files)
df = pd.read_csv(latest_file)
# Simulate model training (replace with actual logic)
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
# Dummy training process
model = RandomForestClassifier(n_estimators=100)
# X, y = prepare_features(df) # Your feature preparation logic
# scores = cross_val_score(model, X, y, cv=5)
model_filename = f"model_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pkl"
# joblib.dump(model, model_filename)
self.logger.info(f"Model training completed: {model_filename}")
self.send_notification("Model Training Success", f"Model saved as {model_filename}")
except Exception as e:
error_msg = f"Model training failed: {str(e)}"
self.logger.error(error_msg)
self.send_notification("Model Training Failed", error_msg, is_error=True)
def data_quality_check(self):
"""Automated data quality monitoring"""
try:
self.logger.info("Starting data quality check")
# Find latest data file
data_files = [f for f in os.listdir('.') if f.startswith('collected_data_')]
if not data_files:
raise ValueError("No data files found for quality check")
latest_file = max(data_files)
df = pd.read_csv(latest_file)
# Perform quality checks
quality_report = {
'total_records': len(df),
'missing_values': df.isnull().sum().sum(),
'duplicate_records': df.duplicated().sum(),
'missing_percentage': (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100
}
# Set quality thresholds
if quality_report['missing_percentage'] > 10:
raise ValueError(f"High missing data: {quality_report['missing_percentage']:.2f}%")
if quality_report['duplicate_records'] > len(df) * 0.05:
raise ValueError(f"High duplicate rate: {quality_report['duplicate_records']} records")
self.logger.info("Data quality check passed")
self.send_notification("Data Quality Check", f"Quality metrics: {quality_report}")
except Exception as e:
error_msg = f"Data quality check failed: {str(e)}"
self.logger.error(error_msg)
self.send_notification("Data Quality Alert", error_msg, is_error=True)
def setup_schedules(self):
"""Setup automated schedules for all jobs"""
# Daily data collection at 2 AM
self.scheduler.add_job(
self.data_collection_job,
CronTrigger(hour=2, minute=0),
id='data_collection',
replace_existing=True
)
# Weekly model training on Sundays at 3 AM
self.scheduler.add_job(
self.model_training_job,
CronTrigger(day_of_week=6, hour=3, minute=0),
id='model_training',
replace_existing=True
)
# Hourly data quality checks
self.scheduler.add_job(
self.data_quality_check,
CronTrigger(minute=0),
id='quality_check',
replace_existing=True
)
self.logger.info("All schedules configured")
def start_pipeline(self):
"""Start the automated pipeline"""
self.setup_schedules()
self.scheduler.start()
self.logger.info("Automated pipeline started")
try:
# Keep the script running
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
self.logger.info("Pipeline stopped by user")
self.scheduler.shutdown()
# Configuration
config = {
'email': {
'smtp_server': 'smtp.gmail.com',
'username': 'your_email@gmail.com',
'password': 'your_app_password',
'from': 'your_email@gmail.com',
'to': 'alerts@company.com'
}
}
# Usage
pipeline = AutomatedDataPipeline(config)
# Run jobs manually for testing
pipeline.data_collection_job()
pipeline.data_quality_check()
# Start automated pipeline (runs continuously)
# pipeline.start_pipeline()
Real-world use case: An e-commerce company needs to update their recommendation models daily with new user behavior data, check data quality every hour, and retrain models weekly, all while monitoring for failures and sending alerts to the data team.
Benefits of automation:
Ensures consistent execution of data pipelines
Provides immediate alerts when issues occur
Reduces manual monitoring and intervention
Scales to complex multi-step workflows
conclusion
These ten Python scripts represent a comprehensive toolkit for automating the most time-consuming and error-prone aspects of data science workflows. From data cleaning and exploration to model training and deployment monitoring, each script addresses a critical pain point that data scientists face daily.
The real power of these automation scripts lies not just in their individual capabilities, but in how they can be combined into end-to-end workflows. Imagine a pipeline that automatically collects data, performs quality checks, engineers features, trains models, evaluates performance, and deploys the best-performing model — all while maintaining version control and sending notifications about the process.
By implementing these scripts in your data science projects, you’ll experience significant productivity gains: tasks that once took hours can be completed in minutes, manual errors are virtually eliminated, and you can focus your expertise on the strategic aspects of data science rather than repetitive operational tasks.
Start by implementing one or two scripts that address your most pressing automation needs. As you become comfortable with the approach, gradually expand your automation toolkit. Remember that the initial time investment in setting up these scripts will pay dividends throughout your data science career, freeing you to tackle more complex problems and deliver insights faster than ever before.
These scripts provide the foundation for building robust, automated data science workflows that can handle the demands of modern data-driven organizations.
Subscribe to my newsletter
Read articles from Timothy Kimutai directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Timothy Kimutai
Timothy Kimutai
I simplify AI and tech for developers and entrepreneurs. Freelance Data scientist at Upwork. Join 10K+ readers for actionable insights.