Realtime IOT Data Ingestion to S3
This is the simple architecture of our project. We will write one lambda function that will insert some dummy data using IOT core. The IOT core acts as a message transporting service.
IOT Core
We connect IOT devices to IOT Core using some certificates/protocols. Then our IOT devices send the real time data into our IOT Core.
In IOT Core, we have some topics and in these topics our data/messages are actually being transmitted.
Once the IOT Core receives the data in the topics, it can then send the information to certain delivery destinations which we would have setup within the topics such as S3 bucket
Amazon Firehose
Firehose is a delivery stream. We can send our real time data using multiple services. e.g. a python code to ingest our data.
Firehose does some basic transformations AWS lambda functions. firehose automatically sends the received real time data to lambda function and then lambda function then sends that data to our delivery stream.
That transformed data can be sent to multiple services such as redshift, s3, api gateway etc.
Now that we have an understanding, let's begin.
Step 1
First, we create our bucket. For this demo, you leave the settings as default.
Next, we create an IOT topic so send our IOT Data.
Select IOT Core in your AWS console.
Here we can connect our devices. In this demo, it's our solar panel.
we will be sending our data with a lambda function because we are simulating our data.
We will be sending some data to an IOT rule.
Got to message routing ---> rules and create new rule
On the next screen we setup our sql statement. Here we will be selecting all the data from our rule and click next once done.
Now we need to perform rule action which means whenever the data comes into your rule, you setup where you want to send this data.
We will be sending our data to kinesis firehose, but you can send your data into multiple services.
Sending to firehose requires a firehose stream so we will create a new firehose stream.
AWS IOT is going to issue a PUT request to send your data into AWS kinesis Firehose, so we choose Direct PUT for the source and S3 for our destination.
Next were going to check the box to enable basic transformation. this will require us to provide the lambda function for transformation.
let's open a new tab and create a new lambda function.
Step 2
In our lambda console, we'll be creating 2 lambda functions. First one we will be creating is the IOT test simulator which will be sending the data to IOT Core.
Create a new function like below and leave advanced settings on default options.
In the code tab, we will be using the following code.
import boto3
import time
import json
import random
iot_client = boto3.client('iot-data')
topic = 'solar_data'
def lambda_handler(event, context):
The code above does the following:
It imports necessary modules:
boto3
for AWS SDK,time
for time-related operations,json
for JSON handling, andrandom
for generating random data.It sets up an IoT client using
boto3.client('iot-data')
.It defines a Lambda handler function named
lambda_handler
, which is the entry point for the Lambda function.
In the lambda_handler event we run the following:
# TODO implement for _ in range(60): payload_rows = [] print("Sending message", _) for _ in range(random.randint(5, 6)): DCPower = random.randint(500, 1000) ACPower = random.randint(500, 1000) SunlightIntensity = random.randint(1000, 5000) DailyYield = random.randint(90000, 110000) temperature = random.randint(80, 120) city = random.choice(["H", "S", "D", "A", "F", "E", "A", "C", "P", "L", "L"]) masterid = random.randint(1, 10) state = "Texas" row = f"{DCPower},{ACPower},{SunlightIntensity},{DailyYield},{temperature},{state},{city},{masterid}" payload_rows.append(row) payload = ';'.join(payload_rows) # Join rows with newline print(payload) response = iot_client.publish(topic=topic, qos=1, payload=payload) time.sleep(1) # Pause for 1 second between messages
It runs a loop for 60 iterations (for 60 seconds).
Within each iteration, it generates random data for simulated solar metrics like DC Power, AC Power, Sunlight Intensity, Daily Yield, temperature, city, masterid, and state.
It constructs a string representing a row of data in the format:
DCPower,ACPower,SunlightIntensity,DailyYield,temperature,state,city,masterid
.It joins multiple rows together with a semicolon delimiter.
It publishes the payload to the specified IoT topic using the IoT client (
iot_client.publish
).It pauses for 1 second between each message using
time.sleep(1)
you can test it by invoking the code but comment out the response line because you will get a client error. Thereafter you may also get a timeout error, but this will be due to the lambda general config settings of 3 seconds timeout. (You can set it to 5minutes for the sake of the test)
So, this code can simulate sending solar data messages to our AWS IoT topic.
Next, we will create permission for our lambda function so that lambda can successfully send the information to our IOT Core.
In the IAM Console, browse to attach policy and choose IOT full access and add permission.
That should be it for our first lambda function.
Make sure to uncomment the response line in the code and redeploy if you did a test.
Step 3
We will be creating our transformation function.
This code is basically taking in data, decoding it, converting it into a specific format (JSON), and then encoding it again in a list before it returns it to Firehose.
import base64
import json
import pandas as pd
import boto3
import io
def convert_to_json(payload,schema):
values = payload.strip().split(',')
if len(values) == len(schema):
item = {}
for i in range(len(schema)):
item[schema[i]] = values[i]
return item
Imports:
- It imports necessary modules:
base64
for base64 encoding and decoding,json
for JSON handling,pandas
for data manipulation,boto3
for AWS SDK, andio
for input and output operations.
Conversion Function:
It defines a function
convert_to_json(payload, schema)
to convert a payload (a comma-separated string) into a JSON format based on a given schema (list of field names).It splits the payload into values based on commas, then iterates through the schema to create a JSON object with field names as keys and corresponding values from the payload.
Next part of the code is the lambda handler.
def lambda_handler(event, context): output = [] schema = ["DCPower","ACPower","SunlightIntensity","DailyYield","temperature","state","city","masterid"] for record in event['records']: payload = base64.b64decode(record['data']).decode('utf-8') print("payload_is:",payload) json_data = convert_to_json(payload,schema) print(json_data) output_record = { 'recordId': f"{record['recordId']}", 'result': 'Ok', 'data': base64.b64encode(json.dumps(json_data).encode('utf-8')).decode('utf-8') } output.append(output_record) print('Successfully processed {} records.'.format(len(event['records']))) return {'records': output}
Lambda Handler:
The
lambda_handler
function is the entry point for the Lambda function.It initializes an empty list called
output
to store processed records.It defines a schema representing the fields in the data.
It iterates over each record in the
event['records']
, provided by Amazon Kinesis Firehose.For each record:
It decodes the encoded data in the record.
It converts the decoded payload into JSON format using the
convert_to_json
function. Essentially It defines how the data should look after conversion (a schema)It prepares an output record containing the original record ID, a result status ("Ok"), and the transformed data encoded.
It adds the output record to the
output
list.It prints out how many pieces of data were processed.
It returns the transformed data.
Step 4
Next, we continue to setup our stream. add the transformation lambda function. click on browse.
add the s3 bucket you created and enable dynamic partitioning.
enable new line delimiter.
we will also specify that we need multi record reaggregation, which firehose will be performing.
We will be using Delimited deaggregation type.
The delimiter is our semi colon, but we will be checking for the base64 encoding type for the sake of firehose.
Go to the link below --> Base64 Encode and Decode - Online
copy that into the Delimiter Deaggregation box.
Inline parsing means you will partition your data into the S3 bucket. So we enable that.
In the example record, it tells you how it specifies the inline parsing.
We will be adding the following key names and expressions and applying them.
In case of error, we specify an output for our S3. Change the retry duration to 0 as we don't need that currently.
We add the buffer size, the minimum being 64mb and buffer intervals, 60 seconds. So after 60 seconds we should be seeing our data in the S3 bucket.
We can leave the rest of the settings as they are and go ahead to create our stream. it will take a few minutes for everything to process.
Step 5
Back to our IOT core to continue setup, we need to select the firehose stream we just created.
For the separator we choose new line. This is so we can add a separator between the records (\n). Then we create an IAM role and give it a name.
Once created, go ahead and create the rule.
Step 6
To see everything in action, we're going to go to our simulation lambda function and invoke the code.
This particular action will run for 60 seconds and it's pushing the data to our IOT topic which in turn sends the data to our firehose delivery stream.
Next, we go to our transformation lambda function, in the CloudWatch log you can see how firehose is sending the information through to our transformation function.
If you get the below error, it means you need to add the panda python package as a layer to your function.
visit this link to learn how to do that.
We can see the output of our data in the S3 bucket we created now
You may have to wait a few minutes for the output to populate into the S3.
once its created, you will see the folder(s) generated. when you drill into them, you'll notice firehose has created the partition for our data.
You can see the separate folders its created for multiple cities and in them, you'll find multiple masterid folders. I have opened masterid 10 in the example below.
We can now go ahead and download the data stream file
When you download, open in notepad to see the data. you can see the deaggregated data and the separator just as we have specified earlier.
Conclusion
We setup a data pipeline for processing and storing simulated solar power data using AWS services:
Data Simulation: A Lambda function simulates solar power data and sends it to an AWS IoT topic using the IoT Core service.
Data Routing: The IoT Core forwards the data to a Kinesis Data Firehose delivery stream.
Data Transformation: Another Lambda function processes the data from the delivery stream, converting it to JSON format and encoding it before sending it to an S3 bucket.
Data Storage: The transformed data is stored in the S3 bucket, partitioned based on specific criteria like city and master ID.
Data Access: The stored data can be accessed and analyzed for further insights.
Overall, this architecture allows for the collection, processing, and storage of solar power data, enabling analysis and decision-making based on real-time and historical information.
Subscribe to my newsletter
Read articles from teju fam directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by