Copy Data from Cosmos DB to Microsoft Fabric Lakehouse

Cosmos DB offers a robust set of REST APIs that work well for general data operations. I wondered if it would be possible to push data from Cosmos DB into a Fabric Lakehouse. In Microsoft fabric Data Factory I came across a Cosmos DB connector for Fabric's data pipeline but it came with a "warning" that it’s a third-party service.

I also realized that directly referencing the native Cosmos DB REST APIs within the Fabric data pipeline doesn’t seem straightforward. This led me to think about creating a custom REST API for Cosmos DB which could then essentially be used in the Fabric data pipeline to copy data into a Lakehouse table. Essentially this custom API would serve as a wrapper for Cosmos DB’s underlying data operations.

To explore this I decided to experiment with a simple schema for CustomerDetails that includes the following fields:

  • id

  • CustomerName

  • CustomerDOB

  • CustomerGender

  • CustomerRegion (acting as the partition key)

Setup

To get started first create an Azure function. It would be easier to expose the data in Fabric data pipeline as an Azure function or as an API hosted on Azure .

Add the necessary references to the Azure function

using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;

Then create a method for creation of Cosmos DB database and container. The details on how it is done can be found here.

In the next step create a method to return Cosmos DB container object.

   private Container ContainerClient()
   {
       CosmosClient cosmosClient = new CosmosClient("CosmosdbURI", "CosDBKey");
       Container container = cosmosbClient.GetContainer("CosmosDbName", "CosmosDbContainerName");
       return container;
   }

Next create a POCO for customer and a CosmosDB record object with similar properties

    public class CustomerClass
    {
        public string id { get; set; }
        public string CustomeName { get; set; }
        public DateTime CustomerDOB { get; set; }
        public string CustomerGender { get; set; }
        public string CustomerRegion { get; set; }

    };

    public record CustomerRecord(
    string id,
    string CustomeName,
    DateTime CustomerDOB,
    string CustomerGender,
    string CustomerRegion
);

First create a POST method to insert data into Cosmos DB.

public static async Task<IActionResult> CreateCustomer(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "customer")] HttpRequest req,
    ILogger log)
{
    Microsoft.Azure.Cosmos.Container container = await ContainerClient();
    string post = await new StreamReader(req.Body).ReadToEndAsync();
    CustomerClass postedData = JsonConvert.DeserializeObject<CustomerClass>(post);

    CustomerRecord item = new(
     id: postedData.id,
     CustomeName: postedData.CustomeName,
     CustomerDOB: Convert.ToDateTime(postedData.CustomerDOB),
     CustomerGender: postedData.CustomerGender,
     CustomerRegion: postedData.CustomerRegion
    );
    CustomerRecord createdItem = await container.CreateItemAsync<CustomerRecord>(
                                   item: item,
                                   partitionKey: new PartitionKey(postedData.CustomerRegion)
                                    );
      return new OkObjectResult(postedData);
}

In this method, the body of the incoming HTTP request is synchronously using a StreamReader and converts it into a string. The JSON from the request body is then deserialized into an object of type CustomerClass using JsonConvert.DeserializeObject. A new instance of CustomerRecord is created and populated with data from the postedData object.

Fields like CustomerName,CustomerGender,CustomerDOB and CustomerRegion, are assigned directly from the deserialized postedData and are then asynchronously inserted as a new CustomerRecord object into the Cosmos DB container.The partitionKey parameter in Cosmos DB is set to the CustomerRegion

Now lets test the function through POSTMAN.

Next create two functions GetCustomer and GetAllCustomers to retrieve a single customer record and records of all customers respectively.

  [FunctionName("GetCustomer")]
  public static async Task<IActionResult> GetCustomer(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "customer/{id}")] HttpRequest req,
    ILogger log, string id)

  {
      Microsoft.Azure.Cosmos.Container container = await Createconnection();
      var parameterizedQuery = new QueryDefinition(query: "SELECT * FROM CustomerContainer c WHERE c.id = @id")
                                  .WithParameter("@id", id);

      List<CustomerClass> customers = new List<CustomerClass>();
      using FeedIterator<CustomerClass> filteredFeed = container.GetItemQueryIterator<CustomerClass>(
          queryDefinition: parameterizedQuery
      );

      while (filteredFeed.HasMoreResults)
      {
          FeedResponse<CustomerClass> response = await filteredFeed.ReadNextAsync();

          foreach (CustomerClass item in response)
          {
              customers.Add(item);
          }
      }
      return new OkObjectResult(JsonConvert.SerializeObject(customers, Formatting.Indented));
  }

  [FunctionName("GetAllCustomers")]
  public static async Task<IActionResult> GetAllCustomers(
   [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
   ILogger log)

  {
      Microsoft.Azure.Cosmos.Container container = await Createconnection();
      var parameterizedQuery = new QueryDefinition(query: "SELECT * FROM c");

      List<CustomerClass> customers = new List<CustomerClass>();

      using FeedIterator<CustomerClass> filteredFeed = container.GetItemQueryIterator<CustomerClass>(
          queryDefinition: parameterizedQuery
      );

      while (filteredFeed.HasMoreResults)
      {
          FeedResponse<CustomerClass> response = await filteredFeed.ReadNextAsync();
          foreach (CustomerClass item in response)
          {
              customers.Add(item);
          }
      }
      return new OkObjectResult(JsonConvert.SerializeObject(customers, Formatting.Indented));
  }

Test the functions through POSTMAN

Azure functions can be exposed as API as well if required. It can provide an additional level of abstraction and security. The screenshot below is the API I created for the above Azure function.

Now that everything is set , it's time to configure the Fabric data pipeline that will insert customers from the Cosmos DB container into the Lakehouse table.

Start by adding a Copy Data activity in Fabric Data Factory canvas. In the source settings, configure the connection to point to the Azure Function or API, as applicable and set the Relative URL to the Azure Function/API method that returns the customer list, which in this case is GetAllCustomers.

Set the mappings between the source and destination.

and then point the destination to the Lakehouse table. Once everything is configured execute the pipeline and you'll see the Cosmos DB data successfully copied into the Lakehouse table.

Conclusion

This approach seems to ensure seamless integration between your data sources on Cosmos DB and the Fabric environment. Though it might not be 100% full proof approach and there might be some pitfall that I am not aware of, but at least it can set a foundation to further analyze and explore alternative methods to move data from Cosmos DB to Fabric Lake houses.

Let me know your thoughts.. Thanks for reading !!!

0
Subscribe to my newsletter

Read articles from Sachin Nandanwar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Sachin Nandanwar
Sachin Nandanwar