Python Automation #3: Data ingestion w/pandas, SQLAlchemy, dataprep.connector, petl

Anix LynchAnix Lynch
6 min read

1. Read Data from CSV, Excel, or JSON (pandas.read_csv, pandas.read_excel, pandas.read_json)

Read CSV:

import pandas as pd

# Read data from CSV
df = pd.read_csv("data.csv")
print(df.head())

Output (Sample):

   ID  Name  Age
0   1  Alice   25
1   2    Bob   30
2   3  Carol   27

Read Excel:

import pandas as pd

# Read data from Excel
df = pd.read_excel("data.xlsx", sheet_name="Sheet1")
print(df.head())

Output (Sample):

   ID  Name  Age
0   1  Alice   25
1   2    Bob   30
2   3  Carol   27

Read JSON:

import pandas as pd

# Read data from JSON
df = pd.read_json("data.json")
print(df.head())

Output (Sample):

   ID  Name  Age
0   1  Alice   25
1   2    Bob   30
2   3  Carol   27

2. Load Data from Relational Databases (SQLAlchemy/pandas.read_sql)

import pandas as pd
from sqlalchemy import create_engine

# Create database connection
engine = create_engine("sqlite:///example.db")

# Load data from a SQL table
df = pd.read_sql("SELECT * FROM users", engine)
print(df.head())

Output (Sample):

   ID  Name  Age
0   1  Alice   25
1   2    Bob   30
2   3  Carol   27

3. Extract Data from APIs (dataprep.connector.read_connector)

from dataprep.connector import Connector

# Connect to an API
c = Connector("example_api")

# Extract data from the API
df = c.query("endpoint", limit=10)
print(df.head())

Output (Sample):

   ID    Name    Age
0   1   Alice    25
1   2     Bob    30
2   3   Carol    27

4. Stream Large Datasets (petl.fromcsv)

import petl as etl

# Stream large datasets from CSV
table = etl.fromcsv("large_data.csv")

# Preview first few rows
print(etl.head(table, 5))

Output (Sample):

+------+-------+-----+
|  ID  | Name  | Age |
+------+-------+-----+
|    1 | Alice |  25 |
|    2 | Bob   |  30 |
|    3 | Carol |  27 |
+------+-------+-----+

5. Write to Relational Databases (SQLAlchemy/pandas.to_sql)

import pandas as pd
from sqlalchemy import create_engine

# Create database connection
engine = create_engine("sqlite:///example.db")

# Sample DataFrame
df = pd.DataFrame({"ID": [1, 2], "Name": ["Alice", "Bob"], "Age": [25, 30]})

# Write data to a SQL table
df.to_sql("users", engine, if_exists="replace", index=False)
print("Data written successfully!")

Output:

Data written successfully!

6. Connect to Cloud Databases (dataprep.connector.read_connector)

from dataprep.connector import Connector

# Connect to a cloud database (e.g., AWS RDS)
c = Connector("aws_rds")

# Query data
df = c.query("database_name.table_name", limit=10)
print(df.head())

Output (Sample):

   ID  Name  Age
0   1  Alice   25
1   2    Bob   30
2   3  Carol   27

7. Transform While Loading (petl.transform)

import petl as etl

# Load data and apply transformation
table = etl.fromcsv("data.csv")
transformed_table = etl.addfield(table, "FullName", lambda row: row.Name.upper())
print(etl.head(transformed_table, 5))

Output (Sample):

+------+-------+-----+----------+
|  ID  | Name  | Age | FullName |
+------+-------+-----+----------+
|    1 | Alice |  25 | ALICE    |
|    2 | Bob   |  30 | BOB      |
+------+-------+-----+----------+

8. Batch Processing for Ingestion (petl.tocsv)

import petl as etl

# Stream and write in batches
table = etl.fromcsv("large_data.csv")
etl.tocsv(table, "processed_data.csv", batchsize=100)
print("Batch processing completed!")

Output:

Batch processing completed!

9. Export Data to Multiple Formats (pandas.to_csv, pandas.to_excel, pandas.to_json)

Export to CSV:

import pandas as pd

# Sample DataFrame
df = pd.DataFrame({"ID": [1, 2], "Name": ["Alice", "Bob"], "Age": [25, 30]})

# Export to CSV
df.to_csv("output.csv", index=False)
print("Exported to CSV!")

Export to Excel:

df.to_excel("output.xlsx", index=False)
print("Exported to Excel!")

Export to JSON:

df.to_json("output.json", orient="records", indent=2)
print("Exported to JSON!")

Output:

Exported to CSV!
Exported to Excel!
Exported to JSON!

10. Handle API Rate Limits (dataprep.connector.rate_limit)

from dataprep.connector import Connector

# API connection with rate-limiting
c = Connector("example_api")

# Set rate limit (e.g., 1 request per second)
df = c.query("endpoint", limit=10, rate_limit=1)
print(df.head())

Output (Sample):

   ID  Name  Age
0   1  Alice  25
1   2    Bob  30
2   3  Carol  27

11. SQL Query Integration (SQLAlchemy/pandas.read_sql)

import pandas as pd
from sqlalchemy import create_engine

# Create database connection
engine = create_engine("sqlite:///example.db")

# Execute SQL query and load data
df = pd.read_sql("SELECT * FROM users WHERE Age > 25", engine)
print(df)

Output:

   ID    Name  Age
0   2     Bob   30
1   3   Carol   27

12. Transform Data from APIs (dataprep.connector.read_connector)

from dataprep.connector import Connector

# Connect to an API
c = Connector("example_api")

# Transform and load data
df = c.query("endpoint", limit=5)
df["Transformed_Name"] = df["Name"].str.upper()
print(df)

Output:

   ID    Name  Age Transformed_Name
0   1   Alice   25           ALICE
1   2     Bob   30             BOB

13. Clean and Filter During Ingestion (petl.select, petl.clean_headers)

import petl as etl

# Load and clean data
table = etl.fromcsv("data.csv")
cleaned_table = etl.clean_headers(table)
filtered_table = etl.select(cleaned_table, lambda row: int(row.Age) > 25)
print(etl.head(filtered_table, 5))

Output:

+------+-------+-----+
|  id  | name  | age |
+------+-------+-----+
|    2 | Bob   |  30 |
|    3 | Carol |  27 |
+------+-------+-----+

14. Handle Flat-File Ingestion (petl.fromcsv, petl.fromtsv)

CSV:

import petl as etl

# Load data from CSV
table = etl.fromcsv("data.csv")
print(etl.head(table, 5))

Output:

+------+-------+-----+
|  ID  | Name  | Age |
+------+-------+-----+
|    1 | Alice |  25 |
|    2 | Bob   |  30 |
+------+-------+-----+

TSV:

# Load data from TSV
table_tsv = etl.fromtsv("data.tsv")
print(etl.head(table_tsv, 5))

Output (TSV):

+------+-------+-----+
|  ID  | Name  | Age |
+------+-------+-----+
|    1 | Alice |  25 |
|    2 | Bob   |  30 |
+------+-------+-----+

15. Parallel Ingestion (dataprep.connector.read_connector)

from dataprep.connector import Connector

# Connect to API with parallel ingestion
c = Connector("example_api")

# Perform parallel queries
df = c.query("endpoint", limit=10, parallel=True)
print(df.head())

Output:

   ID    Name  Age
0   1   Alice   25
1   2     Bob   30
2   3   Carol   27

16. ORM Integration (SQLAlchemy)

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# Define ORM base
Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)

# Create engine and session
engine = create_engine("sqlite:///example.db")
Session = sessionmaker(bind=engine)
session = Session()

# Query data using ORM
users = session.query(User).filter(User.age > 25).all()
for user in users:
    print(user.name, user.age)

Output:

Bob 30
Carol 27

17. Integrate Data Pipelines (petl, SQLAlchemy)

import petl as etl
from sqlalchemy import create_engine

# Load data from CSV
table = etl.fromcsv("data.csv")

# Write to SQL database
engine = create_engine("sqlite:///example.db")
etl.todb(table, engine, "users")
print("Data pipeline integrated!")

Output:

Data pipeline integrated!

18. Preview Data During Ingestion (petl.head, pandas.head)

Using petl:

import petl as etl

# Preview data using petl
table = etl.fromcsv("data.csv")
print(etl.head(table, 5))

Output:

+------+-------+-----+
|  ID  | Name  | Age |
+------+-------+-----+
|    1 | Alice |  25 |
|    2 | Bob   |  30 |
+------+-------+-----+

Using pandas:

import pandas as pd

# Preview data using pandas
df = pd.read_csv("data.csv")
print(df.head())

Output:

   ID    Name  Age
0   1   Alice   25
1   2     Bob   30
2   3   Carol   27

19. Connect to Custom APIs (dataprep.connector)

from dataprep.connector import Connector

# Connect to custom API
c = Connector("custom_api")

# Fetch data
df = c.query("custom_endpoint", params={"key": "value"}, limit=10)
print(df.head())

Output:

   ID    Name  Age
0   1   Alice   25
1   2     Bob   30

20. Write to Cloud Storage (e.g., S3) (pandas.to_csv)

import pandas as pd
import boto3

# Save DataFrame to CSV
df = pd.DataFrame({"ID": [1, 2], "Name": ["Alice", "Bob"], "Age": [25, 30]})
df.to_csv("output.csv", index=False)

# Upload to S3
s3 = boto3.client("s3")
s3.upload_file("output.csv", "your_bucket_name", "output.csv")
print("File uploaded to S3!")

Output:

File uploaded to S3!

0
Subscribe to my newsletter

Read articles from Anix Lynch directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Anix Lynch
Anix Lynch