An Introduction to Real-Time MongoDB with Change Streams and Node.js

Nitin SainiNitin Saini
7 min read

In the world of modern applications, data is rarely static. It's constantly being created, updated, and deleted. For many applications, knowing about these changes as they happen is crucial. This is where MongoDB Change Streams come in. They provide a powerful and efficient way to subscribe to and react to real-time changes in your database.

Think of a change stream as a continuous, live feed of all the modifications happening in a collection, database, or even an entire deployment. Instead of constantly polling the database for updates, which is inefficient and can be resource-intensive, your application can simply "listen" for changes and receive notifications as soon as they occur.

How Do They Work?

Change streams leverage MongoDB's oplog (operations log), a special capped collection used for replication. The oplog records all the operations that modify data in a MongoDB replica set. When you open a change stream, MongoDB essentially tails this oplog and sends the relevant change events to your application.

Each event in a change stream is a change document that contains detailed information about the modification, including:

  • Operation type: Was it an insert, update, delete, or something else?

  • Full document: For inserts and updates, you can choose to receive the full new document.

  • Document key: The unique identifier of the document that was changed.

  • Update description: For updates, this describes which fields were modified.

  • Cluster time: A timestamp that allows you to resume a change stream from a specific point in time.

This rich information allows you to build sophisticated real-time applications, such as a live dashboard showing new orders, a system that sends notifications to users when their data is updated, or a service that synchronizes data between different micro-services.

Further, you can also use an optional pipeline to filter and transform the change events before they are sent to your application. This is done using the same aggregation framework you're already familiar with. For example, you could filter for only update operations or only changes to a specific field.

Getting Started!!

Using change streams is straightforward. Most MongoDB drivers provide a simple API for opening a stream. In this guide, you'll learn how to:

  • Setup MongoDB Atlas and connect with Mongoose

  • Create a Property schema for real estate listings

  • Use Change Streams with Aggregation Pipeline to filter meaningful changes

  • Simulate data changes using setInterval (insert, update, delete)

  • Log real-time changes directly on the back-end console

Step 1: Project Setup

mkdir mongodb-change-stream-demo
cd mongodb-change-stream-demo
npm init -y
npm install express mongoose dotenv nodemon

Folder Structure:

real-estate-streams/
├── models/
│   └── Property.js
├── services/
│   └── changeStreamService.js
├── index.js
├── simulator.js
├── .env
├── package.json

Step 2: Step 2: Setup MongoDB Atlas and .env

Go to MongoDB Atlas, create a cluster, and copy your connection string.

Create a .env file:

//.env file
MONGODB_URI=mongodb+srv://<username>:<password>@cluster.mongodb.net/realestate?retryWrites=true&w=majority

Step 3: Define the Real Estate Property Schema (models/Property.js)

const mongoose = require("mongoose");

const propertySchema = new mongoose.Schema({
  title: String,
  price: Number,
  location: String,
  type: {
    type: String,
    enum: ["Apartment", "House", "Land", "Commercial"],
    default: "Apartment",
  },
  listedAt: {
    type: Date,
    default: Date.now,
  },
  status: {
    type: String,
    enum: ["available", "sold", "pending"],
    default: "available",
  },
});

module.exports = mongoose.model("Property", propertySchema);

Step 4: Listen to Real-Time Changes with Aggregation(services/changeStreamService.js)

const Property = require("../models/Property");

function listenToPropertyChanges() {
  const pipeline = [
    {
      $match: {
        operationType: { $in: ["insert", "update"] },
        "fullDocument.status": { $ne: "sold" },
      },
    },
    {
      $project: {
        operationType: 1,
        fullDocument: {
          title: 1,
          price: 1,
          location: 1,
          status: 1,
        },
        documentKey: 1,
        updateDescription: 1,
      },
    },
  ];

  const changeStream = Property.watch(pipeline);

  changeStream.on("change", (change) => {
    console.log("Filtered property change:");
    console.log(JSON.stringify(change, null, 2));
  });

  changeStream.on("error", (err) => {
    console.error("Change Stream error:", err);
  });
}

module.exports = { listenToPropertyChanges };

Step 5: Start the server (index.js)

require("dotenv").config();
const mongoose = require("mongoose");
const express = require("express");
const { listenToPropertyChanges } = require("./services/changeStreamService");

const app = express();
const PORT = 3000;

async function start() {
  try {
    await mongoose.connect(process.env.MONGODB_URI, {
      useNewUrlParser: true,
      useUnifiedTopology: true,
    });

    console.log("Connected to MongoDB Atlas");

    listenToPropertyChanges();
    app.listen(PORT, () => {
      console.log(`Server running at PORT ${PORT}`);
    });
  } catch (err) {
    console.error("MongoDB connection error:", err);
  }
}

start();

Step 6: Simulate Property Insert/Update/Delete with Timers

const mongoose = require("mongoose");
const Property = require("./models/Property");
require("dotenv").config();

const LOCATIONS = ["Mumbai", "Delhi", "Bangalore", "Hyderabad", "Chennai"];
const TYPES = ["Apartment", "House", "Commercial", "Land"];

let runCount = 0;
const maxRuns = 10;

async function randomInsert() {
  const property = new Property({
    title: `${Math.floor(Math.random() * 5) + 1} BHK in ${
      LOCATIONS[Math.floor(Math.random() * LOCATIONS.length)]
    }`,
    price: Math.floor(Math.random() * 10000000) + 5000000,
    location: LOCATIONS[Math.floor(Math.random() * LOCATIONS.length)],
    type: TYPES[Math.floor(Math.random() * TYPES.length)],
    status: "available",
  });
  await property.save();
  console.log("🟢 Inserted property:", property.title);
}

async function randomUpdate() {
  const props = await Property.find();
  if (props.length === 0) return;

  const randomProp = props[Math.floor(Math.random() * props.length)];
  const newPrice = Math.floor(Math.random() * 5000000) + 5000000;

  await Property.findByIdAndUpdate(randomProp._id, { price: newPrice });
  console.log("🟡 Updated price for:", randomProp.title);
}

async function randomDelete() {
  const props = await Property.find();
  if (props.length === 0) return;

  const randomProp = props[Math.floor(Math.random() * props.length)];
  await Property.findByIdAndDelete(randomProp._id);
  console.log("🔴 Deleted property:", randomProp.title);
}

async function startSimulation() {
  await mongoose.connect(process.env.MONGODB_URI, {
    useNewUrlParser: true,
    useUnifiedTopology: true,
  });

  console.log("Connected to MongoDB Atlas. Starting simulation...");

  const insertInterval = setInterval(async () => {
    await randomInsert();
    checkStop();
  }, 5000);

  const updateInterval = setInterval(async () => {
    await randomUpdate();
    checkStop();
  }, 7000);

  const deleteInterval = setInterval(async () => {
    await randomDelete();
    checkStop();
  }, 15000);

  function checkStop() {
    runCount++;
    if (runCount >= maxRuns) {
      clearInterval(insertInterval);
      clearInterval(updateInterval);
      clearInterval(deleteInterval);
      console.log("✅ Simulation completed after", maxRuns, "runs.");
      mongoose.disconnect();
      process.exit(0);
    }
  }
}

startSimulation();

Step 7: Start the server and simulator

For server open VS Code Terminal or editor’s terminal you’re using and run this command:

node index.js //node server.js

Open another terminal for starting the Simulator:

node simulator.js

After running the above command, you’ll see output like this:

Connected to MongoDB Atlas
Filtered property change:
{
  "_id": {
    "_data": "82688E1F6A000000012B042C0100296E5A1004DBC831BB101F4556B0909ADBFC931735463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064688E1F6973EA24CB4AE2AD98000004"
  },
  "operationType": "insert",
  "fullDocument": {

    "title": "3 BHK in Hyderabad",
    "price": 13833099,
    "location": "Mumbai",
    "status": "available"
  },
  "documentKey": {
    "_id": "688e1f6973ea24cb4ae2ad98"
  }
}
Filtered property change:
{
  "_id": {
    "_data": "82688E1F6C000000022B042C0100296E5A1004DBC831BB101F4556B0909ADBFC931735463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064688E1F6973EA24CB4AE2AD98000004"
  },
  "operationType": "update",
  "documentKey": {
    "_id": "688e1f6973ea24cb4ae2ad98"
  },
  "updateDescription": {
    "updatedFields": {
      "price": 7872085
    },
    "removedFields": [],
    "truncatedArrays": []
  }
}
Filtered property change:
{
  "_id": {
    "_data": "82688E1F6F000000012B042C0100296E5A1004DBC831BB101F4556B0909ADBFC931735463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064688E1F6E73EA24CB4AE2AD9C000004"
  },
  "operationType": "insert",
  "fullDocument": {
    "title": "5 BHK in Mumbai",
    "price": 8461944,
    "location": "Hyderabad",
    "status": "available"
  },
  "documentKey": {
    "_id": "688e1f6e73ea24cb4ae2ad9c"
  }
}
Filtered property change:
{
  "_id": {
    "_data": "82688E1F73000000022B042C0100296E5A1004DBC831BB101F4556B0909ADBFC931735463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064688E1F6973EA24CB4AE2AD98000004"
  },
  "operationType": "update",
  "documentKey": {
    "_id": "688e1f6973ea24cb4ae2ad98"
  },
  "updateDescription": {
    "updatedFields": {
      "price": 8228419
    },
    "removedFields": [],
    "truncatedArrays": []
  }
}
Filtered property change:
{
  "_id": {
    "_data": "82688E1F74000000032B042C0100296E5A1004DBC831BB101F4556B0909ADBFC931735463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064688E1F7373EA24CB4AE2ADA1000004"
  },
  "operationType": "insert",
  "fullDocument": {
    "title": "1 BHK in Chennai",
    "price": 8180896,
    "location": "Chennai",
    "status": "available"
  },
  "documentKey": {
    "_id": "688e1f7373ea24cb4ae2ada1"
  }
}
Filtered property change:
{
  "_id": {
    "_data": "82688E1F79000000032B042C0100296E5A1004DBC831BB101F4556B0909ADBFC931735463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064688E1F7873EA24CB4AE2ADA4000004"
  },
  "operationType": "insert",
  "fullDocument": {
    "title": "2 BHK in Mumbai",
    "price": 11849709,
    "location": "Chennai",
    "status": "available"
  },
  "documentKey": {
    "_id": "688e1f7873ea24cb4ae2ada4"
  }
}
Filtered property change:
{
  "_id": {
    "_data": "82688E1F7A000000022B042C0100296E5A1004DBC831BB101F4556B0909ADBFC931735463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064688E1F6E73EA24CB4AE2AD9C000004"
  },
  "operationType": "update",
  "documentKey": {
    "_id": "688e1f6e73ea24cb4ae2ad9c"
  },
  "updateDescription": {
    "updatedFields": {
      "price": 6573961
    },
    "removedFields": [],
    "truncatedArrays": []
  }
}
Filtered property change:
{
  "_id": {
    "_data": "82688E1F7E000000012B042C0100296E5A1004DBC831BB101F4556B0909ADBFC931735463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064688E1F7D73EA24CB4AE2ADA8000004"
  },
  "operationType": "insert",
  "fullDocument": {
    "title": "5 BHK in Mumbai",
    "price": 11690761,
    "location": "Chennai",
    "status": "available"
  },
  "documentKey": {
    "_id": "688e1f7d73ea24cb4ae2ada8"
  }
}
Filtered property change:
{
  "_id": {
    "_data": "82688E1F81000000022B042C0100296E5A1004DBC831BB101F4556B0909ADBFC931735463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064688E1F6E73EA24CB4AE2AD9C000004"
  },
  "operationType": "update",
  "documentKey": {
    "_id": "688e1f6e73ea24cb4ae2ad9c"
  },
  "updateDescription": {
    "updatedFields": {
      "price": 8099526
    },
    "removedFields": [],
    "truncatedArrays": []
  }
}
Connected to MongoDB Atlas. Starting simulation...
🟢 Inserted property: 3 BHK in Hyderabad
🟡 Updated price for: 3 BHK in Hyderabad
🟢 Inserted property: 5 BHK in Mumbai
🟡 Updated price for: 3 BHK in Hyderabad
🔴 Deleted property: 3 BHK in Hyderabad
🟢 Inserted property: 1 BHK in Chennai
🟢 Inserted property: 2 BHK in Mumbai
🟡 Updated price for: 5 BHK in Mumbai
🟢 Inserted property: 5 BHK in Mumbai
🟡 Updated price for: 5 BHK in Mumbai
Simulation completed after 10 runs.

Additional: Send Events to Front-end

You can use Socket.IO to send these real-time events to the front-end.

Key Benefits

  • Real-time Reactivity: Build applications that respond instantly to data changes without the overhead of constant polling.

  • Efficiency: Reduce the load on your database by eliminating unnecessary queries.

  • Simplicity: The API is easy to use and integrates seamlessly with existing MongoDB drivers.

  • Reliability: Change streams are resumable. If your application disconnects, it can pick up right where it left off, thanks to the resume token included in each change document.

MongoDB Change Streams are a foundational tool for building modern, event-driven architectures. They empower you to move beyond a traditional request-response model and build applications that are more responsive, efficient, and dynamic. Whether you're building a real-time analytics dashboard, a data synchronization service, or a notification system, change streams can simplify your architecture and help you keep pace with your data.

Happy coding! If you have any questions or suggestions you'd like to explore further, feel free to drop a comment below.

See you in the next blog. Please don’t forget to follow me:

Twitter
LinkedIn

0
Subscribe to my newsletter

Read articles from Nitin Saini directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Nitin Saini
Nitin Saini

A Full Stack Web Developer, possessing a strong command of React.js, Node.js, Express.js, MongoDB, and AWS, alongside Next.js, Redux, and modern JavaScript (ES6+)