Data Operations In Cosmos DB - Part 2

In a previous write-up, I demonstrated various ways to perform CRUD operations in Cosmos DB including examples of iterating/inserting/updating/deleting data across the JSON nodes within a document. The data creation method and the operations on them was somewhat academic as the primary focus of the article was to introduce the fundamental techniques for performing CRUD operations across the JSON nodes.
It's possible that these methods may not be particularly useful for practical applications.

In part1 we created couple of sample records and inserted them to the cosmos db container. In practical scenarios you might need to bulk upload a collection of JSON documents.

Also take an example of a document update. In an updated document you wouldn't immediately know exactly in the document as to where the changes have been made.

For instance, take this method used in part1 of the article

 public static async Task<string> UpdateSuppliertDetails()
 {
     string partitionvalue = "region1";

     PartitionKey partitionKey = new(partitionvalue);
     ItemResponse<ProductRecord> response = await container.PatchItemAsync<ProductRecord>(
      id: id,
      partitionKey: new PartitionKey(partitionvalue),
      patchOperations: new[] {
         PatchOperation.Replace($"/Suppliers/0/SupplierName", "SupplierForProductOne")
      });

     return response.StatusCode.ToString();
 }

we knew from the outset that the 0th value of the suppliers array needed to be updated. However in most practical scenarios such requirements are rare as the updates would be very ad hoc across a set of documents. The update process should identify such updates and make the updates in the existing documents in the container accordingly.

The Setup

The sample JSON files used in this article are available here .These files have data for recipes that includes recipe ingredients, preparation methods and cooking instructions in JSON format. Here’s a snapshot of the contents from one of the JSON files:

We have 13 such files. Newly added documents are placed in the New folder.

and the documents that need updating are placed in the Update folder.

Let's first see how to bulk upload all the files to a container.

Project Dependencies

using KellermanSoftware.CompareNetObjects;
using Microsoft.Azure.Cosmos;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using PartitionKey = Microsoft.Azure.Cosmos.PartitionKey;

The KellermanSoftware.CompareNetObjects package is a great and very useful NuGet package for comparing two objects. It simplifies the comparison process compared to first serializing your objects into a JSON string and then using JObject for comparison. This library is particularly handy when we need to identify the changes across two objects.

Additional Steps

The following code for creating the database and container and setting up the environment variables for COSMOS_ENDPOINT and COSMOS_KEY for CosmosClient is derived from part 1.

 static CosmosClient cosmosclient;
 static Container container;
 public static async Task Main(string[] args)
 {
   Set_Env_Variables("COSMOS_ENDPOINT", "Cosmos URI");
   Set_Env_Variables("COSMOS_KEY", "Cosmos Keys");

   cosmosclient = new(
   accountEndpoint: Environment.GetEnvironmentVariable("COSMOS_ENDPOINT"),
   authKeyOrResourceToken: Environment.GetEnvironmentVariable("COSMOS_KEY")
                    );
 Database database = await cosmosclient.CreateDatabaseIfNotExistsAsync(id: "Recipe");
 container = await database.CreateContainerIfNotExistsAsync(id: "RecipeContainer", partitionKeyPath: "/id");
}

public static void Set_Env_Variables(string name, string value)
 {
     Environment.SetEnvironmentVariable(name, value, EnvironmentVariableTarget.Process);
 }

The database name is set to Recipe and container name to RecipeContainer.

Create a class named Recipe to map the properties of the source Json document

  public class Recipe
  {

      public string id { get; set; }
      public string name { get; set; }
      public string description { get; set; }
      public string cuisine { get; set; }
      public string difficulty { get; set; }
      public string prepTime { get; set; }
      public string cookTime { get; set; }
      public string totalTime { get; set; }
      public int servings { get; set; }
      public List<string> ingredients { get; set; }
      public List<string> instructions { get; set; }
  }

The required partition key is based on the recipe id which is derived from the recipe name in a method named ParseDocuments. This method, reads the contents of the Json documents from the source folder,Deserialize them and return them as a list object.

public static List<Recipe> ParseDocuments(string Folderpath)
{
    List<Recipe> ret = new List<Recipe>();

    Directory.GetFiles(Folderpath).ToList().ForEach(f =>
            {
                var jsonString = System.IO.File.ReadAllText(f);
                Recipe recipe = JsonConvert.DeserializeObject<Recipe>(jsonString);
                recipe.id = recipe.name.ToLower().Replace(" ", "");
                ret.Add(recipe);
            }
    );

    return ret;

}

Insert New Documents

Add a method AddDocuments() to upload files to the container

 public static async Task AddDocuments()
 {
     List<Recipe> recipes = new();
     recipes = ParseDocuments("..\\..\\..\\DataSet\\New");
     TransactionalBatch batch = null;
     int cnt = 0;
     foreach (Recipe recipe in recipes)
     {
         string partitionvalue = recipe.id;
         Microsoft.Azure.Cosmos.PartitionKey partitionKeys = new(partitionvalue);
         batch = container.CreateTransactionalBatch(partitionKeys)
                               .CreateItem<Recipe>(recipe);
         using TransactionalBatchResponse response = await batch.ExecuteAsync();
         cnt = cnt + response.Count;

     }
     Console.WriteLine("No of documents uploaded : " + cnt);

 }

You could place the AddDocuments() method in the Main method of Program.cs

AddDocuments().GetAwaiter().GetResult();

Upload Updated Documents

Next we will make a few modifications to a couple of JSON files in order to update them in the container.

Add a method UpdateDocuments() to update the modified files to the container

 public static async Task UpdateDocuments()
 {

     int i = 0;
     Recipe original_recipes = new();
     List<Recipe> recipes = new();
     recipes = ParseDocuments("..\\..\\..\\DataSet\\Update");
     foreach (Recipe recipe in recipes)
     {
         original_recipes = await GetDocumentByIdAsync(recipe.id, recipe.id);
         CompareLogic compareLogic = new();
         compareLogic.Config.MaxDifferences = 200000;
         ComparisonResult rslt = compareLogic.Compare(expectedObject: original_recipes, actualObject: recipes[i]);
         var patchOperations = new List<PatchOperation>();
         if (!rslt.AreEqual)
         {
             foreach (var diff in rslt.Differences)
             {
                 if (diff.PropertyName.Contains("["))
                 {

                     string[] parts = diff.PropertyName.Split(new[] { '[', ']' }, StringSplitOptions.RemoveEmptyEntries);
                     patchOperations.Add(PatchOperation.Replace($"/" + parts[0].Trim() + "/" + parts[1].Trim(), diff.Object2Value));
                 }
                 else { patchOperations.Add(PatchOperation.Replace($"/" + diff.PropertyName, diff.Object2Value)); }

                 await container.PatchItemAsync<JObject>(
                 id: recipe.id,
                 partitionKey: new PartitionKey(recipe.id),
                 patchOperations: patchOperations
             );

             }

         }
         i++;
     }
     Console.WriteLine("No of documents updated : " + i);
 }

Basically, the above code iterates over the Update folder and compares the documents with the existing data in the container.

The recipes object is a list that stores the documents retrieved by the ParseDocuments method.

recipes = ParseDocuments("..\\..\\..\\DataSet\\Update");

The following line of code utilizes the KellermanSoftware.CompareNetObjects package to return the differences between two objects. In this case, the comparison is between a POCO original_recipes and recipes[i]where recipes[i] is a list object of type Recipe that iterates within a collection.

ComparisonResult rslt = compareLogic.Compare(expectedObject: original_recipes, actualObject: recipes[i]);

The GetDocumentByIdAsync method retrieves the collection of documents based on the recipe.id that is passed as parameters to the method.

 original_recipes = await GetDocumentByIdAsync(recipe.id, recipe.id);
   public static async Task<Recipe> GetDocumentByIdAsync(string id, string partitionKey)
   {
       try
       {
           ItemResponse<Recipe> response = await container.ReadItemAsync<Recipe>(id, new PartitionKey(partitionKey));
           return response.Resource;
       }
       catch (Exception ex)
       {
           Console.WriteLine($"Error retrieving document: {ex.Message}");
           return null;
       }
   }

Executing the UpdateDocuments method initiates the process of updating the documents in the collection.

UpdateDocuments().GetAwaiter().GetResult();

Conclusion

In conclusion, through this article I tried to demonstrate how to efficiently bulk insert and update documents in Cosmos DB that can be used to streamline the process of managing large volumes of documents in Cosmos DB effectively.

0
Subscribe to my newsletter

Read articles from Sachin Nandanwar directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Sachin Nandanwar
Sachin Nandanwar