Automate conversion of csv file to parquet and upload to a lake house table


Let's say you have one or more CSV files and you want to convert them to Parquet format and also upload them to a Lakehouse table. The available options for this in the Fabric environment are either through a notebook or a data pipeline but there are aren't any pre-built out-of-the-box solutions.
For instance, you might have an application that generates CSV files, and you want to upload the CSV data directly to the Lakehouse at that moment. One approach could be to make application store the CSV files in ADLS2 storage and use an event-based pipeline triggered by storage events to upload the data to the Lakehouse. However, what if storing the files on cloud storage isn't an option and the files will always be stored on a on prem storage ?
In this post, I've tried to provide an out-of-the-box solution that automates the conversion of CSV files to Parquet and upload them to a Lakehouse table. You might then ask, "Why convert to Parquet?" There’s no reason to do so, but just incase if you require the Parquet files to integrate the data with other applications, it can be useful.
In this solution I have used the following components/libraries.
CsvHelper : Amazing tool if you are dealing with csv files. It reads the csv records in no time. All you have to do is read the file through a file stream and send the stream to the CsvReader class of the library to return the data.
Example:
void Main()
{
using (var reader = new StreamReader("path\\to\\file.csv"))
using (var csv = new CsvReader(reader, CultureInfo.InvariantCulture))
{
var records = csv.GetRecords<MyClass>();
}
}
public class MyClass
{
public int Id {get; set;}
public string Name {get; set;}
}
Parquet .Net : A very versatile .Net library that provides out of box abilities to deal with parquet file formats with extremely fast serialization and deserialization capabilities. I have extensively used this library in my previous articles here and here.
ADLS GEN2 API : Provides the ability to interact with Azure Blob Storage through a file system interface.
Fabric REST API : An extensive set of API’s that can be interact, manipulate and automate Fabric objects and processes.
Approach
The approach used here is that we first fetch the csv data through CsvHelper library and convert it to List<T>. Then send List<T> to Parquet .Net for conversion to parquet format and serialize the output and return it as a stream. I used Memorystream to store the serialized data in parquet format. You can instead use FileStream if you require a physical parquet file generated.
I then used ADLS GEN2 API to push this stream as a file to Files folder on the lakehouse similar to what I did in my previous article. However, in that article I had a physical file available but in this instance I used memorystream to patch and flush the file to the lakehouse folder.
Once the file is available in the lakehouse, I then used Fabric API Load Table method to move the file to the table. I had penned down that process in my earlier article on Fabric REST API. You can find the details here.
Note : We require two different scopes for authentication. First one for ADLS GEN2 API and the second one for Fabric REST API.
You can watch the code walkthrough here.
Code
I have uploaded the code to Github. You can find the repository here.
To get started, add the following references to a new console application.
using CsvHelper;
using CsvHelper.Configuration;
using Parquet;
using Parquet.Serialization;
using Microsoft.Identity.Client;
using Newtonsoft.Json.Linq;
using Spectre.Console;
using System.Globalization;
using System.IO.Compression;
using System.Net.Http.Headers;
using System.Text;
using Microsoft.Extensions.Configuration;
Nuget packages for CsvHelper and Parquet.Net
dotnet add package CsvHelper --version 33.0.1
dotnet add package Parquet.Net --version 5.0.2
local.settings.json
{
"AllowedHosts": "*",
"ClientId": "Your Service Principal Id",
"WorkSpace": "Your Workspace",
"LakeHouse": "Your LakeHouse",
"FileLocation": "File location of your csv file including the filename",
"Table": "tablename",
"Directory": "Directory on lakehouse",
"Mode": "Append" // Options are Append Or Overwrite
}
Declare a bunch of variables
private static string RedirectURI = "http://localhost";
private static string[] scopes_s = new string[] { "https://storage.azure.com/.default" };
private static string[] scopes_f = new string[] { "https://api.fabric.microsoft.com/.default" };
private static string Authority = "https://login.microsoftonline.com/organizations";
private static readonly HttpClient client = new HttpClient();
private static MemoryStream DataStream = new();
private static string clientId = "";
private static string workSpace = "";
private static string workSpaceId = "";
private static string lakeHouse = "";
private static string lakeHouseId = "";
private static string directory = "";
private static string csvFilePath = "";
private static string tablename = "";
private static string filename = "";
private static string mode = "";
Create class attributes that align with the data
public class NycTaxiTableData
{
public string VendorID { get; set; }
public Int32 passenger_count { get; set; }
public double trip_distance { get; set; }
public double RatecodeID { get; set; }
public string store_and_fwd_flag { get; set; }
public string PULocationID { get; set; }
public string DOLocationID { get; set; }
public string payment_type { get; set; }
public double fare_amount { get; set; }
public double extra { get; set; }
public double mta_tax { get; set; }
public double tip_amount { get; set; }
public double tolls_amount { get; set; }
public double improvement_surcharge { get; set; }
public double total_amount { get; set; }
public double congestion_surcharge { get; set; }
public double airport_fee { get; set; }
}
Method to read local.settings.json
public static void ReadConfig()
{
var builder = new ConfigurationBuilder()
.AddJsonFile($"local.settings.json", true, true);
var config = builder.Build();
clientId = config["ClientId"];
workSpace = config["WorkSpace"];
lakeHouse = config["LakeHouse"];
csvFilePath = config["FileLocation"];
tablename = config["Table"];
directory = config["Directory"];
mode = config["Mode"];
}
Next, create a method ReturnAuthenticationResult
that uses MSAL to return the bearer token and cache it. Please note that the method takes scopes as an argument and based on the calling method it will return the corresponding bearer token.
public async static Task<AuthenticationResult> ReturnAuthenticationResult(string[] scopes)
{
string AccessToken;
PublicClientApplicationBuilder PublicClientAppBuilder =
PublicClientApplicationBuilder.Create(clientId)
.WithAuthority(Authority)
.WithCacheOptions(CacheOptions.EnableSharedCacheOptions)
.WithRedirectUri(RedirectURI);
IPublicClientApplication PublicClientApplication = PublicClientAppBuilder.Build();
var accounts = await PublicClientApplication.GetAccountsAsync();
AuthenticationResult result;
try
{
result = await PublicClientApplication.AcquireTokenSilent(scopes, accounts.First())
.ExecuteAsync()
.ConfigureAwait(false);
}
catch
{
result = await PublicClientApplication.AcquireTokenInteractive(scopes)
.ExecuteAsync()
.ConfigureAwait(false);
}
return result;
}
Create GET/POST/PUT http methods
public async static Task<string> PutAsync(string url, HttpContent content, string[] scopes)
{
AuthenticationResult result = await ReturnAuthenticationResult(scopes);
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.PutAsync(url, content);
try
{
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
public async static Task<string> GetAsync(string url, string[] scopes)
{
AuthenticationResult result = await ReturnAuthenticationResult(scopes);
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.GetAsync(url);
try
{
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
public async static Task<string> PostAsync(string url, HttpContent content, string[] scopes)
{
AuthenticationResult result = await ReturnAuthenticationResult(scopes);
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.PostAsync(url, content);
try
{
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
Fetch relevant workspaceid and lakehouseid based on the settings in local.settings.json using fabric REST API
public async static Task<string> GetWorkspaceId(string Workspaace)
{
string baseUrl = $"https://api.fabric.microsoft.com/v1/workspaces";
string response = await GetAsync(baseUrl, scopes_f);
JObject workspace_jobject = JObject.Parse(response);
JArray workspace_array = (JArray)workspace_jobject["value"];
foreach (JObject workspace_array_0 in workspace_array)
{
if (workspace_array_0["displayName"].ToString() == workSpace)
{
return workspace_array_0["id"].ToString();
}
}
return null;
}
public async static Task<string> GetLakeHouseId(string WorkspaceId)
{
string baseUrl = $"https://api.fabric.microsoft.com/v1/workspaces/{WorkspaceId}/lakehouses";
string response = await GetAsync(baseUrl, scopes_f);
JObject lakehouse_jobject = JObject.Parse(response);
JArray lakehouse_array = (JArray)lakehouse_jobject["value"];
foreach (JObject lakehouse_array_0 in lakehouse_array)
{
if (lakehouse_array_0["displayName"].ToString() == lakeHouse)
{
return lakehouse_array_0["id"].ToString();
}
}
return null;
}
Deserialize and convert csv file to parquet storage format and save it in Memorystream
public static async Task ConvertToParquetAsync(bool hasHeader, string delimiter)
{
try
{
using var reader = new StreamReader(csvFilePath);
var csvConfiguration = new CsvConfiguration(CultureInfo.InvariantCulture)
{
Delimiter = $"{delimiter}",
HasHeaderRecord = hasHeader
};
using (var csv = new CsvReader(reader, csvConfiguration))
{
csv.Read();
await ParquetSerializer.SerializeAsync(csv.GetRecords<NycTaxiTableData>().ToList(), DataStream, new ParquetSerializerOptions { CompressionMethod = CompressionMethod.Gzip, CompressionLevel = CompressionLevel.Fastest });
}
}
catch (Exception ex)
{
throw ex;
}
}
Next, send the memorystream to lakehouse and upload the stream as a parquet file. It occurs in three steps. First step is to PUT the file into the filesystem. Second step, append the stream through PATCH method with position 0 .Third step, Flush the stream again with PATCH method and position equal to stream length.
public async static Task<string> FileStreamSendAsync(Stream stream, string directory, string filename)
{
//Step 1
string RequestUri = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{directory}/{filename}?resource=file";
string jsonString = System.String.Empty;
var content = new StringContent(jsonString, Encoding.UTF8, "application/json");
var response = await PutAsync(RequestUri, content, scopes_s);
// Step 2
stream.Seek(0, SeekOrigin.Begin);
var content_s = new StreamContent(stream);
string url = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{directory}/{filename}?action=append&position=0";
var streamMessage = new HttpRequestMessage
{
Method = HttpMethod.Patch,
RequestUri = new Uri(url),
Content = content_s
};
HttpResponseMessage response_s = await client.SendAsync(streamMessage);
try
{
response_s.EnsureSuccessStatusCode();
await response_s.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response_s.Content.ReadAsStringAsync().Result);
return null;
}
//Step 3
if (response_s.IsSuccessStatusCode)
{
url = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{directory}/{filename}?action=flush&position={stream.Length}";
var flushMessage = new HttpRequestMessage
{
Method = HttpMethod.Patch,
RequestUri = new Uri(url)
};
response_s = await client.SendAsync(flushMessage);
response_s.EnsureSuccessStatusCode();
}
return null;
}
Next load the file into a lakehouse table
public static async Task LoadTable(string tablename)
{
string baseUrl = $"https://api.fabric.microsoft.com/v1/workspaces/{workSpaceId}/lakehouses/{lakeHouseId}/tables/{tablename}/load";
var jsonData = new Dictionary<string, object>
{
{ "relativePath",string.Concat(directory,"/",filename.Replace("csv", "parquet")) },
{ "pathType", "File" },
{ "mode", mode },
{ "recursive", false },
{ "formatOptions", new Dictionary<string, object>
{
{ "format", "Parquet" }
}
}
};
string jsonString = Newtonsoft.Json.JsonConvert.SerializeObject(jsonData);
var content = new StringContent(jsonString, Encoding.UTF8, "application/json");
string response = await PostAsync(baseUrl, content, scopes_f);
Console.WriteLine(response);
}
Finally the Main method that acts as a entry point to the application
async static Task Main(string[] args)
{
ReadConfig();
AnsiConsole.MarkupLine("");
AnsiConsole.MarkupLine($"Fetching workspaceId for workspace [Red]{workSpace}[/] and lakehouseId for lakehouse [Red]{lakeHouse}[/]");
AnsiConsole.MarkupLine("");
workSpaceId = await GetWorkspaceId(workSpace);
lakeHouseId = await GetLakeHouseId(workSpaceId);
AnsiConsole.MarkupLine($"WorkspaceId for [Red]{workSpace}[/] is [Blue]{workSpaceId}[/] and lakehouseId for [Red]{lakeHouse}[/] is [Blue]{lakeHouseId}[/]");
AnsiConsole.MarkupLine("");
AnsiConsole.MarkupLine("");
Thread.Sleep(1000);
AnsiConsole.MarkupLine($"Converting the csv file [Red]{filename}[/] to parquet");
AnsiConsole.MarkupLine("");
await ConvertToParquetAsync(false, ",");
AnsiConsole.MarkupLine($"Conversion completed");
AnsiConsole.MarkupLine("");
AnsiConsole.MarkupLine("");
AnsiConsole.MarkupLine($"Uploading the parquet file [Red]{filename.Replace("csv", "parquet")}[/] to lakehouse [Red]{lakeHouse}[/]");
AnsiConsole.MarkupLine("");
await FileStreamSendAsync(DataStream, directory, filename.Replace("csv", "parquet"));
Thread.Sleep(1000);
AnsiConsole.MarkupLine($"Upload to lakehouse completed");
AnsiConsole.MarkupLine("");
AnsiConsole.MarkupLine("");
AnsiConsole.MarkupLine($"Uploading the parquet file [Red]{filename.Replace("csv", "parquet")}[/] to table [Red]{tablename}[/]");
await LoadTable(tablename);
Thread.Sleep(1000);
AnsiConsole.MarkupLine($"Upload to table completed");
}
Code Walkthrough
Conclusion
The aim of this post was to highlight the out of box capabilities of uploading the data and automation of table generation based on the newly uploaded data.
Do let me know any feedback or comments. Thanks for reading !!!
Subscribe to my newsletter
Read articles from Sachin Nandanwar directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
