From Sensors to Streams: Finalizing the IMU Integration and Introducing the Orchestrator

Table of contents
- IMU Sensor Integration via MQTT
- Sensor Layer Architecture Overview
- Creating a Reusable MQTT Client: fp-mqtt-broker
- Buffering IMU Data
- Updated RFID Service Behavior
- Integration Results
- Introducing: The Orchestrator
- Data Lake Design
- fp-orchestrator-utils Python Package
- Further Considerations
- Next steps
- Wrapping Up

Welcome to the third post in my ongoing series documenting the development of my Computer Science dissertation: a privacy-aware, multimodal Human Activity Recognition (HAR) system built to run on resource-constrained edge devices.
This sprint focused on two major components:
Finalizing the Inertial Measurement Unit (IMU) sensor integration using Message Queuing Telemetry Transport (MQTT) for mobile streaming.
Designing the first layer of the orchestrator, which bridges the sensor and cloud layers to build a curated dataset.
IMU Sensor Integration via MQTT
To integrate motion data from mobile devices, I used the Sensor Logger app for Android. Its premium version supports MQTT, allowing seamless publishing of IMU data from the phone to a central broker.
The following architecture exemplifies the MQTT Publish / Subscribe architecture:
Source: https://mqtt.org
MQTT, in short, hosts clients that publish to topics, and subscribers receive those messages.
I used Eclipse Mosquitto as my MQTT broker, hosted on the Raspberry Pi.
Here's a sample configuration to enable public access for testing:
# Allow connections from network
listener 1883 0.0.0.0
# Allow anonymous connections (for testing)
allow_anonymous true
# Logging
log_dest stdout
log_type all
# Persistence
persistence false
Then we start the broker:
rodrigo@raspberrypi:~/fp-imu-service $ mosquitto -c mosquitto.conf -v
1752622740: mosquitto version 2.0.11 starting
1752622740: Config loaded from mosquitto.conf.
1752622740: Opening ipv4 listen socket on port 1883.
1752622740: mosquitto version 2.0.11 running
There might already be a mosquitto
process running after installation. To run the process with the desired configuration, we need to stop the current process using the command: pkill mosquitto
.
Sensor Layer Architecture Overview
The following diagram depicts the architecture of the sensor layer with the MQTT broker:
MQTT Topics:
recording_control
: instructs mobile devices when to start/stop logging.data_stream
: receives IMU payloads from devices.
Participants:
RFID service → Publishes control messages (start/stop)
Mobile device → Subscribes to control, publishes IMU data
IMU service → Subscribes to data, then buffers and processes payloads
Creating a Reusable MQTT Client: fp-mqtt-broker
I would perform MQTT broker operations on multiple devices and services. To reduce boilerplate across services (connection, disconnection, and message handling logic), I created and published a Python package: fp-mqtt-broker on PyPI
This package simplifies and offers an easy-to-use interface for connecting a device to an MQTT broker. It includes a ready-to-use implementation of the paho-mqtt client, but it is designed to be flexible and not dependent on this client, using a factory pattern to create MQTT brokers for:
Connection setup
Topic subscriptions
Message handling (via a factory + handler interface)
This is the basic example of using the package to create a new client to the MQTT broker and assign a custom message handler:
from fp_mqtt_broker import BrokerFactory
from fp_mqtt_broker.abstractions import MessageHandler
class MyHandler(MessageHandler):
def get_subscribed_topics(self): return ['my/topic']
def handle_message(self, topic, payload): print(f"→ {payload}")
broker = BrokerFactory.create_broker(config, [MyHandler()])
broker.connect()
broker.publish_message("my/topic", "hello world")
Buffering IMU Data
The new IMU service includes a buffer system for:
Accelerometer
Gyroscope
Gravity
Orientation (quaternions, pitch/roll/yaw)
Each reading is validated before being added to a capped buffer.
class IMUBuffer:
"""Class to manage the IMU data buffer."""
def validate_sensor_values(self, values, name):
"""Validate the structure of sensor values."""
if not isinstance(values, dict):
raise ValueError("Values must be a JSON object")
# Check for required fields in values
required_fields = ['x', 'y', 'z']
# Orientation requires a different structure.
for field in required_fields:
if field not in values:
raise ValueError(f"Missing required field: {field}")
def add_to_buffer(self, data, buffer):
"""Add new IMU data to the buffer."""
if len(buffer) >= self.max_size:
buffer.pop(0) # Remove oldest data
buffer.append(data)
An IMUMessageHandler
subscribes to the data_stream
topic and pushes incoming data into the buffer. Payloads are expected in the form:
{
"payload": [
{ "name": "accelerometer", "values": { "x": ..., "y": ..., "z": ... } },
...
]
}
Updated RFID Service Behavior
The RFID service now:
Connects to the MQTT broker using
fp-mqtt-broker
Publishes
start
orstop
commands based on tag swipesSimultaneously triggers audio recognition via gRPC
If you don't recall the details and tasks of the RFID service, you can revisit the previous entry at https://typo.hashnode.dev/from-rfid-to-recognition. In short, it is a service that manages GPIO connections with the Raspberry Pi, including an RC522 RFID reader, and interacts with devices like an LCD screen, buzzers, and LEDs.
After the connection is set, the MQTT broker will publish a message to the “recording_control” channel to instruct the start or end of IMU data gathering, depending on the RFID swipe timing.
if not IS_READING:
# ==================== MQTT BROKER MESSAGE PUBLICATION ===========================
mqtt_broker.publish_message(
topic=config['mqtt']['topics']['recording_control'],
payload=json.dumps({"action": "start", "session_id": id})
)
else:
# ==================== MQTT BROKER MESSAGE PUBLICATION ===========================
mqtt_broker.publish_message(
topic=config['mqtt']['topics']['recording_control'],
payload=json.dumps({"action": "stop", "session_id": id})
)
Integration Results
The video below shows the result of integrating MQTT with the sensor layer.
As the video shows, the sensor layer is fully connected, and each RFID tag swipe tells the system to start or stop gathering IMU and audio data.
Introducing: The Orchestrator
With the sensor layer complete, it’s time to go up the stack. The orchestrator bridges the edge and cloud layers. It will:
Provide a User Interface (UI) for labeling activity data.
Store sensor data in an AWS S3-based Data Lake.
Provide a communication bridge between the sensor layer and the cloud.
The next diagram shows the planned architecture with the orchestrator:
Data Lake Design
A Data Lake is a central place to store all your data, both structured and unstructured, at any scale. It keeps data in its raw form until needed for analysis, offering flexibility and scalability. Learn more about data lakes here.
In this Human Activity Recognition system, a Data Lake will store large amounts of diverse data, like IMU and audio data, from various sensors. This data collection is vital for creating a curated, annotated dataset to train machine learning models for recognizing human activities in multi-household settings. The Data Lake's ability to handle different data types and scale makes it perfect for this system's data needs.
fp-orchestrator-utils Python Package
All services on the sensor layer need the orchestrator's new proto definitions to communicate, and duplicating these definitions on each service repository is inefficient. So, I created a public Python package, fp-orchestrator-utils, to provide utilities for the orchestrator. Unlike the fp-mqtt-broker package, this is more tailored to the project and supports only AWS S3, but the code is designed to be expandable for other use cases and cloud vendors.
Features:
CLI for downloading, generating, and uploading gRPC protos from S3
Programmatic S3 data operations with boto3 under the hood
fp-orchestrator-utils proto download
fp-orchestrator-utils proto generate
from fp_orchestrator_utils import S3Service
s3.save("data", "datalake/raw/data.csv")
Further Considerations
Previously, the RFID service controlled and triggered the sensor layer. This role will be passed to the orchestrator, so further refactorization include:
RFID will identify household members via tag IDs.
The orchestrator will manage the start/stop logic via UI.
All services (IMU, Audio) will send data directly to the orchestrator over gRPC.
Security is also an important consideration for the future.
- The MQTT broker's
allow_anonymous
setting allows unwanted anonymous connections. To ensure privacy, we must block these connections. After testing, we should also implement TLS (Transport Layer Security) and authentication protocols for the MQTT broker.
- The MQTT broker's
Next steps
In the next sprint, we will focus on finalizing changes to the sensor layer so that each service can communicate directly with the orchestrator. Additionally, I will set up the data lake to collect enough data to create a curated, annotated, and open-source dataset. This dataset will be used for this project and others related to Human Activity Recognition in multi-household settings.
Wrapping Up
This sprint marked the transition from low-level integration to system-wide coordination. The sensor layer is now modular, testable, and fully interconnected. With MQTT and gRPC in place, the system is ready to scale and support richer functionality like annotation and dataset curation.
Follow the series at typo.hashnode.dev or explore the code on GitHub.
Subscribe to my newsletter
Read articles from Rodrigo Caballero directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Rodrigo Caballero
Rodrigo Caballero
I'm a Software Engineer at JumpCloud and a part-time Pokemon Master. English is not my native language, but I want to improve my storytelling and humour skills, in English specifically, in other languages I'm hilarious. - Automated sentences provided by Grammarly