Breaking the Serverless Ceiling: Building a Durable AI Pipeline for Large Datasets

Carl KCarl K
7 min read

Serverless functions like AWS Lambda are ideal for scalable, low-maintenance data processing—until you hit the 'serverless durability paradox.' Their 15-minute runtime cap clashes with AI tasks that can take hours. Picture a CSV of 75,000 product descriptions needing SEO-optimized titles or key feature extraction. At 1-2 seconds per row, that’s over 20 hours—far beyond Lambda’s limit, even ignoring rate limits.

After testing various workarounds, I devised a solution that sidesteps Kubernetes or container orchestration.

The Orchestration Breakthrough

The key insight wasn't building a better serverless function, but creating a durable orchestration layer that maintains state while delegating compute-heavy work to ephemeral functions.

This architecture combines three technologies:

  1. Cloudflare Workflows handles orchestration and state management

  2. AWS Lambda performs compute-intensive but time-bounded tasks

  3. OpenAI's Batch API runs asynchronous AI operations

Here's a glimpse of the Workflow definition that makes this possible:

// Core workflow definition in Cloudflare Workflows
export class DatasetEnhancerWorkflow extends WorkflowEntrypoint<Env, EnhancerParams> {
  async run(event: WorkflowEvent<EnhancerParams>, step: WorkflowStep) {
    // Step 1: Start the batch job
    const batchInfo = await step.do(
      'start_batch_job',
      {
        retries: { limit: 3, delay: '5 seconds', backoff: 'exponential' }
      },
      async () => {
        // Lambda function call to prepare and submit the batch
        const response = await ky.post(`${this.LAMBDA_BASE_URL}/start`, {
          json: { csv_url: event.payload.csv_url, instructions: event.payload.instructions }
        });
        return await response.json();
      }
    );

    // Step 2: Poll for completion with increasing wait times
    let waitTime: '30 seconds' | '1 minute' | '2 minutes' | '5 minutes' = '30 seconds';
    let statusData = null;

    while (true) {
      if (statusData !== null) {
        await step.sleep(`waiting_${waitTime}`, waitTime);
        // Gradually increase wait time to reduce API calls
        if (waitTime === '30 seconds') waitTime = '1 minute';
        else if (waitTime === '1 minute') waitTime = '2 minutes';
        else if (waitTime === '2 minutes') waitTime = '5 minutes';
      }

      // Check status with automatic retries
      statusData = await step.do('check_status', { /* retry config */ }, 
        async () => { /* status check code */ });

      if (statusData.status === 'completed') break;
    }

    // Step 3: Process results when complete
    const results = await step.do('fetch_results', { /* retry config */ },
      async () => { /* result fetching code */ });

    return { success: true, enhanced_csv_url: results.enhanced_csv_url };
  }
}

I chose Cloudflare Workflows over AWS Step Functions because of its non state transitions based pricing and simpler JavaScript environment. For workflows that need to run for hours or days, these advantages are significant.

How It Actually Works

The system operates through this sequence:

What makes this work is Cloudflare Workflows’ ability to persist state across steps, even in case of failures. Unlike Lambda, a workflow can sleep between status checks, automatically retry failed steps, and maintain its state for hours or days.

The Technical Implementation

Under the hood, the Lambda function handles three main operations:

  1. Starting a batch - Downloads and analyzes the CSV, prepares batch requests for OpenAI

  2. Checking status - Acts as a proxy to the OpenAI Batch API

  3. Processing results - Merges AI outputs with the original data

Here's how we dynamically create batches for processing with the OpenAI API:

def get_messages_batch(
    indexed_csv_path: str,
    instructions: str,
    target_columns: list[str],
    enhancement_fields: Optional[list[EnhancementFieldConfig]] = None,
) -> list:
    """
    Generate batch messages for enhancement using Polars to read the indexed CSV.
    Creates one request per row that handles all target columns together.
    """
    # Create field descriptions for the prompt
    field_descriptions = ""
    if enhancement_fields:
        field_descriptions = "\n\nYou need to generate the following fields:\n"
        for field in enhancement_fields:
            field_descriptions += (
                f"- {field.name} ({field.type}): {field.description}\n"
            )

    # Read the indexed CSV with Polars
    df = pl.read_csv(indexed_csv_path)

    # Generate batch messages - one per row
    messages_batch = []
    for row in df.iter_rows(named=True):
        # Get row index from the indexed CSV
        row_index = row["_row_index"]

        # Compile all target column values
        target_values = {}
        for col in target_columns:
            value = str(row[col]) if row[col] is not None else ""
            target_values[col] = value

        # Skip if all target values are empty
        if all(not value for value in target_values.values()):
            continue

        # Build context from non-target columns
        context_items = []
        for col, value in row.items():
            if col != "_row_index" and col not in target_columns and value is not None:
                context_items.append(f"{col}: {value}")
        row_context = ", ".join(context_items)

        # Create a single message that includes all target columns
        system_content = """You are an AI assistant enhancing CSV data. 
                            Follow the user's instructions carefully to enhance 
                            multiple columns from a single row."""

        target_columns_text = "\n".join(
            [f"{col}: {target_values[col]}" for col in target_columns]
        )

        messages_batch.append(
            [
                {"role": "system", "content": system_content},
                {
                    "role": "user",
                    "content": (
                        f"Instructions: {instructions}{field_descriptions}\n\n"
                        f"Row data to enhance:\n{target_columns_text}\n\n"
                        f"Row context: {row_context}\n"
                        f"Row index: {row_index}"
                    ),
                },
            ]
        )

    return messages_batch

What made this implementation particularly effective:

  • Dynamic model generation using Pydantic, allowing flexible schema design based on the enhancement fields requested:

      def create_dynamic_model(field_configs):
          """Create a Pydantic model dynamically based on field configurations."""
          fields = {
              "row_index": (int, Field(..., description="Row index in the original CSV")),
          }
    
          for field in field_configs:
              # Map string type names to Python types
              python_type = {
                  "string": str,
                  "number": float,
                  "boolean": bool, 
                  "array": list[str]
              }.get(field.type, Any)
    
              fields[field.name] = (
                  python_type,
                  Field(..., description=field.description)
              )
    
          # Create model dynamically
          return create_model("DynamicEnhancementResult", **fields)
    
  • Smart batching that includes relevant context from other columns to improve AI outputs

  • Indexed CSV creation with DuckDB to ensure perfect alignment between source data and AI results

      def create_batch_job(
          file_path: str,
          instructions: str,
          target_columns: list[str],
          model: str = "gpt-4o",
          enhancement_fields: Optional[list[EnhancementFieldConfig]] = None,
      ) -> Tuple[str, dict]:
          """
          Create a batch job for CSV enhancement:
          - Use DuckDB to create an indexed CSV
          - Use Polars to read the indexed CSV and prepare the messages
          """
          try:
              # Step 1: Use DuckDB to create an indexed version of the CSV
              indexed_csv_path = create_indexed_csv(file_path)
    
              # Step 2: Generate batch messages using Polars to read the indexed CSV
              messages_batch = get_messages_batch(
                  indexed_csv_path, instructions, target_columns, enhancement_fields
              )
    
              # Step 3: Create the batch job
              temp_batch_file = tempfile.NamedTemporaryFile(delete=False, suffix=".jsonl")
    
              # Create the dynamic model
              response_model = create_dynamic_model(target_columns, enhancement_fields)
              logger.info(
                  f"Created dynamic model with fields: {list(response_model.model_fields.keys())}"
              )
    
              # Create batch job
              BatchJob.create_from_messages(
                  messages_batch=messages_batch,
                  model=model,
                  file_path=temp_batch_file.name,
                  response_model=response_model,
              )
    
              with open(temp_batch_file.name, "rb") as f:
                  batch_file = openai_client.files.create(file=f, purpose="batch")
    
              batch = openai_client.batches.create(
                  input_file_id=batch_file.id,
                  endpoint="/v1/chat/completions",
                  completion_window="24h",
                  metadata={
                      "description": f"CSV enhancement: {instructions[:50]}...",
                      "type": "csv_enhancement",
                  },
              )
    
              request_metadata = {
                  "batch_file_id": batch_file.id,
                  "instruction": instructions,
                  "model": model,
                  "indexed_csv_path": indexed_csv_path,  # Store the path to the indexed CSV
                  "total_messages": len(messages_batch),
                  "model_fields": list(response_model.model_fields.keys()),
              }
    
              os.unlink(temp_batch_file.name)
              # Note: We don't delete indexed_csv_path as we need it for processing later
    
              logger.info(
                  f"Created batch job: {batch.id} with {len(messages_batch)} requests"
              )
              return batch.id, request_metadata
          except Exception as e:
              logger.error(f"Error creating batch job: {str(e)}")
              raise
    
  • High-performance processing with Polars for efficient CSV manipulation

Real-World Results

I tested this architecture on a dataset of 75,000 celebrity news articles, interviews, and social media posts. The goal was to extract complex entity relationships (who worked with whom, relationships, business partnerships) and create a structured knowledge graph.

The results were impressive:

  • Completed in ~8 hours (vs. an estimated week with traditional approaches hitting rate-limits)

  • 50% cost reduction compared to synchronous API usage (batch API discounts)

  • 99.1% accuracy in relationship extraction (validated on a sample)

  • Zero monitoring needed during processing

This saved countless hours of manual work while producing a rich dataset of celebrity relationships spanning a decade, with every connection properly sourced and dated.

While building this, I encountered several challenges that required careful solutions:

  • State persistence - Made each step idempotent to allow safe retries

  • Error isolation - Failed rows are tracked separately without stopping the entire process

  • Recovery mechanisms - Each component can recover independently after failures

When to Use This Approach

This architecture is ideal for:

  • Processing thousands to millions of CSV rows with AI

  • Long-running batch operations (hours)

  • Situations where you need reliability without infrastructure management

It's less suited for:

  • Real-time processing needs

  • Very small datasets where simplicity trumps durability

  • Interactive data exploration workflows

Beyond CSV Processing

While I've focused on CSV enhancement, this pattern works for many other scenarios:

  • Document processing pipelines for contracts or reports

  • Media analysis workflows for images or audio

  • Research systems applying complex reasoning to large datasets

  • Product catalog enhancement for millions of items

This approach removes the usual 15-minute serverless limit, making AI-powered batch jobs more feasible.

Conclusion

This solution isn't revolutionary because it invents new technology—it's effective because it combines existing tools in a way that overcomes their individual limitations. By letting Cloudflare Workflows handle orchestration and state while delegating actual processing to Lambda and OpenAI (other model providers like Anthropic, Bedrock and soon Gemini support batch mode), we get the best of all worlds: durability, scalability, and minimal operational overhead.

In an era of rapidly advancing AI capabilities, our ability to apply those capabilities at scale often lags behind. This architecture helps close that gap by making industrial-scale AI processing accessible without industrial-scale infrastructure management.

Gist with lambda code: https://gist.github.com/cmakafui/95ae13d2d60968c81e98cbe946c80e6c

References

  1. https://developers.cloudflare.com/workflows/get-started/guide/

  2. https://platform.openai.com/docs/guides/batch

  3. https://github.com/instructor-ai/instructor/blob/main/docs/examples/batch_job_oai.md

0
Subscribe to my newsletter

Read articles from Carl K directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Carl K
Carl K