Declarative Data Pipelines: Moving from Code to Configuration

Jonathan BhaskarJonathan Bhaskar
14 min read

In data teams today, writing Python code to create data pipelines has become second nature, especially for analytics workloads. Creating new DAGs, tasks, and operators in Airflow - now the industry standard for data orchestration - is just part of our daily routine. But what if there's a simpler, more accessible approach to building these pipelines?

Before we explore this alternative, let's examine how another domain - DevOps - evolved its approach to building and shipping software, and see what lessons we can apply to data orchestration.

Evolution of Infrastructure Management

Let's examine a simple but common task: deploying a web application with a database. The way teams handled this task evolved dramatically over time.

Shell scripts (2000s)

In the early 2000s, deploying applications meant writing detailed shell scripts that listed every command needed. These scripts were brittle, hard to maintain, and required deep system knowledge:

#!/bin/bash

# Install dependencies
apt-get update
apt-get install -y nginx mysql-server

# Configure MySQL
mysql -u root -e "CREATE DATABASE myapp;"
mysql -u root -e "CREATE USER 'myapp'@'localhost' IDENTIFIED BY 'password';"
mysql -u root -e "GRANT ALL PRIVILEGES ON myapp.* TO 'myapp'@'localhost';"

# Deploy application
cd /var/www/html
tar -xzf myapp.tar.gz
chown -R www-data:www-data /var/www/html

# Start services
service mysql start
service nginx start

Configuration Management (2010s)

By 2010, tools like Puppet introduced a paradigm shift. Instead of listing commands, teams defined their desired system state in a declarative format. The tool would figure out how to achieve that state:

package { ['nginx', 'mysql-server']:
  ensure => installed,
}

service { 'nginx':
  ensure  => running,
  enable  => true,
  require => Package['nginx'],
}

service { 'mysql':
  ensure  => running,
  enable  => true,
  require => Package['mysql-server'],
}

exec { 'create-db':
  command => 'mysql -u root -e "CREATE DATABASE myapp;"',
  unless  => 'mysql -u root -e "SHOW DATABASES;" | grep myapp',
  require => Service['mysql'],
}

file { '/var/www/html/myapp':
  ensure  => directory,
  source  => 'puppet:///modules/myapp/files',
  recurse => true,
  owner   => 'www-data',
  group   => 'www-data',
}

Infrastructure as Code (2016+)

Cloud platforms like AWS took this declarative approach even further. With CloudFormation, engineers simply specified the resources they needed, and AWS handled all implementation details:

AWSTemplateFormatVersion: '2010-09-09'
Resources:
  WebServer:
    Type: AWS::EC2::Instance
    Properties:
      InstanceType: t2.micro
      ImageId: ami-0c55b159cbfafe1f0
      UserData: 
        Fn::Base64: !Sub |
          #!/bin/bash
          yum update -y
          yum install -y nginx

  Database:
    Type: AWS::RDS::DBInstance
    Properties:
      Engine: mysql
      DBName: myapp
      MasterUsername: admin
      MasterUserPassword: password
      DBInstanceClass: db.t2.micro

  SecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Allow web traffic
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 80
          ToPort: 80
          CidrIp: 0.0.0.0/0

The difference over the years is that, instead of actually writing the steps that needed to be executed (imperative), the engineers moved on to specifying what they wanted and let the system figure out how it was done (declarative).

Evolution of Infrastructure Teams

This shift from imperative to declarative approaches fundamentally changed how infrastructure teams operated. Let's look at what this evolution meant in practice.

In the early days, deploying infrastructure involved a lot of back-and-forth between developers and system administrators:

With declarative infrastructure, the interaction changed dramatically:

The key difference was that teams now had a common language - YAML configurations - that both developers and infrastructure engineers could understand and work with effectively. This shift to configuration-driven workflows revolutionized how infrastructure teams operated.

Current state of data pipelines

Data teams today face similar challenges to what infrastructure teams encountered in the 2000s. To understand these parallels, let's examine how data pipelines are typically built, using a common scenario: moving data from S3 to GCS, then loading it into BigQuery - a pattern you might use when you have transactional database backups in S3 but your analytics stack runs on BigQuery.

def transfer_s3_to_gcs(
    s3_bucket,
    s3_key,
    gcs_bucket,
    gcs_object_name,
    aws_conn_id='aws_default',
    gcp_conn_id='google_cloud_default'
):
    # Initialize hooks
    s3_hook = S3Hook(aws_conn_id=aws_conn_id)
    gcs_hook = GCSHook(gcp_conn_id=gcp_conn_id)

    # Create a temporary file
    with tempfile.NamedTemporaryFile() as temp_file:
        # Download file from S3
        s3_hook.download_file(
            key=s3_key,
            bucket_name=s3_bucket,
            local_path=temp_file.name
        )

        # Upload file to GCS
        gcs_hook.upload(
            bucket_name=gcs_bucket,
            object_name=gcs_object_name,
            filename=temp_file.name
        )

# Create DAG
dag = DAG(
    's3_to_gcs_transfer',
    default_args=default_args,
    description='Transfer files from S3 to GCS',
    schedule_interval='@daily',
    catchup=False
)

# Define the transfer task
transfer_task = PythonOperator(
    task_id='transfer_s3_to_gcs',
    python_callable=transfer_s3_to_gcs,
    op_kwargs={
        's3_bucket': 'your-s3-bucket-name',
        's3_key': 'path/to/your/file.csv',
        'gcs_bucket': 'your-gcs-bucket-name',
        'gcs_object_name': 'path/to/destination/file.csv'
    },
    dag=dag
)

# Load from GCS to BigQuery
load_to_bq = GCSToBigQueryOperator(
    task_id='load_to_bigquery',
    bucket='your-gcs-bucket',
    source_objects=['path/to/destination/file.csv'],
    destination_project_dataset_table='your-project:dataset.table'
)

# Set task dependencies (if you add more tasks)
transfer_task > load_to_bq

When comparing this approach to what we saw with the DevOps workflows, the code is imperative in nature. We tell the orchestrator how to perform each step in the data pipeline, instead of what needs to be done and letting it figure out the how.

Data team interactions

Unsurprisingly, the data team workflows within companies mirror the interactions between developers and sysadmins in the 2000s. When a data scientist or analyst needs a new pipeline, they have to coordinate with data engineers who are often juggling multiple priorities:

Reusability

Another significant downside of the current imperative approach in Airflow is the difficulty of code reuse. Looking at Airflow's abstraction layers, we can see why:

While Airflow provides hooks and operators that can be reused across DAGs, the tasks themselves must be written directly in the DAG file along with the business logic. Even if we need to create a new DAG that shares most of the same tasks but adds just one additional step, we have to rewrite all the task definitions in the new DAG file.

The tasks have to be defined in this DAG again, even though the transfer_s3_to_gcs task and the load_to_bigquery task are the same as the previous DAG.

Tight coupling

The real challenge becomes apparent when we need to update implementation details. Consider a scenario where we need to make our transfer process more scalable. The current approach, which might work for smaller files, will fail for file sizes exceeding the Airflow worker's memory. Even worse, it could stall or crash the worker, disrupting other executing DAGs.

Instead, to ensure the workload scales independently, we create an instance and use the instance's STARTUP_SCRIPT to transfer the files.

with DAG('s3_to_gcs_transfer',
         default_args=default_args,
         schedule_interval='@daily',
         catchup=False) as dag:

    # Create and start instance
    # Use the STARTUP_SCRIPT to transfer files in the instance
    create_instance = ComputeEngineInsertInstanceOperator(
        task_id='create_transfer_instance',
        project_id='your-project-id', 
        zone='us-central1-a',
        body={
            'name': 'transfer-instance-{{ ds_nodash }}',
            'machineType': 'n1-standard-2',
            'metadata': {
                'items': [
                    {'key': 'startup-script', 'value': STARTUP_SCRIPT},
                    {'key': 's3_bucket', 'value': 'your-s3-bucket'},
                    {'key': 's3_key', 'value': 'path/to/your/file'},
                    {'key': 'gcs_bucket', 'value': 'your-gcs-bucket'},
                    {'key': 'gcs_path', 'value': 'destination-path'}
                ]
            }
        }
    )

    # Delete instance
    delete_instance = ComputeEngineDeleteInstanceOperator(
        task_id='delete_transfer_instance',
        project_id='your-project-id',
        zone='us-central1-a',
        resource_id='transfer-instance-{{ ds_nodash }}',
        trigger_rule='all_done'
    )

    # Load from GCS to BigQuery
    load_to_bq = GCSToBigQueryOperator(
        task_id='load_to_bigquery',
        bucket='your-gcs-bucket',
        source_objects=['path/to/destination/file.csv'],
        destination_project_dataset_table='project:dataset.table'
    )

    create_instance >> delete_instance
    create_instance >> load_to_bq

When we need to update the DAGs to use this scalable approach, there isn't a clean way to do it. Every DAG using the transfer logic needs to be updated, tested, and redeployed individually. The refactoring effort multiplies with the number of files that need updating.

This limitation stems from Airflow's fundamental design pattern where business logic (the nodes and structure of the DAG) is tightly coupled with implementation logic (the actual task code). This coupling not only makes maintenance difficult but also creates a barrier between data engineers and other team members who could potentially define and modify pipelines themselves.

Declarative data pipelines

A declarative data platform could address these challenges by completely separating the technical implementation details (how to move data from S3 to GCS) from the pipeline business logic (which pipeline needs to move data from S3 to GCS). This separation would allow data engineers to focus on building robust, reusable components while enabling analysts and scientists to define pipelines without deep technical knowledge.

Config-based workflows

In summary, we can identify three key requirements for building an effective declarative data platform:

  1. Separation of Concerns: Pipeline definitions (what) should be completely separate from task implementations (how)

  2. Reusability: Tasks should be reusable across different pipelines without code duplication

  3. Simplified Interface: Teams should be able to define pipelines using a simple, declarative syntax

Here's how such a platform might work. Instead of writing Python code, you'd define your pipeline in YAML:

tasks:
  - name: daily_transfer
    task: s3_to_gcs
    params:
      s3_bucket: your-s3-bucket-name,
      s3_key: path/to/your/file.csv,
      gcs_bucket: your-gcs-bucket-name,
      gcs_object_name: path/to/destination/file.csv

  - name: daily_load_to_bq
    task: gcs_to_bq
    depends_on: s3_to_gcs
    params:
      gcs_bucket: your-gcs-bucket-name,
      gcs_object_name: path/to/destination/file.csv,
      bq_dataset: your-bq-dataset-name,
      bq_table: your-bq-table-name

The actual implementation of these tasks would be written in Python, but crucially, they would be independent of any specific pipeline. This separation allows data engineers to optimize and maintain task implementations without affecting pipeline definitions.

Benefits

This approach also offers other key advantages.

Reduced cognitive load

Declarative systems mirror how humans naturally think about problem-solving. When designing a new pipeline, we typically think top-down about what tasks need to be included. However, imperative systems like Airflow require bottom-up thinking, where you must detail every implementation step before building the pipeline.

Improved collaboration

Since creating and updating tasks are decoupled from business logic, teams can work more efficiently:

  • Data analysts can create and modify pipelines without deep technical knowledge

  • Data engineers can optimize task implementations without understanding pipeline-specific logic

  • Changes can be reviewed and deployed independently

UI and no-code tools

While this article proposes a YAML-based approach, visual workflow tools like AWS Glue offer similar declarative functionality. However, these UI-based tools come with some non-obvious limitations:

Limited DevOps Integration

Core software engineering practices like code review, testing, and automated deployments become difficult or impossible with UI-based tools.

Reduced Flexibility

While UI tools excel at common use cases, they often struggle with custom requirements. Teams frequently end up modifying their requirements to fit the tool's capabilities rather than the other way around.

AI & LLMs

The rise of Large Language Models (LLMs) presents interesting opportunities for declarative data platforms. LLMs excel at breaking down high-level instructions into logical steps but can struggle with generating efficient, maintainable implementation code.

A declarative platform could leverage these strengths by using LLMs to:

  • Help generate pipeline configurations from natural language descriptions

  • Suggest task combinations for common data patterns

  • Assist with pipeline documentation and metadata

This aligns well with LLMs' capabilities while avoiding their limitations around generating complex implementation code.

Implementation

With these challenges in mind, I built dagster-odp, as a proof-of-concept for bringing declarative pipelines to Dagster. Let's examine how this implementation addresses the problems we've discussed, starting with why Dagster was chosen as the foundation.

Why not Airflow

Two fundamental aspects of Airflow's design made it unsuitable for our needs:

  1. Task definition model: In Airflow, tasks can only exist within DAGs - they aren't independent, reusable components. This conflicts with our core requirement of having tasks be first-class citizens that can be reused across different pipelines. The tight coupling between tasks and DAGs makes it impossible to build a truly modular system.

  2. Dynamic DAG Creation: Our platform needs to create DAGs dynamically from YAML configurations. However, Airflow expects DAGs to be defined in Python files. Creating DAGs dynamically would mean generating Python code at runtime and writing this code to DAG files - an approach that's both complex and prone to errors.

Why Dagster?

Dagster's architecture aligns perfectly with our declarative approach, offering two key constructs that make it ideal for our platform:

Asset-Based Architecture

Instead of focusing on tasks like Airflow, Dagster uses assets as its primary abstraction. An asset represents a concrete piece of data - like a table, file, or model. Creating a new version of assets through a pipeline run is called asset materialization.

This approach means the code is directly linked to the data object it creates, enabling rich metadata when a new version of an asset is created. For instance, when running a pipeline in Dagster, your code can push detailed information about the data asset it produced to the UI. The metadata could include the asset's size, schema, quality metrics, and lineage. This tight coupling between code and data makes it easier to build observable, maintainable data pipelines.

Dynamic Definition Support

Dagster's component model is built around Python decorators rather than class inheritance, making it ideal for dynamic creation. For example:

@asset 
def dynamic_asset(context): 
    # Asset logic here 
    pass

This design means we can programmatically create assets from YAML configurations without generating code files - a critical requirement for our platform.

Additional Dagster features

Beyond its core architecture, Dagster provides several features that make it an ideal foundation for our declarative platform:

  1. Local Development: Dagster prioritizes developer experience by allowing local execution without complex setup. This means you can develop and test pipelines on your local machine with the same code that will run in production, leading to faster iteration cycles and simpler debugging.

  2. DBT Integration: Dagster's DBT integration automatically discovers DBT models in your repository. This deep integration means you can treat individual DBT models as Dagster assets, run specific model selections, and maintain a complete picture of your data lineage across both DBT and non-DBT transformations.

  3. Partitions: Dagster's partitioning system lets you process data in logical chunks, whether time-based (like daily runs) or categorical (like per-country processing). This is particularly powerful for backfills and incremental processing, as you can easily rerun specific partitions without touching others.

  4. Asset checks: Data quality monitoring is built into Dagster's core abstractions through asset checks. Rather than treating quality checks as an afterthought, Dagster allows you to define checks that run automatically after asset materialization, enabling you to catch data issues before they propagate downstream.

dagster-odp

To demonstrate how dagster-odp brings these concepts together, we'll implement the same S3 to BigQuery pipeline we discussed earlier, but using a declarative approach. The complete implementation consists of three main components: resource configuration, task definition, and workflow configuration.

Resource definition

First, we define our resource configuration. Resources in dagster-odp represent reusable connections and services, similar to Airflow hooks. While dagster-odp ships with pre-built GCP resources, we'll also create a custom S3 resource:

resources:
  - resource_kind: gcs
    params:
      project: my-gcp-project

  - resource_kind: s3

  - resource_kind: bigquery
    params:
      project: my-gcp-project
      location: us-east1

The S3 resource implementation shows how to create a custom resource:

@odp_resource("s3")
class S3Resource(ConfigurableResource):
    """A resource that provides access to Amazon S3."""

    def get_client(self) -> Any:
        return boto3.client(
        "s3",
        aws_access_key_id=EnvVar("AWS_ACCESS_KEY_ID").get_value(),
        aws_secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY").get_value(),
        )

We use the odp_resource decorator to define the resource class. This allows it to be discovered by the framework and used in the YAML file. We define one method, get_client, which we will use in our task definition.

Task definition

With our resources defined, we can create the S3 to GCS transfer task by implementing a BaseTask subclass.

from dagster_odp.tasks.manager import BaseTask, odp_task

@odp_task(
    "s3_file_to_gcs",
    required_resources=["gcs", "s3"]
)
class S3ToGCSTransfer(BaseTask):
    """
    A task that transfers a file from Amazon S3 to Google Cloud Storage.
    """
    s3_bucket: str
    s3_key: str
    destination_file_uri: str

    def run(self) -> dict:
        s3_client = self._resources["s3"].get_client()
        gcs_client = self._resources["gcs"]


        # Download file from S3
        s3_response = s3_client.get_object(Bucket=self.s3_bucket,
                                            Key=self.s3_key)
        file_content = s3_response["Body"].read()

        bucket_name = self.destination_file_uri[5:].split("/")[0]
        blob_name = "/".join(self.destination_file_uri[5:].split("/")[1:])

        # Upload to GCS
        bucket = gcs_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        blob.upload_from_string(file_content)

        return {
            "s3_bucket": self.s3_bucket,
            "s3_key": self.s3_key,
            "destination_file_uri": self.destination_file_uri,
            "size": len(file_content),
        }

The task implementation returns metadata that will be visible in the Dagster UI after each run.

Workflow definition

Finally, we tie everything together in our workflow configuration. This YAML file defines our assets, their dependencies, and scheduling:

assets:
  - asset_key: raw_data_gcs
    task_type: s3_file_to_gcs
    params:
      s3_bucket: my-source-bucket
      s3_key: data/daily_export.csv
      destination_file_uri: gs://my-gcs-bucket/data/daily_export.csv

  - asset_key: raw_data_bq
    task_type: gcs_file_to_bq
    depends_on: [raw_data_gcs]
    params:
      source_file_uri: "{{raw_data_gcs.destination_file_uri}}"
      destination_table_id: "my-project.my_dataset.daily_data"
      job_config_params:
        source_format: CSV
        write_disposition: WRITE_TRUNCATE
        autodetect: true

jobs:
  - job_id: daily_data_transfer
    description: "Transfer data from S3 to BigQuery daily"
    asset_selection: [raw_data_gcs, raw_data_bq]
    triggers:
      - trigger_id: daily_schedule
        trigger_type: schedule
        params:
          schedule_kind: cron
          schedule_params:
            cron_schedule: "@daily"

These components are converted by dagster-odp into Dagster constructs like assets, jobs and schedules.

dagster-odp Features

The example above demonstrates some key features of the framework:

  1. Asset dependencies through the depends_on field

  2. Metadata sharing between assets using handlebars syntax ({{raw_data_gcs.destination_file_uri}})

  3. Built-in scheduling with cron support

  4. Pre-built tasks for common operations like gcs_file_to_bq

The framework also provides several other capabilities:

  • Integration with DLT for data ingestion

  • Soda integration for data quality monitoring

  • DBT support with dependency management and variable passing

For a hands-on introduction to dagster-odp, check out the Getting Started guide. The guide walks through setting up your first project and building a simple pipeline using the concepts we've discussed.

The Move Towards Declarative Pipelines

dagster-odp isn't the only declarative data platform. Several platforms are exploring similar approaches, recognizing the need to make data pipelines more maintainable and accessible:

  1. Data aware scheduling: Airflow's data aware scheduling (v2.4+) introduces a concept similar to Dagster's assets, allowing DAGs to be dynamically chained together based on datasets. However, since DAGs must still be defined in Python files, creating these jobs dynamically from YAML remains challenging.

  2. DLT+: dlthub, an open source data ingestion tool, recently launched DLT+, offering end-to-end workflow definitions via YAML files. It's built specifically for data scientists, focusing on single-machine computation for both ingestion and processing tasks. To learn more, get in touch with them.

  3. Dagster: The Dagster team is working on their own declarative feature that will be deeply integrated into their orchestration product. To learn more or collaborate, jump into the Dagster Slack.

Conclusion

The evolution from imperative to declarative approaches in data engineering mirrors what we've seen in other domains like DevOps. As data pipelines become more complex, the need for maintainable, configurable, and collaborative solutions grows stronger. A declarative framework addresses these challenges by separating pipeline definitions from implementation details, enabling teams to work more efficiently while maintaining the flexibility needed for complex data workflows.

Whether you're building new pipelines or maintaining existing ones, considering a declarative approach could help make your data infrastructure more maintainable and your teams more productive.

0
Subscribe to my newsletter

Read articles from Jonathan Bhaskar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Jonathan Bhaskar
Jonathan Bhaskar