PYTHON & NoSQL ft. Atlas MongoDB

AnkurAnkur
13 min read

The last article discussed Relational Databases, which can be displayed as tables with defined columns. This article covers another kind of database, Non-relational & Document Oriented, also called No SQL (Not Only SQL) Database. A famous example of such a database is MongoDB. Due to no restrictions on how tables are made or data is arranged, MongoDB has grown into a wider data platform offering more scalability, high performance, reliability & flexibility.

What is MongoDB

MongoDB is a cross-platform, document-oriented NoSQL database that is used to store and manage large amounts of unstructured data. Unlike traditional SQL databases, MongoDB does not rely on tables and rows to organize data. Instead, it uses a document model, where data is stored in JSON-like documents that can have varying structures and fields. This allows for greater flexibility in how data is organized and stored, making it easier to handle large and complex datasets.

Learn more about MongoDB here

What is PyMongo

PyMongo is a Python library that provides a client interface for MongoDB. It is used to interact with MongoDB databases using Python code. PyMongo supports all MongoDB features and operations, including CRUD operations (create, read, update, and delete), aggregation pipelines, indexes, and more.

PyMongo allows you to connect to a MongoDB instance, query and manipulate data, and execute administrative operations such as creating and dropping databases and collections. The library provides a rich API for interacting with MongoDB, including helper functions for common operations and utilities for working with BSON (Binary JSON) data.

Learn more about PyMongo here

Install PyMongo & BSON

pip install pymongo

Note - Use only this command to install both PyMongo & BSON, individually installing them can cause issues with packages.

What is Motor

Motor is a Python asynchronous driver for MongoDB that is built on top of the PyMongo library. It allows you to write non-blocking and high-performance applications using MongoDB by providing an asynchronous API that is compatible with the asyncio library in Python.

Motor works by using Python coroutines and the asyncio event loop to provide asynchronous access to MongoDB operations. It supports all the features and operations of PyMongo, including CRUD operations, aggregation pipelines, indexes, and more. The motor also provides a rich API for working with BSON data, including utilities for encoding and decoding BSON data to Python objects.

Learn more about Motor here

Install Motor

pip install motor

Let's Begin

This article is not a NoSQL Tutorial (here is one if you want a revision), these topics and basics of non-relational and document-based db is a pre-requisite. This article focuses on deploying a remote MongoDB Instance using Atlas over Azure and working on it using Python.

Deployment Diagram

An Instance of MongoDB will be deployed on the cloud, which can be accessed via a public endpoint by the python code.

Step 1 Deployment

  1. Creating a Database

    Visit Atlas Page, sign in and Build a Database

  1. Deploy the Database

    Choose from the config options (This article uses M0 Free), and choose from the provider (This article uses Azure), choose Region, select a Cluster Name and click Create.

  1. Security & Auth

    Choose Username & Password, and add a username and Password. Choose your preferred connection type (this article uses local)

  1. Networking

    Add Current IP, and other needed IP & click finish & close, you can add IP later in the Network section.

  1. Dashboard

    The dashboard features buttons to Monitor the Database, Explore collections, Check Db and Network access etc.

  1. Connect

    Click on Connect to reveal Connect Tab. Choose Drivers.

  1. Connecting...

    Choose Python & Copy the URI. We would be needing that at a later stage. Also notice, that in URI username & password needs to be added.

Step 2 Connecting to DB & Setting up _id factory

  1. Install the necessary libraries

    pip install pymongo motor json certifi

  2. Import Necessary Packages

     import certifi
     import motor.motor_asyncio
     import time
     import pymongo
     import logging
     import asyncio
     from bson import ObjectId #install bson with pymongo - pip install pymongo
     #LOGGING
     logging.basicConfig(level=logging.INFO)
     logger = logging.getLogger(__name__)
    
  3. Connect to MongoDB

     uri = "mongodb+srv://admin:admin@cluster0.ihzdk52.mongodb.net/?retryWrites=true&w=majority"
     try:
         mongo_client = motor.motor_asyncio.AsyncIOMotorClient(uri,tlsCAFile=certifi.where())
         print("Connected successfully!!!")
     except Exception as e:
         print(e)
    
  4. Setup ObjectId factory

    In MongoDB, ObjectId is a unique identifier that is automatically generated for each document in a collection. It is a 12-byte hexadecimal string that consists of:

    • A 4-byte timestamp value, representing the time of creation of the ObjectId.

    • A 5-byte random value, to avoid collisions between ObjectIds created at the same time.

    • A 3-byte counter, starting with a random value, to avoid collisions between ObjectIds created with the same timestamp and random value.

ObjectIds are unique within a collection and are generated by the MongoDB server when a new document is inserted. They are used as the default primary key for MongoDB documents and are indexed by default. ObjectId values are guaranteed to be unique across multiple MongoDB instances, making them suitable for distributed systems. Thus, we need an ObjectId factory that produces objected

    #Object ID Factory
    class PyObjectId(ObjectId):
        @classmethod
        def __get_validators__(cls):
            yield cls.validate

        @classmethod
        def validate(cls, v):
            if not ObjectId.is_valid(v):
                raise ValueError("Invalid objectid")
            return ObjectId(v)

        @classmethod
        def __modify_schema__(cls, field_schema):
            field_schema.update(type="string")
  1. Create a Database & a Collection

     #Creating a database
     db = mongo_client["test"]
    
     #Creating a collection
     collection = db["test_collection"]
    

Step 3 Getting started with MongoDB

  1. Create Functions for different Operations

    This module covers making functions, for different operations, the usage of functions will be discussed later.

    Learn more about operations here

     #inserting a document
     async def insert_document(document:dict):
         try:
             result = await collection.insert_one(document)
             return result
         except Exception as e:
             logger.error(e)
             return None
    
     #updating a document
     async def update_document(document:dict):
         try:
             result = await collection.update_one({"_id":document["_id"]},{"$set":document},upsert=True)
             return result
         except Exception as e:
             logger.error(e)
             return None
    
     # Get a document / Find one document 
     async def get_document(document_id:str):
         try:
             result = await collection.find_one({"_id":document_id})
             return result
         except Exception as e:
             logger.error(e)
             return None
    
     # Insert many document
     async def insert_many_documents(documents:list):
         try:
             result = await collection.insert_many(documents)
             return result
         except Exception as e:
             logger.error(e)
             return None
    
     #Create Index for text
     async def create_index(collection):
         try:
             result = await collection.create_index([('text', pymongo.TEXT)])
             return result
         except Exception as e:
             logger.error(e)
             return None
    
     # Find many with Query
     async def find_with_query(collection,query):
         try:
             cursor = collection.find(query)
             docs=[]
             async for document in cursor:
                 docs.append(document)
             return docs
         except Exception as e:
             logger.error(e)
             return None
    
     #Delete many
     async def delete_many_documents(collection):
         try:
             result = await collection.delete_many({})
             return result
         except Exception as e:
             logger.error(e)
             return None
    
  2. Create Async Main function that uses all functions

    Async main function has code components that call all defined functions in a coroutine. Next section explains each component

    ```python async def main():

    #create a document document = {"_id":str(ObjectId()), "name":"test", "email":"tester@gmail.com", "password":"test123", "phone":"1234567890", "address":"test address", "city":"test city", "state":"test state", "country":"test country", "pincode":"123456", "created_at":time.time(), }

    Working with a Single Document

    print("Inserting a document") #running with asyncio result = await insert_document(document)

    Print the ID of the inserted document

    print(f'Insertion Done with ID: {result.inserted_id}') idx = result.inserted_id print("Finding a document") result = await get_document(idx) print(result)

    print("Updating a document") document["name"] = "test1" result = await update_document(document)

    print("Finding a document") result = await get_document(idx) print(result)

    print("Deleting a document") result = await collection.delete_one({"_id":idx}) print(result)

    Inserting multiple documents with $Text

    print("Inserting multiple documents")

    Insert documents with the $text field

    doc1 = {'_id': str(ObjectId()), 'text': 'This is the first document.', 'age': 25} doc2 = {'_id': str(ObjectId()), 'text': 'This is the second document.', 'age': 30} doc3 = {'_id': str(ObjectId()), 'text': 'This is the third document.', 'age': 60}

    Insert the documents

    result = await insert_many_documents([doc1, doc2, doc3]) print(f'Insertion Done with ID: {result.inserted_ids}')

Create the text index

result = await create_index(collection) print(f'Index created: {result}')

Search for a document using text

query = {'$text': {'$search': 'second'}} result = await find_with_query(collection,query) for doc in result: print(doc)

Filter by age

query1 = {'age': {'$gt': 25}} #greater than 25 query2 = {'age': {'$lt': 60}} #less than 60 query3 = {'age': {'$gte': 25}} #greater than or equal to 25 query4 = {'age': {'$lte': 60}} #less than or equal to 60 query5 = {'age': {'$ne': 30}} #not equal to 30 result = await find_with_query(collection,query1) for doc in result: print(doc) result = await find_with_query(collection,query2) for doc in result: print(doc) result = await find_with_query(collection,query3) for doc in result: print(doc) result = await find_with_query(collection,query4) for doc in result: print(doc) result = await find_with_query(collection,query5) for doc in result: print(doc)

#Combining 2 queries query = {'$and': [{'age': {'$gt': 25}}, {'age': {'$lt': 60}}]} result = await find_with_query(collection,query) for doc in result: print(doc)

Delete the documents

result = await delete_many_documents(collection)


3. Driver Function

    A single line asyncio run command to run main function

    ```python

    if __name__ == "__main__":
        asyncio.run(main())
  1. The Outputs

    This section includes code snippets from main() and respective outputs

    1. Insert One

       #inserting a document
       async def insert_document(document:dict):
           try:
               result = await collection.insert_one(document)
               return result
           except Exception as e:
               logger.error(e)
               return None
      
       result = await insert_document(document)
           # Print the ID of the inserted document
       print(f'Insertion Done with ID: {result.inserted_id}')
      

      The function inserts one dict or json type document into the collection.

      Output - ObjectId is returned

       Insertion Done with ID: 644c36fcaa0cefd233a94240
      
    2. Find One

       # Get a document / Find one document 
       async def get_document(document_id:str):
           try:
               result = await collection.find_one({"_id":document_id})
               return result
           except Exception as e:
               logger.error(e)
               return None
      
       print("Finding a document")
       result = await get_document(idx)
      

      The function finds one document based on ObjectId _id. Since the ObjectId is unique to all Documents, it is generally used in find one

      Output - Document is returned

       Finding a document
       {'_id': '644c36fcaa0cefd233a94240', 'name': 'test', 'email': 'tester@gmail.com', 'password': 'test123', 'phone': '1234567890', 'address': 'test address', 'city': 'test city', 'state': 'test state', 'country': 'test country', 'pincode': '123456', 'created_at': 1682716412.1300101}
      
    3. Update One

       #updating a document
       async def update_document(document:dict):
           try:
               result = await collection.update_one({"_id":document["_id"]},{"$set":document},upsert=True)
               return result
           except Exception as e:
               logger.error(e)
               return None
      
       print("Updating a document")
       document["name"] = "test1"
       result = await update_document(document)
      

      This function updates one document based on _id . upsert = True ensures that only the component passed gets updated rather than replacing the entire document. In this case only key "name" gets updated and it doesn't replace the entire document i.e. rest key values are intact.

      Output - Updated Document is returned

       {'_id': '644c36fcaa0cefd233a94240', 'name': 'test1', 'email': 'tester@gmail.com', 'password': 'test123', 'phone': '1234567890', 'address': 'test address', 'city': 'test city', 'state': 'test state', 'country': 'test country', 'pincode': '123456', 'created_at': 1682716412.1300101}
      
    4. Insert many

       # Insert many document
       async def insert_many_documents(documents:list):
           try:
               result = await collection.insert_many(documents)
               return result
           except Exception as e:
               logger.error(e)
               return None
      
       # Inserting multiple documents with $Text
       print("Inserting multiple documents")
           # Insert documents with the `$text` field
       doc1 = {'_id': str(ObjectId()), 'text': 'This is the first document.', 'age': 25}
       doc2 = {'_id': str(ObjectId()), 'text': 'This is the second document.', 'age': 30}
       doc3 = {'_id': str(ObjectId()), 'text': 'This is the third document.', 'age': 60}
      
           # Insert the documents
       result = await insert_many_documents([doc1, doc2, doc3])
       print(f'Insertion Done with ID: {result.inserted_ids}')
      

      This function inserts multiple docs into the collection and returns ObjectId

      Output -

       Inserting multiple documents
       Insertion Done with ID: ['644c36fdaa0cefd233a94241', '644c36fdaa0cefd233a94242', '644c36fdaa0cefd233a94243']
      
    5. $Text Index creation for searching

      $text performs a text search on the content of the fields indexed with a text index. Learn more about Evaluation Query parameters like $regex , $mod etc. here

       #Create Index for text
       async def create_index(collection):
           try:
               result = await collection.create_index([('text', pymongo.TEXT)])
               return result
           except Exception as e:
               logger.error(e)
               return None
      
       # Create the text index
       result = await create_index(collection)
       print(f'Index created: {result}')
      
    6. $Search with $Text

       # Find many with Query
       async def find_with_query(collection,query):
           try:
               cursor = collection.find(query)
               docs=[]
               async for document in cursor:
                   docs.append(document)
               return docs
           except Exception as e:
               logger.error(e)
               return None
       # Search for a document using text
       query = {'$text': {'$search': 'second'}}
       #searching the string "second" in all documents (find many)
       result = await find_with_query(collection,query)
       for doc in result:
           print(doc)
      

      This function searches all fields with $text index = "second" created in last step.

      Output -

       {'_id': '644c36fdaa0cefd233a94242', 'text': 'This is the second document.', 'age': 30}
      
    7. Filter for Numerical Data

       # Find many with Query
       async def find_with_query(collection,query):
           try:
               cursor = collection.find(query)
               docs=[]
               async for document in cursor:
                   docs.append(document)
               return docs
           except Exception as e:
               logger.error(e)
               return None
       # filter by age
           query1 = {'age': {'$gt': 25}} #greater than 25
           query2 = {'age': {'$lt': 60}} #less than 60
           query3 = {'age': {'$gte': 25}} #greater than or equal to 25
           query4 = {'age': {'$lte': 60}} #less than or equal to 60
           query5 = {'age': {'$ne': 30}} #not equal to 30
           result = await find_with_query(collection,query1)
           for doc in result:
               print(doc)
           result = await find_with_query(collection,query2)
           for doc in result:
               print(doc)
           result = await find_with_query(collection,query3)
           for doc in result:
               print(doc)
           result = await find_with_query(collection,query4)
           for doc in result:
               print(doc)
           result = await find_with_query(collection,query5)
           for doc in result:
               print(doc)
      

      Filter based on values for keys having numerical values.

      Output - Observe the output

       Filtering
       {'_id': '644c81ebcbb5b7e05e26aa96', 'text': 'This is the second document.', 'age': 30}
       {'_id': '644c81ebcbb5b7e05e26aa97', 'text': 'This is the third document.', 'age': 60}
       {'_id': '644c81ebcbb5b7e05e26aa95', 'text': 'This is the first document.', 'age': 25}
       {'_id': '644c81ebcbb5b7e05e26aa96', 'text': 'This is the second document.', 'age': 30}
       {'_id': '644c81ebcbb5b7e05e26aa95', 'text': 'This is the first document.', 'age': 25}
       {'_id': '644c81ebcbb5b7e05e26aa96', 'text': 'This is the second document.', 'age': 30}
       {'_id': '644c81ebcbb5b7e05e26aa97', 'text': 'This is the third document.', 'age': 60}
       {'_id': '644c81ebcbb5b7e05e26aa95', 'text': 'This is the first document.', 'age': 25}
       {'_id': '644c81ebcbb5b7e05e26aa96', 'text': 'This is the second document.', 'age': 30}
       {'_id': '644c81ebcbb5b7e05e26aa97', 'text': 'This is the third document.', 'age': 60}
       {'_id': '644c81ebcbb5b7e05e26aa95', 'text': 'This is the first document.', 'age': 25}
       {'_id': '644c81ebcbb5b7e05e26aa97', 'text': 'This is the third document.', 'age': 60}
      
    8. Combining Queries

       # Find many with Query
       async def find_with_query(collection,query):
           try:
               cursor = collection.find(query)
               docs=[]
               async for document in cursor:
                   docs.append(document)
               return docs
           except Exception as e:
               logger.error(e)
               return None
      
           #Combining 2 queries
       query = {'$and': [{'age': {'$gt': 25}}, {'age': {'$lt': 60}}]}
       result = await find_with_query(collection,query)
       for doc in result:
           print(doc)
      

      use $and , $not , $nor & $or to combine different queries. learn more here

      output -

       Combining 2 queries
       {'_id': '644c81ebcbb5b7e05e26aa96', 'text': 'This is the second document.', 'age': 30}
      
      1. Delete one
        print("Deleting a document")
        result = await collection.delete_one({"_id":idx})
        print(result)

Delete one document based on ObjectId

  1. Delete All

     #Delete many
     async def delete_many_documents(collection):
         try:
             result = await collection.delete_many({})
             return result
         except Exception as e:
             logger.error(e)
             return None
    
         # Delete the documents
     result = await delete_many_documents(collection)
    

    The parameter {} passed in collection.delete_many can be replaced by a Query (Similar to what we discussed above), to delete multiple documents. If left blank, it deletes entire collection.

Step 4 Browse Collection

In the browse Section of Atlas MongoDb, we can check all collections. Alternatively, we can use Datagrip or TablePlus to check the data.

You can also write filter Queries here to filter out documents.

Indexes sub-tab, gives information on various indexes like $text . You can also create indexes from here.

Search tab provides platform to make up search Indexes.

Step 5 Metrics & Health Monitoring

The Metrics Tab provide many important metrics like opcounter, connections & networks.

Select Zoom level & Members (Nodes of Database)

Step 6 API & MongoDB

We have discussed FastAPI, and the functions defined can be called by a fastAPI by passing ObjectId & db information. The document can be a parameter to the API and then operations can be performed. API can return json of the document got from db. Thus Integrating MongoDB with REST API is comparatively easier than SQL.

Conclusion

In conclusion, using Python with MongoDB provides a powerful platform for building modern web applications that can handle large amounts of data and scale to meet the demands of modern business. The combination of Python's rich ecosystem of libraries and tools with MongoDB's flexibility and scalability allows developers to create innovative and compelling applications that meet the needs of today's businesses and organizations. The use of PyMongo and Motor drivers for MongoDB provides a comprehensive set of APIs and features for developing high-performance and scalable applications using Python, making Python and MongoDB a winning combination for building modern web applications.

0
Subscribe to my newsletter

Read articles from Ankur directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Ankur
Ankur

I am a Master's student at Concordia University, Montreal, pursuing a Master of Engineering in Electrical & Computer Engineering (Computer Systems & Signal Processing Major). Previously I worked with an International Consulting firm, Wipro as Senior Software Engineer, where I supported a Telecom giant by developing and maintaining Ab Initio Data pipelines for Customer Data Extraction and Transformation, for business intelligence purposes and Airflow DAG's for Bharti Airtel. I with my team have won over 7 international and national Hackathons as a result of our commitment, perseverance and technical skills. The Hackathons were related to Data Science, Software Engineering & Blockchain. Presently, I am working on projects based on BigData, Business Intelligence, and Data Analytics. My research interests in Artificial Intelligence, Computer Vision and Data Analytics and I have published 3 research papers in the domains of IoT, Software Engineering, Machine Learning and Image processing during my undergrad. I am skilled at Python programming, SQL, UNIX, Tableau, Machine Learning, Deep Learning, Azure, Git, BigData, Jenkins, Data Structures & Algorithms.