Prefect for Workflow Orchestration

Gabriel OkemwaGabriel Okemwa
3 min read

๐Ÿš€ Revolutionizing Data Workflows with Prefect: The Future of Automation

In today's data-driven world, efficient workflow management is crucial for businesses to stay competitive. Enter Prefect, a cutting-edge workflow automation tool that's transforming how data engineers and data scientists handle complex data pipelines.

๐Ÿ”ง What is Prefect?

Prefect is an open-source workflow management system designed to handle modern data stacks. It allows you to build, schedule, and monitor data pipelines with unprecedented ease and flexibility.

๐ŸŒŸ Key Features:

  1. Dynamic Workflows: Prefect adapts to your data, not the other way around. It handles conditional logic and dynamic task generation effortlessly.

  2. Failure Handling: With built-in retry mechanisms and failure notifications, Prefect ensures your workflows are resilient and reliable.

  3. Observability: Real-time monitoring and logging give you full visibility into your workflows, making debugging a breeze.

  4. Versioning: Keep track of changes in your workflows with built-in versioning support.

๐Ÿ’ก Why Prefect Stands Out:

  • Pythonic: Write your workflows in pure Python, leveraging your existing skills and libraries.

  • Scalable: From simple scripts to complex distributed systems, Prefect scales with your needs.

  • Cloud or Self-Hosted: Choose between Prefect Cloud for a managed solution or self-host for complete control.

Lets create work on a simple project to demonstrate the power of Prefect. In this project I will demonstrate how to create a basic ETL (Extract, Transform, Load) pipeline using Prefect. We'll extract some data from an API, transform it, and then load it into a CSV file.

To get started we will install the following python packages:

pip install prefect requests pandas

create a file named main.py and add the following code:

import requests
import pandas as pd
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta


@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract(url: str) -> dict:
    """Extract data from an API"""
    response = requests.get(url)
    return response.json()


@task
def transform(data: dict) -> pd.DataFrame:
    """Transform the data into a DataFrame"""
    df = pd.DataFrame(data["results"])
    return df[["name", "height", "mass", "birth_year", "gender"]]


@task
def load(df: pd.DataFrame, path: str) -> None:
    """Load the data into a CSV file"""
    df.to_csv(path, index=False)


@flow
def etl_flow(url: str, path: str):
    """Main ETL flow"""
    raw_data = extract(url)
    transformed_data = transform(raw_data)
    load(transformed_data, path)


if __name__ == "__main__":
    etl_flow("https://swapi.dev/api/people", "star_wars_characters.csv")

Now run the file using python main.py and you should have the following output:

Prefect comes with a dashboard which you can access at: http://127.0.0.1:4200/

Here are the key concepts in Prefect:

  1. Tasks:

    • Fundamental units of work in Prefect

    • Python functions decorated with @task

    • Can be chained together to form complex workflows

  2. Flows:

    • Containers for tasks

    • Define the structure and dependencies of your workflow

    • Decorated with @flow

  3. State:

    • Represents the status of a task or flow execution

    • Examples: Pending, Running, Successful, Failed

  4. Schedules:

    • Allow for automated, periodic flow runs

    • Can be defined using cron syntax or intervals

  5. Storage:

    • Where flow code is stored

    • Options include local file system, GitHub, S3, etc.

  6. Agents:

    • Processes that execute scheduled flows

    • Can run on various platforms (local, cloud, etc.)

  7. Triggers:

    • Define conditions for task execution

    • Examples: all_successful, all_failed, any_successful

  8. Parameters:

    • Allow for dynamic inputs to flows

    • Can be set at runtime

  9. Retries:

    • Automatic re-execution of failed tasks

    • Configurable with max_retries and retry_delay

  10. Caching:

    • Stores task results for reuse

    • Improves efficiency by avoiding redundant computations

  11. Logging:

    • Built-in logging for tasks and flows

    • Helps with monitoring and debugging

  12. Results:

    • Outputs of task executions

    • Can be stored and passed between tasks

0
Subscribe to my newsletter

Read articles from Gabriel Okemwa directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Gabriel Okemwa
Gabriel Okemwa