From RFID to Recognition: Integrating Sensor Layers for Privacy-Aware HAR on the Edge.

Welcome to the second entry in my blog series documenting the development of my dissertation project for a Computer Science degree. This series explores the construction of a privacy-aware, multimodal Human Activity Recognition (HAR) system, designed to run on resource-constrained edge devices. The system monitors activity in a shared environment, using audio, IMU, and RFID sensor inputs.

Sprint Focus (June 16th - 29th)

This sprint covered the foundational infrastructure for the sensor layer, focusing on RFID and integration work. The objectives were:

  • Build and test an RFID sensor layout.

  • Establish unit and end-to-end testing frameworks.

  • Connect the Audio and RFID services via gRPC protocol.

Enhancements from Previous Stage

  1. The full version of Raspberry Pi OS was replaced with the CLI-based Raspberry Pi OS Lite (64-bit). This version reduces disk space usage by ten times and improves access to volatile memory by reducing the number of processes.

  2. The ONNX model and the best_model.pth file created by PyTorch were saved directly using Git version control. However, with each file being about 30MB, this significantly impacted the audio service repository pulls from Git. To improve this, I used Git Large File Storage, which replaces files with text pointers in version control and allows the actual files to be pulled when needed by the Raspberry Pi.

Sensor Layout Plan

The initial task was to plan the main layout of the sensor layer. This layer is responsible for collecting data from the environment for further processing. The HAR system will gather environmental data such as sound, Inertial Measurement Unit (IMU) data, and RFID tag data. The following diagram shows the user flow that will be followed to collect this sensor data for the project's development.

The RFID tag swipe process mimics how a smart home environment works when someone arrives home and uses an RFID tag to gain access. In this sequence diagram, two important changes to the initial assumptions are made:

  1. The plan was to use a long-range RFID reader so that when a user enters the kitchen, data collection would begin automatically. However, long-range RFID readers are more expensive and harder to set up. For this stage of the project, I decided to use the low-range, low-cost RFID reader RC522. This reader is also easy to integrate with the General Purpose Input Output (GPIO) pins of the Raspberry Pi 4.

  2. General-purpose sensors, like the MPU6050, would be used to gather IMU data. However, using these sensors requires a microcontroller to be powered on to process the data, which can be more inconvenient than simply using a mobile phone. The mobile phone uses an app that sends the IMU data through an HTTP client.

In this entry, I will focus on developing the RFID tag and its logic, which involves the first part of the sequence diagram: from the user to the buzzer.

The breadboard schematic below shows the sensor layout.

List of components

  • 1 × 16 × 2 LCD screen.

  • 1 x I2C backpack for LCD screen.

  • 1 x RFID RC522

  • 1 x Active buzzer

  • 1 x Red LED

  • 1 x Green LED

  • 19 x Jumper wires

  • 2 × 220 Ω resistors.

The following section explains the development and testing environment used to implement this sensor layer. It's important to note that this environment should be the standard for other project integrations and services.

Development Environment

To interact with the sensors, I will develop a Python package on a Windows machine using Microsoft's WSL (Windows Subsystem for Linux) with an Ubuntu distribution. This setup ensures that the resulting application is tailored to run on a Linux-based system, such as the Raspberry Pi OS Lite. You can find the code in the following repository: https://github.com/RodCaba/fp-rfid-reader-service.

The external dependencies used by the repository are listed in the README.md file.

spidev==3.7
mfrc522==0.0.7
pytest==8.4.1
pytest-mock==3.14.1
RPLCD==1.4.0
smbus2==0.5.0
coverage==7.9.1
pytest-cov==6.2.1

Code Layout

Each sensor interactor in the code includes the following components: (I’m using the LCD I2C Display interactor as an example)

  1. A service that is started by external libraries, such as the script used for end-to-end testing of the interactors or any other external services.

  2.    from .base import Writer
    
       class LCDService:
         """
         A service class for managing LCD operations.
    
         Attributes:
           writer: An instance of a Writer class that handles LCD writing operations.
         """
         def __init__(self, writer: Writer):
           """
           Initializes the LCDService with a specific Writer instance.
    
           Args:
             writer (Writer): An instance of a Writer class that implements the LCD writing functionality.
           """
           self.writer = writer
    
         def write(self, text: str):
           """
           Writes text to the LCD display.
    
           Args:
             text (str): The text to be displayed on the LCD.
           """
           try:
             self.writer.write(text)
           except Exception as e:
             print(f"Error writing to LCD: {e}")
    
         def clear(self):
           """
           Clears the LCD display.
    
           This method calls the clear method of the Writer instance to clear any text currently displayed.
           """
           try:
             self.writer.clear()
           except Exception as e:
             print(f"Error clearing LCD: {e}")
    
  3. An abstract class for the sensor interactor is passed to the service. This abstraction is designed for the dependency inversion principle (part of the SOLID principles) and allows the services to run unit tests on devices other than the intended hardware, like the Raspberry Pi. For example, importing the GPIO module on a device other than a Raspberry Pi would cause a runtime error: from RPi._GPIO import * RuntimeError: This module can only be run on a Raspberry Pi!

     from abc import ABC, abstractmethod
    
     class Writer(ABC):
         """
         Abstract base class for LCD writers.
    
         This class defines the interface that all concrete LCD writer 
         implementations must follow.
         """
         def __init__(
                 self,
                 i2c_expander="PCF8574",
                 address=0x27,
                 port=1,
                 cols=16,
                 rows=2,
                 dotsize=8,
             ):
             """
             Initialize the LCD writer.
    
             This method can be overridden by subclasses to perform any necessary
             setup for the LCD display.
             """
             self.i2c_expander = i2c_expander
             self.address = address
             self.port = port
             self.cols = cols
             self.rows = rows
             self.dotsize = dotsize
    
         def __del__(self):
             """
             Clean up resources when the LCD writer is deleted.
    
             The LCD display should be cleared to ensure no residual text
             remains when the writer is no longer in use.
             """
             try:
                 self.clear()
             except Exception as e:
                 print(f"Error during cleanup: {e}")
    
         @abstractmethod
         def write(self, text: str):
             """
             Write text to the LCD display.
    
             Args:
                 text (str): The text to display on the LCD.
    
             Raises:
                 Exception: If there's an error writing to the display.
             """
             pass
    
         @abstractmethod
         def clear(self):
             """
             Clear the LCD display.
    
             This method should be called to clear any text currently displayed
             on the LCD.
             Raises:
                 Exception: If there's an error clearing the display.
             """
             pass
    
  4. One or more concrete implementations of the sensor interaction abstraction. This involves importing external modules and implementing the abstraction functions.

     from ..base import Writer
     from RPLCD.i2c import CharLCD
    
     class CharLCDWriter(Writer):
         """
         Concrete implementation of the Writer interface for character LCD displays.
         """
    
         def __init__(
                 self,
                 i2c_expander="PCF8574",
                 address=0x27,
                 port=1,
                 cols=16,
                 rows=2,
                 dotsize=8,
             ):
             super().__init__(
                 i2c_expander=i2c_expander,
                 address=address,
                 port=port,
                 cols=cols,
                 rows=rows,
                 dotsize=dotsize,
             )
             self.lcd = CharLCD(
                 i2c_expander=i2c_expander,
                 address=address,
                 port=port,
                 cols=cols,
                 rows=rows,
                 dotsize=dotsize,
             )
    
         def write(self, text: str):
             """
             Write text to the LCD display.
    
             Args:
                 text (str): The text to display on the LCD.
    
             Raises:
                 Exception: If there's an error writing to the display.
             """
             try:
                 self.lcd.write_string(text)
             except Exception as e:
                 raise Exception(f"Error writing to LCD: {e}")
    
         def clear(self):
             """
             Clear the LCD display.
    
             Raises:
                 Exception: If there's an error clearing the display.
             """
             try:
                 self.lcd.clear()
             except Exception as e:
                 raise Exception(f"Error clearing LCD: {e}")
    

In summary, each hardware interaction is abstracted and injected into a service to:

  • Avoid hardware, such as RPi.GPIO , errors on non-Pi machines

  • Enable full mocking and testing

  • Follow Dependency Inversion and Open/Closed principles

Testing Environment

Unit Testing

Under the tests folder, the layout of the src folder is copied so that each application service has its own set of unit tests. This suite includes unit tests for the sensor service and each specific implementation of the sensor interactor.

The project runs unit tests on every push or pull request to the master branch using a GitHub action workflow. This ensures that everything added to the master branch passes the unit test suites.

Furthermore, one of the project's goals is to achieve over 80% statement test coverage in unit tests. The current coverage report from pytest-cov shows a 95% test coverage.

======================================================================= tests coverage =======================================================================
______________________________________________________ coverage: platform linux, python 3.10.12-final-0 ______________________________________________________

Name                                           Stmts   Miss  Cover
------------------------------------------------------------------
src/gpio/gpio_controller.py                       23      0   100%
src/lcd/base.py                                   20      2    90%
src/lcd/implementations/charlcd_writer.py         16      0   100%
src/lcd/lcd_service.py                            14      0   100%
src/reader/base.py                                 8      2    75%
src/reader/implementations/mfrc522_reader.py      16      1    94%
src/reader/reader_service.py                      11      0   100%
------------------------------------------------------------------
TOTAL                                            108      5    95%

Integration Tests

In addition to the unit test setup, the repository includes an integration folder with integration tests. These tests ensure that the integration between services works as expected. They are labeled with an "integration" tag using the pytest framework and, like unit tests, are run in the GitHub action workflow when there is a push or pull request to the master branch.

End to End Tests

Finally, a set of End to End tests was set up for the User to Buzzer interaction. A format was developed, which can be found at this URL. This format describes the test scenarios, execution log, and the software and hardware specifications of the tests. The end-to-end script implements the intended functionality and is run to test the scenarios.

Building the Sensor Layout

LCD Display I2C

The first step is to solder the I2C backpack to the LCD display. Some versions of the LCD display come with the backpack already soldered, which can save you this step (especially if you're not great at soldering like me). Next, I enabled the I2C interface using the raspi-config module and installed the necessary tools with sudo apt-get install i2c-tools python3-smbus.

After connecting the LCD to the Raspberry Pi, you need to detect the I2C bus. Run the following command and take note of the address given:

$ sudo i2cdetect 1

WARNING! This program can confuse your I2C bus, cause data loss and worse!
I will probe file /dev/i2c-1.
I will probe address range 0x08-0x77.
Continue? [Y/n] Y
     0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f
00:                         -- -- -- -- -- -- -- --
10: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
20: -- -- -- -- -- -- -- 27 -- -- -- -- -- -- -- --
30: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
40: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
50: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
60: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
70: -- -- -- -- -- -- -- --

This matrix shows that there is an I2C bus detected at address 0x27. You will need to add this address to the code when starting the LCD service.

RFID Reader.

Once the RFID reader is soldered, I connect it to the GPIOs of the Raspberry Pi, enable the SPI interface using the raspi-config tool and then reboot the device. The MFRC522 Python package includes a SimpleMFRC522 class that makes it easier to initialize and communicate with the reader. However, there is a known compatibility issue with newer RFID tags that do not support authentication protocols. When you run the reader function and detect a new RFID tag (like an NTAG215), the following runtime error will occur:

"AUTH ERROR!!
AUTH ERROR(status2reg & 0x08) != 0"

To solve this, you need to use the MFRC522 class instead. You can find more details about the issue on this GitHub page, where the solution was sourced: https://github.com/pimylifeup/MFRC522-python/issues/31

gRPC Services Connection

To establish a connection between both services, I decided to use the gRPC protocol. This choice was driven by the need for enhanced performance and the compatibility with the solution's microservices architecture. The gRPC protocol is well-suited for this task because it allows for efficient communication between distributed systems. In this setup, the audio service will be made accessible, and the RFID service will send a request to initiate the audio recognition loop whenever an RFID tag is detected.

The initial step in this process involves defining the Protocol Buffer, which serves as the interface definition language for the service. This definition will specify the structure of the data and the methods that the services will use to communicate.

syntax = "proto3";

package audio_service;

// Audio processing service
service AudioService {
    // Start audio recording and processing
    rpc StartAudioProcessing(AudioRequest) returns (AudioResponse);

    // Get the status of audio processing
    rpc GetProcessingStatus(StatusRequest) returns (StatusResponse);

    // Health check
    rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
}

// Request message for audio processing
message AudioRequest {
    string session_id = 1;           // Unique session identifier
    int32 recording_duration = 2;    // Duration in seconds
    string output_format = 3;        // Output format (wav, mp3, etc.)
}

In this last extract of the definition file, I'm defining the Protocol Buffer to serialize the data that the service will request and return as a response. I can also set up a set of functions that the RFID service can call. Using the grpc_tools Python package, I've compiled the Protocol Buffer definition into Python classes to be used in the code.

from src.grpc_generated import audio_service_pb2, audio_service_pb2_grpc
from src.predictor.predict import AudioPredictor


class AudioService(audio_service_pb2_grpc.AudioServiceServicer):
    def __init__(self):
        # Initialize the predictor
        model_path = os.path.join(
                                  "exported_models", "model.onnx")
        print(f"Loading model from: {model_path}")
        self.predictor = AudioPredictor(model_path, feature_type="melspectrogram")

       ...

    def StartAudioProcessing(self, request, context):
        """Start audio recording and processing"""
        session_id = request.session_id or str(uuid.uuid4())

        # Process audio code with AudioPredictor...

        return audio_service_pb2.AudioResponse(
                session_id=session_id,
                success=True,
                predicted_class=predicted_class,
                confidence=float(confidence),
                top_predictions=top_predictions
            )

Each function defined in the Protocol Buffer must be implemented in the class that extends the AudioServiceServicer class. These functions should accept and return the expected Protocol Buffer types.

The same Protocol Buffer definition and gRPC auto-generated Python classes are used in the RFID service. A client is set up to connect and call the functions to communicate with the audio service.

class AudioServiceClient:
    """gRPC client for Audio Service"""

    def __init__(self, server_address: str = None, timeout: int = 30):
        # Use environment variable or default to Docker service name
        if server_address is None:
            server_address = os.environ.get('AUDIO_SERVICE_URL', 'localhost:50051')

        self._connect()

    def _connect(self):
        """Establish connection to audio service"""
            self.logger.info(f"Attempting to connect to audio service at {self.server_address}")
            self.channel = grpc.insecure_channel(self.server_address)
            self.stub = audio_service_pb2_grpc.AudioServiceStub(self.channel)

    def start_audio_processing(self, duration: int = 5, session_id: Optional[str] = None) -> Optional[Dict]:
        """
        Start audio recording and processing
        """
            if session_id is None:
                session_id = str(uuid.uuid4())

            request = audio_service_pb2.AudioRequest(
                session_id=session_id,
                recording_duration=duration,
                output_format="wav"
            )

            response = self.stub.StartAudioProcessing(request, timeout=self.timeout)

Final Integrated Result

The RFID service will wait for the RFID swipe. Once this happens, it will create a new thread using the threading package. This thread requests the Audio service to process audio through gRPC until the RFID tag is swiped again. These separate threads are created to ensure that RFID tag swipe detection is not blocked by the main thread.

The following video demonstrates the integrated result. You can find the end-to-end testing execution in the End to end testing format.

As the test execution log explains, the RFID swipe stress test is failing. Swiping the RFID reader repeatedly creates new threads and causes overlaps in predictions. Ideally, the audio processing thread should finish before starting a new one.

Known Limitations

  1. The plan was to use Docker containers for the services, with the Raspberry Pi managing the initialization of these containers. However, because containers operate separately from the Raspberry Pi environment, setting up the audio hardware and GPIO pins in the containers proved to be complicated. As a result, the idea was abandoned in favor of running the Python scripts and starting the servers on the local machine.

What’s Next?

  1. To enhance the quality and maintainability of the codebase, the next step involves implementing a linting tool integrated with a GitHub Actions (GHA) workflow.

    1. This tool will automatically check the code for adherence to coding standards, ensuring consistency and promoting best practices across the entire project.
  2. Begin by implementing the IMU (Inertial Measurement Unit) Service, which is designed to capture detailed IMU data from the mobile device. This service will be responsible for collecting various types of sensor data, including accelerometer, gyroscope, and other readings.

  3. As shown by E2E testing, we need a stronger RFID swipe logic to pass the stress execution tests.

Wrapping Up

This sprint marked a pivotal step in bridging the hardware and software layers of the HAR system. From soldering and assembling the sensor layout to implementing abstraction layers and wiring services with gRPC, the project is now equipped with a solid, extensible foundation. The successful integration between RFID and audio modules shows that even in constrained environments, it's possible to build smart, responsive systems with strong architectural principles.

The upcoming sprints will push this foundation further—by incorporating mobile-based IMU data, refining audio recognition, and reinforcing system performance under load.

Thanks for following along. Feel free to explore the repositories, share feedback, or connect if you're navigating similar challenges in edge AI, IoT, or smart environments.

📬 Follow this series on typo.hashnode.dev to see how this HAR system evolves from prototype to production-ready.

0
Subscribe to my newsletter

Read articles from Rodrigo Caballero directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Rodrigo Caballero
Rodrigo Caballero

I'm a Software Engineer at JumpCloud and a part-time Pokemon Master. English is not my native language, but I want to improve my storytelling and humour skills, in English specifically, in other languages I'm hilarious. - Automated sentences provided by Grammarly