Building a Real-Time Data Pipeline with Kafka and PySpark on AWS

Introduction

Hey there! ๐Ÿ‘‹ I'm excited to share my journey of building a real-time data pipeline using Apache Kafka for streaming data ingestion and PySpark for processing. This project leverages AWS for deployment and showcases my passion for data engineering and cloud computing. If you're looking to get your hands dirty with some cutting-edge technologies, this project is a great start!

Project Overview

This project demonstrates how to set up a real-time data pipeline using Kafka for data ingestion and PySpark for data processing. We'll deploy everything on AWS and automate the deployment process with CI/CD.

Directory Structure

Here's a sneak peek into the directory structure of our project:

Real-Time-Data-Pipeline/
โ”‚
โ”œโ”€โ”€ data/
โ”‚   โ””โ”€โ”€ sample_data.txt
โ”‚
โ”œโ”€โ”€ kafka/
โ”‚   โ”œโ”€โ”€ kafka_producer.py
โ”‚   โ””โ”€โ”€ kafka_consumer.py
โ”‚
โ”œโ”€โ”€ pyspark/
โ”‚   โ””โ”€โ”€ pyspark_processing.py
โ”‚
โ””โ”€โ”€ scripts/
    โ”œโ”€โ”€ setup_kafka.ps1
    โ””โ”€โ”€ deploy_pipeline.ps1
README.md
LICENSE

Setting Up Kafka

We'll use AWS Managed Streaming for Apache Kafka (MSK) for our Kafka cluster. Here's a step-by-step guide to get it up and running:

  1. Create an MSK Cluster:

    • Navigate to the Amazon MSK console.

    • Click on Create cluster.

    • Choose the Custom create option.

    • Configure your cluster settings and click Create cluster.

  2. Configure IAM Policies:

    • Create an IAM policy for MSK.

    • Attach this policy to the roles that will access the MSK cluster.

Kafka Producer and Consumer

We'll create a simple Kafka producer and consumer in Python.

kafka_producer.py:

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers='your_msk_bootstrap_servers',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

data = {"event": "test_event", "value": 100}
producer.send('test_topic', value=data)
producer.flush()
producer.close()

kafka_consumer.py:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'test_topic',
    bootstrap_servers='your_msk_bootstrap_servers',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
    print(message.value)

PySpark Processing

We'll use PySpark to process the data ingested by Kafka.

pyspark_processing.py:

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col

spark = SparkSession.builder.appName("KafkaPySpark").getOrCreate()

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "your_msk_bootstrap_servers") \
    .option("subscribe", "test_topic") \
    .load()

df = df.selectExpr("CAST(value AS STRING)")

schema = "event STRING, value INT"
json_df = df.select(from_json(col("value"), schema).alias("data")).select("data.*")

query = json_df.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

Deployment with CI/CD

We'll use GitLab CI/CD to automate the deployment process.

.gitlab-ci.yml:

stages:
  - deploy

deploy:
  stage: deploy
  script:
    - ./scripts/deploy_pipeline.ps1

Conclusion

This project has been an incredible learning journey, and I'm thrilled to share it with you. By combining Kafka, PySpark, and AWS, we can create powerful real-time data processing pipelines. I hope this guide helps you get started with your own projects. Feel free to reach out if you have any questions or need further assistance!

Happy coding! ๐Ÿš€


I hope this blog post inspires you to dive into the world of real-time data processing. Let's keep pushing the boundaries of what's possible with data engineering and cloud computing!


You can find the full code and detailed instructions in my GitHub repository.

Enjoy building and optimizing your real-time data pipelines!


If you liked this project, consider starring the repository on GitHub and sharing it with your network!

1
Subscribe to my newsletter

Read articles from Tanishka Marrott directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Tanishka Marrott
Tanishka Marrott

I'm a results-oriented cloud architect passionate about designing resilient cloud solutions. I specialize in building scalable architectures that meet business needs and are agile. With a strong focus on scalability, performance, and security, I ensure solutions are adaptable. My DevSecOps foundation allows me to embed security into CI/CD pipelines, optimizing deployments for security and efficiency. At Quantiphi, I led security initiatives, boosting compliance from 65% to 90%. Expertise in data engineering, system design, serverless solutions, and real-time data analytics drives my enthusiasm for transforming ideas into impactful solutions. I'm dedicated to refining cloud infrastructures and continuously improving designs. If our goals align, feel free to message me. I'd be happy to connect!