Building NeuroStash - IV

Farhan KhojaFarhan Khoja
7 min read

In Part III, I covered the RBAC system that controls who can do what in NeuroStash. Now comes the exciting part - building the core functionality that users actually interact with: document management and knowledge base creation.

This isn't just about storing files. Document management requires handling:

  • Atomic uploads with proper rollback mechanisms

  • Conflict resolution when uploads fail midway

  • Presigned URL generation for secure direct-to-S3 uploads

  • Document lifecycle management with proper state tracking

  • Knowledge base associations with many-to-many relationships

Let me walk you through how I solved these challenges.

The Document Upload Challenge

Most engineers think document upload is simple: "Just accept the file and save it." But in production, you quickly discover the edge cases:

  • What if the upload fails halfway through?

  • What if the same filename is uploaded twice?

  • How do you prevent orphaned database records?

  • How do you handle large files without timeouts?

I designed a two-phase upload system that solves all these problems.

Phase 1: Presigned URL Generation

Instead of uploading through the API server (which creates bottlenecks), I generate presigned URLs for direct S3 uploads:

@router.post("/upload", response_model=GeneratedPresignedUrls)
def upload_documents(
    req: GeneratePresignedUrlsReq,
    db: SessionDep,
    payload: TokenPayloadDep,
    aws_client: AwsDep,
):
    list_of_documents: List[CreateDocument] = []
    url_by_filename: Dict[str, str] = {}

    for file in req.files:
        # Generate unique filename to prevent conflicts
        unique_id = uuid.uuid4()
        filename = f"{file}-{unique_id}"
        object_key = f"{payload.user_id}/{filename}"

        # Generate presigned URL for direct S3 upload
        content_type = aws_client.extract_content_type(filename=file)
        url = aws_client.generate_presigned_upload_url(
            object_key=object_key, content_type=content_type
        )

        url_by_filename[filename] = url
        document = CreateDocument(
            user_id=payload.user_id, 
            file_name=filename, 
            object_key=object_key
        )
        list_of_documents.append(document)

    # Create database records in PENDING state
    created_documents = create_document(db=db, files=list_of_documents)

    final_response: Dict[int, str] = {}
    for doc_id, filename in created_documents:
        presigned_url = url_by_filename.get(filename)
        if presigned_url:
            final_response[doc_id] = presigned_url

    return GeneratedPresignedUrls(
        message="generated presigned urls successfully", 
        urls=final_response
    )

Key Design Decisions:

  • Unique filenames: Append UUID to prevent conflicts

  • User-scoped object keys: Isolate user data with {user_id}/{filename}

  • PENDING state: Documents start locked until upload completes

  • Atomic database operations: All records created in single transaction

The Database Schema: Built for Reliability

The document registry schema handles the complex state management:

class DocumentRegistry(Base, TimestampMixin):
    __tablename__ = "documents_registry"

    id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True)
    user_id: Mapped[int] = mapped_column(
        ForeignKey("user_clients.id", onupdate="CASCADE", ondelete="RESTRICT"),
        nullable=False,
    )
    file_name: Mapped[str] = mapped_column(String(100), nullable=False)
    object_key: Mapped[str] = mapped_column(String(150), nullable=False)
    lock_status: Mapped[bool] = mapped_column(Boolean, nullable=False)
    op_status: Mapped[OperationStatusEnum] = mapped_column(
        SQLEnum(OperationStatusEnum),
        nullable=False,
        server_default=OperationStatusEnum.PENDING.value,
    )

    __table_args__ = (
        UniqueConstraint("user_id", "file_name", name="idx_unique_filename"),
        Index("idx_file_registry_user_id", "user_id", "lock_status", "op_status"),
    )

The idea of the state machine:

  • PENDING + locked: Upload in progress

  • SUCCESS + unlocked: Ready for use

  • FAILED + unlocked: Upload failed, can retry

  • PENDING + unlocked: Conflict state (needs cleanup)

Phase 2: Upload Finalization

After S3 upload completes (or fails), the client calls the finalize endpoint:

@router.put("/finalize", response_model=StandardResponse)
def post_upload_documents(req: FinalizeDocumentReq, db: SessionDep):
    try:
        finalize_documents(
            db=db, 
            successful=req.successful, 
            failed=req.failed
        )
        return StandardResponse(message="successfully finalized documents")
    except Exception:
        logger.exception("error finalizing document")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="error finalizing documents",
        )

what is actually the finalize_documents function about ? So here it is,

def finalize_documents(*, db: Session, successful: List[int], failed: List[int]):
    try:
        all_ids = successful + failed

        stmt = (
            update(DocumentRegistry)
            .where(DocumentRegistry.id.in_(all_ids))
            .values(
                op_status=case(
                    (
                        DocumentRegistry.id.in_(successful),
                        cast(
                            OperationStatusEnum.SUCCESS.value,
                            DocumentRegistry.op_status.type,
                        ),
                    ),
                    (
                        DocumentRegistry.id.in_(failed),
                        cast(
                            OperationStatusEnum.FAILED.value,
                            DocumentRegistry.op_status.type,
                        ),
                    ),
                ),
                lock_status=False,
            )
        )

        db.execute(stmt)
        db.commit()
    except Exception:
        db.rollback()
        raise

This single query updates hundreds of documents atomically - successful ones get SUCCESS status, failed ones get FAILED status, and all get unlocked.

Knowledge Base Creation: Simple but Robust

@router.post("/create", response_model=CreatedKb)
def create_knowledge_base(
    req: CreateKbReq, 
    db: SessionDep, 
    token_payload: TokenPayloadDep
):
    try:
        args = CreateKbInDb(user_id=token_payload.user_id, name=req.name)
        created_kb = create_kb_db(db=db, kb=args)
        return CreatedKb(
            message="successfully created knowledge base",
            id=created_kb.id,
            name=created_kb.name,
        )
    except KnowledgeBaseAlreadyExists:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT, 
            detail="knowledge base already exists"
        )

The database function handles the uniqueness constraint:

def create_kb_db(*, db: Session, kb: CreateKbInDb) -> KnowledgeBase:
    try:
        knowledge_base = KnowledgeBase(**kb.model_dump())
        db.add(knowledge_base)
        db.commit()
        db.refresh(knowledge_base)
        return knowledge_base
    except IntegrityError as e:
        db.rollback()
        if isinstance(e.orig, psycopg.errors.UniqueViolation):
            raise KnowledgeBaseAlreadyExists(knowledg_base_name=kb.name)
        else:
            raise RuntimeError(f"database integrity error: {e}")

Custom exceptions make error handling clean - the API layer can catch specific exceptions and return appropriate HTTP status codes.

Knowledge Base Architecture: Many-to-Many Done Right

Documents alone aren't useful - you need to organize them into knowledge bases. This requires a many-to-many relationship where:

  • One document can belong to multiple knowledge bases

  • One knowledge base can contain multiple documents

  • Each association has its own status tracking

The Junction Table Pattern

class KnowledgeBaseDocument(Base, TimestampMixin):
    __tablename__ = "knowledge_base_documents"

    id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True)
    knowledge_base_id: Mapped[int] = mapped_column(
        ForeignKey("knowledge_bases.id", onupdate="CASCADE", ondelete="CASCADE"),
        nullable=False,
    )
    document_id: Mapped[int] = mapped_column(
        ForeignKey("documents_registry.id", onupdate="CASCADE", ondelete="CASCADE"),
        nullable=False,
    )
    status: Mapped[OperationStatusEnum] = mapped_column(
        SQLEnum(OperationStatusEnum),
        nullable=False,
        server_default=OperationStatusEnum.PENDING.value,
    )

    __table_args__ = (
        UniqueConstraint(
            "knowledge_base_id", "document_id", 
            name="idx_unique_kb_doc_combination"
        ),
    )

Why not just use a simple junction table? Because, we are going to have ingestion job which is going to be asynchronous and having a state machine helps to track things and clean them up. A document might be:

  • PENDING: Being processed for the knowledge base

  • SUCCESS: Successfully added and indexed

  • FAILED: Processing failed, needs retry

Document Lifecycle: The Cleanup Problem

In production, things go wrong. Networks fail, users close browsers, S3 timeouts occur. This creates "conflicted" documents - database records that don't match S3 reality.

I built an automated cleanup system that is ofcourse triggered by API endpoint:

@router.get("/cleanup", response_model=StandardResponse)
def cleanup_files(db: SessionDep, payload: TokenPayloadDep, aws_client: AwsDep):
    conflicting_docs = conflicted_docs(db=db, user_id=payload.user_id)

    if not conflicting_docs:
        return StandardResponse(message="no conflicting files found")

    to_be_unlocked = []
    to_be_deleted = []

    for doc in conflicting_docs:
        exists: bool = aws_client.object_exists(object_key=doc.object_key)
        if not exists:
            # Database record but no S3 object - delete record
            to_be_deleted.append(doc.id)
        else:
            # S3 object exists but record is conflicted - fix status
            to_be_unlocked.append(doc.id)

    if to_be_deleted or to_be_unlocked:
        cleanup_docs(
            db=db,
            user_id=payload.user_id,
            to_be_unlocked=to_be_unlocked,
            to_be_deleted=to_be_deleted,
        )

    return StandardResponse(message="cleanup successfully finished")

The conflicted_docs function finds documents in conflicted states:

def conflicted_docs(*, db: Session, user_id: int) -> List[DocumentRegistry]:
    # Valid state combinations
    valid_combinations = [
        (True, OperationStatusEnum.PENDING),   # Upload in progress
        (True, OperationStatusEnum.SUCCESS),   # Processing
        (True, OperationStatusEnum.FAILED),    # Failed processing
        (False, OperationStatusEnum.PENDING),  # Failed upload
        (False, OperationStatusEnum.FAILED),   # Available failed
        # (False, OperationStatusEnum.SUCCESS) - This is the only valid "ready" state
    ]

    stmt = select(DocumentRegistry).where(
        and_(
            DocumentRegistry.user_id == user_id,
            or_(
                *[
                    and_(
                        DocumentRegistry.lock_status == lock_status,
                        DocumentRegistry.op_status == op_status,
                    )
                    for lock_status, op_status in valid_combinations
                ]
            ),
        )
    )

    result = db.execute(stmt)
    return result.scalars().all()

This finds documents that need attention and reconciles database state with S3 reality.

Listing and Pagination: Performance Matters

When you have thousands of documents, naive listing kills performance. I implemented efficient pagination:

def list_files(
    *, db: Session, user_id: int, limit: int, offset: int
) -> Tuple[List[DocumentRegistry], int]:
    try:
        # Base query for ready documents only
        stmt = select(DocumentRegistry).where(
            DocumentRegistry.user_id == user_id,
            DocumentRegistry.lock_status == False,
            DocumentRegistry.op_status == OperationStatusEnum.SUCCESS,
        )

        # Count total for pagination metadata
        count_stmt = select(func.count()).select_from(stmt.subquery())
        total_count = db.execute(count_stmt).scalar()

        # Apply pagination
        stmt = stmt.limit(limit=limit).offset(offset=offset)
        documents = db.execute(stmt).scalars().all()

        return documents, total_count
    except Exception:
        logger.error("error listing user documents")
        raise

Key optimizations:

  • Filter at database level: Only query documents in SUCCESS state

  • Separate count query: Get total count for pagination metadata

  • Index support: The composite index on (user_id, lock_status, op_status) makes this blazing fast

Real-World Usage Flow

Here's how the complete system works in practice:

# 1. Request presigned URLs for upload
POST /documents/upload
Authorization: Bearer <jwt-token>
{
  "files": ["document1.pdf", "document2.docx"]
}

# Returns:
{
  "urls": {
    "123": "https://s3.amazonaws.com/bucket/presigned-url-1",
    "124": "https://s3.amazonaws.com/bucket/presigned-url-2"
  }
}

# 2. Upload directly to S3 using presigned URLs
PUT https://s3.amazonaws.com/bucket/presigned-url-1
Content-Type: application/pdf
<binary data>

# 3. Finalize the upload status
PUT /documents/finalize
{
  "successful": [123],
  "failed": [124]
}

# 4. Create knowledge base
POST /kb/create
{
  "name": "Engineering Docs"
}

# 5. List available documents
GET /documents/list?limit=20&offset=0

# 6. Add documents to knowledge base
# (This would be in the ingestion system - Part V!)

Lessons Learned & Best Practices

1. State Machines Save Your Sanity

Don't use boolean flags for complex states. Enums with clear state transitions make debugging infinitely easier.

2. Always Plan for Failure

Networks fail, uploads timeout, users close browsers. Design your system to handle partial failures gracefully.

3. Presigned URLs Scale Better

Direct-to-S3 uploads bypass your API servers entirely, eliminating bottlenecks and reducing costs.

4. Cleanup is Not Optional

In production, orphaned records accumulate. Build cleanup mechanisms from day one.

5. Index Your Queries

The composite index on (user_id, lock_status, op_status) makes document listing sub-millisecond even with millions of records.

Want to see the full implementation? Check out the NeuroStash repository and follow along as we build something amazing! ๐Ÿš€

Code: https://github.com/DEVunderdog/NeuroStash
Connect with me: https://x.com/fkhoja098

0
Subscribe to my newsletter

Read articles from Farhan Khoja directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Farhan Khoja
Farhan Khoja