Azure Event Hubs with Microsoft Fabric
Azure Event Hub, Microsoft’s cloud-based event streaming platform addresses the need for providing a high-throughput distributed service capable of ingesting millions of events per second whether its tracking user activity or monitoring IoT devices or log data.
Just a brief background on the setup of Azure Stream Analytics . It operates around the concept of a Job, which includes an input, query, and output. It can ingest data from sources like Azure Event Hubs (including those from Apache Kafka), Azure IoT Hub, and Azure Blob Storage. The query built on SQL syntax basically enables efficient data streaming over a defined time period.
In this write-up, I will walk through a pretty simple setup that covers the entire data pipeline from ingestion to processing and finally the storage. We’ll start by ingesting data using Azure Event Hub created through a C# library called Bogus
then process it with Azure Stream Analytics, applying transformations through SQL-based queries, and conclude by storing the results in an Azure SQL database and test if data can be replicated to Microsoft fabric. I hope this will provide a clear understanding of how to build an end-to-end solution for real-time data streaming and analytics across Azure and the Fabric ecosystem.
Bogus
is a great fake data generator for .NET
languages. You can check its details here. To install the Nuget
package use Install-Package Bogus
.
To get started, we would mimic data of an product ordering system where there are customers and order details for those customers. The Bogus
library would generate JSON
objects containing customer information along with an array of order details associated with that customer.
Setup
Create a new Console application and ensure following references added to the project
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Bogus;
using System.Text.Json;
Define constants to specify the number of records to generate through Bogus
. We would generate 50 customer records with each customer record having 2 orders.
public const int NumberOfCustomers = 50;
public const int NumberOfOrdersPerCustomer = 2;
Create POCO for Customers
and Orders
public class Customer
{
public Guid Id { get; set; }
public string FirstName { get; set; } = default!;
public string LastName { get; set; } = default!;
public string Address { get; set; } = default!;
public string Email { get; set; } = default!;
public string AboutMe { get; set; } = default!;
public int YearsOld { get; set; }
public IEnumerable<Order>? Orders { get; set; }
public override string ToString()
{
return JsonSerializer.Serialize(this, new JsonSerializerOptions { WriteIndented = true });
}
}
public sealed class Order
{
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public string ProductName { get; set; } = default!;
public override string ToString()
{
return JsonSerializer.Serialize(this, new JsonSerializerOptions { WriteIndented = true });
}
}
Next, create an instance of a class called Faker<T>
to create customer data.
private static Faker<Customer> CustomerGenerator()
{
return new Faker<Customer>()
.RuleFor(e => e.Id, _ => Guid.NewGuid())
.RuleFor(e => e.FirstName, f => f.Name.FirstName())
.RuleFor(e => e.LastName, f => f.Name.LastName())
.RuleFor(e => e.Address, f => f.Address.FullAddress())
.RuleFor(e => e.Email, (f, e) => f.Internet.Email(e.FirstName, e.LastName))
.RuleFor(e => e.AboutMe, f => f.Lorem.Paragraph(1))
.RuleFor(e => e.YearsOld, f => f.Random.Int(18, 60))
.RuleFor(e => e.Orders, (_, e) =>
{
return GenerateOrderData(e.Id);
});
}
public static void GenerateCustomerData()
{
var customerGenerator = CustomerGenerator();
var generatedCustomers = customerGenerator.Generate(NumberOfCustomers);
Customers.AddRange(generatedCustomers);
}
Do the same for Orders
private static Faker<Order> OrderGenerator(Guid customerId)
{
return new Faker<Order>()
.RuleFor(v => v.OrderId, _ => Guid.NewGuid())
.RuleFor(v => v.CustomerId, _ => customerId)
.RuleFor(v => v.ProductName, f => f.Commerce.ProductName());
}
private static List<Order>GenerateOrderData(Guid customerId)
{
var orderGenerator = OrderGenerator(customerId);
var generatedOrders = orderGenerator.Generate(NumberOfOrdersPerCustomer);
Orders.AddRange(generatedOrders);
return generatedOrders;
}
Execute the data generation method.
GenerateCustomerData();
Here is how the output from data generated by Bogus
looks. You would have 50 customer JSON
records created with 2 order details for each customer.
We would then feed this data to Azure Event Hub and create a Stream analytics job to push this data to Azure SQL database.
So first create an Event Hub namespace and an Event hub under the namespace. You can check the step by step process to create one detailed here.
Ensure that you are at least using the Standard Tier for your Event hub namespace else you would face issues with Consumer Group configuration.
Once everything is all set we would push the data generated by the Bogus
library to the Event Hub. I am doing this through the Main
method of the console application.
static async Task Main(string[] args)
{
Console.WriteLine("Bogus has started creating data");
GenerateCustomerData();
Console.WriteLine("Bogus ended creating data");
var connectionString = "EvenHubConnectionString";
var eventHubName = "EventHubName";
Console.WriteLine(">>>>>>>>>>>>>>>>>>");
Console.WriteLine("Started data upload to Event Hub");
var producer = new EventHubProducerClient(connectionString, eventHubName);
try
{
var eventsToSend = new List<EventData>();
for (var index = 0; index < Customers.Count; ++index)
{
var eventData = new EventData(Customers[index].ToString());
eventsToSend.Add(eventData);
}
await producer.SendAsync(eventsToSend);
}
finally
{
await producer.CloseAsync();
}
Console.WriteLine("Data upload to Event Hub ended");
}
Query the data in Event Hubs to ensure that data is transferred. You could do it through Data Explorer for your Event Hub Namespace.
or through Stream Analytics query window.
One interesting point to note is that the Orders
array is stored as a string and this isnt desirable.
Lets try to push the data into Azure SQL DB. First I tried doing it using Transform and store data to SQL database
option . This is a no code feature available to analyze and process the event hub data and push it across different sources.
I tested this feature by using the Expand transformation
step in the flow to parse the Orders
array in the JSON
. But it only separates the individual elements from the array across rows in JSON
format , which wasn’t helpful at all.
The only option left was to use Stream Analytics SQL
and parse and split the Orders array from the JSON
.
So I created a new Stream Analytics jobs and set up the Job Topology with Input as the Event Hub data and Output set to Azure SQL DB.
I then used the following query for data transformation.
SELECT
i.id,
i.firstname,
i.lastname,
i.Address,
i.Email,
i.AboutMe,
i.YearsOld ,
OrderData.ArrayValue.OrderId as OrderId,
OrderData.ArrayValue.CustomerId as CustomerId,
OrderData.ArrayValue.ProductName as ProductName
INTO
[EventHubs-DB] --name of the Azure SQL database
FROM [evenr-hub-1] i --name of the event hub
CROSS APPLY GetArrayElements(Orders) AS OrderData
and it gave me the output that I was looking for.
This way, OrderId
, CustomerId
, and ProductName
from the Orders array of the input JSON
is parsed and flattened into separate rows and columns.
Out of curiosity I then decided to try the EventStream
feature of Microsoft fabric to check if data from Azure Event hub gets perfectly replicated to event stream on fabric
and to my surprise the Orders array from the JSON
input was completely missing.
I am not sure if this is a bug or if there is something that I missed while the configuring Azure Event hub with Fabric Eventstream
.
An update on the above issue:
I deleted and recreated the Eventstream
and the missing column Orders
became available in the output.
So in the next step I went ahead ahead and created anEventHouse
in Fabric
and then created a KQL database
and ingested the data from the Azure
EventHub
to Fabric
EventStream
and then to the newly created KQL database
through the Eventhouse
And once the data was available in the KQL database
, I used the following KQL query to flatten the Orders JSON
data.
Customer_Orders
|extend ExtProp=parse_json(Orders)
|mv-expand ExtProp
| project Id, FirstName,LastName,Address,Email,AboutMe,YearsOld,
Orderid= ExtProp.OrderId,
CustomerId=ExtProp.CustomerId,
ProductName=ExtProp.ProductName
Conclusion
Through this blog I tried to demonstrate a very simple example for generating quick sample data to test the process and storing options using a combination of Azure Event Hub
for data ingestion and Azure Stream Analytics
for data processing in Azure and store that data on Azure SQL DB
.
As it wasn’t possible to port the data from Stream Analytics
directly into Fabric EventStream
, the only option left was to pull data from Azure Eventhub
into Fabric EventStream
and stored that data into a KQL database
through Fabric EventHouse
.
I hope this article would get you started on how to set up the interaction across Azure and Fabric with respect to streaming data and the limitations you might face during the setup.
Thanks for reading !!!
Subscribe to my newsletter
Read articles from Sachin Nandanwar directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by