Azure Event Hubs with Microsoft Fabric

Azure Event Hub, Microsoft’s cloud-based event streaming platform addresses the need for providing a high-throughput distributed service capable of ingesting millions of events per second whether its tracking user activity or monitoring IoT devices or log data.

Just a brief background on the setup of Azure Stream Analytics . It operates around the concept of a Job, which includes an input, query, and output. It can ingest data from sources like Azure Event Hubs (including those from Apache Kafka), Azure IoT Hub, and Azure Blob Storage. The query built on SQL syntax basically enables efficient data streaming over a defined time period.

In this write-up, I will walk through a pretty simple setup that covers the entire data pipeline from ingestion to processing and finally the storage. We’ll start by ingesting data using Azure Event Hub created through a C# library called Bogus then process it with Azure Stream Analytics, applying transformations through SQL-based queries, and conclude by storing the results in an Azure SQL database and test if data can be replicated to Microsoft fabric. I hope this will provide a clear understanding of how to build an end-to-end solution for real-time data streaming and analytics across Azure and the Fabric ecosystem.

Bogus is a great fake data generator for .NET languages. You can check its details here. To install the Nuget package use Install-Package Bogus.

To get started, we would mimic data of an product ordering system where there are customers and order details for those customers. The Bogus library would generate JSON objects containing customer information along with an array of order details associated with that customer.

Setup

Create a new Console application and ensure following references added to the project

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Bogus;
using System.Text.Json;

Define constants to specify the number of records to generate through Bogus. We would generate 50 customer records with each customer record having 2 orders.

  public const int NumberOfCustomers = 50;
  public const int NumberOfOrdersPerCustomer = 2;

Create POCO for Customers and Orders

   public class Customer
   {
       public Guid Id { get; set; }
       public string FirstName { get; set; } = default!;
       public string LastName { get; set; } = default!;
       public string Address { get; set; } = default!;
       public string Email { get; set; } = default!;
       public string AboutMe { get; set; } = default!;
       public int YearsOld { get; set; }
       public IEnumerable<Order>? Orders { get; set; }
       public override string ToString()
       {
           return JsonSerializer.Serialize(this, new JsonSerializerOptions { WriteIndented = true });
       }
   }

public sealed class Order
 {
     public Guid OrderId { get; set; }
     public Guid CustomerId { get; set; }
     public string ProductName { get; set; } = default!;

     public override string ToString()
     {
         return JsonSerializer.Serialize(this, new JsonSerializerOptions { WriteIndented = true });
     }
 }

Next, create an instance of a class called Faker<T> to create customer data.

 private static Faker<Customer> CustomerGenerator()
 {
     return new Faker<Customer>()
         .RuleFor(e => e.Id, _ => Guid.NewGuid())
         .RuleFor(e => e.FirstName, f => f.Name.FirstName())
         .RuleFor(e => e.LastName, f => f.Name.LastName())
         .RuleFor(e => e.Address, f => f.Address.FullAddress())
         .RuleFor(e => e.Email, (f, e) => f.Internet.Email(e.FirstName, e.LastName))
         .RuleFor(e => e.AboutMe, f => f.Lorem.Paragraph(1))
         .RuleFor(e => e.YearsOld, f => f.Random.Int(18, 60))
         .RuleFor(e => e.Orders, (_, e) =>
         {
             return GenerateOrderData(e.Id);
         });
 }

public static void GenerateCustomerData()
{
    var customerGenerator = CustomerGenerator();
    var generatedCustomers = customerGenerator.Generate(NumberOfCustomers);
    Customers.AddRange(generatedCustomers);
}

Do the same for Orders

 private static Faker<Order> OrderGenerator(Guid customerId)
 {
     return new Faker<Order>()
         .RuleFor(v => v.OrderId, _ => Guid.NewGuid())
         .RuleFor(v => v.CustomerId, _ => customerId)
         .RuleFor(v => v.ProductName, f => f.Commerce.ProductName());
 }

 private static List<Order>GenerateOrderData(Guid customerId)
 {
     var orderGenerator = OrderGenerator(customerId);
     var generatedOrders = orderGenerator.Generate(NumberOfOrdersPerCustomer);
     Orders.AddRange(generatedOrders);
     return generatedOrders;
 }

Execute the data generation method.

GenerateCustomerData();

Here is how the output from data generated by Bogus looks. You would have 50 customer JSON records created with 2 order details for each customer.

We would then feed this data to Azure Event Hub and create a Stream analytics job to push this data to Azure SQL database.

So first create an Event Hub namespace and an Event hub under the namespace. You can check the step by step process to create one detailed here.

Ensure that you are at least using the Standard Tier for your Event hub namespace else you would face issues with Consumer Group configuration.

Once everything is all set we would push the data generated by the Bogus library to the Event Hub. I am doing this through the Main method of the console application.

 static async Task Main(string[] args)
 {
     Console.WriteLine("Bogus has started creating data");
     GenerateCustomerData();
     Console.WriteLine("Bogus ended creating data");

     var connectionString = "EvenHubConnectionString";
     var eventHubName = "EventHubName";

     Console.WriteLine(">>>>>>>>>>>>>>>>>>");
     Console.WriteLine("Started data upload to Event Hub");
     var producer = new EventHubProducerClient(connectionString, eventHubName);

     try
     {
         var eventsToSend = new List<EventData>();

         for (var index = 0; index < Customers.Count; ++index)
         {
             var eventData = new EventData(Customers[index].ToString());
             eventsToSend.Add(eventData);
         }

         await producer.SendAsync(eventsToSend);
     }
     finally
     {
         await producer.CloseAsync();
     }
      Console.WriteLine("Data upload to Event Hub ended");
 }

Query the data in Event Hubs to ensure that data is transferred. You could do it through Data Explorer for your Event Hub Namespace.

or through Stream Analytics query window.

One interesting point to note is that the Orders array is stored as a string and this isnt desirable.

Lets try to push the data into Azure SQL DB. First I tried doing it using Transform and store data to SQL database option . This is a no code feature available to analyze and process the event hub data and push it across different sources.

I tested this feature by using the Expand transformation step in the flow to parse the Orders array in the JSON. But it only separates the individual elements from the array across rows in JSON format , which wasn’t helpful at all.

The only option left was to use Stream Analytics SQL and parse and split the Orders array from the JSON.

So I created a new Stream Analytics jobs and set up the Job Topology with Input as the Event Hub data and Output set to Azure SQL DB.

I then used the following query for data transformation.

 SELECT   
    i.id,
    i.firstname,
    i.lastname,
    i.Address,    
    i.Email,
    i.AboutMe,
    i.YearsOld ,
    OrderData.ArrayValue.OrderId as OrderId,
    OrderData.ArrayValue.CustomerId as CustomerId,
    OrderData.ArrayValue.ProductName as ProductName
    INTO
    [EventHubs-DB] --name of the Azure SQL database
   FROM [evenr-hub-1] i --name of the event hub 
CROSS APPLY GetArrayElements(Orders) AS OrderData

and it gave me the output that I was looking for.

This way, OrderId, CustomerId, and ProductName from the Orders array of the input JSON is parsed and flattened into separate rows and columns.

Out of curiosity I then decided to try the EventStream feature of Microsoft fabric to check if data from Azure Event hub gets perfectly replicated to event stream on fabric

and to my surprise the Orders array from the JSON input was completely missing.

I am not sure if this is a bug or if there is something that I missed while the configuring Azure Event hub with Fabric Eventstream.

An update on the above issue:

I deleted and recreated the Eventstream and the missing column Orders became available in the output.

So in the next step I went ahead ahead and created anEventHouse in Fabric

and then created a KQL database and ingested the data from the Azure EventHub to Fabric EventStream and then to the newly created KQL database through the Eventhouse

And once the data was available in the KQL database, I used the following KQL query to flatten the Orders JSON data.

Customer_Orders
|extend ExtProp=parse_json(Orders)
|mv-expand ExtProp
| project Id, FirstName,LastName,Address,Email,AboutMe,YearsOld,
  Orderid= ExtProp.OrderId,
  CustomerId=ExtProp.CustomerId,
  ProductName=ExtProp.ProductName

Conclusion

Through this blog I tried to demonstrate a very simple example for generating quick sample data to test the process and storing options using a combination of Azure Event Hub for data ingestion and Azure Stream Analytics for data processing in Azure and store that data on Azure SQL DB.

As it wasn’t possible to port the data from Stream Analytics directly into Fabric EventStream, the only option left was to pull data from Azure Eventhub into Fabric EventStream and stored that data into a KQL database through Fabric EventHouse.

I hope this article would get you started on how to set up the interaction across Azure and Fabric with respect to streaming data and the limitations you might face during the setup.

Thanks for reading !!!

0
Subscribe to my newsletter

Read articles from Sachin Nandanwar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Sachin Nandanwar
Sachin Nandanwar