Microsoft Fabric Real-Time Intelligence with Eventstream Custom Endpoints


Overview
Navigating the world of real-time data processing can be complex, but Microsoft Fabric's Real-Time Intelligence with Eventstream makes it surprisingly intuitive. This guide walks you through a practical scenario where we'll transform randomized locally generated bogus humidity sensor data using custom endpoints as both input and output sources.
System Architecture
The architecture consists of the following stages:
Custom Endpoint as input Data Source: Sensor data is ingested via a local custom endpoint(Generated in the local system via VS Code)
Microsoft Fabric Eventstream: The data flows through transformations to refine and enhance its usability.
Derived Streams as output Data Source: Processed data is sent to another stream and acts as an endpoint to send data.
Eventhouse KQL Database: Data is stored for further analytics and historical tracking.
Prerequisites
- Fabric Eventstream configured in Microsoft Fabric (Because we need to get the Eventhub Name and Primary Connection String to be included in the producer code)
Step 1: Simulating Humidity Sensor Data
A producer script simulates sensor data and sends it to a custom endpoint. This will be executed in VS Code. As a best practice, the Event Hub name and primary connection strings should be stored securely, such as in Azure Key Vault, and should not be exposed. However, for demonstration purposes, I will explain them as they are.
# INSTALL azure-eventhub before running!
# pip command:
# pip install azure-eventhub
# Primary Stream
from azure.eventhub import EventHubProducerClient, EventData
from datetime import datetime, timezone
import hashlib
import random
import time
import json
# Replace the placeholders with your Event Hubs connection string and event hub name
EVENTHUB_NAME = 'es_d8ecd8c2-45c2-41e8-8f67-2d37e0c8112b'
CONNECTION_STR = 'Endpoint=sb:/'
# Configuration variables
MIN_HUMIDITY = 30.0 # Minimum humidity value
MAX_HUMIDITY = 60.0 # Maximum humidity value
COUNTRY = "Canada" # Country
CITY = "Toronto" # City
SLEEP_TIME = 5 # Sleep time before sending next event
# Example message
'''
{
"country": "Canada",
"city": "Toronto",
"timestamp": "2025-01-25T12:36:03.826769+00:00",
"humidity_readings": [
{
"sensor": "sensor_1",
"humidity": "45.34"
},
{
"sensor": "sensor_2",
"humidity": "50.21"
},
{
"sensor": "sensor_3",
"humidity": "55.69"
}
],
"event_id": "88709af29e138d8d906e009a800ebaddacd46d89d20ec91151e2ab91557170c5"
}
'''
# Create a producer client to send messages to the event hub
producer = EventHubProducerClient.from_connection_string(conn_str=CONNECTION_STR, eventhub_name=EVENTHUB_NAME)
def generate_fake_humidity(min_humidity, max_humidity):
"""Simulate a fake humidity reading within the specified range or generate null."""
return str(round(random.uniform(min_humidity, max_humidity), 2))
def get_random_sensor_readings(min_humidity, max_humidity):
"""Generate a list of humidity readings for random sensors."""
sensors = ["sensor_1", "sensor_2", "sensor_3"]
selected_sensors = random.sample(sensors, random.randint(1, len(sensors)))
return [{"sensor": sensor, "humidity": generate_fake_humidity(min_humidity, max_humidity)}
for sensor in selected_sensors]
def generate_event_id(payload):
"""Generate a SHA256 hash as a unique event ID."""
hash_object = hashlib.sha256(json.dumps(payload, sort_keys=True).encode('utf-8'))
return hash_object.hexdigest()
def get_current_timestamp():
"""Return the current timestamp in ISO 8601 format."""
return datetime.now(timezone.utc).isoformat()
try:
# Continuously generate and send fake humidity readings
while True:
# Create a batch.
event_data_batch = producer.create_batch()
# Generate random sensor readings
humidity_readings = get_random_sensor_readings(MIN_HUMIDITY, MAX_HUMIDITY)
# Create the payload
payload = {
"country": COUNTRY,
"city": CITY,
"timestamp": get_current_timestamp(),
"humidity_readings": humidity_readings
}
# Generate an event_id
payload["event_id"] = generate_event_id(payload)
# Format the message as JSON
message = json.dumps(payload)
# Add the JSON-formatted message to the batch
event_data_batch.add(EventData(message))
# Send the batch of events to the event hub
producer.send_batch(event_data_batch)
print(json.dumps(json.loads(message), indent=4))
print(event_data_batch)
# Wait for a bit before sending the next reading
time.sleep(SLEEP_TIME)
except KeyboardInterrupt:
print("Stopped by the user")
except Exception as e:
print(f"Error: {e}")
finally:
# Close the producer
producer.close()
Here’s a brief breakdown of the code snippet:
It generates random humidity readings between 30% and 60% for three sensors.
The data includes location (
Canada, Toronto
) and a timestamp.The payload is serialized into JSON and assigned a unique event ID using SHA-256 hashing.
The script continuously generates and prints this data every 5 seconds, simulating real-time sensor readings.
It runs in an infinite loop until manually stopped (
KeyboardInterrupt
).Event hub name and Connection String-Primary key should be obtained from folloiwng (That’s why we need to first create an Eventstream and then select the source as CustomEndpoint )
Following are the payload output in VS Code
Step 2: Transforming Data in Microsoft Fabric Eventstream
In these steps, i will explain different transformations done to the generated Eventstream output.
2.1 Expanding Humidity Readings
This transformation expands the humidity_readings
array into individual events using Expand transformation function,what the Expand function does is ‘Create a new row for each value within an array’ like below.
{
"sensor": "sensor_1",
"humidity": 45.5,
"country": "Canada",
"timestamp": "2025-02-07T12:00:00Z",
"event_id": "<unique_hash>"
}
2.2 Managed Field Transformations
Renaming column names and data type changes are done in this step
{
"sensor": "sensor_1",
"humidity": 45.5,
"country": "Canada",
"event_id": "<unique_hash>"
}
2.3 Group by Transformation
We will add a new column based on the average humidity for each given Event_id. To achieve this, first, perform a group-by operation and then Left join the result with the original table stream.
2.3 Left Join and Managed Fields Transformations
(New AVG_humidity column has been added)
These Processed data are then routed to:
A derived stream /secondary stream which acts as an input for another custom endpoint.
An Eventhouse/KQL DB (Will not explain the usage of this as the use case in this post is custom endpoints)
Step 3: Consuming derived stream /secondary stream via custom endpoint
The following code block, when executed in VS Code, retrieves grouped and transformed data which pushes through the secondary stream. Similarly, a new EventHub name and connection string need to be added.
# Secondary Stream
# INSTALL azure-eventhub before running!
# pip install azure-eventhub
from azure.eventhub import EventHubConsumerClient
import json
# Replace the placeholders with your Event Hubs connection string and event hub name
EVENTHUB_NAME = 'des_cee26d55-449b-4c4e-a0dc-ed481eaed80c'
CONNECTION_STR = 'Endpoint=sb://.windows.net/;'
CONSUMER_GROUP = '$Default' # Change if using a different consumer group
def on_event(partition_context, event):
"""Callback function to process received events."""
try:
# Decode event data
event_data = json.loads(event.body_as_str())
# Print received event
print(json.dumps(event_data, indent=4))
# Update checkpoint
partition_context.update_checkpoint(event)
except Exception as e:
print(f"Error processing event: {e}")
# Create a consumer client
consumer = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STR,
consumer_group=CONSUMER_GROUP,
eventhub_name=EVENTHUB_NAME
)
try:
print("Listening for events...")
with consumer:
consumer.receive(
on_event=on_event,
starting_position="-1" # Read from the beginning
)
except KeyboardInterrupt:
print("Stopped by user.")
except Exception as e:
print(f"Error: {e}")
finally:
consumer.close()
Conclusion
Adding custom endpoints as inputs and outputs in Microsoft Fabric Eventstream provides greater flexibility for real-time data ingestion and distribution, making it easier to integrate with various systems and applications.
As an Input Source
Bring in real-time data from external systems, IoT devices, or third-party applications.
Connect to proprietary or industry-specific data sources that aren’t natively supported.
Ingest data from APIs, webhooks, or other event-driven sources.
As an Output Destination
Route transformed event data to external applications, analytics platforms, or databases.
Send data to business systems, dashboards, or machine learning models for further processing.
Integrate with third-party APIs or messaging services for real-time automation.
By using custom endpoints, you can seamlessly extend Eventstream’s capabilities to fit your specific needs, ensuring your data flows where it’s needed in real-time.
Thanks for Reading !!!
Subscribe to my newsletter
Read articles from Nalaka Wanniarachchi directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by

Nalaka Wanniarachchi
Nalaka Wanniarachchi
Nalaka Wanniarachchi is an accomplished data analytics and data engineering professional with over 18 years of experience. As a CIMA(ACMA/CGMA) UK qualified ex-banker with strong analytical skills, he transitioned into building robust data solutions. Nalaka specializes in Microsoft Fabric and Power BI, delivering advanced analytics and engineering solutions. He holds a Microsoft certification as a Fabric Analytic Engineer and Power BI Professional, combining technical expertise with a deep understanding of financial and business analytics.