Pipelining in Machine Learning: Streamlining Your Workflow

Kshitij ShresthKshitij Shresth
5 min read

Unlocking the Power of Big Data Engineering: A Guide to Building Scalable  Data Pipelines Part2 | by Santosh Gowrisetty | Medium

In machine learning, effective data handling and model training are crucial for building robust and scalable models. One powerful way to streamline this process is through pipelining. Pipelining automates the workflow, ensuring that data pre-processing, feature engineering, and model training are seamlessly integrated.

What is Pipelining?

Pipelining in machine learning refers to the process of chaining together multiple data processing steps and machine learning models into a single, cohesive workflow. This approach helps in automating repetitive tasks, ensuring consistency, and improving code readability. It offers several advantages:

  1. Automation: Automates repetitive tasks, reducing manual intervention and errors.

  2. Consistency: Ensures consistent preprocessing and feature engineering across different models.

  3. Maintainability: Simplifies the workflow, making it easier to maintain and update.

  4. Scalability: Facilitates the scaling of workflows to handle larger datasets and more complex models.

without pipelining:

import pandas as pd

data = {
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
    'age': [25, 30, 35, 40, 45],
    'gender': ['Female', 'Male', 'Male', 'Male', 'Female']
}

df = pd.DataFrame(data)
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

#Encoding 'gender' to numerical values
le = LabelEncoder()
df['gender_encoded'] = le.fit_transform(df['gender'])

#Preparing features and target
X = df[['gender_encoded']]
y = df['age']

#Split 
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

#Scale
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

#Model training 
model = LogisticRegression()
model.fit(X_train_scaled, y_train)

#Prediction
y_pred = model.predict(X_test_scaled)

with pipelining:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer


preprocessor = ColumnTransformer(
    transformers=[
        ('gender', OneHotEncoder(), ['gender'])
    ],
    remainder='passthrough'
)

#Defining pipeline
pipeline = Pipeline([
    ('preprocessor', preprocessor),  #Preprocessing
    ('scaler', StandardScaler()),  #Scaling
    ('model', LogisticRegression())  #Classification
])

#Features and target
X = df[['gender']]
y = df['age']

#Split 
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

#Train pipeline
pipeline.fit(X_train, y_train)

#Predictions
y_pred = pipeline.predict(X_test)

By pipelining we consolidate pre-processing, scaling, and model training into a single cohesive workflow.

Building a Pipeline in Scikit-Learn

Scikit-learn provides a convenient Pipeline class to streamline the process:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
data = load_iris()
X = data.data
y = data.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

#Define the pipeline here with Scaling and Classification
pipeline = Pipeline([
    ('scaler', StandardScaler()),  
    ('classifier', LogisticRegression())  
])

#Training
pipeline.fit(X_train, y_train)

#Prediction and evaluation
y_pred = pipeline.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f'Accuracy: {accuracy:.2f}')

Standard Scaler standardizes the features by removing the mean and scaling to unit variance. Logistic Regression is the model used for classification and the pipeline automates the sequence of scaling followed by model training.

Adding Feature Engineering to the Pipeline

Pipelines can also include custom feature engineering steps:

from sklearn.feature_selection import SelectKBest, f_classif

#Define the extended pipeline this time with classification
extended_pipeline = Pipeline([
    ('scaler', StandardScaler()),  
    ('feature_selection', SelectKBest(score_func=f_classif, k=2)),  
    ('classifier', LogisticRegression())
])
extended_pipeline.fit(X_train, y_train)
y_pred = extended_pipeline.predict(X_test)

In this extended pipeline, SelectKBest selects the top k features based on their importance scores. Feature engineering is critical for improving model performance. Advanced pipelines use automated feature engineering tools like Featuretools automate feature creation by deriving new features from existing ones. Feature selection algorithm methods like Recursive Feature Elimination (RFE) or Embedded Methods (e.g., LASSO) are used to select the most relevant features, reducing dimensionality and improving model efficiency.

Model Ensemble

Model ensemble combines multiple models to improve performance and robustness. When incorporating model ensemble into a machine learning pipeline, the goal is to systematically manage the various stages of the workflow to achieve better performance and efficiency.

1. Bagging

Bootstrap Aggregating is used to improve the stability and accuracy of machine learning models by training multiple versions of the same model on different subsets of the training data. It helps in reducing variance and preventing overfitting.

Pipeline Integration is done by using a bagging technique like BaggingClassifier to train multiple instances of a base model after data has been cleaned. Evaluation is done of the combined predictions from the bagged models.

Here is a snippet using decision trees as base learners

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import BaggingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

data = load_iris()
X, y = data.data, data.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

#Define the pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('bagging', BaggingClassifier(base_estimator=DecisionTreeClassifier(), n_estimators=50, random_state=42))
])

#Train ing and evaluation
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
print(f"Bagging Accuracy: {accuracy_score(y_test, y_pred):.2f}")

2. Boosting

Boosting trains models sequentially, where each new model attempts to correct the errors of its predecessor. It’s effective for reducing both bias and variance, done by integrating a boosting technique like GradientBoostingClassifier into the pipeline.Here is an exmaple of boosting with gradient boosting.

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
data = load_iris()
X, y = data.data, data.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('boosting', GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, random_state=42))
])
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
print(f"Boosting Accuracy: {accuracy_score(y_test, y_pred):.2f}")

3. Stacking

Stacking combines multiple models to improve prediction performance. Base models are trained on the same dataset, and their predictions are used as inputs for a meta-model. Here we combine predictions from base models (SVM and Decision Tree) using a meta-model (Logistic Regression).

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import StackingClassifier
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
data = load_iris()
X, y = data.data, data.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

#Defining the base models and meta-model
base_models = [
    ('svm', SVC(probability=True)),
    ('dt', DecisionTreeClassifier())
]
meta_model = LogisticRegression()

#Stacked pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('stacking', StackingClassifier(estimators=base_models, final_estimator=meta_model))
])

pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
print(f"Stacking Accuracy: {accuracy_score(y_test, y_pred):.2f}")

These advanced techniques not only enhance the robustness and accuracy of machine learning solutions but also streamline workflows, making them indispensable for developing high-performing, scalable models in production environments. Implementing these strategies effectively positions the pipeline for success in real-world applications.

0
Subscribe to my newsletter

Read articles from Kshitij Shresth directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Kshitij Shresth
Kshitij Shresth

hi.