Building a Real-Time Data Pipeline with Kafka and PySpark on AWS
Introduction
Hey there! ๐ I'm excited to share my journey of building a real-time data pipeline using Apache Kafka for streaming data ingestion and PySpark for processing. This project leverages AWS for deployment and showcases my passion for data engineering and cloud computing. If you're looking to get your hands dirty with some cutting-edge technologies, this project is a great start!
Project Overview
This project demonstrates how to set up a real-time data pipeline using Kafka for data ingestion and PySpark for data processing. We'll deploy everything on AWS and automate the deployment process with CI/CD.
Directory Structure
Here's a sneak peek into the directory structure of our project:
Real-Time-Data-Pipeline/
โ
โโโ data/
โ โโโ sample_data.txt
โ
โโโ kafka/
โ โโโ kafka_producer.py
โ โโโ kafka_consumer.py
โ
โโโ pyspark/
โ โโโ pyspark_processing.py
โ
โโโ scripts/
โโโ setup_kafka.ps1
โโโ deploy_pipeline.ps1
README.md
LICENSE
Setting Up Kafka
We'll use AWS Managed Streaming for Apache Kafka (MSK) for our Kafka cluster. Here's a step-by-step guide to get it up and running:
Create an MSK Cluster:
Navigate to the Amazon MSK console.
Click on Create cluster.
Choose the Custom create option.
Configure your cluster settings and click Create cluster.
Configure IAM Policies:
Create an IAM policy for MSK.
Attach this policy to the roles that will access the MSK cluster.
Kafka Producer and Consumer
We'll create a simple Kafka producer and consumer in Python.
kafka_producer.py:
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers='your_msk_bootstrap_servers',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
data = {"event": "test_event", "value": 100}
producer.send('test_topic', value=data)
producer.flush()
producer.close()
kafka_consumer.py:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'test_topic',
bootstrap_servers='your_msk_bootstrap_servers',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
print(message.value)
PySpark Processing
We'll use PySpark to process the data ingested by Kafka.
pyspark_processing.py:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
spark = SparkSession.builder.appName("KafkaPySpark").getOrCreate()
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "your_msk_bootstrap_servers") \
.option("subscribe", "test_topic") \
.load()
df = df.selectExpr("CAST(value AS STRING)")
schema = "event STRING, value INT"
json_df = df.select(from_json(col("value"), schema).alias("data")).select("data.*")
query = json_df.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
Deployment with CI/CD
We'll use GitLab CI/CD to automate the deployment process.
.gitlab-ci.yml:
stages:
- deploy
deploy:
stage: deploy
script:
- ./scripts/deploy_pipeline.ps1
Conclusion
This project has been an incredible learning journey, and I'm thrilled to share it with you. By combining Kafka, PySpark, and AWS, we can create powerful real-time data processing pipelines. I hope this guide helps you get started with your own projects. Feel free to reach out if you have any questions or need further assistance!
Happy coding! ๐
I hope this blog post inspires you to dive into the world of real-time data processing. Let's keep pushing the boundaries of what's possible with data engineering and cloud computing!
You can find the full code and detailed instructions in my GitHub repository.
Enjoy building and optimizing your real-time data pipelines!
If you liked this project, consider starring the repository on GitHub and sharing it with your network!
Subscribe to my newsletter
Read articles from Tanishka Marrott directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Tanishka Marrott
Tanishka Marrott
I'm a results-oriented cloud architect passionate about designing resilient cloud solutions. I specialize in building scalable architectures that meet business needs and are agile. With a strong focus on scalability, performance, and security, I ensure solutions are adaptable. My DevSecOps foundation allows me to embed security into CI/CD pipelines, optimizing deployments for security and efficiency. At Quantiphi, I led security initiatives, boosting compliance from 65% to 90%. Expertise in data engineering, system design, serverless solutions, and real-time data analytics drives my enthusiasm for transforming ideas into impactful solutions. I'm dedicated to refining cloud infrastructures and continuously improving designs. If our goals align, feel free to message me. I'd be happy to connect!