API GeoJSON menggunakan FastAPI + PostgreSQL + MongoDB

HidayatullahHidayatullah
3 min read

Berikut API GeoJSON menggunakan FastAPI dengan MongoDB yang seeding otomatis ke PostgreSQL.

main.py

from fastapi import FastAPI, HTTPException
import asyncpg
import motor.motor_asyncio
import json
import logging
import asyncio
import os
from pymongo import UpdateOne

app = FastAPI()

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

pg_pool = None

async def init_pg_pool():
    global pg_pool
    pg_pool = await asyncpg.create_pool(
        user=os.getenv("POSTGRES_USER", "postgres"),
        password=os.getenv("POSTGRES_PASSWORD", "password"),
        database=os.getenv("POSTGRES_DB", "geodb"),
        host=os.getenv("POSTGRES_HOST", "localhost"),
        port=int(os.getenv("POSTGRES_PORT", 5432)),
        min_size=1, max_size=10  
    )

MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:12345")
logging.info(f"Menggunakan MongoDB URI: {MONGO_URI}")

mongo_client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_URI)
mongo_db = mongo_client["db1"]
mongo_collection = mongo_db["polygon"]

async def fetch_pg_data():
    async with pg_pool.acquire() as conn:
        query = """
            SELECT id, ST_AsGeoJSON(geom) AS geometry, name, description, photo 
            FROM public.buildings;
        """
        rows = await conn.fetch(query)
        return rows

async def sync_postgres_to_mongo():
    rows = await fetch_pg_data()

    if not rows:
        logging.info("⚠ Tidak ada data yang ditemukan di PostgreSQL")
        return

    update_operations = []
    new_ids = set()

    for row in rows:
        new_ids.add(row["id"])
        new_data = {
            "geometry": json.loads(row["geometry"]),
            "name": row["name"],
            "description": row["description"],
            "photo": row["photo"]
        }

        update_operations.append(
            UpdateOne({"_id": row["id"]}, {"$set": new_data}, upsert=True)
        )

    try:
        if update_operations:
            result = await mongo_collection.bulk_write(update_operations)
            logging.info(f"🟢 {result.matched_count} diperbarui, {result.upserted_count} ditambahkan.")
    except Exception as e:
        logging.error(f"⚠️ Gagal menyimpan ke MongoDB: {e}")

    # Hapus data di MongoDB yang tidak ada di PostgreSQL
    await mongo_collection.delete_many({"_id": {"$nin": list(new_ids)}})
    logging.info("🟢 Data PostgreSQL berhasil disinkronkan ke MongoDB")

async def continuous_sync():
    while True:
        await sync_postgres_to_mongo()
        await asyncio.sleep(5)  # Sinkronisasi setiap 5 detik

@app.get("/api/mongo-geojson")
async def get_mongo_geojson():
    try:
        cursor = mongo_collection.find()
        documents = await cursor.to_list(length=None)

        features = [{
            "type": "Feature",
            "geometry": doc["geometry"],
            "properties": {
                "id": doc["_id"],
                "name": doc.get("name"),
                "description": doc.get("description"),
                "photo": doc.get("photo")
            }
        } for doc in documents]

        if not features:
            raise HTTPException(status_code=404, detail="No features found in MongoDB")

        return {"type": "FeatureCollection", "features": features}
    except Exception as e:
        logging.error(f"Exception: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/check-count")
async def check_count():
    try:
        async with pg_pool.acquire() as conn:
            pg_count = await conn.fetchval("SELECT COUNT(*) FROM public.buildings;")
        mongo_count = await mongo_collection.estimated_document_count()
        return {"postgre_count": pg_count, "mongo_count": mongo_count}
    except Exception as e:
        logging.error(f"Error saat mengecek jumlah data: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))


@app.on_event("startup")
async def startup_event():
    await init_pg_pool()
    asyncio.create_task(continuous_sync())

@app.on_event("shutdown")
async def shutdown_event():
    if pg_pool:
        await pg_pool.close()
        logging.info("PostgreSQL pool telah ditutup.")

File ini ceritanya bernama main.py. Keuntungannya menggunakan MongoDB sebagai NoSQL kabarnya ialah kecepatan aksesnya. setelah kita pelajari kecepatannya sebagai berikut:

Wajar bila cepat, karena:
- backend yang digunakan menggunakan FastAPI + NGINX
- data yang digunakan berisi 1467 bidang bangunan dengan attribut yang sedikit
- sifatnya hanya menampilkan (READ), tidak dalam mode editing (CREATE, READ, UPDATE dan DELETE / CRUD). tetapi ini masih menjadi PR bila data yang ditangani lebih dari jutaan polygon.

Tapi dari skema diatas, didapatkan keuntungan diantaranya:
- metode ini melindungi postgresql secara langsung karena mekanisme ditangani langsung oleh MongoDB
- transaksional ini bisa berakibat pada kecepatan postgresql yang stabil karena seeding tidak dilakukan oleh postgre tapi melalui fastapi
- request api ditangani fastapi dengan mengambil data dari MongoDB saja

Jadi disarankan penggunaan ditingkatkan untuk editing GeoJSON web browser by Boundary Screen Browser dengan Skala Detil (1:200 - 1:5.000). Sangat berguna sekali apabila dilakukan digitasi polygon bangunan melalui peta Foto Udara Resolusi Tinggi Presisi (FURTP).

0
Subscribe to my newsletter

Read articles from Hidayatullah directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Hidayatullah
Hidayatullah

Hi, my name is Hidayatullah. I am a GIS Engineer, Analyst, Specialist, and everything related to GIS. With over 5 years of experience, I am highly proficient in ArcGIS and QGIS. I specialize in spatial topology methods, least square adjustment measurement methods, PostGIS with PostgreSQL, RDBMS databases, spatial generalization by scale, WebGIS Geoserveer/Mapserver/Mapproxy, and more. If you're interested in my services, feel free to reach out via email at genhidayatullah@icloud.com.