Data Operations In Cosmos DB - Part 2
In a previous write-up, I demonstrated various ways to perform CRUD operations in Cosmos DB including examples of iterating/inserting/updating/deleting
data across the JSON nodes within a document. The data creation method and the operations on them was somewhat academic as the primary focus of the article was to introduce the fundamental techniques for performing CRUD operations across the JSON nodes.
It's possible that these methods may not be particularly useful for practical applications.
In part1 we created couple of sample records and inserted them to the cosmos db container. In practical scenarios you might need to bulk upload a collection of JSON documents.
Also take an example of a document update. In an updated document you wouldn't immediately know exactly in the document as to where the changes have been made.
For instance, take this method used in part1 of the article
public static async Task<string> UpdateSuppliertDetails()
{
string partitionvalue = "region1";
PartitionKey partitionKey = new(partitionvalue);
ItemResponse<ProductRecord> response = await container.PatchItemAsync<ProductRecord>(
id: id,
partitionKey: new PartitionKey(partitionvalue),
patchOperations: new[] {
PatchOperation.Replace($"/Suppliers/0/SupplierName", "SupplierForProductOne")
});
return response.StatusCode.ToString();
}
we knew from the outset that the 0th
value of the suppliers array needed to be updated. However in most practical scenarios such requirements are rare as the updates would be very ad hoc across a set of documents. The update process should identify such updates and make the updates in the existing documents in the container accordingly.
The Setup
The sample JSON files used in this article are available here .These files have data for recipes that includes recipe ingredients, preparation methods and cooking instructions in JSON format. Here’s a snapshot of the contents from one of the JSON files:
We have 13 such files. Newly added documents are placed in the New
folder.
and the documents that need updating are placed in the Update
folder.
Let's first see how to bulk upload all the files to a container.
Project Dependencies
using KellermanSoftware.CompareNetObjects;
using Microsoft.Azure.Cosmos;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using PartitionKey = Microsoft.Azure.Cosmos.PartitionKey;
The KellermanSoftware.CompareNetObjects
package is a great and very useful NuGet
package for comparing two objects. It simplifies the comparison process compared to first serializing your objects into a JSON
string and then using JObject
for comparison. This library is particularly handy when we need to identify the changes across two objects.
Additional Steps
The following code for creating the database and container and setting up the environment variables for COSMOS_ENDPOINT
and COSMOS_KEY
for CosmosClient
is derived from part 1.
static CosmosClient cosmosclient;
static Container container;
public static async Task Main(string[] args)
{
Set_Env_Variables("COSMOS_ENDPOINT", "Cosmos URI");
Set_Env_Variables("COSMOS_KEY", "Cosmos Keys");
cosmosclient = new(
accountEndpoint: Environment.GetEnvironmentVariable("COSMOS_ENDPOINT"),
authKeyOrResourceToken: Environment.GetEnvironmentVariable("COSMOS_KEY")
);
Database database = await cosmosclient.CreateDatabaseIfNotExistsAsync(id: "Recipe");
container = await database.CreateContainerIfNotExistsAsync(id: "RecipeContainer", partitionKeyPath: "/id");
}
public static void Set_Env_Variables(string name, string value)
{
Environment.SetEnvironmentVariable(name, value, EnvironmentVariableTarget.Process);
}
The database name is set to Recipe
and container name to RecipeContainer
.
Create a class named Recipe
to map the properties of the source Json
document
public class Recipe
{
public string id { get; set; }
public string name { get; set; }
public string description { get; set; }
public string cuisine { get; set; }
public string difficulty { get; set; }
public string prepTime { get; set; }
public string cookTime { get; set; }
public string totalTime { get; set; }
public int servings { get; set; }
public List<string> ingredients { get; set; }
public List<string> instructions { get; set; }
}
The required partition key is based on the recipe id
which is derived from the recipe name
in a method named ParseDocuments
. This method, reads the contents of the Json
documents from the source folder,Deserialize
them and return them as a list
object.
public static List<Recipe> ParseDocuments(string Folderpath)
{
List<Recipe> ret = new List<Recipe>();
Directory.GetFiles(Folderpath).ToList().ForEach(f =>
{
var jsonString = System.IO.File.ReadAllText(f);
Recipe recipe = JsonConvert.DeserializeObject<Recipe>(jsonString);
recipe.id = recipe.name.ToLower().Replace(" ", "");
ret.Add(recipe);
}
);
return ret;
}
Insert New Documents
Add a method AddDocuments()
to upload files to the container
public static async Task AddDocuments()
{
List<Recipe> recipes = new();
recipes = ParseDocuments("..\\..\\..\\DataSet\\New");
TransactionalBatch batch = null;
int cnt = 0;
foreach (Recipe recipe in recipes)
{
string partitionvalue = recipe.id;
Microsoft.Azure.Cosmos.PartitionKey partitionKeys = new(partitionvalue);
batch = container.CreateTransactionalBatch(partitionKeys)
.CreateItem<Recipe>(recipe);
using TransactionalBatchResponse response = await batch.ExecuteAsync();
cnt = cnt + response.Count;
}
Console.WriteLine("No of documents uploaded : " + cnt);
}
You could place the AddDocuments()
method in the Main
method of Program.cs
AddDocuments().GetAwaiter().GetResult();
Upload Updated Documents
Next we will make a few modifications to a couple of JSON files in order to update them in the container.
Add a method UpdateDocuments()
to update the modified files to the container
public static async Task UpdateDocuments()
{
int i = 0;
Recipe original_recipes = new();
List<Recipe> recipes = new();
recipes = ParseDocuments("..\\..\\..\\DataSet\\Update");
foreach (Recipe recipe in recipes)
{
original_recipes = await GetDocumentByIdAsync(recipe.id, recipe.id);
CompareLogic compareLogic = new();
compareLogic.Config.MaxDifferences = 200000;
ComparisonResult rslt = compareLogic.Compare(expectedObject: original_recipes, actualObject: recipes[i]);
var patchOperations = new List<PatchOperation>();
if (!rslt.AreEqual)
{
foreach (var diff in rslt.Differences)
{
if (diff.PropertyName.Contains("["))
{
string[] parts = diff.PropertyName.Split(new[] { '[', ']' }, StringSplitOptions.RemoveEmptyEntries);
patchOperations.Add(PatchOperation.Replace($"/" + parts[0].Trim() + "/" + parts[1].Trim(), diff.Object2Value));
}
else { patchOperations.Add(PatchOperation.Replace($"/" + diff.PropertyName, diff.Object2Value)); }
await container.PatchItemAsync<JObject>(
id: recipe.id,
partitionKey: new PartitionKey(recipe.id),
patchOperations: patchOperations
);
}
}
i++;
}
Console.WriteLine("No of documents updated : " + i);
}
Basically, the above code iterates over the Update folder and compares the documents with the existing data in the container.
The recipes
object is a list that stores the documents retrieved by the ParseDocuments
method.
recipes = ParseDocuments("..\\..\\..\\DataSet\\Update");
The following line of code utilizes the KellermanSoftware.CompareNetObjects
package to return the differences between two objects. In this case, the comparison is between a POCO original_recipes
and recipes[i]
where recipes[i]
is a list object of type Recipe
that iterates within a collection.
ComparisonResult rslt = compareLogic.Compare(expectedObject: original_recipes, actualObject: recipes[i]);
The GetDocumentByIdAsync
method retrieves the collection of documents based on the recipe.id
that is passed as parameters to the method.
original_recipes = await GetDocumentByIdAsync(recipe.id, recipe.id);
public static async Task<Recipe> GetDocumentByIdAsync(string id, string partitionKey)
{
try
{
ItemResponse<Recipe> response = await container.ReadItemAsync<Recipe>(id, new PartitionKey(partitionKey));
return response.Resource;
}
catch (Exception ex)
{
Console.WriteLine($"Error retrieving document: {ex.Message}");
return null;
}
}
Executing the UpdateDocuments
method initiates the process of updating the documents in the collection.
UpdateDocuments().GetAwaiter().GetResult();
Conclusion
In conclusion, through this article I tried to demonstrate how to efficiently bulk insert and update documents in Cosmos DB that can be used to streamline the process of managing large volumes of documents in Cosmos DB effectively.
Subscribe to my newsletter
Read articles from Sachin Nandanwar directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by