Expose your Microsoft Fabric Lakehouse data through REST API


As many of you may know, Microsoft Fabric offers a GraphQL API to access data from Warehouse, Fabric Lakehouse through SQL Analytics and Fabric SQL databases. However it lacks a built-in feature to expose data as a REST API.
Imagine you have an existing application designed to consume REST APIs and you need to pull data from a lakehouse. The only option available is to use GraphQL but this would require refactoring the application and rewriting existing logic to make it compatible with GraphQL—a process that is both time-consuming and labor-intensive.
In this article I will show how to use a Azure Function that can be hosted as an API to query your lakehouse data. Some months ago I had posted an article on somewhat related topic wherein I designed a process to move data from CosmosDB to a lakehouse wherein the CosmosDB data was exposed through REST API and using Fabric data pipeline insert the REST API data into the lakehouse. You can check that article here.
The idea in this article is that we will create separate Azure Function methods for each table in the lakehouse and then send values through route parameters that filters the data in the lakehouse. I created individual GET methods to fetch the data from all the tables of a given lakehouse and developed this as an Azure Function. You can deploy them as Azure Function on Azure or convert them to API and deploy it as an Azure API.
I used Parquet.Net to iterate across individual Rowgroups existing in the lakehouse table. For example for a very large table it is very much possible that a given value spans across multiple Rowgroups. So the code should be versatile enough to fetch a given value that exist across all the RowGroups.
You can watch the code walkthrough here .
SetUp
The lakehouse that I used in this code has three tables :
Row Counts of each table :
category_data
: ~ 100K rows
nyc_taxi
: ~ 20 million rows
trip_data
: ~ 3 million rows
category_data
has ID, Name, Age
and Category
columns and is partitioned on Age
column and suffers from small file problem.
Every partition in category_data
table has at least 3 parquet files and there are about 165 such parquet files. In this case as well the code the should be able to search for a given value across all the partitions and the parquet files for the table.
Every individual lakehouse table has its own routing URL with values being passed to it to filter the required data from the tables. These are the sample routes for the three lakehouse tables.
Sample values :
http://localhost:7254/api/Id/9950&Category/Category_17
http://localhost:7254/api/hvfhs_license_num/HV0005&PULocationID/127
http://localhost:7254/api/VendorId/2&TripDistance/5.00
Code
Create a new Azure function and in the local.settings.json
of the project add the following
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated"
},
"AllowedHosts": "*",
"ClientId": "Service Principal Id",
"WorkSpace": "Your Workspace",
"LakeHouse": "Your Lakehouse"
}
Main.cs (Comments for each method are added in the code )
//Required references.The only third party library we need is that of Parquet.Net
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Identity.Client;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Parquet;
using Parquet.Serialization;
using System.Net.Http.Headers;
namespace ParquetData
{
public class Main
{
private static string RedirectURI = "http://localhost";
private static string clientId = "";
private static string[] scopes = new string[] { "https://storage.azure.com/.default" };
private static string Authority = "https://login.microsoftonline.com/organizations";
private static readonly HttpClient client = new HttpClient();
private static string workSpace = "";
private static string lakeHouse = "";
private static string table = "";
public HttpClient Client => client;
private readonly ILogger<Main> _logger;
private static string dfsendpoint;
private static string FileName;
private static string FolderName;
// IList and List Objects for the nyc_taxi table
private static IList<NycTableData> Nyc_Data;
private static List<NycTableData> Nyc_filteredata = new();
// IList and List Objects for the category_data table
private static IList<CategoryTableData> Category_Data;
private static List<CategoryTableData> Category_filteredata = new();
// IList and List Objects for the trip_data table
private static IList<TripTableData> Trip_Data;
private static List<TripTableData> Trip_filteredata = new();
public Main(ILogger<Main> logger)
{
_logger = logger;
}
//Read localsettings.json
public static void ReadConfig()
{
var builder = new ConfigurationBuilder()
.AddJsonFile($"local.settings.json", true, true);
var config = builder.Build();
clientId = config["ClientId"];
workSpace = config["WorkSpace"];
lakeHouse = config["LakeHouse"];
}
//Helper function to get the filename from the response returned by the ADLs GEN2 API
public static async Task<string> GetFileName(string value)
{
int lastSlashIndex_n = value.ToString().LastIndexOf('/');
string dir_n = value.ToString().Substring(lastSlashIndex_n + 1);
return dir_n;
}
//Helper function to get the foldername from the response returned by the ADLs GEN2 API
public static async Task<string> GetFolderName(string value)
{
int startIndex = value.IndexOf("Tables/") + "Tables/".Length;
int endIndex = value.IndexOf("/part");
string dir_n = value.Substring(startIndex, endIndex - startIndex);
return dir_n;
}
// Route function for nyc_taxi table
[Function("Nyc_Taxi_Data")]
public static async Task<OkObjectResult> Get_Nyc_Tax_Data([HttpTrigger(AuthorizationLevel.Function, "get", Route = "hvfhs_license_num/{hvfhs_license_num}&PULocationID/{PULocationID}")] HttpRequest req, string hvfhs_license_num, Int32? PULocationId)
{
ReadConfig();
table = "nyc_taxi";
client.Timeout = TimeSpan.FromMilliseconds(-1);
dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/Tables/{table}/?resource=filesystem&recursive=false";
NycTableData.req_hvfhs_license_num = hvfhs_license_num;
NycTableData.req_PULocationID = PULocationId;
var obj = new OkObjectResult(await TraverseTable(dfsendpoint));
return obj;
}
// Route function for category_data table
[Function("Category_Data")]
public static async Task<OkObjectResult> Get_Category_Data([HttpTrigger(AuthorizationLevel.Function, "get", Route = "Id/{Id}&Category/{Category}")] HttpRequest req, Int32? ID, string Category)
{
ReadConfig();
table = "category_data";
client.Timeout = TimeSpan.FromMilliseconds(-1);
CategoryTableData.req_ID = ID;
CategoryTableData.req_Category = Category;
dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/Tables/{table}/?resource=filesystem&recursive=false";
var obj = new OkObjectResult(await TraverseTable(dfsendpoint));
return obj;
}
// Route function for trip_data table
[Function("Trip_Data")]
public static async Task<OkObjectResult> Get_Trip_Data([HttpTrigger(AuthorizationLevel.Function, "get", Route = "VendorId/{VendorID}&TripDistance/{TripDistance}")] HttpRequest req, Int32? VendorID, double TripDistance)
{
ReadConfig();
table = "trip_data";
client.Timeout = TimeSpan.FromMilliseconds(-1);
TripTableData.req_VendorID = VendorID;
TripTableData.req_trip_distance = TripDistance;
dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/Tables/{table}/?resource=filesystem&recursive=false";
var obj = new OkObjectResult(await TraverseTable(dfsendpoint));
return obj;
}
// Get a list of files and folders at root of the table
public static async Task<string> TraverseTable(string dfsendpoint)
{
string response = await GetAsync(dfsendpoint);
JObject jsonObject_dir = JObject.Parse(response);
JArray dirArray = (JArray)jsonObject_dir["paths"];
string rs = null;
foreach (JObject dir in dirArray)
{
if (!dir["name"].ToString().Contains("_delta_log"))
{
rs = await TraverseAllFilesInTable(await GetFileName(dir["name"].ToString()));
}
}
return rs;
}
// Get a response for every file across all folders that exist for a table and return
// the parquet data in a serialized format.This is done through ReadParquetFile function
// that is called from within this function
public static async Task<string> TraverseAllFilesInTable(string file)
{
string responsename;
dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.LakeHouse/Tables/{table}/{file}?resource=filesystem&recursive=true";
responsename = await GetAsync(dfsendpoint);
string res = null;
if (responsename != "")
{
JObject jsonObject = JObject.Parse(responsename);
JArray pathsArray = (JArray)jsonObject["paths"];
foreach (JObject path_ in pathsArray)
{
if (path_["name"].ToString().Contains("parquet") && path_["IsDirectory"] == null)
{
FolderName = await GetFolderName(path_["name"].ToString());
FileName = await GetFileName(path_["name"].ToString());
var downloadMessage = new HttpRequestMessage
{
Method = HttpMethod.Get,
RequestUri = new Uri($"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.LakeHouse/Tables/{FolderName}/{FileName}")
};
var response = await SendAsync(downloadMessage);
res = await ReadParquetFile(response);
}
}
return res;
}
else
{
return null;
}
}
//Read the parquet data across every rowgroup of each file for a table and return the data in a serialized format.
public static async Task<string> ReadParquetFile(Stream stream)
{
ParquetReader parquetReader = await ParquetReader.CreateAsync(stream);//, options);
for (int rowGroupIndex = 0; rowGroupIndex < parquetReader.RowGroupCount; rowGroupIndex++)
{
using (ParquetRowGroupReader reader = parquetReader.OpenRowGroupReader(rowGroupIndex))
{
if (table == "nyc_taxi")
{
Nyc_Data = await ParquetSerializer.DeserializeAsync<NycTableData>(stream, rowGroupIndex);
var filteredWithIndex = Nyc_Data
.Where((details) => details.hvfhs_license_num == NycTableData.req_hvfhs_license_num && details.PULocationID == NycTableData.req_PULocationID);
Nyc_filteredata.AddRange(filteredWithIndex);
}
if (table == "category_data")
{
Category_Data = await ParquetSerializer.DeserializeAsync<CategoryTableData>(stream, rowGroupIndex);
var filteredWithIndex = Category_Data
.Where((details) => (details.ID == CategoryTableData.req_ID && details.Category == CategoryTableData.req_Category));
Category_filteredata.AddRange(filteredWithIndex);
}
if (table == "trip_data")
{
Trip_Data = await ParquetSerializer.DeserializeAsync<TripTableData>(stream, rowGroupIndex);
var filteredWithIndex = Trip_Data
.Where((details) => details.VendorID == TripTableData.req_VendorID && details.trip_distance == TripTableData.req_trip_distance);
Trip_filteredata.AddRange(filteredWithIndex);
}
}
}
if (table == "nyc_taxi") { return JsonConvert.SerializeObject(Nyc_filteredata, Newtonsoft.Json.Formatting.Indented); }
if (table == "category_data") { return JsonConvert.SerializeObject(Category_filteredata, Newtonsoft.Json.Formatting.Indented); }
if (table == "trip_data") { return JsonConvert.SerializeObject(Trip_filteredata, Newtonsoft.Json.Formatting.Indented); }
return null;
}
//Http SendAsync function
public async static Task<Stream> SendAsync(HttpRequestMessage httprequestMessage)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.SendAsync(httprequestMessage);
response.EnsureSuccessStatusCode();
try
{
return await response.Content.ReadAsStreamAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStreamAsync().Result);
return null;
}
}
//Http Get Async function
public async static Task<string> GetAsync(string url)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.GetAsync(url);
try
{
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
// Authentication function
public async static Task<AuthenticationResult> ReturnAuthenticationResult()
{
string AccessToken;
PublicClientApplicationBuilder PublicClientAppBuilder =
PublicClientApplicationBuilder.Create(clientId)
.WithAuthority(Authority)
.WithCacheOptions(CacheOptions.EnableSharedCacheOptions)
.WithRedirectUri(RedirectURI);
IPublicClientApplication PublicClientApplication = PublicClientAppBuilder.Build();
var accounts = await PublicClientApplication.GetAccountsAsync();
AuthenticationResult result;
try
{
result = await PublicClientApplication.AcquireTokenSilent(scopes, accounts.First())
.ExecuteAsync()
.ConfigureAwait(false);
}
catch
{
result = await PublicClientApplication.AcquireTokenInteractive(scopes)
.ExecuteAsync()
.ConfigureAwait(false);
}
return result;
}
//Properties for nyc_data table
public class NycTableData
{
public static string req_hvfhs_license_num { get; set; }
public static Int32? req_PULocationID { get; set; }
public string hvfhs_license_num { get; set; }
public Int32? PULocationID { get; set; }
public DateTime? request_datetime { get; set; }
public Int32? DOLocationID { get; set; }
public double? trip_miles { get; set; }
}
//Properties for category_data table
public class CategoryTableData
{
public static Int32? req_ID { get; set; }
public static string req_Category { get; set; }
public Int32? ID { get; set; }
public string Name { get; set; }
public string Category { get; set; }
}
//Properties for trip_data table
public class TripTableData
{
public static Int32? req_VendorID { get; set; }
public static double? req_trip_distance { get; set; }
public Int32? VendorID { get; set; }
public double? trip_distance { get; set; }
public DateTime? tpep_pickup_datetime { get; set; }
public DateTime? tpep_dropoff_datetime { get; set; }
}
}
}
Note : Please note that in this example performance is NOT a priority as the focus is centered around the exposing the lakehouse data through REST API. The performance can possibly be improved by leveraging Azure Redis cache.
I have created a detailed video on the step by step of the process. In this video I tested the process on the Trip_Data
table that has approximately 3 millions rows.
In the below writeup I have detailed the overall performance on the table nyc_data
tables with a high level insights into the response time across each method and the process involved.
Just to shed some insight on the performance of the process, I tested the code to filter a given value on the biggest table in the lakehouse i.e nyc_data
that holds about 20 million rows. The values queried were hvfhs_license_num
\=HV0005
and PULocationID
\=127
http://localhost:7254/api/hvfhs_license_num/HV0005&PULocationID/127
Following are some screenshots :
First step was to utilize the ADLS GEN2 API to fetch a response from every parquet file of the table and across every partition (if exists) of the table.
Notice the red box on the right side of the image. Fetching a response from a table of 20 million rows took about 137 seconds which is 2:30 minutes. This is the time consuming process and and if we use some type of caching mechanism(Redis or memcached) the duration can be reduced dramatically.
Deserialization is the next step that took about 24 seconds .The first rowgroup has about 15,310,000 rows .The results are saved in a IList<T> Nyc_Data
.
Check the green box at the bottom of the screenshot for the count and the current row group in operation
Please note I have used Parquet.Net in built Deserialization method and not a custom one. Details are here.
Filtering the data from the the Ilist<T> was quite fast. Took only 1 ms.I used LINQ Method type to filter the required data from Ilist<T>.
Adding the filtered output to list Nyc_filteredata
took a little above one second. The filtered record count for this rowgroup is 13,170.
The process then moves to the next rowgroup .The second rowgroup holds 38,18,392 rows and the Deserialization took about ten seconds.
Filtering the data took one ms.
Adding the filtered data to a List object took about three seconds .This rowgroup contains 16,558 qualifying rows for our predicate.
and returning back the final output took about two seconds
In total we have 29,728 qualifying rows ( 13,170 rows from the first row group and 16,558 from second row group ).
Over all , the entire process for a lakehouse table of about 20 million rows took little over three minutes with bulk of time taken for returning a response from the parquet file in the table through ADLS GEN2 API. As mentioned earlier this process can be heavily optimized by using cache mechanism so that the process does not always have to get a response using the ADLS GEN2 API.
Conclusion
Through this article I tried to demonstrate a very basic method by which the Lakehouse data can be exposed through a API. Though not very optimal it does provides necessary insights on different components and artifacts and the underlying challenges faced.
Thanks for reading !!!
Subscribe to my newsletter
Read articles from Sachin Nandanwar directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
