PYTHON & NoSQL ft. Atlas MongoDB
Table of contents
- What is MongoDB
- What is PyMongo
- What is Motor
- Let's Begin
- Working with a Single Document
- Print the ID of the inserted document
- Inserting multiple documents with $Text
- Insert documents with the $text field
- Insert the documents
- Create the text index
- Search for a document using text
- Filter by age
- Delete the documents
The last article discussed Relational Databases, which can be displayed as tables with defined columns. This article covers another kind of database, Non-relational & Document Oriented, also called No SQL (Not Only SQL) Database. A famous example of such a database is MongoDB. Due to no restrictions on how tables are made or data is arranged, MongoDB has grown into a wider data platform offering more scalability, high performance, reliability & flexibility.
What is MongoDB
MongoDB is a cross-platform, document-oriented NoSQL database that is used to store and manage large amounts of unstructured data. Unlike traditional SQL databases, MongoDB does not rely on tables and rows to organize data. Instead, it uses a document model, where data is stored in JSON-like documents that can have varying structures and fields. This allows for greater flexibility in how data is organized and stored, making it easier to handle large and complex datasets.
Learn more about MongoDB here
What is PyMongo
PyMongo is a Python library that provides a client interface for MongoDB. It is used to interact with MongoDB databases using Python code. PyMongo supports all MongoDB features and operations, including CRUD operations (create, read, update, and delete), aggregation pipelines, indexes, and more.
PyMongo allows you to connect to a MongoDB instance, query and manipulate data, and execute administrative operations such as creating and dropping databases and collections. The library provides a rich API for interacting with MongoDB, including helper functions for common operations and utilities for working with BSON (Binary JSON) data.
Learn more about PyMongo here
Install PyMongo & BSON
pip install pymongo
Note - Use only this command to install both PyMongo & BSON, individually installing them can cause issues with packages.
What is Motor
Motor is a Python asynchronous driver for MongoDB that is built on top of the PyMongo library. It allows you to write non-blocking and high-performance applications using MongoDB by providing an asynchronous API that is compatible with the asyncio library in Python.
Motor works by using Python coroutines and the asyncio event loop to provide asynchronous access to MongoDB operations. It supports all the features and operations of PyMongo, including CRUD operations, aggregation pipelines, indexes, and more. The motor also provides a rich API for working with BSON data, including utilities for encoding and decoding BSON data to Python objects.
Learn more about Motor here
Install Motor
pip install motor
Let's Begin
This article is not a NoSQL Tutorial (here is one if you want a revision), these topics and basics of non-relational and document-based db is a pre-requisite. This article focuses on deploying a remote MongoDB Instance using Atlas over Azure and working on it using Python.
Deployment Diagram
An Instance of MongoDB will be deployed on the cloud, which can be accessed via a public endpoint by the python code.
Step 1 Deployment
Creating a Database
Visit Atlas Page, sign in and Build a Database
Deploy the Database
Choose from the config options (This article uses M0 Free), and choose from the provider (This article uses Azure), choose Region, select a Cluster Name and click Create.
Security & Auth
Choose Username & Password, and add a username and Password. Choose your preferred connection type (this article uses local)
Networking
Add Current IP, and other needed IP & click finish & close, you can add IP later in the Network section.
Dashboard
The dashboard features buttons to Monitor the Database, Explore collections, Check Db and Network access etc.
Connect
Click on Connect to reveal Connect Tab. Choose Drivers.
Connecting...
Choose Python & Copy the URI. We would be needing that at a later stage. Also notice, that in URI username & password needs to be added.
Step 2 Connecting to DB & Setting up _id factory
Install the necessary libraries
pip install pymongo motor json certifi
Import Necessary Packages
import certifi import motor.motor_asyncio import time import pymongo import logging import asyncio from bson import ObjectId #install bson with pymongo - pip install pymongo #LOGGING logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__)
Connect to MongoDB
uri = "mongodb+srv://admin:admin@cluster0.ihzdk52.mongodb.net/?retryWrites=true&w=majority" try: mongo_client = motor.motor_asyncio.AsyncIOMotorClient(uri,tlsCAFile=certifi.where()) print("Connected successfully!!!") except Exception as e: print(e)
Setup ObjectId factory
In MongoDB, ObjectId is a unique identifier that is automatically generated for each document in a collection. It is a 12-byte hexadecimal string that consists of:
A 4-byte timestamp value, representing the time of creation of the ObjectId.
A 5-byte random value, to avoid collisions between ObjectIds created at the same time.
A 3-byte counter, starting with a random value, to avoid collisions between ObjectIds created with the same timestamp and random value.
ObjectIds are unique within a collection and are generated by the MongoDB server when a new document is inserted. They are used as the default primary key for MongoDB documents and are indexed by default. ObjectId values are guaranteed to be unique across multiple MongoDB instances, making them suitable for distributed systems. Thus, we need an ObjectId factory that produces objected
#Object ID Factory
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError("Invalid objectid")
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
Create a Database & a Collection
#Creating a database db = mongo_client["test"] #Creating a collection collection = db["test_collection"]
Step 3 Getting started with MongoDB
Create Functions for different Operations
This module covers making functions, for different operations, the usage of functions will be discussed later.
Learn more about operations here
#inserting a document async def insert_document(document:dict): try: result = await collection.insert_one(document) return result except Exception as e: logger.error(e) return None #updating a document async def update_document(document:dict): try: result = await collection.update_one({"_id":document["_id"]},{"$set":document},upsert=True) return result except Exception as e: logger.error(e) return None # Get a document / Find one document async def get_document(document_id:str): try: result = await collection.find_one({"_id":document_id}) return result except Exception as e: logger.error(e) return None # Insert many document async def insert_many_documents(documents:list): try: result = await collection.insert_many(documents) return result except Exception as e: logger.error(e) return None #Create Index for text async def create_index(collection): try: result = await collection.create_index([('text', pymongo.TEXT)]) return result except Exception as e: logger.error(e) return None # Find many with Query async def find_with_query(collection,query): try: cursor = collection.find(query) docs=[] async for document in cursor: docs.append(document) return docs except Exception as e: logger.error(e) return None #Delete many async def delete_many_documents(collection): try: result = await collection.delete_many({}) return result except Exception as e: logger.error(e) return None
Create Async Main function that uses all functions
Async main function has code components that call all defined functions in a coroutine. Next section explains each component
```python async def main():
#create a document document = {"_id":str(ObjectId()), "name":"test", "email":"tester@gmail.com", "password":"test123", "phone":"1234567890", "address":"test address", "city":"test city", "state":"test state", "country":"test country", "pincode":"123456", "created_at":time.time(), }
Working with a Single Document
print("Inserting a document") #running with asyncio result = await insert_document(document)
Print the ID of the inserted document
print(f'Insertion Done with ID: {result.inserted_id}') idx = result.inserted_id print("Finding a document") result = await get_document(idx) print(result)
print("Updating a document") document["name"] = "test1" result = await update_document(document)
print("Finding a document") result = await get_document(idx) print(result)
print("Deleting a document") result = await collection.delete_one({"_id":idx}) print(result)
Inserting multiple documents with $Text
print("Inserting multiple documents")
Insert documents with the
$text
fielddoc1 = {'_id': str(ObjectId()), 'text': 'This is the first document.', 'age': 25} doc2 = {'_id': str(ObjectId()), 'text': 'This is the second document.', 'age': 30} doc3 = {'_id': str(ObjectId()), 'text': 'This is the third document.', 'age': 60}
Insert the documents
result = await insert_many_documents([doc1, doc2, doc3]) print(f'Insertion Done with ID: {result.inserted_ids}')
Create the text index
result = await create_index(collection) print(f'Index created: {result}')
Search for a document using text
query = {'$text': {'$search': 'second'}} result = await find_with_query(collection,query) for doc in result: print(doc)
Filter by age
query1 = {'age': {'$gt': 25}} #greater than 25 query2 = {'age': {'$lt': 60}} #less than 60 query3 = {'age': {'$gte': 25}} #greater than or equal to 25 query4 = {'age': {'$lte': 60}} #less than or equal to 60 query5 = {'age': {'$ne': 30}} #not equal to 30 result = await find_with_query(collection,query1) for doc in result: print(doc) result = await find_with_query(collection,query2) for doc in result: print(doc) result = await find_with_query(collection,query3) for doc in result: print(doc) result = await find_with_query(collection,query4) for doc in result: print(doc) result = await find_with_query(collection,query5) for doc in result: print(doc)
#Combining 2 queries query = {'$and': [{'age': {'$gt': 25}}, {'age': {'$lt': 60}}]} result = await find_with_query(collection,query) for doc in result: print(doc)
Delete the documents
result = await delete_many_documents(collection)
3. Driver Function
A single line asyncio run command to run main function
```python
if __name__ == "__main__":
asyncio.run(main())
The Outputs
This section includes code snippets from
main()
and respective outputsInsert One
#inserting a document async def insert_document(document:dict): try: result = await collection.insert_one(document) return result except Exception as e: logger.error(e) return None result = await insert_document(document) # Print the ID of the inserted document print(f'Insertion Done with ID: {result.inserted_id}')
The function inserts one dict or json type document into the collection.
Output - ObjectId is returned
Insertion Done with ID: 644c36fcaa0cefd233a94240
Find One
# Get a document / Find one document async def get_document(document_id:str): try: result = await collection.find_one({"_id":document_id}) return result except Exception as e: logger.error(e) return None print("Finding a document") result = await get_document(idx)
The function finds one document based on ObjectId
_id
. Since the ObjectId is unique to all Documents, it is generally used in find oneOutput - Document is returned
Finding a document {'_id': '644c36fcaa0cefd233a94240', 'name': 'test', 'email': 'tester@gmail.com', 'password': 'test123', 'phone': '1234567890', 'address': 'test address', 'city': 'test city', 'state': 'test state', 'country': 'test country', 'pincode': '123456', 'created_at': 1682716412.1300101}
Update One
#updating a document async def update_document(document:dict): try: result = await collection.update_one({"_id":document["_id"]},{"$set":document},upsert=True) return result except Exception as e: logger.error(e) return None print("Updating a document") document["name"] = "test1" result = await update_document(document)
This function updates one document based on
_id
.upsert = True
ensures that only the component passed gets updated rather than replacing the entire document. In this case only key"name"
gets updated and it doesn't replace the entire document i.e. rest key values are intact.Output - Updated Document is returned
{'_id': '644c36fcaa0cefd233a94240', 'name': 'test1', 'email': 'tester@gmail.com', 'password': 'test123', 'phone': '1234567890', 'address': 'test address', 'city': 'test city', 'state': 'test state', 'country': 'test country', 'pincode': '123456', 'created_at': 1682716412.1300101}
Insert many
# Insert many document async def insert_many_documents(documents:list): try: result = await collection.insert_many(documents) return result except Exception as e: logger.error(e) return None # Inserting multiple documents with $Text print("Inserting multiple documents") # Insert documents with the `$text` field doc1 = {'_id': str(ObjectId()), 'text': 'This is the first document.', 'age': 25} doc2 = {'_id': str(ObjectId()), 'text': 'This is the second document.', 'age': 30} doc3 = {'_id': str(ObjectId()), 'text': 'This is the third document.', 'age': 60} # Insert the documents result = await insert_many_documents([doc1, doc2, doc3]) print(f'Insertion Done with ID: {result.inserted_ids}')
This function inserts multiple docs into the collection and returns ObjectId
Output -
Inserting multiple documents Insertion Done with ID: ['644c36fdaa0cefd233a94241', '644c36fdaa0cefd233a94242', '644c36fdaa0cefd233a94243']
$Text
Index creation for searching$text
performs a text search on the content of the fields indexed with a text index. Learn more about Evaluation Query parameters like$regex
,$mod
etc. here#Create Index for text async def create_index(collection): try: result = await collection.create_index([('text', pymongo.TEXT)]) return result except Exception as e: logger.error(e) return None # Create the text index result = await create_index(collection) print(f'Index created: {result}')
$Search
with$Text
# Find many with Query async def find_with_query(collection,query): try: cursor = collection.find(query) docs=[] async for document in cursor: docs.append(document) return docs except Exception as e: logger.error(e) return None # Search for a document using text query = {'$text': {'$search': 'second'}} #searching the string "second" in all documents (find many) result = await find_with_query(collection,query) for doc in result: print(doc)
This function searches all fields with $text index = "second" created in last step.
Output -
{'_id': '644c36fdaa0cefd233a94242', 'text': 'This is the second document.', 'age': 30}
Filter for Numerical Data
# Find many with Query async def find_with_query(collection,query): try: cursor = collection.find(query) docs=[] async for document in cursor: docs.append(document) return docs except Exception as e: logger.error(e) return None # filter by age query1 = {'age': {'$gt': 25}} #greater than 25 query2 = {'age': {'$lt': 60}} #less than 60 query3 = {'age': {'$gte': 25}} #greater than or equal to 25 query4 = {'age': {'$lte': 60}} #less than or equal to 60 query5 = {'age': {'$ne': 30}} #not equal to 30 result = await find_with_query(collection,query1) for doc in result: print(doc) result = await find_with_query(collection,query2) for doc in result: print(doc) result = await find_with_query(collection,query3) for doc in result: print(doc) result = await find_with_query(collection,query4) for doc in result: print(doc) result = await find_with_query(collection,query5) for doc in result: print(doc)
Filter based on values for keys having numerical values.
Output - Observe the output
Filtering {'_id': '644c81ebcbb5b7e05e26aa96', 'text': 'This is the second document.', 'age': 30} {'_id': '644c81ebcbb5b7e05e26aa97', 'text': 'This is the third document.', 'age': 60} {'_id': '644c81ebcbb5b7e05e26aa95', 'text': 'This is the first document.', 'age': 25} {'_id': '644c81ebcbb5b7e05e26aa96', 'text': 'This is the second document.', 'age': 30} {'_id': '644c81ebcbb5b7e05e26aa95', 'text': 'This is the first document.', 'age': 25} {'_id': '644c81ebcbb5b7e05e26aa96', 'text': 'This is the second document.', 'age': 30} {'_id': '644c81ebcbb5b7e05e26aa97', 'text': 'This is the third document.', 'age': 60} {'_id': '644c81ebcbb5b7e05e26aa95', 'text': 'This is the first document.', 'age': 25} {'_id': '644c81ebcbb5b7e05e26aa96', 'text': 'This is the second document.', 'age': 30} {'_id': '644c81ebcbb5b7e05e26aa97', 'text': 'This is the third document.', 'age': 60} {'_id': '644c81ebcbb5b7e05e26aa95', 'text': 'This is the first document.', 'age': 25} {'_id': '644c81ebcbb5b7e05e26aa97', 'text': 'This is the third document.', 'age': 60}
Combining Queries
# Find many with Query async def find_with_query(collection,query): try: cursor = collection.find(query) docs=[] async for document in cursor: docs.append(document) return docs except Exception as e: logger.error(e) return None #Combining 2 queries query = {'$and': [{'age': {'$gt': 25}}, {'age': {'$lt': 60}}]} result = await find_with_query(collection,query) for doc in result: print(doc)
use
$and
,$not
,$nor
&$or
to combine different queries. learn more hereoutput -
Combining 2 queries {'_id': '644c81ebcbb5b7e05e26aa96', 'text': 'This is the second document.', 'age': 30}
- Delete one
print("Deleting a document")
result = await collection.delete_one({"_id":idx})
print(result)
Delete one document based on ObjectId
Delete All
#Delete many async def delete_many_documents(collection): try: result = await collection.delete_many({}) return result except Exception as e: logger.error(e) return None # Delete the documents result = await delete_many_documents(collection)
The parameter {} passed in
collection.delete_many
can be replaced by a Query (Similar to what we discussed above), to delete multiple documents. If left blank, it deletes entire collection.
Step 4 Browse Collection
In the browse Section of Atlas MongoDb, we can check all collections. Alternatively, we can use Datagrip or TablePlus to check the data.
You can also write filter Queries here to filter out documents.
Indexes sub-tab, gives information on various indexes like $text
. You can also create indexes from here.
Search tab provides platform to make up search Indexes.
Step 5 Metrics & Health Monitoring
The Metrics Tab provide many important metrics like opcounter, connections & networks.
Select Zoom level & Members (Nodes of Database)
Step 6 API & MongoDB
We have discussed FastAPI, and the functions defined can be called by a fastAPI by passing ObjectId & db information. The document can be a parameter to the API and then operations can be performed. API can return json of the document got from db. Thus Integrating MongoDB with REST API is comparatively easier than SQL.
Conclusion
In conclusion, using Python with MongoDB provides a powerful platform for building modern web applications that can handle large amounts of data and scale to meet the demands of modern business. The combination of Python's rich ecosystem of libraries and tools with MongoDB's flexibility and scalability allows developers to create innovative and compelling applications that meet the needs of today's businesses and organizations. The use of PyMongo and Motor drivers for MongoDB provides a comprehensive set of APIs and features for developing high-performance and scalable applications using Python, making Python and MongoDB a winning combination for building modern web applications.
Subscribe to my newsletter
Read articles from Ankur directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
Ankur
Ankur
I am a Master's student at Concordia University, Montreal, pursuing a Master of Engineering in Electrical & Computer Engineering (Computer Systems & Signal Processing Major). Previously I worked with an International Consulting firm, Wipro as Senior Software Engineer, where I supported a Telecom giant by developing and maintaining Ab Initio Data pipelines for Customer Data Extraction and Transformation, for business intelligence purposes and Airflow DAG's for Bharti Airtel. I with my team have won over 7 international and national Hackathons as a result of our commitment, perseverance and technical skills. The Hackathons were related to Data Science, Software Engineering & Blockchain. Presently, I am working on projects based on BigData, Business Intelligence, and Data Analytics. My research interests in Artificial Intelligence, Computer Vision and Data Analytics and I have published 3 research papers in the domains of IoT, Software Engineering, Machine Learning and Image processing during my undergrad. I am skilled at Python programming, SQL, UNIX, Tableau, Machine Learning, Deep Learning, Azure, Git, BigData, Jenkins, Data Structures & Algorithms.