Building NeuroStash - IV

In Part III, I covered the RBAC system that controls who can do what in NeuroStash. Now comes the exciting part - building the core functionality that users actually interact with: document management and knowledge base creation.
This isn't just about storing files. Document management requires handling:
Atomic uploads with proper rollback mechanisms
Conflict resolution when uploads fail midway
Presigned URL generation for secure direct-to-S3 uploads
Document lifecycle management with proper state tracking
Knowledge base associations with many-to-many relationships
Let me walk you through how I solved these challenges.
The Document Upload Challenge
Most engineers think document upload is simple: "Just accept the file and save it." But in production, you quickly discover the edge cases:
What if the upload fails halfway through?
What if the same filename is uploaded twice?
How do you prevent orphaned database records?
How do you handle large files without timeouts?
I designed a two-phase upload system that solves all these problems.
Phase 1: Presigned URL Generation
Instead of uploading through the API server (which creates bottlenecks), I generate presigned URLs for direct S3 uploads:
@router.post("/upload", response_model=GeneratedPresignedUrls)
def upload_documents(
req: GeneratePresignedUrlsReq,
db: SessionDep,
payload: TokenPayloadDep,
aws_client: AwsDep,
):
list_of_documents: List[CreateDocument] = []
url_by_filename: Dict[str, str] = {}
for file in req.files:
# Generate unique filename to prevent conflicts
unique_id = uuid.uuid4()
filename = f"{file}-{unique_id}"
object_key = f"{payload.user_id}/{filename}"
# Generate presigned URL for direct S3 upload
content_type = aws_client.extract_content_type(filename=file)
url = aws_client.generate_presigned_upload_url(
object_key=object_key, content_type=content_type
)
url_by_filename[filename] = url
document = CreateDocument(
user_id=payload.user_id,
file_name=filename,
object_key=object_key
)
list_of_documents.append(document)
# Create database records in PENDING state
created_documents = create_document(db=db, files=list_of_documents)
final_response: Dict[int, str] = {}
for doc_id, filename in created_documents:
presigned_url = url_by_filename.get(filename)
if presigned_url:
final_response[doc_id] = presigned_url
return GeneratedPresignedUrls(
message="generated presigned urls successfully",
urls=final_response
)
Key Design Decisions:
Unique filenames: Append UUID to prevent conflicts
User-scoped object keys: Isolate user data with
{user_id}/{filename}
PENDING state: Documents start locked until upload completes
Atomic database operations: All records created in single transaction
The Database Schema: Built for Reliability
The document registry schema handles the complex state management:
class DocumentRegistry(Base, TimestampMixin):
__tablename__ = "documents_registry"
id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True)
user_id: Mapped[int] = mapped_column(
ForeignKey("user_clients.id", onupdate="CASCADE", ondelete="RESTRICT"),
nullable=False,
)
file_name: Mapped[str] = mapped_column(String(100), nullable=False)
object_key: Mapped[str] = mapped_column(String(150), nullable=False)
lock_status: Mapped[bool] = mapped_column(Boolean, nullable=False)
op_status: Mapped[OperationStatusEnum] = mapped_column(
SQLEnum(OperationStatusEnum),
nullable=False,
server_default=OperationStatusEnum.PENDING.value,
)
__table_args__ = (
UniqueConstraint("user_id", "file_name", name="idx_unique_filename"),
Index("idx_file_registry_user_id", "user_id", "lock_status", "op_status"),
)
The idea of the state machine:
PENDING + locked
: Upload in progressSUCCESS + unlocked
: Ready for useFAILED + unlocked
: Upload failed, can retryPENDING + unlocked
: Conflict state (needs cleanup)
Phase 2: Upload Finalization
After S3 upload completes (or fails), the client calls the finalize endpoint:
@router.put("/finalize", response_model=StandardResponse)
def post_upload_documents(req: FinalizeDocumentReq, db: SessionDep):
try:
finalize_documents(
db=db,
successful=req.successful,
failed=req.failed
)
return StandardResponse(message="successfully finalized documents")
except Exception:
logger.exception("error finalizing document")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="error finalizing documents",
)
what is actually the finalize_documents
function about ? So here it is,
def finalize_documents(*, db: Session, successful: List[int], failed: List[int]):
try:
all_ids = successful + failed
stmt = (
update(DocumentRegistry)
.where(DocumentRegistry.id.in_(all_ids))
.values(
op_status=case(
(
DocumentRegistry.id.in_(successful),
cast(
OperationStatusEnum.SUCCESS.value,
DocumentRegistry.op_status.type,
),
),
(
DocumentRegistry.id.in_(failed),
cast(
OperationStatusEnum.FAILED.value,
DocumentRegistry.op_status.type,
),
),
),
lock_status=False,
)
)
db.execute(stmt)
db.commit()
except Exception:
db.rollback()
raise
This single query updates hundreds of documents atomically - successful ones get SUCCESS
status, failed ones get FAILED
status, and all get unlocked.
Knowledge Base Creation: Simple but Robust
@router.post("/create", response_model=CreatedKb)
def create_knowledge_base(
req: CreateKbReq,
db: SessionDep,
token_payload: TokenPayloadDep
):
try:
args = CreateKbInDb(user_id=token_payload.user_id, name=req.name)
created_kb = create_kb_db(db=db, kb=args)
return CreatedKb(
message="successfully created knowledge base",
id=created_kb.id,
name=created_kb.name,
)
except KnowledgeBaseAlreadyExists:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="knowledge base already exists"
)
The database function handles the uniqueness constraint:
def create_kb_db(*, db: Session, kb: CreateKbInDb) -> KnowledgeBase:
try:
knowledge_base = KnowledgeBase(**kb.model_dump())
db.add(knowledge_base)
db.commit()
db.refresh(knowledge_base)
return knowledge_base
except IntegrityError as e:
db.rollback()
if isinstance(e.orig, psycopg.errors.UniqueViolation):
raise KnowledgeBaseAlreadyExists(knowledg_base_name=kb.name)
else:
raise RuntimeError(f"database integrity error: {e}")
Custom exceptions make error handling clean - the API layer can catch specific exceptions and return appropriate HTTP status codes.
Knowledge Base Architecture: Many-to-Many Done Right
Documents alone aren't useful - you need to organize them into knowledge bases. This requires a many-to-many relationship where:
One document can belong to multiple knowledge bases
One knowledge base can contain multiple documents
Each association has its own status tracking
The Junction Table Pattern
class KnowledgeBaseDocument(Base, TimestampMixin):
__tablename__ = "knowledge_base_documents"
id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True)
knowledge_base_id: Mapped[int] = mapped_column(
ForeignKey("knowledge_bases.id", onupdate="CASCADE", ondelete="CASCADE"),
nullable=False,
)
document_id: Mapped[int] = mapped_column(
ForeignKey("documents_registry.id", onupdate="CASCADE", ondelete="CASCADE"),
nullable=False,
)
status: Mapped[OperationStatusEnum] = mapped_column(
SQLEnum(OperationStatusEnum),
nullable=False,
server_default=OperationStatusEnum.PENDING.value,
)
__table_args__ = (
UniqueConstraint(
"knowledge_base_id", "document_id",
name="idx_unique_kb_doc_combination"
),
)
Why not just use a simple junction table? Because, we are going to have ingestion job which is going to be asynchronous and having a state machine helps to track things and clean them up. A document might be:
PENDING
: Being processed for the knowledge baseSUCCESS
: Successfully added and indexedFAILED
: Processing failed, needs retry
Document Lifecycle: The Cleanup Problem
In production, things go wrong. Networks fail, users close browsers, S3 timeouts occur. This creates "conflicted" documents - database records that don't match S3 reality.
I built an automated cleanup system that is ofcourse triggered by API endpoint:
@router.get("/cleanup", response_model=StandardResponse)
def cleanup_files(db: SessionDep, payload: TokenPayloadDep, aws_client: AwsDep):
conflicting_docs = conflicted_docs(db=db, user_id=payload.user_id)
if not conflicting_docs:
return StandardResponse(message="no conflicting files found")
to_be_unlocked = []
to_be_deleted = []
for doc in conflicting_docs:
exists: bool = aws_client.object_exists(object_key=doc.object_key)
if not exists:
# Database record but no S3 object - delete record
to_be_deleted.append(doc.id)
else:
# S3 object exists but record is conflicted - fix status
to_be_unlocked.append(doc.id)
if to_be_deleted or to_be_unlocked:
cleanup_docs(
db=db,
user_id=payload.user_id,
to_be_unlocked=to_be_unlocked,
to_be_deleted=to_be_deleted,
)
return StandardResponse(message="cleanup successfully finished")
The conflicted_docs
function finds documents in conflicted states:
def conflicted_docs(*, db: Session, user_id: int) -> List[DocumentRegistry]:
# Valid state combinations
valid_combinations = [
(True, OperationStatusEnum.PENDING), # Upload in progress
(True, OperationStatusEnum.SUCCESS), # Processing
(True, OperationStatusEnum.FAILED), # Failed processing
(False, OperationStatusEnum.PENDING), # Failed upload
(False, OperationStatusEnum.FAILED), # Available failed
# (False, OperationStatusEnum.SUCCESS) - This is the only valid "ready" state
]
stmt = select(DocumentRegistry).where(
and_(
DocumentRegistry.user_id == user_id,
or_(
*[
and_(
DocumentRegistry.lock_status == lock_status,
DocumentRegistry.op_status == op_status,
)
for lock_status, op_status in valid_combinations
]
),
)
)
result = db.execute(stmt)
return result.scalars().all()
This finds documents that need attention and reconciles database state with S3 reality.
Listing and Pagination: Performance Matters
When you have thousands of documents, naive listing kills performance. I implemented efficient pagination:
def list_files(
*, db: Session, user_id: int, limit: int, offset: int
) -> Tuple[List[DocumentRegistry], int]:
try:
# Base query for ready documents only
stmt = select(DocumentRegistry).where(
DocumentRegistry.user_id == user_id,
DocumentRegistry.lock_status == False,
DocumentRegistry.op_status == OperationStatusEnum.SUCCESS,
)
# Count total for pagination metadata
count_stmt = select(func.count()).select_from(stmt.subquery())
total_count = db.execute(count_stmt).scalar()
# Apply pagination
stmt = stmt.limit(limit=limit).offset(offset=offset)
documents = db.execute(stmt).scalars().all()
return documents, total_count
except Exception:
logger.error("error listing user documents")
raise
Key optimizations:
Filter at database level: Only query documents in
SUCCESS
stateSeparate count query: Get total count for pagination metadata
Index support: The composite index on
(user_id, lock_status, op_status)
makes this blazing fast
Real-World Usage Flow
Here's how the complete system works in practice:
# 1. Request presigned URLs for upload
POST /documents/upload
Authorization: Bearer <jwt-token>
{
"files": ["document1.pdf", "document2.docx"]
}
# Returns:
{
"urls": {
"123": "https://s3.amazonaws.com/bucket/presigned-url-1",
"124": "https://s3.amazonaws.com/bucket/presigned-url-2"
}
}
# 2. Upload directly to S3 using presigned URLs
PUT https://s3.amazonaws.com/bucket/presigned-url-1
Content-Type: application/pdf
<binary data>
# 3. Finalize the upload status
PUT /documents/finalize
{
"successful": [123],
"failed": [124]
}
# 4. Create knowledge base
POST /kb/create
{
"name": "Engineering Docs"
}
# 5. List available documents
GET /documents/list?limit=20&offset=0
# 6. Add documents to knowledge base
# (This would be in the ingestion system - Part V!)
Lessons Learned & Best Practices
1. State Machines Save Your Sanity
Don't use boolean flags for complex states. Enums with clear state transitions make debugging infinitely easier.
2. Always Plan for Failure
Networks fail, uploads timeout, users close browsers. Design your system to handle partial failures gracefully.
3. Presigned URLs Scale Better
Direct-to-S3 uploads bypass your API servers entirely, eliminating bottlenecks and reducing costs.
4. Cleanup is Not Optional
In production, orphaned records accumulate. Build cleanup mechanisms from day one.
5. Index Your Queries
The composite index on (user_id, lock_status, op_status)
makes document listing sub-millisecond even with millions of records.
Want to see the full implementation? Check out the NeuroStash repository and follow along as we build something amazing! ๐
Code: https://github.com/DEVunderdog/NeuroStash
Connect with me: https://x.com/fkhoja098
Subscribe to my newsletter
Read articles from Farhan Khoja directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
