Integrando AWS Glue com Apache Kafka: Pipeline de Inventário em Tempo Real com Spark e Avro

Felipe RodriguesFelipe Rodrigues
10 min read

Neste artigo, vou compartilhar uma implementação real de integração entre AWS Glue e Apache Kafka para processamento de dados de inventário empresarial. Esta solução aborda desde a extração de movimentações de estoque de um PostgreSQL (RDS) até a publicação em tópicos Kafka com serialização Avro para sistemas downstream.

Visão Geral da Arquitetura

A arquitetura implementada segue um fluxo clássico de ETL distribuído, onde integramos diferentes tecnologias da AWS com o Apache Kafka para criar um pipeline robusto e escalável.

flowchart TD
    subgraph "Fontes de Dados"
        A[("🗄️ PostgreSQL RDS<br/>inventory_db<br/><i>Tabela: inventory_movements</i>")]
        E[("🔐 AWS Secrets Manager<br/><i>Credenciais & Config</i>")]
    end

    subgraph "Camada de Processamento"
        B[("⚡ AWS Glue Job<br/>Spark Distribuído<br/><i>4 Workers G.1X</i>")]
    end

    subgraph "Stream de Eventos"
        D[("📋 Schema Registry<br/><i>Validação Schema Avro</i>")]
        C[("🚀 Apache Kafka<br/>inventory.stock.updates.v1<br/><i>Eventos Tempo Real</i>")]
    end

    subgraph "Monitoramento"
        F[("📊 CloudWatch<br/><i>Logs & Métricas</i>")]
    end

    A -->|"Job Bookmarks<br/>Leitura Incremental"| B
    E -->|"Conexão Segura"| B
    B -->|"Buscar Schema Mais Recente"| D
    B -->|"Mensagens Serializadas<br/>em Avro"| C
    D -.->|"Validação de Schema"| C
    B -->|"Logs de Execução"| F

    classDef sourceStyle fill:#e1f5fe,stroke:#01579b,stroke-width:2px,color:#000
    classDef processStyle fill:#f3e5f5,stroke:#4a148c,stroke-width:2px,color:#000
    classDef streamStyle fill:#e8f5e8,stroke:#1b5e20,stroke-width:2px,color:#000
    classDef monitorStyle fill:#fff3e0,stroke:#e65100,stroke-width:2px,color:#000

    class A,E sourceStyle
    class B processStyle
    class C,D streamStyle
    class F monitorStyle

Componentes Principais

  • Origem dos Dados: Tabela inventory_movements no banco inventory_db

  • Processamento: AWS Glue com Apache Spark

  • Destino: Tópico Kafka inventory.stock.updates.v1

  • Serialização: Formato Avro com Schema Registry

Extração de Dados do RDS

A extração é implementada pela classe DataReader, que utiliza as melhores práticas de segurança e performance do AWS Glue.

Conexão Segura com PostgreSQL

dynamic_frame = glue_context.create_dynamic_frame.from_options(
    connection_type="postgresql",
    connection_options={
        "url": jdbc_url,
        "user": username,
        "password": password,
        "dbtable": "inventory_movements"
    }
)

As credenciais são recuperadas do AWS Secrets Manager (database-connection-secret), garantindo que informações sensíveis nunca sejam expostas no código.

Processamento Incremental com Job Bookmarks

Um dos recursos mais poderosos implementados é o Job Bookmarks do AWS Glue:

flowchart TD
    subgraph "Fluxo de Execução do Job"
        A[("🔖 Job Bookmark<br/><b>Último ID: 1000</b><br/><i>Processamento Incremental</i>")]
        B[("📝 Consulta SQL<br/><b>WHERE id > 1000</b><br/><i>Filtrar Novos Registros</i>")]
        C[("⚙️ Processamento Spark<br/><b>Apenas Dados Novos</b><br/><i>Transformação Distribuída</i>")]
        D[("✅ Atualizar Bookmark<br/><b>Novo Max ID: 1500</b><br/><i>Persistência de Estado</i>")]
    end

    A -->|"Gerar Filtro"| B
    B -->|"Executar Query"| C
    C -->|"Processamento Concluído"| D
    D -.->|"Próxima Execução"| A

    classDef bookmarkStyle fill:#e3f2fd,stroke:#0d47a1,stroke-width:3px,color:#000
    classDef queryStyle fill:#f1f8e9,stroke:#33691e,stroke-width:3px,color:#000
    classDef processStyle fill:#fce4ec,stroke:#880e4f,stroke-width:3px,color:#000
    classDef updateStyle fill:#fff8e1,stroke:#ff6f00,stroke-width:3px,color:#000

    class A bookmarkStyle
    class B queryStyle
    class C processStyle
    class D updateStyle

Vantagens do Job Bookmark:

  • 🚀 Performance: Reduz volume de dados em até 95%

  • 💰 Custo: Menor tempo de execução = menor custo

  • 🔄 Confiabilidade: Estado persistido automaticamente pelo Glue

Configuração:

  • Coluna monitorada: id

  • Ordenação: asc

  • Argumento: --job-bookmark-option: job-bookmark-enable

Esta abordagem reduz drasticamente o volume de dados processados, otimizando tempo e custos ao processar apenas registros novos ou modificados.

Processamento e Transformação com Spark

A classe DataProcessor implementa a lógica de negócio utilizando a API DataFrame do Spark de forma distribuída e declarativa.

Transformações Implementadas

def process_inventory_data(self, df):
    return df \
        .withColumn("product_code_normalized", F.upper(F.col("product_code"))) \
        .withColumn("warehouse_location", F.coalesce(F.col("warehouse"), F.lit("MAIN"))) \
        .withColumn("stock_status", 
            F.when(F.col("current_stock") <= F.col("reorder_point"), "LOW_STOCK")
            .when(F.col("current_stock") <= F.col("safety_stock"), "CRITICAL")
            .when(F.col("current_stock") >= F.col("max_stock"), "OVERSTOCK")
            .otherwise("NORMAL")
        ) \
        .withColumn("movement_value", F.col("quantity") * F.col("unit_cost")) \
        .withColumn("restock_needed", 
            F.when(F.col("current_stock") <= F.col("reorder_point"), 
                   F.col("max_stock") - F.col("current_stock"))
            .otherwise(F.lit(0))
        ) \
        .cast({"current_stock": "int", "movement_value": "decimal(10,2)"}) \
        .select("movement_id", "product_code_normalized", "warehouse_location", 
                "current_stock", "stock_status", "movement_value", "restock_needed", 
                "movement_timestamp")

Lógica de Negócio Aplicada:

  • Normalização: Códigos de produto em maiúsculo para consistência

  • Classificação de Status: Baseada em pontos de reposição e estoque de segurança

  • Cálculos Financeiros: Valor total da movimentação (quantidade × custo unitário)

  • Alertas Automáticos: Identificação de itens que precisam reposição

  • Tratamento de Nulos: Warehouse padrão para locais não especificados

Distribuição e Paralelização

O Spark particiona automaticamente o DataFrame e distribui as transformações entre os workers:

flowchart TD
    subgraph "Distribuição do Cluster Spark"
        A[("📊 DataFrame de Entrada<br/><b>1M Registros</b><br/><i>inventory_movements</i>")]
    end

    subgraph "Nós de Trabalho"
        B[("⚡ Worker 1<br/><b>Partições 1-4</b><br/><i>G.1X - 4 cores</i>")]
        C[("⚡ Worker 2<br/><b>Partições 5-8</b><br/><i>G.1X - 4 cores</i>")]
        D[("⚡ Worker 3<br/><b>Partições 9-12</b><br/><i>G.1X - 4 cores</i>")]
        E[("⚡ Worker 4<br/><b>Partições 13-16</b><br/><i>G.1X - 4 cores</i>")]
    end

    subgraph "Saída"
        F[("✨ Resultado Processado<br/><b>Dados Transformados</b><br/><i>Pronto para Kafka</i>")]
    end

    A -->|"Partição Automática"| B
    A -->|"Partição Automática"| C
    A -->|"Partição Automática"| D
    A -->|"Partição Automática"| E

    B -->|"Processamento Paralelo"| F
    C -->|"Processamento Paralelo"| F
    D -->|"Processamento Paralelo"| F
    E -->|"Processamento Paralelo"| F

    classDef inputStyle fill:#e8eaf6,stroke:#3f51b5,stroke-width:3px,color:#000
    classDef workerStyle fill:#e0f2f1,stroke:#00695c,stroke-width:3px,color:#000
    classDef outputStyle fill:#fff3e0,stroke:#ef6c00,stroke-width:3px,color:#000

    class A inputStyle
    class B,C,D,E workerStyle
    class F outputStyle

Paralelização Inteligente:

  • 🔄 16 Partições processadas simultaneamente

  • 4 Workers com 4 cores cada = 16 threads

  • 📈 Throughput: ~250K registros/worker em paralelo

Publicação no Apache Kafka

A integração com Kafka é implementada pela classe KafkaProducer, que resolve um desafio importante: a serialização Avro com Schema Registry usando o sink nativo do Spark.

Integração com Schema Registry

def get_latest_schema(self):
    response = requests.get(
        f"{self.schema_registry_url}/subjects/{self.topic}-value/versions/latest",
        auth=(self.username, self.password)
    )
    return response.json()

O sistema busca dinamicamente a versão mais recente, garantindo compatibilidade automática com mudanças de schema.

Serialização Avro com Confluent Wire Format

Uma das partes mais técnicas da implementação é a serialização manual no formato Confluent Wire:

def avro_confluent_serializer(row, schema, schema_id):
    # Serialização Avro
    avro_bytes = avro.io.BinaryEncoder(io.BytesIO())
    avro.io.DatumWriter(schema).write(row_dict, avro_bytes)

    # Confluent Wire Format: Magic Byte (1) + Schema ID (4) + Payload
    magic_byte = b'\x00'
    schema_id_bytes = struct.pack('>I', schema_id)

    return magic_byte + schema_id_bytes + avro_bytes.getvalue()

Fluxo de Serialização

flowchart TD
    subgraph "Pipeline de Serialização"
        A[("📋 Linha do DataFrame<br/><b>Dados de Inventário</b><br/><i>{product_id, stock, status}</i>")]
        G[("🏗️ Schema Registry<br/><b>Schema Mais Recente</b><br/><i>Lookup de Versão & ID</i>")]
    end

    subgraph "Processamento Avro"
        B[("🔄 Serialização Avro<br/><b>Codificação Binária</b><br/><i>Formato Compacto</i>")]
        C[("🎯 Magic Byte<br/><b>0x00</b><br/><i>Identificador Confluent</i>")]
        D[("🔢 Schema ID<br/><b>4 bytes</b><br/><i>Versão do Schema</i>")]
        E[("📦 Payload Avro<br/><b>Dados Binários</b><br/><i>Registro Serializado</i>")]
    end

    subgraph "Saída"
        F[("🚀 Mensagem Kafka<br/><b>Formato Wire Completo</b><br/><i>Pronto para Consumidores</i>")]
    end

    A -->|"Obter Schema"| G
    G -->|"Info do Schema"| B
    A -->|"Serializar"| B
    B --> C
    C --> D
    D --> E
    E --> F

    classDef inputStyle fill:#e8f5e8,stroke:#2e7d32,stroke-width:3px,color:#000
    classDef schemaStyle fill:#e3f2fd,stroke:#1565c0,stroke-width:3px,color:#000
    classDef processStyle fill:#fce4ec,stroke:#c2185b,stroke-width:3px,color:#000
    classDef outputStyle fill:#fff8e1,stroke:#f57c00,stroke-width:3px,color:#000

    class A inputStyle
    class G schemaStyle
    class B,C,D,E processStyle
    class F outputStyle

Estrutura do Wire Format:

  • 🎯 Magic Byte (0x00): Identificador do formato Confluent

  • 🔢 Schema ID (4 bytes): Permite evolução de schema

  • 📦 Payload: Dados binários Avro compactos

Este formato permite que consumidores identifiquem o schema usado e deserializem corretamente as mensagens.

Configuração de Ambientes

A solução é configurada para dois ambientes distintos com diferentes estratégias de execução:

AspectoHMLPRDJustificativa
TriggerON_DEMANDSCHEDULED (3x/dia)Testes manuais vs. automação
Bucket S3hml-generic-integration-dataprd-generic-integration-dataIsolamento de ambientes
WorkersG.1X, 4 Workers (Flex)G.1X, 4 Workers (Flex)Configuração consistente
BookmarksHabilitadoHabilitadoProcessamento incremental
gantt
    title 📅 Cronograma Diário de Sincronização - Ambiente de Produção
    dateFormat HH:mm
    axisFormat %H:%M

    section 🌙 Turno Noturno
    Processamento Noturno    :done, night, 02:00, 03:00

    section 🌅 Pico Matutino  
    Sincronização Meio-dia   :done, morning, 10:00, 11:00

    section 🌆 Fim do Expediente
    Fechamento Comercial     :done, evening, 18:00, 19:00

    section 📊 Detalhes de Processamento
    Volume Baixo            :crit, 02:00, 02:30
    Volume Médio            :active, 10:00, 10:45  
    Volume Alto             :milestone, 18:00, 18:30

Padrão de Execução:

  • 🌙 02:00-03:00: Transações noturnas + prep. do dia (Volume baixo)

  • 🌅 10:00-11:00: Sincronização vendas matinais (Volume médio)

  • 🌆 18:00-19:00: Processamento completo do dia comercial (Volume alto)

Adaptação por Cenário de Negócio

A frequência de 3 execuções diárias é adequada para operações B2B e manufatura, mas outros cenários podem exigir ajustes:

Tipo de NegócioFrequência RecomendadaJustificativa
🏭 Manufatura/B2B2-3x/diaMovimentações planejadas, menos críticas
🏪 Retail Tradicional4-6x/diaHorário comercial concentrado
🛒 E-commerce Alto VolumeA cada 15-30 minVendas contínuas, evitar overselling
📦 Logística/FulfillmentA cada 1-2 horasEntrada/saída constante de produtos

💡 Implementação Flexível: O mesmo job pode ser reconfigurado alterando apenas o cron expression no trigger, mantendo toda a lógica de processamento incremental intacta.

Otimizações de Performance

Configurações do Worker

  • WorkerType: G.1X (1 DPU, 4 vCPU, 16 GB RAM, 64 GB disco)

  • ExecutionClass: FLEX (usa capacidade sobressalente para reduzir custos)

  • Auto-scaling: Habilitado para ajuste dinâmico de recursos

Tuning do Spark

As configurações mais importantes para performance:

spark.dynamicAllocation.enabled: true
spark.dynamicAllocation.minExecutors: 2
spark.dynamicAllocation.maxExecutors: 8
spark.dynamicAllocation.initialExecutors: 4
spark.executor.cores: 4
spark.sql.shuffle.partitions: 16
spark.rpc.message.maxSize: 1024  # 1GB para grandes transferências
spark.shuffle.service.enabled: true  # Robustez com alocação dinâmica

Alocação Dinâmica de Recursos

flowchart TD
    subgraph "Fluxo de Auto Scaling"
        A[("🚀 Início do Job<br/><b>4 Executores</b><br/><i>Capacidade Inicial</i>")]
        B{{"🔍 Monitor de Carga<br/><b>CPU & Memória</b><br/><i>Análise Tempo Real</i>"}}
    end

    subgraph "Decisões de Escalonamento"
        C[("📈 Escalar Para Cima<br/><b>+2 Executores</b><br/><i>Máx: 8 Total</i>")]
        D[("📉 Escalar Para Baixo<br/><b>-1 Executor</b><br/><i>Mín: 2 Total</i>")]
        E[("⚖️ Manter<br/><b>Nível Atual</b><br/><i>Carga Ótima</i>")]
    end

    subgraph "Monitoramento"
        F[("🔄 Monitor Contínuo<br/><b>Uso de Recursos</b><br/><i>A cada 30s</i>")]
    end

    A --> B
    B -->|"Carga Alta"| C
    B -->|"Carga Baixa"| D  
    B -->|"Carga Normal"| E

    C --> F
    D --> F
    E --> F
    F --> B

    classDef startStyle fill:#e8f5e8,stroke:#388e3c,stroke-width:3px,color:#000
    classDef monitorStyle fill:#e3f2fd,stroke:#1976d2,stroke-width:3px,color:#000
    classDef scaleStyle fill:#fff3e0,stroke:#f57c00,stroke-width:3px,color:#000
    classDef maintainStyle fill:#fce4ec,stroke:#c2185b,stroke-width:3px,color:#000

    class A startStyle
    class B,F monitorStyle
    class C,D scaleStyle
    class E maintainStyle

🎯 Gerenciamento Inteligente de Recursos:

  • Faixa Dinâmica: 2-8 executores baseado na carga de trabalho

  • Otimização de Custos: Pague apenas pelos recursos utilizados

  • Performance: Auto-ajuste para picos de volume de dados

Monitoramento e Observabilidade

A solução inclui configurações abrangentes de monitoramento:

  • CloudWatch Logs: Logs contínuos habilitados

  • Métricas: Coleta detalhada para análise de performance

  • Headers Kafka: Metadados para rastreabilidade

Headers Padrão para Rastreabilidade

headers = {
    "source": "inventory-sync-job",
    "timestamp": str(int(time.time())),
    "job_run_id": glue_context.get_job_run_id(),
    "schema_version": str(schema_id),
    "warehouse": row["warehouse_location"],
    "movement_type": row["movement_type"]  # IN/OUT/ADJUSTMENT
}

Benefícios dos Headers:

  • Rastreabilidade: Identificação única de cada processamento

  • Debugging: Correlação entre mensagens e execuções do job

  • Roteamento: Consumidores podem filtrar por depósito específico

  • Versionamento: Controle de compatibilidade de schema

Benefícios da Arquitetura para Gestão de Inventário

Escalabilidade

  • Processamento distribuído para milhões de movimentações diárias

  • Alocação dinâmica ajusta recursos conforme volume de transações

  • Particionamento inteligente por warehouse e categoria de produto

Resiliência Operacional

  • Job Bookmarks garantem que falhas não causem reprocessamento desnecessário

  • Processamento incremental assegura que apenas novas movimentações sejam processadas

  • Retry automático em caso de falhas temporárias de rede ou sistema

Eficiência de Custos

  • ExecutionClass FLEX reduz custos usando capacidade sobressalente

  • Processamento incremental evita scan completo da base diariamente

  • Auto-scaling paga apenas pelos recursos efetivamente utilizados

Tempo Real para Negócio

  • Latência baixa entre movimentação física e atualização dos sistemas

  • Alertas automáticos de baixo estoque em tempo real

  • Sincronização multi-sistema via Kafka para ERP, WMS, e-commerce

Governança e Auditoria

  • Schema Registry garante consistência de dados entre sistemas

  • Headers padronizados permitem rastreabilidade completa

  • CloudWatch integration fornece métricas detalhadas de performance

Considerações Finais

Esta implementação demonstra como integrar efetivamente o AWS Glue com Apache Kafka para pipelines de inventário em tempo real, combinando o poder do processamento distribuído do Spark com a flexibilidade dos streams de eventos. A solução aborda desafios reais como:

  • Sincronização de inventário multi-warehouse em escala empresarial

  • Serialização Avro com Schema Registry para garantir compatibilidade

  • Alertas inteligentes baseados em regras de negócio (reorder points, safety stock)

  • Otimização de custos através de processamento incremental

Principais Lições Aprendidas

  1. Job Bookmarks são fundamentais para pipelines de inventário eficientes e evitam reprocessamento de milhões de movimentações

  2. Serialização manual pode ser necessária quando integrações específicas são requeridas (como Confluent Wire Format)

  3. Transformações de negócio complexas (status de estoque, cálculos de reposição) se beneficiam enormemente do processamento distribuído

  4. Configurações de tuning corretas podem reduzir o tempo de execução em até 60%

  5. Headers Kafka bem estruturados são essenciais para debugging e roteamento downstream

Casos de Uso Práticos

Esta arquitetura suporta cenários empresariais reais como:

  • E-commerce: Sincronização de estoque entre loja física e online

  • Supply Chain: Alertas automáticos para reposição de produtos críticos

  • Manufatura: Controle de matéria-prima e produtos acabados

  • Retail: Otimização de estoque sazonal e gestão multi-loja

A solução apresentada serve como base sólida para pipelines de inventário empresariais, oferecendo escalabilidade para processar milhões de transações diárias, confiabilidade para operações 24/7, e eficiência de custos através de otimizações inteligentes.


Este artigo foi baseado em uma implementação real de produção. Para mais detalhes sobre configurações específicas ou adaptações para seu caso de uso, sinta-se à vontade para entrar em contato.

1
Subscribe to my newsletter

Read articles from Felipe Rodrigues directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Felipe Rodrigues
Felipe Rodrigues