Integrando AWS Glue com Apache Kafka: Pipeline de Inventário em Tempo Real com Spark e Avro

Neste artigo, vou compartilhar uma implementação real de integração entre AWS Glue e Apache Kafka para processamento de dados de inventário empresarial. Esta solução aborda desde a extração de movimentações de estoque de um PostgreSQL (RDS) até a publicação em tópicos Kafka com serialização Avro para sistemas downstream.
Visão Geral da Arquitetura
A arquitetura implementada segue um fluxo clássico de ETL distribuído, onde integramos diferentes tecnologias da AWS com o Apache Kafka para criar um pipeline robusto e escalável.
flowchart TD
subgraph "Fontes de Dados"
A[("🗄️ PostgreSQL RDS<br/>inventory_db<br/><i>Tabela: inventory_movements</i>")]
E[("🔐 AWS Secrets Manager<br/><i>Credenciais & Config</i>")]
end
subgraph "Camada de Processamento"
B[("⚡ AWS Glue Job<br/>Spark Distribuído<br/><i>4 Workers G.1X</i>")]
end
subgraph "Stream de Eventos"
D[("📋 Schema Registry<br/><i>Validação Schema Avro</i>")]
C[("🚀 Apache Kafka<br/>inventory.stock.updates.v1<br/><i>Eventos Tempo Real</i>")]
end
subgraph "Monitoramento"
F[("📊 CloudWatch<br/><i>Logs & Métricas</i>")]
end
A -->|"Job Bookmarks<br/>Leitura Incremental"| B
E -->|"Conexão Segura"| B
B -->|"Buscar Schema Mais Recente"| D
B -->|"Mensagens Serializadas<br/>em Avro"| C
D -.->|"Validação de Schema"| C
B -->|"Logs de Execução"| F
classDef sourceStyle fill:#e1f5fe,stroke:#01579b,stroke-width:2px,color:#000
classDef processStyle fill:#f3e5f5,stroke:#4a148c,stroke-width:2px,color:#000
classDef streamStyle fill:#e8f5e8,stroke:#1b5e20,stroke-width:2px,color:#000
classDef monitorStyle fill:#fff3e0,stroke:#e65100,stroke-width:2px,color:#000
class A,E sourceStyle
class B processStyle
class C,D streamStyle
class F monitorStyle
Componentes Principais
Origem dos Dados: Tabela
inventory_movements
no bancoinventory_db
Processamento: AWS Glue com Apache Spark
Destino: Tópico Kafka
inventory.stock.updates.v1
Serialização: Formato Avro com Schema Registry
Extração de Dados do RDS
A extração é implementada pela classe DataReader
, que utiliza as melhores práticas de segurança e performance do AWS Glue.
Conexão Segura com PostgreSQL
dynamic_frame = glue_context.create_dynamic_frame.from_options(
connection_type="postgresql",
connection_options={
"url": jdbc_url,
"user": username,
"password": password,
"dbtable": "inventory_movements"
}
)
As credenciais são recuperadas do AWS Secrets Manager (database-connection-secret
), garantindo que informações sensíveis nunca sejam expostas no código.
Processamento Incremental com Job Bookmarks
Um dos recursos mais poderosos implementados é o Job Bookmarks do AWS Glue:
flowchart TD
subgraph "Fluxo de Execução do Job"
A[("🔖 Job Bookmark<br/><b>Último ID: 1000</b><br/><i>Processamento Incremental</i>")]
B[("📝 Consulta SQL<br/><b>WHERE id > 1000</b><br/><i>Filtrar Novos Registros</i>")]
C[("⚙️ Processamento Spark<br/><b>Apenas Dados Novos</b><br/><i>Transformação Distribuída</i>")]
D[("✅ Atualizar Bookmark<br/><b>Novo Max ID: 1500</b><br/><i>Persistência de Estado</i>")]
end
A -->|"Gerar Filtro"| B
B -->|"Executar Query"| C
C -->|"Processamento Concluído"| D
D -.->|"Próxima Execução"| A
classDef bookmarkStyle fill:#e3f2fd,stroke:#0d47a1,stroke-width:3px,color:#000
classDef queryStyle fill:#f1f8e9,stroke:#33691e,stroke-width:3px,color:#000
classDef processStyle fill:#fce4ec,stroke:#880e4f,stroke-width:3px,color:#000
classDef updateStyle fill:#fff8e1,stroke:#ff6f00,stroke-width:3px,color:#000
class A bookmarkStyle
class B queryStyle
class C processStyle
class D updateStyle
Vantagens do Job Bookmark:
🚀 Performance: Reduz volume de dados em até 95%
💰 Custo: Menor tempo de execução = menor custo
🔄 Confiabilidade: Estado persistido automaticamente pelo Glue
Configuração:
Coluna monitorada:
id
Ordenação:
asc
Argumento:
--job-bookmark-option: job-bookmark-enable
Esta abordagem reduz drasticamente o volume de dados processados, otimizando tempo e custos ao processar apenas registros novos ou modificados.
Processamento e Transformação com Spark
A classe DataProcessor
implementa a lógica de negócio utilizando a API DataFrame do Spark de forma distribuída e declarativa.
Transformações Implementadas
def process_inventory_data(self, df):
return df \
.withColumn("product_code_normalized", F.upper(F.col("product_code"))) \
.withColumn("warehouse_location", F.coalesce(F.col("warehouse"), F.lit("MAIN"))) \
.withColumn("stock_status",
F.when(F.col("current_stock") <= F.col("reorder_point"), "LOW_STOCK")
.when(F.col("current_stock") <= F.col("safety_stock"), "CRITICAL")
.when(F.col("current_stock") >= F.col("max_stock"), "OVERSTOCK")
.otherwise("NORMAL")
) \
.withColumn("movement_value", F.col("quantity") * F.col("unit_cost")) \
.withColumn("restock_needed",
F.when(F.col("current_stock") <= F.col("reorder_point"),
F.col("max_stock") - F.col("current_stock"))
.otherwise(F.lit(0))
) \
.cast({"current_stock": "int", "movement_value": "decimal(10,2)"}) \
.select("movement_id", "product_code_normalized", "warehouse_location",
"current_stock", "stock_status", "movement_value", "restock_needed",
"movement_timestamp")
Lógica de Negócio Aplicada:
Normalização: Códigos de produto em maiúsculo para consistência
Classificação de Status: Baseada em pontos de reposição e estoque de segurança
Cálculos Financeiros: Valor total da movimentação (quantidade × custo unitário)
Alertas Automáticos: Identificação de itens que precisam reposição
Tratamento de Nulos: Warehouse padrão para locais não especificados
Distribuição e Paralelização
O Spark particiona automaticamente o DataFrame e distribui as transformações entre os workers:
flowchart TD
subgraph "Distribuição do Cluster Spark"
A[("📊 DataFrame de Entrada<br/><b>1M Registros</b><br/><i>inventory_movements</i>")]
end
subgraph "Nós de Trabalho"
B[("⚡ Worker 1<br/><b>Partições 1-4</b><br/><i>G.1X - 4 cores</i>")]
C[("⚡ Worker 2<br/><b>Partições 5-8</b><br/><i>G.1X - 4 cores</i>")]
D[("⚡ Worker 3<br/><b>Partições 9-12</b><br/><i>G.1X - 4 cores</i>")]
E[("⚡ Worker 4<br/><b>Partições 13-16</b><br/><i>G.1X - 4 cores</i>")]
end
subgraph "Saída"
F[("✨ Resultado Processado<br/><b>Dados Transformados</b><br/><i>Pronto para Kafka</i>")]
end
A -->|"Partição Automática"| B
A -->|"Partição Automática"| C
A -->|"Partição Automática"| D
A -->|"Partição Automática"| E
B -->|"Processamento Paralelo"| F
C -->|"Processamento Paralelo"| F
D -->|"Processamento Paralelo"| F
E -->|"Processamento Paralelo"| F
classDef inputStyle fill:#e8eaf6,stroke:#3f51b5,stroke-width:3px,color:#000
classDef workerStyle fill:#e0f2f1,stroke:#00695c,stroke-width:3px,color:#000
classDef outputStyle fill:#fff3e0,stroke:#ef6c00,stroke-width:3px,color:#000
class A inputStyle
class B,C,D,E workerStyle
class F outputStyle
Paralelização Inteligente:
🔄 16 Partições processadas simultaneamente
⚡ 4 Workers com 4 cores cada = 16 threads
📈 Throughput: ~250K registros/worker em paralelo
Publicação no Apache Kafka
A integração com Kafka é implementada pela classe KafkaProducer
, que resolve um desafio importante: a serialização Avro com Schema Registry usando o sink nativo do Spark.
Integração com Schema Registry
def get_latest_schema(self):
response = requests.get(
f"{self.schema_registry_url}/subjects/{self.topic}-value/versions/latest",
auth=(self.username, self.password)
)
return response.json()
O sistema busca dinamicamente a versão mais recente, garantindo compatibilidade automática com mudanças de schema.
Serialização Avro com Confluent Wire Format
Uma das partes mais técnicas da implementação é a serialização manual no formato Confluent Wire:
def avro_confluent_serializer(row, schema, schema_id):
# Serialização Avro
avro_bytes = avro.io.BinaryEncoder(io.BytesIO())
avro.io.DatumWriter(schema).write(row_dict, avro_bytes)
# Confluent Wire Format: Magic Byte (1) + Schema ID (4) + Payload
magic_byte = b'\x00'
schema_id_bytes = struct.pack('>I', schema_id)
return magic_byte + schema_id_bytes + avro_bytes.getvalue()
Fluxo de Serialização
flowchart TD
subgraph "Pipeline de Serialização"
A[("📋 Linha do DataFrame<br/><b>Dados de Inventário</b><br/><i>{product_id, stock, status}</i>")]
G[("🏗️ Schema Registry<br/><b>Schema Mais Recente</b><br/><i>Lookup de Versão & ID</i>")]
end
subgraph "Processamento Avro"
B[("🔄 Serialização Avro<br/><b>Codificação Binária</b><br/><i>Formato Compacto</i>")]
C[("🎯 Magic Byte<br/><b>0x00</b><br/><i>Identificador Confluent</i>")]
D[("🔢 Schema ID<br/><b>4 bytes</b><br/><i>Versão do Schema</i>")]
E[("📦 Payload Avro<br/><b>Dados Binários</b><br/><i>Registro Serializado</i>")]
end
subgraph "Saída"
F[("🚀 Mensagem Kafka<br/><b>Formato Wire Completo</b><br/><i>Pronto para Consumidores</i>")]
end
A -->|"Obter Schema"| G
G -->|"Info do Schema"| B
A -->|"Serializar"| B
B --> C
C --> D
D --> E
E --> F
classDef inputStyle fill:#e8f5e8,stroke:#2e7d32,stroke-width:3px,color:#000
classDef schemaStyle fill:#e3f2fd,stroke:#1565c0,stroke-width:3px,color:#000
classDef processStyle fill:#fce4ec,stroke:#c2185b,stroke-width:3px,color:#000
classDef outputStyle fill:#fff8e1,stroke:#f57c00,stroke-width:3px,color:#000
class A inputStyle
class G schemaStyle
class B,C,D,E processStyle
class F outputStyle
Estrutura do Wire Format:
🎯 Magic Byte (0x00): Identificador do formato Confluent
🔢 Schema ID (4 bytes): Permite evolução de schema
📦 Payload: Dados binários Avro compactos
Este formato permite que consumidores identifiquem o schema usado e deserializem corretamente as mensagens.
Configuração de Ambientes
A solução é configurada para dois ambientes distintos com diferentes estratégias de execução:
Aspecto | HML | PRD | Justificativa |
Trigger | ON_DEMAND | SCHEDULED (3x/dia) | Testes manuais vs. automação |
Bucket S3 | hml-generic-integration-data | prd-generic-integration-data | Isolamento de ambientes |
Workers | G.1X, 4 Workers (Flex) | G.1X, 4 Workers (Flex) | Configuração consistente |
Bookmarks | Habilitado | Habilitado | Processamento incremental |
gantt
title 📅 Cronograma Diário de Sincronização - Ambiente de Produção
dateFormat HH:mm
axisFormat %H:%M
section 🌙 Turno Noturno
Processamento Noturno :done, night, 02:00, 03:00
section 🌅 Pico Matutino
Sincronização Meio-dia :done, morning, 10:00, 11:00
section 🌆 Fim do Expediente
Fechamento Comercial :done, evening, 18:00, 19:00
section 📊 Detalhes de Processamento
Volume Baixo :crit, 02:00, 02:30
Volume Médio :active, 10:00, 10:45
Volume Alto :milestone, 18:00, 18:30
Padrão de Execução:
🌙 02:00-03:00: Transações noturnas + prep. do dia (Volume baixo)
🌅 10:00-11:00: Sincronização vendas matinais (Volume médio)
🌆 18:00-19:00: Processamento completo do dia comercial (Volume alto)
Adaptação por Cenário de Negócio
A frequência de 3 execuções diárias é adequada para operações B2B e manufatura, mas outros cenários podem exigir ajustes:
Tipo de Negócio | Frequência Recomendada | Justificativa |
🏭 Manufatura/B2B | 2-3x/dia | Movimentações planejadas, menos críticas |
🏪 Retail Tradicional | 4-6x/dia | Horário comercial concentrado |
🛒 E-commerce Alto Volume | A cada 15-30 min | Vendas contínuas, evitar overselling |
📦 Logística/Fulfillment | A cada 1-2 horas | Entrada/saída constante de produtos |
💡 Implementação Flexível: O mesmo job pode ser reconfigurado alterando apenas o cron
expression no trigger, mantendo toda a lógica de processamento incremental intacta.
Otimizações de Performance
Configurações do Worker
WorkerType: G.1X (1 DPU, 4 vCPU, 16 GB RAM, 64 GB disco)
ExecutionClass: FLEX (usa capacidade sobressalente para reduzir custos)
Auto-scaling: Habilitado para ajuste dinâmico de recursos
Tuning do Spark
As configurações mais importantes para performance:
spark.dynamicAllocation.enabled: true
spark.dynamicAllocation.minExecutors: 2
spark.dynamicAllocation.maxExecutors: 8
spark.dynamicAllocation.initialExecutors: 4
spark.executor.cores: 4
spark.sql.shuffle.partitions: 16
spark.rpc.message.maxSize: 1024 # 1GB para grandes transferências
spark.shuffle.service.enabled: true # Robustez com alocação dinâmica
Alocação Dinâmica de Recursos
flowchart TD
subgraph "Fluxo de Auto Scaling"
A[("🚀 Início do Job<br/><b>4 Executores</b><br/><i>Capacidade Inicial</i>")]
B{{"🔍 Monitor de Carga<br/><b>CPU & Memória</b><br/><i>Análise Tempo Real</i>"}}
end
subgraph "Decisões de Escalonamento"
C[("📈 Escalar Para Cima<br/><b>+2 Executores</b><br/><i>Máx: 8 Total</i>")]
D[("📉 Escalar Para Baixo<br/><b>-1 Executor</b><br/><i>Mín: 2 Total</i>")]
E[("⚖️ Manter<br/><b>Nível Atual</b><br/><i>Carga Ótima</i>")]
end
subgraph "Monitoramento"
F[("🔄 Monitor Contínuo<br/><b>Uso de Recursos</b><br/><i>A cada 30s</i>")]
end
A --> B
B -->|"Carga Alta"| C
B -->|"Carga Baixa"| D
B -->|"Carga Normal"| E
C --> F
D --> F
E --> F
F --> B
classDef startStyle fill:#e8f5e8,stroke:#388e3c,stroke-width:3px,color:#000
classDef monitorStyle fill:#e3f2fd,stroke:#1976d2,stroke-width:3px,color:#000
classDef scaleStyle fill:#fff3e0,stroke:#f57c00,stroke-width:3px,color:#000
classDef maintainStyle fill:#fce4ec,stroke:#c2185b,stroke-width:3px,color:#000
class A startStyle
class B,F monitorStyle
class C,D scaleStyle
class E maintainStyle
🎯 Gerenciamento Inteligente de Recursos:
Faixa Dinâmica: 2-8 executores baseado na carga de trabalho
Otimização de Custos: Pague apenas pelos recursos utilizados
Performance: Auto-ajuste para picos de volume de dados
Monitoramento e Observabilidade
A solução inclui configurações abrangentes de monitoramento:
CloudWatch Logs: Logs contínuos habilitados
Métricas: Coleta detalhada para análise de performance
Headers Kafka: Metadados para rastreabilidade
Headers Padrão para Rastreabilidade
headers = {
"source": "inventory-sync-job",
"timestamp": str(int(time.time())),
"job_run_id": glue_context.get_job_run_id(),
"schema_version": str(schema_id),
"warehouse": row["warehouse_location"],
"movement_type": row["movement_type"] # IN/OUT/ADJUSTMENT
}
Benefícios dos Headers:
Rastreabilidade: Identificação única de cada processamento
Debugging: Correlação entre mensagens e execuções do job
Roteamento: Consumidores podem filtrar por depósito específico
Versionamento: Controle de compatibilidade de schema
Benefícios da Arquitetura para Gestão de Inventário
Escalabilidade
Processamento distribuído para milhões de movimentações diárias
Alocação dinâmica ajusta recursos conforme volume de transações
Particionamento inteligente por warehouse e categoria de produto
Resiliência Operacional
Job Bookmarks garantem que falhas não causem reprocessamento desnecessário
Processamento incremental assegura que apenas novas movimentações sejam processadas
Retry automático em caso de falhas temporárias de rede ou sistema
Eficiência de Custos
ExecutionClass FLEX reduz custos usando capacidade sobressalente
Processamento incremental evita scan completo da base diariamente
Auto-scaling paga apenas pelos recursos efetivamente utilizados
Tempo Real para Negócio
Latência baixa entre movimentação física e atualização dos sistemas
Alertas automáticos de baixo estoque em tempo real
Sincronização multi-sistema via Kafka para ERP, WMS, e-commerce
Governança e Auditoria
Schema Registry garante consistência de dados entre sistemas
Headers padronizados permitem rastreabilidade completa
CloudWatch integration fornece métricas detalhadas de performance
Considerações Finais
Esta implementação demonstra como integrar efetivamente o AWS Glue com Apache Kafka para pipelines de inventário em tempo real, combinando o poder do processamento distribuído do Spark com a flexibilidade dos streams de eventos. A solução aborda desafios reais como:
Sincronização de inventário multi-warehouse em escala empresarial
Serialização Avro com Schema Registry para garantir compatibilidade
Alertas inteligentes baseados em regras de negócio (reorder points, safety stock)
Otimização de custos através de processamento incremental
Principais Lições Aprendidas
Job Bookmarks são fundamentais para pipelines de inventário eficientes e evitam reprocessamento de milhões de movimentações
Serialização manual pode ser necessária quando integrações específicas são requeridas (como Confluent Wire Format)
Transformações de negócio complexas (status de estoque, cálculos de reposição) se beneficiam enormemente do processamento distribuído
Configurações de tuning corretas podem reduzir o tempo de execução em até 60%
Headers Kafka bem estruturados são essenciais para debugging e roteamento downstream
Casos de Uso Práticos
Esta arquitetura suporta cenários empresariais reais como:
E-commerce: Sincronização de estoque entre loja física e online
Supply Chain: Alertas automáticos para reposição de produtos críticos
Manufatura: Controle de matéria-prima e produtos acabados
Retail: Otimização de estoque sazonal e gestão multi-loja
A solução apresentada serve como base sólida para pipelines de inventário empresariais, oferecendo escalabilidade para processar milhões de transações diárias, confiabilidade para operações 24/7, e eficiência de custos através de otimizações inteligentes.
Este artigo foi baseado em uma implementação real de produção. Para mais detalhes sobre configurações específicas ou adaptações para seu caso de uso, sinta-se à vontade para entrar em contato.
Subscribe to my newsletter
Read articles from Felipe Rodrigues directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
