Integrating Livy API with Microsoft Fabric


Livy API is an open-source REST API that enables interaction with an Apache Spark cluster. It acts as a lightweight interface that allows developers to submit Spark jobs, manage their execution remotely through a RESTful interface and retrieve results programmatically without requiring direct access to the Spark cluster.
Image Credit :
https://livy.incubator.apache.org
But what has Lyvi API got to do anything with Microsoft Fabric ?
With Livy you can configure and submit Spark jobs written in .Net, Scala, Python, R, or Java and execute them on the Fabric tenant. Now this can come in handy incase you want an out of box interaction to the Spark environment without having the need to exclusively access the Fabric environment. For example submitting spark jobs from your web or mobile apps without the need of a Spark client.
Apart from submitting and running spark jobs from your web or mobile apps a use case that I can think of is a windows service or a Web API being able to periodically submit Spark jobs to perform a certain set of actions in the lakehouse.
Livy API acts like a job scheduler or executor through which you can submit, monitor and retrieve results of Spark jobs in Fabric. Think of it like a Livy API an interface through which you "submit jobs" to a Spark Cluster in a way similar to how SQL Server Agent manages and executes SQL jobs in SQL Server.
Fabric and Livy API
In Fabric, through the Livy API you can submit spark jobs for your Lakehouse. Fabric lakehouses has two types of endpoints : SQL analytics endpoints and Livy endpoints
and there are two types of Livy jobs : Session jobs and Batch jobs
In this article I will cover creation of session level Spark job though Azure Functions that uses Lyvi API’s.
The video of the demo walkthrough of the setup is posted here.
SetUp
We would create a Livy Spark job using Azure Functions, Microsoft Authentication Library (MSAL) and Service Principal.
To read more about Microsoft Authentication Library (MSAL) in Fabric you can refer to my article on the topic here and to read about Service Principal in Fabric please refer to my article here.
Incase if you aren’t aware of Azure Functions you can read about it in details here.
I am going to use a Service Principal created specific for the lakehouse workspace through which the Livy APIs will interact and create Spark jobs.
Incase if its confusing to understand what I mean when I say “service principal specific for the lakehouse workspace “ then please refer to my article here.
Basically, creating a workspace identity automatically creates a managed service principal in your Entra account associated with that workspace.
Next, note down the Application(Client)ID of the Service Principal.
Code
Create a new Azure Function application and create three classes that holds the structure of output data of the Livy job.
public class LivyClass
{
public string id { get; set; }
public string state { get; set; }
public Output Output { get; set; }
}
public class Output
{
public Data Data { get; set; }
}
public class Data
{
[JsonProperty("text/plain")]
public string TextPlain { get; set; }
}
LivyClass : This is the main class of a Livy job session response.
Output: This class contains the actual results of the job.
Data : This class represents the actual data returned by the Livy job or session.
The reason for creating two separate classes Output
and Data
is because the output object will have Data property in the final output of the job execution exposed by the Livy API.
Then we define a custom service class that will handle the GET
and POST
methods through MSAL.
To get started, first import the necessary namespaces
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Identity.Client;
using Newtonsoft.Json;
using System.Net.Http.Headers;
using System.Text;
Declare a bunch of variables in the Service class
public static string livyBaseUrl = "Livy endpoints";
private static string clientId = "Client Id of the Service Principal";
private static string RedirectURI = "http://localhost";
private static string Authority = "https://login.microsoftonline.com/organizations";
private static string[] scopes = new string[] { "https://api.fabric.microsoft.com/.default" };
//private static string[] scopes = new string[] { "https://api.fabric.microsoft.com/Lakehouse.Execute.All https://api.fabric.microsoft.com/Lakehouse.Read.All https://api.fabric.microsoft.com/Item.ReadWrite.All https://api.fabric.microsoft.com/Workspace.ReadWrite.All https://api.fabric.microsoft.com/Code.AccessStorage.All https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All https://api.fabric.microsoft.com/Code.AccessFabric.All" };
private static readonly HttpClient client = new HttpClient();
public HttpClient Client => client;
If you wish you can define granular scopes instead of .default
private static string[] scopes = new string[] { "https://api.fabric.microsoft.com/Lakehouse.Execute.All https://api.fabric.microsoft.com/Lakehouse.Read.All https://api.fabric.microsoft.com/Item.ReadWrite.All https://api.fabric.microsoft.com/Workspace.ReadWrite.All https://api.fabric.microsoft.com/Code.AccessStorage.All https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All https://api.fabric.microsoft.com/Code.AccessFabric.All" };
The Livy endpoints for the lakehouse are available here. We will be using the Session job connection string.
Define the custom GET method
public async static Task<string> GetAsync(string url)
{
string AccessToken;
PublicClientApplicationBuilder PublicClientAppBuilder =
PublicClientApplicationBuilder.Create(clientId)
.WithAuthority(Authority)
.WithCacheOptions(CacheOptions.EnableSharedCacheOptions)
.WithRedirectUri(RedirectURI);
IPublicClientApplication PublicClientApplication = PublicClientAppBuilder.Build();
var accounts = await PublicClientApplication.GetAccountsAsync();
AuthenticationResult result;
try
{
result = await PublicClientApplication.AcquireTokenSilent(scopes, accounts.First())
.ExecuteAsync()
.ConfigureAwait(false);
}
catch
{
result = await PublicClientApplication.AcquireTokenInteractive(scopes)
.ExecuteAsync()
.ConfigureAwait(false);
}
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.GetAsync(url);
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
I have used TokenCache so that there are not repeated prompts to fetch the access tokens. The prompt will appear only once at the start.
Define the custom POST method
public async static Task<string> PostAsync(string url, HttpContent content)
{
string AccessToken;
PublicClientApplicationBuilder PublicClientAppBuilder =
PublicClientApplicationBuilder.Create(clientId)
.WithAuthority(Authority)
.WithCacheOptions(CacheOptions.EnableSharedCacheOptions)
.WithRedirectUri(RedirectURI);
IPublicClientApplication PublicClientApplication = PublicClientAppBuilder.Build();
var accounts = await PublicClientApplication.GetAccountsAsync();
AuthenticationResult result;
try
{
result = await PublicClientApplication.AcquireTokenSilent(scopes, accounts.First())
.ExecuteAsync()
.ConfigureAwait(false);
}
catch
{
result = await PublicClientApplication.AcquireTokenInteractive(scopes)
.ExecuteAsync()
.ConfigureAwait(false);
}
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.PostAsync(url, content);
try
{
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
Next create a method in the Service class to generate a Livy Session.
public static async Task<string> CreateLivySession()
{
var content = new StringContent("{}", Encoding.UTF8, "application/json");
var createLivySessionResponse = await Service.PostAsync(livyBaseUrl, content);
return createLivySessionResponse;
}
Method to terminate the job session once its execution is successful
public static async Task CloseLivySession(string sessionId)
{
HttpResponseMessage result = await client.DeleteAsync(livyBaseUrl + "/" + sessionId);
if (result.IsSuccessStatusCode)
{}
else
{throw new Exception("Livy close session failed");}
}
The complete Service class would look like this
public class Service
{
public static string livyBaseUrl = "Livy endpoints";
private static string clientId = "Client Id of the Service Principal";
private static string RedirectURI = "http://localhost";
private static string Authority = "https://login.microsoftonline.com/organizations";
private static string[] scopes = new string[] { "https://api.fabric.microsoft.com/.default" };
//private static string[] scopes = new string[] { "https://api.fabric.microsoft.com/Lakehouse.Execute.All https://api.fabric.microsoft.com/Lakehouse.Read.All https://api.fabric.microsoft.com/Item.ReadWrite.All https://api.fabric.microsoft.com/Workspace.ReadWrite.All https://api.fabric.microsoft.com/Code.AccessStorage.All https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All https://api.fabric.microsoft.com/Code.AccessFabric.All" };
private static readonly HttpClient client = new HttpClient();
public HttpClient Client => client;
public async static Task<string> PostAsync(string url, HttpContent content)
{
string AccessToken;
PublicClientApplicationBuilder PublicClientAppBuilder =
PublicClientApplicationBuilder.Create(clientId)
.WithAuthority(Authority)
.WithCacheOptions(CacheOptions.EnableSharedCacheOptions)
.WithRedirectUri(RedirectURI);
IPublicClientApplication PublicClientApplication = PublicClientAppBuilder.Build();
var accounts = await PublicClientApplication.GetAccountsAsync();
AuthenticationResult result;
try
{
result = await PublicClientApplication.AcquireTokenSilent(scopes, accounts.First())
.ExecuteAsync()
.ConfigureAwait(false);
}
catch
{
result = await PublicClientApplication.AcquireTokenInteractive(scopes)
.ExecuteAsync()
.ConfigureAwait(false);
}
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.PostAsync(url, content);
try
{
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
public async static Task<string> GetAsync(string url)
{
string AccessToken;
PublicClientApplicationBuilder PublicClientAppBuilder =
PublicClientApplicationBuilder.Create(clientId)
.WithAuthority(Authority)
.WithCacheOptions(CacheOptions.EnableSharedCacheOptions)
.WithRedirectUri(RedirectURI);
IPublicClientApplication PublicClientApplication = PublicClientAppBuilder.Build();
var accounts = await PublicClientApplication.GetAccountsAsync();
AuthenticationResult result;
try
{
result = await PublicClientApplication.AcquireTokenSilent(scopes, accounts.First())
.ExecuteAsync()
.ConfigureAwait(false);
}
catch
{
result = await PublicClientApplication.AcquireTokenInteractive(scopes)
.ExecuteAsync()
.ConfigureAwait(false);
}
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.GetAsync(url);
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
public static async Task<string> CreateLivySession()
{
var content = new StringContent("{}", Encoding.UTF8, "application/json");
var createLivySessionResponse = await Service.PostAsync(livyBaseUrl, content);
return createLivySessionResponse;
}
public static async Task CloseLivySession(string sessionId)
{
HttpResponseMessage result = await client.DeleteAsync(livyBaseUrl + "/" + sessionId);
if (result.IsSuccessStatusCode)
{}
else
{throw new Exception("Livy close session failed");}
}
}
The RUN
method of the Azure function.
string createLivySessionResponse = await Service.CreateLivySession();
var obj = JsonConvert.DeserializeObject<LivyClass>(createLivySessionResponse);
string livy_session_id = obj.id;
string livy_session_url = Service.livyBaseUrl + "/" + livy_session_id;
string execute_statement = livy_session_url + "/statements";
var payload = new
{
code = "spark.sql(\"SELECT * FROM sales_data where Quantity = 6\").show()",
kind = "spark"
};
string jsonString = JsonConvert.SerializeObject(payload);
StringContent content = new StringContent(jsonString, Encoding.UTF8, "application/json");
string execute_statement_response = "";
while (true) //Wait till the session comes out of idle state
{
try
{
execute_statement_response = await Service.PostAsync(execute_statement, content);
}
catch
{}
if (execute_statement_response == null)
{
execute_statement_response = await Service.PostAsync(execute_statement, content);
}
else
{break;}
}
var obj_response_status = JsonConvert.DeserializeObject<LivyClass>(execute_statement_response);
string query_id = obj_response_status.id;
var TextPlain = new LivyClass();
while (true) //Wait till the job status becomes available
{
if (obj_response_status.state != "available")
{
Thread.Sleep(5000);
string get_statement = livy_session_url + "/statements/" + query_id;
string get_statement_response = await Service.GetAsync(get_statement);
obj_response_status = JsonConvert.DeserializeObject<LivyClass>(get_statement_response);
TextPlain = JsonConvert.DeserializeObject<LivyClass>(get_statement_response);
}
else
{ break; }
}
Service.CloseLivySession(livy_session_id).GetAwaiter().GetResult();
return new ContentResult
{
Content = TextPlain.Output.Data.TextPlain,
ContentType = "text/plain",
StatusCode = 200
};
}
The above code would first create a Lyvi session and then submit a SQL query that fetches the data from a table in the lakehouse as a Spark job in form of a paylod.
var payload = new
{
code = "spark.sql(\"SELECT * FROM sales_data where Quantity = 6\").show()",
kind = "spark"
};
and once submitted it would run in a while loop to check the status of Lyvi session and the Spark job. It would stop the thread execution for about five seconds in the loop and recheck the status till the status becomes “available”.
if (obj_response_status.state != "available")
{
Thread.Sleep(5000);
string get_statement = livy_session_url + "/statements/" + query_id;
string get_statement_response = await Service.GetAsync(get_statement);
obj_response_status = JsonConvert.DeserializeObject<LivyClass>(get_statement_response);
TextPlain = JsonConvert.DeserializeObject<LivyClass>(get_statement_response);
}
You can change the value of thread sleep to any value you wish based on the load.
Once done the Spark job gets created on the Fabric tenant.
The result set of the job execution is published in the browser.
Here is the complete code
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Identity.Client;
using Newtonsoft.Json;
using System.Net.Http.Headers;
using System.Text;
namespace FunctionApp_Lyvi
{
public class ExecuteLyviJob
{
private readonly ILogger<ExecuteLyviJob> _logger;
public ExecuteLyviJob(ILogger<ExecuteLyviJob> logger)
{
_logger = logger;
}
[Function("ExecuteLyviJob")]
public static async Task<ContentResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req, ILogger log)
{
string createLivySessionResponse = await Service.CreateLivySession();
var obj = JsonConvert.DeserializeObject<LivyClass>(createLivySessionResponse);
string livy_session_id = obj.id;
string livy_session_url = Service.livyBaseUrl + "/" + livy_session_id;
string execute_statement = livy_session_url + "/statements";
var payload = new
{
code = "spark.sql(\"SELECT * FROM sales_data where Quantity = 6\").show()",
kind = "spark"
};
string jsonString = JsonConvert.SerializeObject(payload);
StringContent content = new StringContent(jsonString, Encoding.UTF8, "application/json");
string execute_statement_response = "";
while (true) //Wait till the session comes out of idle state
{
try
{
execute_statement_response = await Service.PostAsync(execute_statement, content);
}
catch
{ }
if (execute_statement_response == null)
{
execute_statement_response = await Service.PostAsync(execute_statement, content);
}
else
{ break; }
}
var obj_response_status = JsonConvert.DeserializeObject<LivyClass>(execute_statement_response);
string query_id = obj_response_status.id;
var TextPlain = new LivyClass();
while (true) //Wait till the job status becomes available
{
if (obj_response_status.state != "available")
{
Thread.Sleep(5000);
string get_statement = livy_session_url + "/statements/" + query_id;
string get_statement_response = await Service.GetAsync(get_statement);
obj_response_status = JsonConvert.DeserializeObject<LivyClass>(get_statement_response);
TextPlain = JsonConvert.DeserializeObject<LivyClass>(get_statement_response);
}
else
{ break; }
}
Service.CloseLivySession(livy_session_id).GetAwaiter().GetResult();
return new ContentResult
{
Content = TextPlain.Output.Data.TextPlain,
ContentType = "text/plain",
StatusCode = 200
};
}
}
public class Service
{
public static string livyBaseUrl = "Livy endpoints";
private static string clientId = "Client Id of the Service Principal";
private static string RedirectURI = "http://localhost";
private static string Authority = "https://login.microsoftonline.com/organizations";
private static string[] scopes = new string[] { "https://api.fabric.microsoft.com/.default" };
//private static string[] scopes = new string[] { "https://api.fabric.microsoft.com/Lakehouse.Execute.All https://api.fabric.microsoft.com/Lakehouse.Read.All https://api.fabric.microsoft.com/Item.ReadWrite.All https://api.fabric.microsoft.com/Workspace.ReadWrite.All https://api.fabric.microsoft.com/Code.AccessStorage.All https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All https://api.fabric.microsoft.com/Code.AccessFabric.All" };
private static readonly HttpClient client = new HttpClient();
public HttpClient Client => client;
public async static Task<string> PostAsync(string url, HttpContent content)
{
string AccessToken;
PublicClientApplicationBuilder PublicClientAppBuilder =
PublicClientApplicationBuilder.Create(clientId)
.WithAuthority(Authority)
.WithCacheOptions(CacheOptions.EnableSharedCacheOptions)
.WithRedirectUri(RedirectURI);
IPublicClientApplication PublicClientApplication = PublicClientAppBuilder.Build();
var accounts = await PublicClientApplication.GetAccountsAsync();
AuthenticationResult result;
try
{
result = await PublicClientApplication.AcquireTokenSilent(scopes, accounts.First())
.ExecuteAsync()
.ConfigureAwait(false);
}
catch
{
result = await PublicClientApplication.AcquireTokenInteractive(scopes)
.ExecuteAsync()
.ConfigureAwait(false);
}
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.PostAsync(url, content);
try
{
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
public async static Task<string> GetAsync(string url)
{
string AccessToken;
PublicClientApplicationBuilder PublicClientAppBuilder =
PublicClientApplicationBuilder.Create(clientId)
.WithAuthority(Authority)
.WithCacheOptions(CacheOptions.EnableSharedCacheOptions)
.WithRedirectUri(RedirectURI);
IPublicClientApplication PublicClientApplication = PublicClientAppBuilder.Build();
var accounts = await PublicClientApplication.GetAccountsAsync();
AuthenticationResult result;
try
{
result = await PublicClientApplication.AcquireTokenSilent(scopes, accounts.First())
.ExecuteAsync()
.ConfigureAwait(false);
}
catch
{
result = await PublicClientApplication.AcquireTokenInteractive(scopes)
.ExecuteAsync()
.ConfigureAwait(false);
}
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.GetAsync(url);
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
public static async Task<string> CreateLivySession()
{
var content = new StringContent("{}", Encoding.UTF8, "application/json");
var createLivySessionResponse = await Service.PostAsync(livyBaseUrl, content);
return createLivySessionResponse;
}
public static async Task CloseLivySession(string sessionId)
{
HttpResponseMessage result = await client.DeleteAsync(livyBaseUrl + "/" + sessionId);
if (result.IsSuccessStatusCode)
{}
else
{throw new Exception("Livy close session failed");}
}
}
public class LivyClass
{
public string id { get; set; }
public string state { get; set; }
public Output Output { get; set; }
}
public class Output
{
public Data Data { get; set; }
}
public class Data
{
[JsonProperty("text/plain")]
public string TextPlain { get; set; }
}
}
Demo Walk Through
Conclusion
As we saw, Livy allows remote applications to run Spark jobs without directly interacting with the Spark cluster on RESTful interface. Its a great approach to simplify the process of integrating Spark with other applications. And its availability in Microsoft Fabric makes it an ideal approach to remotely execute Spark jobs
Thanks for reading !!!
Subscribe to my newsletter
Read articles from Sachin Nandanwar directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
